How does ALS implementation calculate ratings when model.transform is called?

The spark ALS model is based on this paper: Collaborative Filtering for Implicit Feedback datasets. . Here, latent vectors are learnt such that instead of estimating R (ratings matrix), they only estimate P (preference matrix - binary matrix based on whether user has interacted with item or not). (R is broken down into P and C (confidence matrix). Question: As C is not estimated, how is it possible that model.transform(dataset) accurately predicts ratings R? The implementation is a dead end …
Category: Data Science

Does one-hot encode effects chi-square test?

I am doing a feature selection for a data science project with one of those feature being a high cardinality categorical variable (for context, it’s nationality). I know chi-square test could handle multiclass feature like mine but I need to do one-hot encode (dividing a multiclass variable into multiple binary variable based on its values) to be able to input it into my machine learning algorithm (spark mllib). My question is does doing one-hot encode effects the result of a …
Category: Data Science

Pyspark Dataframes to Pandas and ML Ops - Parallel Execution Hold?

If I convert a spark dataframe into a pandas dataframe and subsequently apply pandas operations and sklearn models to the dataset in databricks, will the operations from pandas and sklearn be distributed across the cluster? Or do i have to use pyspark dataframe operations and pyspark ml packages for operations to be distributed?
Category: Data Science

Contunious model update

Hello I have a trained model on credit card fraud detection, however I want to find a solution so I can update the model parameters with new input records to simulate the pattern changes, is there any repository or ideas to help me.
Category: Data Science

Computing number of business days between start/end columns

I have two Dataframes facts: columns: data, start_date and end_date holidays: column: holiday_date What I want is a way to produce another Dataframe that has columns: data, start_date, end_date and num_holidays Where num_holidays is computed as: Number of days between start and end that are not weekends or holidays (as in the holidays table). The solution is here if we wanted to do this in PL/SQL. Crux is this part of code: --Calculate and return the number of workdays using …
Category: Data Science

How to store subset of columns from a csv file?

I need to create a table in hive (or Impala) by reading from a csv file (named file.csv), the problem is that this csv file could have a different number of columns each time I read it. The only thing I am sure of is that it will always have three columns called A, B, and C. For example, the first csv I get could be (the first row is the header): ------------------------ | X | Y | A | …
Topic: hive pyspark sql
Category: Data Science

PySpark createDataFrame() throws segmentation fault on Mac

I'm trying to learn PySpark. Finally got it installed following the tutorial here: https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/ However, even though I am now able to create a spark session (I can access the GUI, create some RDDs, etc.), any time I try to create a DataFrame, no matter how tiny, I get a segmentation fault (when I'm in shell) or my kernel crashes (when I'm in Jupyter notebook). I'm on MacOS Monterey (12.3.1), spark v3.1.2, Python 3.9.11 (conda distribution installed via homebrew), java …
Topic: pyspark
Category: Data Science

PySpark: How do I specify dropna axis in PySpark transformation?

I would like to drop columns that contain all null values using dropna(). With Pandas you can do this with setting the keyword argument axis = 'columns' in dropna(). Here an example in a GitHub post. How do I do this in PySpark ? dropna() is available as a transformation in PySpark, however axis is not an available keyword. Note: I do not want to transpose my dataframe for this to work. How would I drop the furniture column from …
Category: Data Science

Is it possible to implement an rdd version of a for loop having map and reduce using pyspark?

I need to test an algorithm that computes a function on a dataframe where in each execution I drop a column and computes the function. This is a example in python pyspark but without using rdd: df2581=spark.sparkContext.parallelize([Row(a=1 ,b=3,c=5,d=7,e=9)]).toDF() df2581.show() wo = df2581.rdd.flatMap(lambda x: x[1:] ).map(lambda a:print(type(a))) wo.collect() def f(x): list3 = [] index = 0 list2 = x for j in x: list = array(x) list.remove(list[index]) list3 = list.copy() index += 1 return list3 colu= df2581.columns def add(x,y): return x+y …
Category: Data Science

How to run Spark python code in Jupyter Notebook via command prompt

I am trying to import a data frame into spark using Python's pyspark module. For this, I used Jupyter Notebook and executed the code shown in the screenshot below After that I want to run this in CMD so that I can save my python codes in text file and save as test.py (as python file). Then, I run that python file in CMD using python test.py command, below the screen shot: So my task previously worked, but after 3 …
Category: Data Science

customer segmentation with unbalanced data

I am trying to do a customer segmentation on my transactional data and I am struggling a little bit on the best approach. Since it is an unsupervised model I can throw it to any algorithm and get some clusters but I am more interested in the best approach to do it. My data has basically 3 different products. Each product has a dozens of features. The issue is that one product is purchased by 95% of the customer while …
Category: Data Science

Running H2O in databricks

I am trying to run H2O in databricks. However, when I do the following: hc = pysparkling.H2OContext.getOrCreate(spark) I get the following error: java.lang.AbstractMethodError Does anyone know what the problem could be?
Category: Data Science

PicklingError in pyspark (PicklingError: Can't pickle <class '__main__.Person'>: attribute lookup Person on __main__ failed)

I am unable to pickle the below class. I am using data bricks 6.5 ML (includes Apache Spark 2.4.5, Scala 2.11) import pickle class Person: def __init__(self, name, age): self.name = name self.age = age p1 = Person("John", 36) pickle.dump(p1,open('d.pkl','wb'))``` PicklingError: Can't pickle &lt;class '__main__.Person'&gt;: attribute lookup Person on __main__ failed
Category: Data Science

Group a spark dataframe by a starting event to an ending event

Given a series of events (with datetime) such as: failed, failed, passed, failed, passed, passed I want to retrieve the time from when it first &quot;failed&quot; to when it first &quot;passed,&quot; resetting every time it fails again, as I want to measure the recovery time. I only succeeded doing this with a for loop, as when I groupBy the event with min in the date I lost the order of events, as I want to group by failed-passed pairs. Ultimately …
Category: Data Science

Which ML method for multiclass (non-binary) text classification should I choose (from SparkML)?

I am working on a quite big dataset that will be processed on the cluster, so this is why I am using PySpark for that purpose. The presentable records of this dataset have a such structure: +----------+------------+--------------------+--------------------+--------------------+ | 0| 07/29/2013| Consumer Loan| Vehicle loan|Managing the loan...| | 1| 07/29/2013|Bank account or s...| Checking account|Using a debit or ...| | 2| 07/29/2013|Bank account or s...| Checking account|Account opening, ... After some preprocessing/data cleansing operations I would like to create and then …
Category: Data Science

ML modeling a data with big amount of rows

I want to do ML modeling such XGboost, KNN, and similar models on data with 9 numerical features and more than 25 million rows and the size of data is almost 2.5 Gig and I prefer to use all the data for modeling and don't want to use samples of data and stuff. which platforms such as Databricks or AWS or GCP do you suggest to do this project? Do you think is it doable on a single machine?
Category: Data Science

Accumulators in Spark (PySpark) without global variables?

BACKGROUND Consider the following textbook example which uses accumulators to add vectors. from pyspark import AccumulatorParam class VectorAccumulatorParam(AccumulatorParam): def zero(self, value): dict1 = {i: 0 for i in range(0, len(value))} return dict1 def addInPlace(self, val1, val2): for i in val1.keys(): val1[i] += val2[i] return val1 rdd1 = sc.parallelize([{0: 0.3, 1: 0.8, 2: 0.4}, {0: 0.2, 1: 0.4, 2: 0.2}, {0: -0.1, 1: 0.4, 2: 1.6}]) vector_acc = sc.accumulator({0: 0, 1: 0, 2: 0}, VectorAccumulatorParam()) def mapping_fn(x): global vector_acc vector_acc += …
Category: Data Science

Spark: How to run PCA parallelized? Only one thread used

I use pySpark and set my configuration like following: spark = (SparkSession.builder.master(&quot;local[*]&quot;) .config(&quot;spark.driver.memory&quot;, &quot;20g&quot;) .config(&quot;spark.executor.memory&quot;, &quot;10g&quot;) .config(&quot;spark.driver.cores&quot;, &quot;30&quot;) .config(&quot;spark.num.executors&quot;, &quot;8&quot;) .config(&quot;spark.executor.cores&quot;, &quot;4&quot;) .getOrCreate()) sc = spark.sparkContext If I then run PCA: from pyspark.ml.feature import PCA pca = PCA(k=50, inputCol=&quot;features&quot;, outputCol=&quot;pcaFeatures&quot;) model = pca.fit(train) Only one thread is active and therefore the computation takes a long time. How can I parallelize PCA in Spark? I run on a local machine and did not configure a cluster in the configs. Also I …
Category: Data Science

About

Geeks Mental is a community that publishes articles and tutorials about Web, Android, Data Science, new techniques and Linux security.