There is a more common version of this question regarding parallelization on pandas apply function - so this is a refreshing question :)
First, I want to mention swifter since you asked for a "packaged" solution, and it appears on most SO question regarding pandas parallelization.
But.. I'd still like to share my personal gist code for it, since after several years of working with DataFrame I never found a 100% parallelization solution (mainly for the apply function) and I always had to come back for my "manual" code.
Thanks to you I made it more generic to support any (theoretically) DataFrame method by its name (so you won't have to keep versions for isin, apply, etc..).
I tested it on "isin", "apply" and "isna" functions using both python 2.7 and 3.6.
It's under 20 lines, and I followed the pandas naming convention like "subset" and "njobs".
I also added a time comparison with dask equivalent code for "isin" and it seems ~ X2 times slower then this gist.
It includes 2 functions:
df_multi_core - this is the one you call. It accepts:
- Your df object
- The function name you'd like to call
- The subset of columns the function can be performed upon (helps reducing time / memory)
- The number of jobs to run in parallel (-1 or omit for all cores)
- Any other kwargs the df's function accepts (like "axis")
_df_split - this is an internal helper function that has to be positioned globally to the running module ( is "placement dependent"), otherwise I'd locate it internally..
here's the code from my gist (I'll add more pandas function tests there):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results =, **kwargs), pool_data)
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow is a test code for a parallelized isin, comparing the native, multi-core gist and dask performance.
On an I7 machine with 8 physical cores, I got around X4 times speedup. I'd love to hear what you get on your real data!
from time import time
if __name__ == '__main__':
sep = '-' * 50
# isin test
N = 10000000
df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
lookfor = np.random.randint(low=1, high=N, size=1000000)
print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
t1 = time()
t2 = time()
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
t4 = time()
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
t5 = time()
ddata = dd.from_pandas(df, npartitions=njobs)
res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
t6 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))
testing pandas isin on (10000000, 2)
c1 953213
c2 951942
dtype: int64
time for native implementation 3.87
c1 953213
dtype: int64
time for multi core implementation 1.16
c1 953213
c2 951942
dtype: int64
time for dask implementation 2.88