Does spaCy support multiple GPUs?

I was wondering if spaCy supports multi-GPU via mpi4py?

I am currently using spaCy's nlp.pipe for Named Entity Recognition on a high-performance-computing cluster that supports the MPI protocol and has many GPUs. It says here that I would need to specify the GPU to use with cupy, but with PyMPI, I am not sure if the following will work (should I import spacy after calling cupy device?):


from mpi4py import MPI
import cupy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = [His friend Nicolas J. Smith is here with Bart Simpon and Fred.*100]
else:
    data = None

unit = comm.scatter(data, root=0)

with cupy.cuda.Device(rank):
    import spacy
    from thinc.api import set_gpu_allocator, require_gpu
    set_gpu_allocator(pytorch)
    require_gpu(rank)
    nlp = spacy.load('en_core_web_lg')
    nlp.add_pipe(merge_entities)
    tmp_list = []
    for doc in nlp.pipe(unit):
        res =  .join([t.text if not t.ent_type_ else t.ent_type_ for t in doc])
        tmp_list.append(res)

result = comm.gather(tmp_list, root=0)

if comm.rank == 0:
    print (result)
else:
    result = None

Or if I have 4 GPUs on the same machine and I do not want to use MPI, can I do the following:

from joblib import Parallel, delayed
import cupy

rank = 0

def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))

def flatten(list_of_lists):
    Flatten a list of lists to a combined list
    return [item for sublist in list_of_lists for item in sublist]

def process_chunk(texts):
    with cupy.cuda.Device(rank):
        import spacy
        from thinc.api import set_gpu_allocator, require_gpu
        set_gpu_allocator(pytorch)
        require_gpu(rank)
        preproc_pipe = []
        for doc in nlp.pipe(texts, batch_size=20):
            preproc_pipe.append(lemmatize_pipe(doc))
        rank+=1
        return preproc_pipe

def preprocess_parallel(texts, chunksize=100):
    executor = Parallel(n_jobs=4, backend='multiprocessing', prefer=processes)
    do = delayed(process_chunk)
    tasks = (do(chunk) for chunk in chunker(texts, len(texts), chunksize=chunksize))
    result = executor(tasks)
    return flatten(result)

preprocess_parallel(texts = [His friend Nicolas J. Smith is here with Bart Simpon and Fred.*100], chunksize=1000)
```

Topic spacy hpc gensim nlp python

Category Data Science


I ran into a similar situation where I needed to use parallelization with SpaCy on a GPU, and used a modified version of your code above.

I get an error from the ForkingPickler:
AttributeError: Can't get attribute 'process_chunk' on <module '__main__' (built-in)>
I believe it stems from:
Line 34: executor = Parallel(n_jobs=4, backend='multiprocessing', prefer='processes')
Which I changed to:
executor = Parallel(n_jobs=4, prefer='threads')

The joblib documentation states for the Parallel mapping class, the prefer argument is ignored when a backend argument is given. By passing prefer='threads' it hints to joblib that your code can release the global interpreter lock (GIL).

Further, the documentation says:

If your code can release the GIL, then using a thread-based backend by passing prefer='threads' is even more efficient (than loky or multiprocessing) because it makes it possible to avoid the communication overhead of process-based parallelism.

Scientific Python libraries such as numpy, scipy, pandas and scikit-learn often release the GIL in performance critical code paths. It is therefore advised to always measure the speed of thread-based parallelism and use it when the scalability is not limited by the GIL.

Next, I dropped the line:
mp.set_start_method('spawn', force=True)
I think it was unnecessary. Joblib seems to create the appropriate context for you if you hint to it properly.

From what I've read the issues here appear to be related to how the GIL implementation functions. However, I know little about the GIL. I don't want to remark any further on the topic or create debate on something I know so little about.

Anyways, here is working code with the above changes made.

import cupy
import spacy
import multiprocessing as mp

from joblib import Parallel, Delayed
from thinc.api import set_gpu_allocator, require_gpu

# A different method for flattening a list
def flatten2d(list2d):
    from functools import reduce
    from operator import iconcat
    return reduce(iconcat, list2d, [])

def chunker(iterator, length, chunksize):
    return (iterator[pos: pos + chunksize] for pos in range(0, length, chunksize))

def process_entity(doc):
    # I need lists of sentences for my use case,  but you could do other processing
    return [s.text for s in doc.sents]

def process_chunk(docs, rank):
    with cupy.cuda.Device(rank):
        set_gpu_allocator('pytorch')
        require_gpu(rank)
        nlp = spacy.load('en_core_web_sm', disable=['parser'])
        nlp.add_pipe('sentencizer')
        preprocess_pipe = []
        for doc in nlp.pipe(docs, batch_size=20):
            preprocess_pipe.append(process_entity(doc))
        rank += 1
        return preprocess_pipe

def process_parallel(docs, jobs=2 chunksize=50):
    executor = Parallel(n_jobs=jobs, prefer='threads')
    do = delayed(process_chunk)
    tasks = []
    gpus = list(range(0, cupy.cuda.runtime.getDeviceCount()))
    rank = 0
    for chunk in chunker(docs, len(docs), chunksize):
        tasks.append(do(chunk, rank))
        rank = (rank + 1) % len(gpus)
    result = executor(tasks)
    return flatten2d(result)

if __name__ == '__main__':
    preprocessed = preprocess_parallel(
        texts = ["This is a basic sentence. This is another one."]*100,
        jobs=4,
        chunksize=25
    )
    print(preprocessed)

I also would like to note the environment I'm using since it appears to me that it matters. Previously, I was using a Conda environment with Python 3.9. I was having issues with certain packages on that version as well as a PicklingError being thrown by joblib.

I upgraded my Conda environment to Python 3.10:
conda create --name threeten --no-default-packages python=3.10

Edit:
It's important to activate your new conda environment if you plan to use it.
conda activate threeten

Then I re-installed these packages with conda:
conda install pytorch torchvision torchaudio cudatoolkit=11.3 -c pytorch
conda install -c conda-forge cupy cudnn cutensor nccl
conda install -c conda-forge spacy

Then I ran this to retrieve the pre-built model for SpaCy:
python -m spacy download en_core_web_sm

You should check the installation page in the documentation for each of these. This way you can tailor installation to your specific use case or machine requirements.

In my case efficiency matters so I'm using the smaller, more efficient SpaCy model as I'll be running a handful at the same time. In your use case you may prefer accuracy and install the larger model en_core_web_trf. You may even require a different cudatoolkit version.


I think I have figured out how to do this:

The key is to use spawn not fork, and use cupy to select GPU.

import multiprocessing as mp
mp.set_start_method('spawn', force=True)
from joblib import Parallel, delayed
from itertools import cycle
import cupy
import spacy
from thinc.api import set_gpu_allocator, require_gpu


def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))

def flatten(list_of_lists):
    "Flatten a list of lists to a combined list"
    return [item for sublist in list_of_lists for item in sublist]

def process_entity(doc):
    super_word_ls = []
    for s in doc.sents:
        word_ls = []
        for t in s:
            if not t.ent_type_:
                if (t.text.strip()!=""):
                    word_ls.append(t.text)
            else:
                word_ls.append(t.ent_type_)
        if len(word_ls)>0:
            super_word_ls.append(" ".join(word_ls))
    return " ".join(super_word_ls)

def process_chunk(texts, rank):
    print(rank)
    with cupy.cuda.Device(rank):
        set_gpu_allocator("pytorch")
        require_gpu(rank)
        nlp = spacy.load("en_core_web_trf")
        preproc_pipe = []
        for doc in nlp.pipe(texts, batch_size=20):
            preproc_pipe.append(process_entity(doc))
        rank+=1
        return preproc_pipe


def preprocess_parallel(texts, chunksize=100):
    executor = Parallel(n_jobs=2, backend='multiprocessing', prefer="processes")
    do = delayed(process_chunk)
    tasks = []
    gpus = list(range(0, cupy.cuda.runtime.getDeviceCount()))
    rank = 0
    for chunk in chunker(texts, len(texts), chunksize=chunksize):
        tasks.append(do(chunk, rank))
        rank = (rank+1)%len(gpus)
    result = executor(tasks)
    return flatten(result)

if __name__ == '__main__':
    print(preprocess_parallel(texts = ["His friend Nicolas J. Smith is here with Bart Simpon and Fred."]*100, chunksize=50))

About

Geeks Mental is a community that publishes articles and tutorials about Web, Android, Data Science, new techniques and Linux security.