Would try to answer based on experience and understandings of parallel computing in production for DS/ML models:
Answer to your questions as high level:
- Does the simple program above give you better performance with increasing n_jobs when you run it?
answer: Yes and can be seen bellow in results.
- On what OS / setup?
answer: OS:ubuntu, 2xCPUsx16Cores+512GB RAM with python=3.7, joblib>=0.14.1 and sklearn >=0.22.1
- Is there something that must be tweaked for this to work properly?
yes: change/force parallel_backend to be used other then sequential (requires joblib approach with registered parallel_backend and you can use sklearn.utils.parallel_backend ... I tried sequential from sklearn model you have with n_jobs=-1 into joblib Parallel and got huge scale but need to look more for correctness but did saw huge improvement when scaled to 100mil samples on my machine so worth to test it since were amazed by performance with predefined backend
My conda setup:
scikit-learn 0.22.1 py37hd81dba3_0
ipython 7.12.0 py37h5ca1d4c_0
ipython 7.12.0 py37h5ca1d4c_0
ipython_genutils 0.2.0 py37_0
msgpack-python 0.6.1 py37hfd86e86_1
python 3.7.6 h357f687_2 conda-forge
python-dateutil 2.8.1 py_0
python_abi 3.7 1_cp37m conda-forge
joblib 0.14.1 py_0
Try to leave 1 core for your machine if you use personal machine or
workstation with n_jobs=-2
, you can increase you data because this
is the purpose of joblib for optimization (not all algorithms support
this approach but is out of scope here) and also change the backend
because by default won't perform parallel tasks and would only use
sequential, maybe with more data is doing an auto "mode" but not sure
about it based since I tested with 1k, 10k 100k, 1 mil and 10 mil
samples and without loky backend ElasticNetCV won't go out of
sequential backend.
Joblib is optimized to be fast and robust on large data in particular and has specific optimizations for numpy arrays.
As an explanation will look into how is calculated resources
For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs
= -2, all CPUs but one are used. None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution)
Your code perform bad with n_jobs=-1
so try n_jobs=-2
due following facts:
- does use all your CPU cores (based on documentation) but you can change to be used threads by registering a parallel_backend from joblib of your machine so this
will be slow and will decrease performance if other processes does use CPU
threads/cores (in your case this is happening( you have OS running and other processes that needs CPU power to run) and also is not taking full advantage of "threading" so does use "cores" based on your performance issue.
As an example you will use the "n_jobs=-1"
when on cluster mode so the workers as a container does have allocated cores for and will take advantage of parallel approach
and distribute
optimization or computation part.
- you run out of CPU resources in this case and also don’t forget that
parallel isn’t
because does copy SAME data for each
“job” so you will get all that “allocation” at the same time.
- sklearn parallel implementation isn't perfect so in your case will try to use n_jobs=-2 or if you want to use joblib then you can have more room of optimizing the algorithm. Your CV part is where all performance does get degraded because will be
Will add the following from joblib to better understand how does work in your case and limitations + differences:
backend: str, ParallelBackendBase instance or None, default: ‘loky’
Specify the parallelization backend implementation. Supported backends are:
“loky” used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes.
“multiprocessing” previous process-based backend based on multiprocessing.Pool. Less robust than loky.
“threading” is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. “threading” is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a “with nogil” block or an expensive call to a library such as NumPy).
finally, you can register backends by calling register_parallel_backend. This will allow you to implement a backend of your liking.
From source code the implementation I do see that sklearn does use cores or is preferred but not default for all algorithms the threads: _joblib.py
import warnings as _warnings
with _warnings.catch_warnings():
# joblib imports may raise DeprecationWarning on certain Python
# versions
import joblib
from joblib import logger
from joblib import dump, load
from joblib import __version__
from joblib import effective_n_jobs
from joblib import hash
from joblib import cpu_count, Parallel, Memory, delayed
from joblib import parallel_backend, register_parallel_backend
__all__ = ["parallel_backend", "register_parallel_backend", "cpu_count",
"Parallel", "Memory", "delayed", "effective_n_jobs", "hash",
"logger", "dump", "load", "joblib", "__version__"]
But your listed Elastic Net model
algorithm on the CV
part does used "threads" as preferred (_joblib_parallel_args(prefer="threads")) and seems is a bug for windows that does only consider cores:
mse_paths = Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
Note: This answer is from experience from daily basis working to take advantage of sparkjoblib
and parallel_backend('spark')
. Scales and run fast as expected but don't
forget that each executor I do have is basically with 4 cores and
4-32GB RAM so when doing n_jobs=-1
does take parallel part of joblib
tasks inside of each executor and that copy SAME data won't be
noticed since is distributed.
Does run perfectly CV and fit part, and i do use n_jobs=-1
when performing fit or CV parts.
My results with OP default setup:
# Without tracking/progress execution is faster execution but needed to add progress for clarity:
n_jobs = None, perf_counter = 1.4849148329813033 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 1, perf_counter = 1.4728297910187393 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 2, perf_counter = 1.470994730014354 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 4, perf_counter = 1.490676686167717 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 8, perf_counter = 1.465600558090955 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 12, perf_counter = 1.463360101915896 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 16, perf_counter = 1.4638906640466303 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 20, perf_counter = 1.4602260519750416 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 24, perf_counter = 1.4646347570233047 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = 28, perf_counter = 1.4710926250554621 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = -1, perf_counter = 1.468439529882744 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
n_jobs = -2, perf_counter = 1.4649679311551154 ([Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.)
# With tracking/progress execution needed to add progress+verbose for clarity:
0%| | 0/12 [00:00<?, ?it/s][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
8%|▊ | 1/12 [00:02<00:31, 2.88s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = None, perf_counter = 2.8790326060261577
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
17%|█▋ | 2/12 [00:05<00:28, 2.87s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 1, perf_counter = 2.83856769092381
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
25%|██▌ | 3/12 [00:08<00:25, 2.85s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 2, perf_counter = 2.8207667160313576
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
33%|███▎ | 4/12 [00:11<00:22, 2.84s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 4, perf_counter = 2.8043343869503587
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.7s finished
42%|████▏ | 5/12 [00:14<00:19, 2.81s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 8, perf_counter = 2.730375789105892
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
50%|█████ | 6/12 [00:16<00:16, 2.82s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 12, perf_counter = 2.8604282720480114
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
58%|█████▊ | 7/12 [00:19<00:14, 2.83s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 16, perf_counter = 2.847634136909619
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.8s finished
67%|██████▋ | 8/12 [00:22<00:11, 2.84s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 20, perf_counter = 2.8461739809717983
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
75%|███████▌ | 9/12 [00:25<00:08, 2.85s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 24, perf_counter = 2.8684673600364476
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 2.9s finished
83%|████████▎ | 10/12 [00:28<00:05, 2.87s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = 28, perf_counter = 2.9122865139506757
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.1s finished
92%|█████████▏| 11/12 [00:31<00:02, 2.94s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = -1, perf_counter = 3.1204342890996486
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.3s finished
100%|██████████| 12/12 [00:34<00:00, 2.91s/it]
n_jobs = -2, perf_counter = 3.347235122928396
HERE MAGIC starts:
So I would say this is actually the bug, even if does is specified the
n_jobs this won't take effect and is still runs as 'None' or '1'.
Small differences in time is probably due Caching the results using
joblib.Memory and Checkpoint but need to look more on this
part in source code (I bet is implemented otherwise will be expensive
to be performed CV).
As a reference: this is by using joblib and do parallel part with parallel_backend: results with specifying parallel_backend('loky') in order to have the ability to specify the default backend used by Parallel inside with block and not used 'auto' mode:
# Without tracking/progress execution is faster execution but needed to add progress for clarity:
n_jobs = None, perf_counter = 1.7306506633758545, sec
n_jobs = 1, perf_counter = 1.7046034336090088, sec
n_jobs = 2, perf_counter = 2.1097865104675293, sec
n_jobs = 4, perf_counter = 1.4976494312286377, sec
n_jobs = 8, perf_counter = 1.380277156829834, sec
n_jobs = 12, perf_counter = 1.3992164134979248, sec
n_jobs = 16, perf_counter = 0.7542541027069092, sec
n_jobs = 20, perf_counter = 1.9196209907531738, sec
n_jobs = 24, perf_counter = 0.6940577030181885, sec
n_jobs = 28, perf_counter = 0.780998945236206, sec
n_jobs = -1, perf_counter = 0.7055854797363281, sec
n_jobs = -2, perf_counter = 0.4049191474914551, sec
Below output will explain EVERYTHING about limitations you do have, "IMPRESSION of paralel expected vs paralell done insklearn algorithm you have" and in general what is executing and how may workers are assigned:
# With tracking/progress execution needed to add progress+verbose for clarity:
0%| | 0/12 [00:00<?, ?it/s][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.4s finished
8%|▊ | 1/12 [00:03<00:37, 3.44s/it][Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
n_jobs = None, perf_counter = 3.4446191787719727, sec
[Parallel(n_jobs=1)]: Done 100 out of 100 | elapsed: 3.5s finished
17%|█▋ | 2/12 [00:06<00:34, 3.45s/it]
n_jobs = 1, perf_counter = 3.460832357406616, sec
[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done 100 out of 100 | elapsed: 2.0s finished
25%|██▌ | 3/12 [00:09<00:27, 3.09s/it]
n_jobs = 2, perf_counter = 2.2389445304870605, sec
[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done 100 out of 100 | elapsed: 1.7s finished
33%|███▎ | 4/12 [00:10<00:21, 2.71s/it]
n_jobs = 4, perf_counter = 1.8393192291259766, sec
[Parallel(n_jobs=8)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done 34 tasks | elapsed: 1.0s
[Parallel(n_jobs=8)]: Done 100 out of 100 | elapsed: 1.3s finished
42%|████▏ | 5/12 [00:12<00:16, 2.36s/it]
n_jobs = 8, perf_counter = 1.517085075378418, sec
[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done 26 tasks | elapsed: 1.3s
[Parallel(n_jobs=12)]: Done 77 out of 100 | elapsed: 1.5s remaining: 0.4s
[Parallel(n_jobs=12)]: Done 100 out of 100 | elapsed: 1.6s finished
50%|█████ | 6/12 [00:14<00:13, 2.17s/it]
n_jobs = 12, perf_counter = 1.7410166263580322, sec
[Parallel(n_jobs=16)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=16)]: Done 18 tasks | elapsed: 0.1s
[Parallel(n_jobs=16)]: Done 100 out of 100 | elapsed: 0.7s finished
58%|█████▊ | 7/12 [00:15<00:09, 1.81s/it]
n_jobs = 16, perf_counter = 0.9577205181121826, sec
[Parallel(n_jobs=20)]: Using backend LokyBackend with 20 concurrent workers.
[Parallel(n_jobs=20)]: Done 10 tasks | elapsed: 1.6s
[Parallel(n_jobs=20)]: Done 100 out of 100 | elapsed: 1.9s finished
67%|██████▋ | 8/12 [00:17<00:07, 1.88s/it]
n_jobs = 20, perf_counter = 2.0630648136138916, sec
[Parallel(n_jobs=24)]: Using backend LokyBackend with 24 concurrent workers.
[Parallel(n_jobs=24)]: Done 2 tasks | elapsed: 0.0s
[Parallel(n_jobs=24)]: Done 100 out of 100 | elapsed: 0.5s finished
75%|███████▌ | 9/12 [00:18<00:04, 1.55s/it]
n_jobs = 24, perf_counter = 0.7588121891021729, sec
[Parallel(n_jobs=28)]: Using backend LokyBackend with 28 concurrent workers.
[Parallel(n_jobs=28)]: Done 100 out of 100 | elapsed: 0.6s finished
83%|████████▎ | 10/12 [00:18<00:02, 1.34s/it]
n_jobs = 28, perf_counter = 0.8542406558990479, sec
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 32 concurrent workers.
[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed: 0.7s finished
92%|█████████▏| 11/12 [00:19<00:01, 1.21s/it][Parallel(n_jobs=-2)]: Using backend LokyBackend with 31 concurrent workers.
n_jobs = -1, perf_counter = 0.8903687000274658, sec
[Parallel(n_jobs=-2)]: Done 100 out of 100 | elapsed: 0.5s finished
100%|██████████| 12/12 [00:20<00:00, 1.69s/it]
n_jobs = -2, perf_counter = 0.544947624206543, sec
# # Here I do show what is doing behind and to understand differences in times and wil explain 'None' vs '1' execution time (is all about picklink process and Memory Caching implementation for paralel.
[Parallel(n_jobs=-2)]: Done 71 out of 100 | elapsed: 0.9s remaining: 0.4s
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
Pickling array (shape=(1000, 30), dtype=float64).
Pickling array (shape=(1000,), dtype=float64).
Pickling array (shape=(900,), dtype=int64).
Pickling array (shape=(100,), dtype=int64).
Pickling array (shape=(100,), dtype=float64).
[Parallel(n_jobs=-2)]: Done 73 out of 100 | elapsed: 0.9s remaining: 0.3s
