There is a more common version of this question regarding parallelization on pandas apply function - so this is a refreshing question :)
First, I want to mention swifter since you asked for a "packaged" solution, and it appears on most SO question regarding pandas parallelization.
But.. I'd still like to share my personal gist code for it, since after several years of working with DataFrame I never found a 100% parallelization solution (mainly for the apply function) and I always had to come back for my "manual" code.
Thanks to you I made it more generic to support any (theoretically) DataFrame method by its name (so you won't have to keep versions for isin, apply, etc..).
I tested it on "isin", "apply" and "isna" functions using both python 2.7 and 3.6.
It's under 20 lines, and I followed the pandas naming convention like "subset" and "njobs".
I also added a time comparison with dask equivalent code for "isin" and it seems ~ X2 times slower then this gist.
It includes 2 functions:
df_multi_core - this is the one you call. It accepts:
- Your df object
- The function name you'd like to call
- The subset of columns the function can be performed upon (helps reducing time / memory)
- The number of jobs to run in parallel (-1 or omit for all cores)
- Any other kwargs the df's function accepts (like "axis")
_df_split - this is an internal helper function that has to be positioned globally to the running module (Pool.map is "placement dependent"), otherwise I'd locate it internally..
here's the code from my gist (I'll add more pandas function tests there):
import pandas as pd
import numpy as np
import multiprocessing
from functools import partial
def _df_split(tup_arg, **kwargs):
split_ind, df_split, df_f_name = tup_arg
return (split_ind, getattr(df_split, df_f_name)(**kwargs))
def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
if njobs == -1:
njobs = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=njobs)
try:
splits = np.array_split(df[subset], njobs)
except ValueError:
splits = np.array_split(df, njobs)
pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
results = pool.map(partial(_df_split, **kwargs), pool_data)
pool.close()
pool.join()
results = sorted(results, key=lambda x:x[0])
results = pd.concat([split[1] for split in results])
return results
Bellow is a test code for a parallelized isin, comparing the native, multi-core gist and dask performance.
On an I7 machine with 8 physical cores, I got around X4 times speedup. I'd love to hear what you get on your real data!
from time import time
if __name__ == '__main__':
sep = '-' * 50
# isin test
N = 10000000
df = pd.DataFrame({'c1': np.random.randint(low=1, high=N, size=N), 'c2': np.arange(N)})
lookfor = np.random.randint(low=1, high=N, size=1000000)
print('{}\ntesting pandas isin on {}\n{}'.format(sep, df.shape, sep))
t1 = time()
print('result\n{}'.format(df.isin(lookfor).sum()))
t2 = time()
print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))
t3 = time()
res = df_multi_core(df=df, df_f_name='isin', subset=['c1'], njobs=-1, values=lookfor)
print('result\n{}'.format(res.sum()))
t4 = time()
print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
t5 = time()
ddata = dd.from_pandas(df, npartitions=njobs)
res = ddata.map_partitions(lambda df: df.apply(apply_f, axis=1)).compute(scheduler='processes')
t6 = time()
print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
print('time for dask implementation {}\n{}'.format(round(t6 - t5, 2), sep))
--------------------------------------------------
testing pandas isin on (10000000, 2)
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for native implementation 3.87
--------------------------------------------------
result
c1 953213
dtype: int64
time for multi core implementation 1.16
--------------------------------------------------
result
c1 953213
c2 951942
dtype: int64
time for dask implementation 2.88