Spark ALS-WR giving the same recommended items for all users

We are trying to build a recommendation system for a supermarket with diverse item types (ranging from fast-moving grocery to low-moving electronic items). Some items are purchased more frequently in high volume and some items are purchased only once. We have purchase history data of 4 months from 25K+ customers across 30K+ SKU's from 100+ departments. We ran ALS-WR in Spark to generate recommendations. To our surprise, we are receiving top 15 recommendations for each customer quite generic without much …
Category: Data Science

How to Predict/Forecast street's Traffic based on previous values?

I have a dataset which has the following 5 columns: date, hour, day_of_week, street_id, counts My dataset has information about the number of cars that each street (same city) has in a given hour of a certain date, and I want to predict the traffic count that a certain street has in a given hour of a certain date. I think I could use certain variables depending on the day and hour that I want to predict, for example, if …
Category: Data Science

How to compute the median of a Date type of column in Spark (JAVA)

I have extracted a column from a dataset that contains Date type of values: +-------------------+ | Created_datetime | +-------------------+ |2019-10-12 17:09:18| |2019-12-03 07:02:07| |2020-01-16 23:10:08| The Type of the column being StringType in Spark. And i want to compute the average of these dates, for example in the above case will be 2019-12-03 07:02:07 since it is the median date of the three dates. How to achieve that in Spark in Java? I tried using dataset.select(org.apache.spark.sql.functions.avg(dataset.col("Created_datetime").cast("timestamp"))).first().getDouble(0) But as it is …
Category: Data Science

Pyspark Dataframes to Pandas and ML Ops - Parallel Execution Hold?

If I convert a spark dataframe into a pandas dataframe and subsequently apply pandas operations and sklearn models to the dataset in databricks, will the operations from pandas and sklearn be distributed across the cluster? Or do i have to use pyspark dataframe operations and pyspark ml packages for operations to be distributed?
Category: Data Science

How to construct the document-topic matrix using the word-topic and topic-word matrix calculated using Latent Dirichlet Allocation?

How to construct the document-topic matrix using the word-topic and topic-word matrix calculated using Latent Dirichlet Allocation? I can not seem to find it anywhere, even not from the author of LDA, M.Blei. Gensim and sklearn just work, but I want to know how to use the two matrices to construct the document topic-matrix (Spark MLLIB LDA only gives me the 2 matrices and not the document-topic matrix).
Category: Data Science

Computing number of business days between start/end columns

I have two Dataframes facts: columns: data, start_date and end_date holidays: column: holiday_date What I want is a way to produce another Dataframe that has columns: data, start_date, end_date and num_holidays Where num_holidays is computed as: Number of days between start and end that are not weekends or holidays (as in the holidays table). The solution is here if we wanted to do this in PL/SQL. Crux is this part of code: --Calculate and return the number of workdays using …
Category: Data Science

Is spark als item feature comparable between several runs

I am using spark als.train() to build my user-items recommendation system. The problem is I want to cover more item feature. So, I need to input 7 days user action data. But the als train become slow than just input 1 day data. So, is it possible that I just input 1 day data, and compare the similarities between other runs(every time just input 1 day)?
Category: Data Science

Is it possible to implement an rdd version of a for loop having map and reduce using pyspark?

I need to test an algorithm that computes a function on a dataframe where in each execution I drop a column and computes the function. This is a example in python pyspark but without using rdd: df2581=spark.sparkContext.parallelize([Row(a=1 ,b=3,c=5,d=7,e=9)]).toDF() df2581.show() wo = df2581.rdd.flatMap(lambda x: x[1:] ).map(lambda a:print(type(a))) wo.collect() def f(x): list3 = [] index = 0 list2 = x for j in x: list = array(x) list.remove(list[index]) list3 = list.copy() index += 1 return list3 colu= df2581.columns def add(x,y): return x+y …
Category: Data Science

Retrieve user features in real time from UserId for prediction

Let's say I'm building an app like Uber and I want to predict the user's most likely destination based on the user's past history, current latitude/longitude, and time/date. Here is the proposed architecture - Let's say I have a pre-trained model hosted as a service. The part I'm struggling with is, how do I get the user features from the database in realtime from the RiderID to be used by the prediction service (XGBoost Model)? I'm guessing a lookup in …
Category: Data Science

Lightweight execution of Spark MLLib models

I have some training data which I am using to build a Spark MLLib model which is in a Hive database. I am using simple linear regression models and the PySpark API. I have a code set up to train this model every day to get the most up-to-date model. (the real-world use case is that I am predicting vehicle unloading times, and my model must always be recently trained since the characteristics of the vehicles and locations change over …
Category: Data Science

How to run Spark python code in Jupyter Notebook via command prompt

I am trying to import a data frame into spark using Python's pyspark module. For this, I used Jupyter Notebook and executed the code shown in the screenshot below After that I want to run this in CMD so that I can save my python codes in text file and save as test.py (as python file). Then, I run that python file in CMD using python test.py command, below the screen shot: So my task previously worked, but after 3 …
Category: Data Science

Why is Spark's LinearRegressionWithSGD very slow locally?

I have been trying to run linear regression with SGD that is found in Spark mllib for some time and I am experiencing huge performance problems. All examples that I was looking have number of iterations set to 100 and say that Spark mllib can run very fast for big data. This the code I am having problems with: def train(data: RDD[TrainFeature], scalerModel: StandardScalerModel): LinearRegressionModel = { val labeledData = data map { trainFeature => LabeledPoint(trainFeature.relevance.value, toVector(trainFeature.feature, scalerModel)) } labeledData.cache() …
Category: Data Science

PicklingError in pyspark (PicklingError: Can't pickle <class '__main__.Person'>: attribute lookup Person on __main__ failed)

I am unable to pickle the below class. I am using data bricks 6.5 ML (includes Apache Spark 2.4.5, Scala 2.11) import pickle class Person: def __init__(self, name, age): self.name = name self.age = age p1 = Person("John", 36) pickle.dump(p1,open('d.pkl','wb'))``` PicklingError: Can't pickle &lt;class '__main__.Person'&gt;: attribute lookup Person on __main__ failed
Category: Data Science

Group a spark dataframe by a starting event to an ending event

Given a series of events (with datetime) such as: failed, failed, passed, failed, passed, passed I want to retrieve the time from when it first &quot;failed&quot; to when it first &quot;passed,&quot; resetting every time it fails again, as I want to measure the recovery time. I only succeeded doing this with a for loop, as when I groupBy the event with min in the date I lost the order of events, as I want to group by failed-passed pairs. Ultimately …
Category: Data Science

What data/analytics tools I need to use at my current e-commerce workplace?

I recently started a new position as a data scientist at an E-commerce company. The company is founded about 4-5 years ago and is new to many data-related areas. Specifically, I'm their first data science employee. So I have to take care of both data analysis tasks as well as bringing new technologies to the company. They have used Elastic Search (and Kibana) to have reporting dashboards on their daily purchases and user's interactions on their e-commerce website. They also …
Category: Data Science

Accumulators in Spark (PySpark) without global variables?

BACKGROUND Consider the following textbook example which uses accumulators to add vectors. from pyspark import AccumulatorParam class VectorAccumulatorParam(AccumulatorParam): def zero(self, value): dict1 = {i: 0 for i in range(0, len(value))} return dict1 def addInPlace(self, val1, val2): for i in val1.keys(): val1[i] += val2[i] return val1 rdd1 = sc.parallelize([{0: 0.3, 1: 0.8, 2: 0.4}, {0: 0.2, 1: 0.4, 2: 0.2}, {0: -0.1, 1: 0.4, 2: 1.6}]) vector_acc = sc.accumulator({0: 0, 1: 0, 2: 0}, VectorAccumulatorParam()) def mapping_fn(x): global vector_acc vector_acc += …
Category: Data Science

Spark: How to run PCA parallelized? Only one thread used

I use pySpark and set my configuration like following: spark = (SparkSession.builder.master(&quot;local[*]&quot;) .config(&quot;spark.driver.memory&quot;, &quot;20g&quot;) .config(&quot;spark.executor.memory&quot;, &quot;10g&quot;) .config(&quot;spark.driver.cores&quot;, &quot;30&quot;) .config(&quot;spark.num.executors&quot;, &quot;8&quot;) .config(&quot;spark.executor.cores&quot;, &quot;4&quot;) .getOrCreate()) sc = spark.sparkContext If I then run PCA: from pyspark.ml.feature import PCA pca = PCA(k=50, inputCol=&quot;features&quot;, outputCol=&quot;pcaFeatures&quot;) model = pca.fit(train) Only one thread is active and therefore the computation takes a long time. How can I parallelize PCA in Spark? I run on a local machine and did not configure a cluster in the configs. Also I …
Category: Data Science

Storage of N-dimensional matrices (tensors) as part of machine learning pipelines

I'm an infra person working on a storage product. I've been googling quite a bit to find an answer to the following question but unable to do so. Hence, I am attemping to ask the question here. I am aware that relational data or structured data can often be represented in 2-dimensional tables like DataFrames and that can be used for ML training input. If we want to store the DataFrames they can easily be stored as tables in a …
Category: Data Science

Generalized Additive Modeling Apache Spark implementation

Does Spark MLlib support Generalized Additive Modeling? How does one go about implementing GAM models in Spark? I want to implement GAM (Generalized additive model) model in Spark. Based on my research on online forums, I could not find the implementation of GAM models on Spark. Has anyone in this community attempted this? Does Spark MLlib support GAM? https://reposit.haw-hamburg.de/bitstream/20.500.12738/7778/1/BachelorarbeitKaiBrusch.pdf This document shows how to manipulate GLM on Spark MLlib but does not have the actual GAM on Spark implementation.
Category: Data Science

About

Geeks Mental is a community that publishes articles and tutorials about Web, Android, Data Science, new techniques and Linux security.