Accumulators in Spark (PySpark) without global variables?

BACKGROUND

Consider the following textbook example which uses accumulators to add vectors.

from pyspark import AccumulatorParam

class VectorAccumulatorParam(AccumulatorParam):

    def zero(self, value):

        dict1 = {i: 0 for i in range(0, len(value))}

        return dict1

    def addInPlace(self, val1, val2):

        for i in val1.keys():
            val1[i] += val2[i]

        return val1

rdd1 = sc.parallelize([{0: 0.3, 1: 0.8, 2: 0.4}, 
                       {0: 0.2, 1: 0.4, 2: 0.2},
                       {0: -0.1, 1: 0.4, 2: 1.6}])

vector_acc = sc.accumulator({0: 0, 1: 0, 2: 0}, 
                            VectorAccumulatorParam())

def mapping_fn(x):

    global vector_acc

    vector_acc += x

rdd1.foreach(mapping_fn)
print(vector_acc.value)

This prints {0: 0.4, 1: 1.6, 2: 2.2}, i.e., the element-wise sum of the vectors.

QUESTION

The use of the global scope in mapping_fn() gnaws at me, since it's usually bad practice. Is there a simple way to illustrate how accumulators work without resorting to a global variable? This is all the more confusing since, as far as I understand, the beauty of Spark lies in a share nothing philosophy, and global is precisely the opposite of that.

Topic pyspark apache-spark python apache-hadoop

Category Data Science


You do not need global. You can simply reference vector_acc in this function and it will serialize with the function. Spark will handle getting updates made on the executors back to the driver. I am not sure what the purpose of global is here.


The problem with sharing nothing is that for reduce functions like sum you need results from multiple elements. There are Spark ways to do this and there is also a sum already implemented, but the easiest code for reduction is using variables and looping. You need to get used to functional programming that Spark uses in order to write Spark way code. Functional programming also allows easier parallelization and faster distributed code.

It would probably be faster to use Spark dataframes like here. And in my opinion they are easier to understand than map-reduce logic.

A possible map-reduce approach (this is how the whole code looks like):

rdd1 = sc.parallelize([{0: 0.3, 1: 0.8, 2: 0.4}, 
                       {0: 0.2, 1: 0.4, 2: 0.2},
                       {0: -0.1, 1: 0.4, 2: 1.6}])

def acc_fn(acc, row):
    res = {}
    for key,value in row.items():
        res[key] = acc[key] + value
    return res

vector_acc = rdd1.reduce(acc_fn)

print(vector_acc)

vector_acc is now defined as a Python dictionary and not a class. You should access it as a dictionary.

About

Geeks Mental is a community that publishes articles and tutorials about Web, Android, Data Science, new techniques and Linux security.