Skip to content

How to use parallelism in Python

Since you are running at NERSC you may be interested in parallelizing your Python code and/or its I/O. This is a detailed topic but we will provide a short overview of several options.

If you intend to to run your code at scale, please see our discussion on scaling Python that provides a brief overview of best file system practices for scaling up.

Multiprocessing

Python's standard library provides a multiprocessing package that supports spawning of processes. Multiprocessing can be used to achieve some level of parallelism within a single compute node. Currently it cannot be used to achieve parallelism across compute nodes.

Launching programs

Since multiprocessing jobs are single node only, in general you will not need Slurm srun. You can run your jobs on a compute node via

python my_multiprocessing_script.py

When using Slurm srun, use --cpu-bind=none for multiprocessing

If you do want or need to use Slurm srun, make sure you set

srun -n 1 --cpu-bind=none python my_multiprocessing_script.py

to ensure that your single task is able to use all cores on the node. Note that this is different than the advice you may get from our NERSC jobscript generator as this configuration is somewhat unusual. Using --cpu-bind=cores will bind your single task to a single physical core instead of allowing your multiprocessing code to use all resources on the node.

Choosing the number of processes to use

The optimal number of processes to use may vary depending on the application and the number of cpu resources to the application at run time. By default, multiprocessing.Pool will start os.cpu_count() number of processes. The number returned by os.cpu_count() is the total number of cpus ("hardware threads"). This may oversuscribe the cpu resources available to the application at run time and hinder performance.

This is especially an issue when the number of cpus available to an application is restricted. For example, a sconjob running in the cron qos on perlmutter will be allocated a single physical core but os.cpu_count() will report the total number of logical cores on the node.

The following script demonstrates how to get a count of the total and available number of threads and cores in a Python application.

import multiprocessing as mp
import os
import psutil

nthreads = psutil.cpu_count(logical=True)
ncores = psutil.cpu_count(logical=False)
nthreads_per_core = nthreads // ncores
nthreads_available = len(os.sched_getaffinity(0))
ncores_available = nthreads_available // nthreads_per_core

assert nthreads == os.cpu_count()
assert nthreads == mp.cpu_count()

print(f'{nthreads=}')
print(f'{ncores=}')
print(f'{nthreads_per_core=}')
print(f'{nthreads_available=}')
print(f'{ncores_available=}')

Running that script as a scronjob in the cron qos on perlmutter would report:

nthreads=256
ncores=128
nthreads_per_core=2
nthreads_available=2
ncores_available=1

NumPy and nested threading

If your multiprocessing code makes calls to a threaded library like NumPy with threaded MKL support then you need to consider oversubscription of threads. While process affinity can be controlled to some degrees in certain contexts (e.g. Python distributions that implement os.sched_{get,set}affinity) it is generally easier to reduce the number of threads used by each process. Actually it is most advisable to set it to a single thread. In particular for OpenMP:

export OMP_NUM_THREADS=1

When using Python multiprocessing on KNL you are advised to set:

export KMP_AFFINITY=disabled

Use the spawn start method

The default method used by multiprocessing to start processes on Linux is fork. We advise using the spawn start method as it is typically more robust. The following script demonstrates how to set the start method safely inside the main entrypoint (under if __name__ == '__main__':) of a Python program:

import multiprocessing as mp

def addone(x):
    return x + 1

if __name__ == '__main__':
    mp.set_start_method('spawn')
    with mp.Pool(4) as pool:
        pool.map(addone, range(100))

mpi4py

mpi4py provides MPI standard bindings to the Python programming language.

Here is an example of how to use mpi4py on Cori:

#!/usr/bin/env python
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.Get_rank()
mpi_size = MPI.COMM_WORLD.Get_size()
print(mpi_rank, mpi_size)

This program will initialize MPI, find each MPI task's rank in the global communicator, find the total number of ranks in the global communicator, print out these two results, and exit. Finalizing MPI with mpi4py is not necessary; it happens automatically when the program exits.

Suppose we put this program into a file called "mympi.py." To run it on the Haswell nodes on Cori, we could create the following batch script in the same directory as our Python script, that we call "myjob.sh:"

#!/bin/bash
#SBATCH --constraint=haswell
#SBATCH --nodes=3
#SBATCH --time=5

module load python
srun -n 96 -c 2 python mympi.py

applications using mpi4py must be launched via srun

To run "mympi.py" in batch on Cori, we submit the batch script from the command line using sbatch, and wait for it to run:

$ sbatch myjob.sh
Submitted batch job 987654321

After the job finishes, the output will be found in the file "slurm-987654321.out:"

$ cat slurm-987654321.out
...
91 96
44 96
31 96
...
0 96
...

mpi4py in your custom conda environment

Do NOT conda/pip install mpi4py at NERSC

You can install mpi4py using these tools without any warnings, but your mpi4py programs just won't work at NERSC. To use Cori's MPICH MPI, you'll need to build it yourself using the Cray compiler wrappers that link in Cray MPICH libraries.

We offer three separate options to use mpi4py in a conda environment at NERSC:

  • build mpi4py in your custom conda environment:

    module load python
    conda create -n my_mpi4py_env python=3.8
    conda activate my_mpi4py_env
    module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu
    MPICC="cc -shared" pip install --force-reinstall --no-cache-dir --no-binary=mpi4py mpi4py
    
  • use the lazy-mpi4py conda environment that we provide, built against PrgEnv-gnu, which you can clone if you want to add more packages:

    module load python
    conda create --name my_mpi4py_env --clone lazy-mpi4py
    
  • use the conda-forge dummy mpich package. If you use this option, you will need to add the location of the appropriate libmpi.so.12 to your LD_LIBRARY_PATH. On Cori for GNU this is currently located at /opt/cray/pe/mpt/7.7.10/gni/mpich-gnu-abi/8.2/lib, although it may change with future versions. If you have questions about using this method, please don't hesitate to submit a ticket to help.nersc.gov.

Testing mpi4py interactively

This is a common question. To use mpi4py in an interactive Python interpreter, you have two options. You can start a session in the following way:

module load python
salloc -N 1 -t 30 -C haswell -q interactive
elvis@nid00195:~> srun --pty python
Python 3.8.5 (default, Sep  4 2020, 07:30:14) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from mpi4py import MPI
>>> MPI.Get_library_version()
'MPI VERSION    : CRAY MPICH version 7.7.10 (ANL base 3.2)\nMPI BUILD INFO : Built Mon Aug 19 15:12:38 2019 (git hash ff5d439) MT-G\n'
>>> 

Alternatively, you can test using a command line function. For example:

module load python
salloc -N 1 -t 30 -C haswell -q interactive
elvis@nid00195:~> srun python -c "from mpi4py import MPI;print(MPI.Get_library_version())"
MPI VERSION    : CRAY MPICH version 7.7.10 (ANL base 3.2)
MPI BUILD INFO : Built Mon Aug 19 15:12:38 2019 (git hash ff5d439) MT-G

Using mpi4py in a Shifter container

When a large number of Python tasks are simultaneously launched with mpi4py, the result is many tasks trying to open the same files at the same time, causing filesystem contention and performance degradation. mpi4py applications running at the scale of a few hundred or a thousand tasks may take an unacceptable amount of time simply starting up.

Using mpi4py in a Shifter container is our recommended solution to this problem. We provide Python in Shifter examples and a specific mpi4py in Shifter example.

Using mpi4py.futures

Cray MPICH does not currently support Dynamic Process Management capabilities in their optimized MPI. This means that mpi4py MPIPoolExecutor is not supported.

However, mpi4py MPICommExecutor is supported. In this case a user must still start all their MPI processes in advance, and can then use these processes in a task-queue fashion.

Dask

Dask is a task-based system in which a scheduler assigns work to workers. It is robust to failure and provides a nice bokeh-based application dashboard. It can be used to scale to multinode CPU and GPU systems. You can find more information on the NERSC Dask page.

Parallel I/O with h5py

You can use h5py for either serial or parallel I/O.

For more general information about HDF5 at NERSC please see the NERSC HDF5 page.

If you would like to use h5py for parallel I/O, you have two choices:

  • build h5py against mpi4py in your custom conda environment;
  • use the lazy-h5py conda environment that we provide, or clone it if you want to add other packages.

Both options are based on packages built against the GNU compiler and Cray MPICH, so if you want a build with a different compiler or MPI library, you'll have to recompile both mpi4py and h5py from scratch.

HDF5 File Locking

You will have to disable file locking if you are writing to a filesystem other than $SCRATCH.

export HDF5_USE_FILE_LOCKING=FALSE

Pre-built h5py conda environment

To use the conda environment that we provide:

module load python
conda activate lazy-h5py

And you are good to go; you can test it with a smoketest, see below.

You cannot install other packages in that conda environment, though, since it's read-only, but you can clone it and work on it:

module load python
conda create --name h5pyenv --clone lazy-h5py
conda activate h5pyenv

conda install ...

Building h5py from source

If you instead prefer to build a h5py that is capable of running in parallel at NERSC we can follow the directions from the official documentation.

You will first need a conda environment with mpi4py built from source for the NERSC environment, as shown in our directions above, or you can clone our lazy-mpi4py conda environment where we have already built mpi4py against PrgEnv-gnu:

module load python
conda create -n h5pyenv --clone lazy-mpi4py

Activate the environment:

conda activate h5pyenv

Load and configure your modules:

module load cray-hdf5-parallel
module swap PrgEnv-${PE_ENV,,} PrgEnv-gnu

Install h5py dependencies. To get an MKL-optimized numpy from the defaults conda channel; we override the channel selection to avoid installing a less-optimized numpy e.g. from conda-forge (see this note):

conda install -c defaults --override-channels numpy cython

You can also find some tips for performance optimization on our Python on AMD page for running MKL-optimized code on AMD CPUs, such as those on Perlmutter.

And finally use pip to build the parallel h5py from source:

HDF5_MPI=ON CC=cc pip install -v --force-reinstall --no-cache-dir --no-binary=h5py --no-build-isolation --no-deps h5py

We used a few pip install flags to help achieve the desired outcome in this case. Not all were strictly necessary but they may come in handy in case you run into any issues.

Smoketest for h5py

To test a parallel build of h5py we need to use the compute nodes, since mpi4py doesn't work on login nodes. Let's get 2 interactive compute nodes, with 2 processors each and activate the h5py environment we just cloned or built:

salloc -N 2 --ntasks-per-node 2 -t 10 -C knl -q interactive
module load python
conda activate h5pyenv

We'll use this test program described in the h5py docs:

from mpi4py import MPI
import h5py

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for a 4-process job)

with h5py.File('test.h5', 'w', driver='mpio', comm=MPI.COMM_WORLD) as f:
  dset = f.create_dataset('test', (4,), dtype='i')
  dset[rank] = rank

We can run this test with the 4 mpi ranks requested:

srun python test_h5py.py

If we now look at the file we wrote with h5dump test.h5 it should look like this:

HDF5 "test.h5" {
GROUP "/" {
   DATASET "test" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 4 ) / ( 4 ) }
      DATA {
      (0): 0, 1, 2, 3
      }
   }
}
}

Great! Our 4 mpi ranks each wrote part of the HDF5 file.

PyOMP

PyOMP is a new project to allow Python programmers to use OpenMP directives in their Python code. These directives can be used to achieve low-level thread parallelism in loops, for example. Right now PyOMP provides CPU-only support but eventually plans to add GPU support.

Users can install a development version of numba with PyOMP support from the drtodd13 conda channel using the following commands.

> module load python
> conda create -n pyomp python numba -c drtodd13 -c conda-forge --override-channels

Confirm that numba was installed from the drtodd13 conda channel:

> conda activate pyomp
(pyomp)> conda list | grep numba
# Name   Version         Build                                 Channel
...
numba    0.55.0dev0      np1.11py3.7hc13618b_gcd8f3afa0_488    drtodd13
...

Known issue on Cori

On Cori, you may encounter the following error message when you try to import this development version of numba.

OSError: Could not load shared object file: libllvmlite.so

At this time, we suggest you try PyOMP on Perlmutter or use it via Shifter instead.

PyOMP in Shifter

Users can also use a NERSC Shifter image that already contains PyOMP. The image registry.nersc.gov/library/nersc/pyomp:0.55 is available on both Cori and Perlmutter. To test on a login node, a user can

elvis@cori12:~> OMP_NUM_THREADS=2 spyomp hello_par.py

To use in a job

elvis@cori12:~> cat job2.sh
#!/bin/bash
#SBATCH --image=registry.nersc.gov/library/nersc/pyomp:0.55
#SBATCH --qos=debug
#SBATCH -N 1
#SBATCH -C knl

export OMP_NUM_THREADS=4
spyomp hello_par.py
elvis@cori12:~> sbatch job2.sh

where the hello_par.py Python script is:

from numba import njit
from numba.openmp import openmp_context as openmp

@njit
def hello():
    with openmp ("parallel"):
        print("hello")
        print("world")

hello()

Note both of these examples make use of the spyomp wrapper, which is a simple alias to the registry.nersc.gov/library/nersc/pyomp:0.55 image and Shifter command, which is accessible to all users:

elvis@cori03:~> more `which spyomp`
#!/bin/bash
if echo $* | grep -e "-h" -q
then
  echo 'Usage: spyomp is a NERSC wrapper to use a PyOMP shifter image to run PyOMP application codes'
  echo 'spyomp pyomp_code.py'
  echo 'or OMP_NUM_THREADS=<value> spyomp pyomp_code.py'
else
  shifter --image=registry.nersc.gov/library/nersc/pyomp:0.55 python "$@"
fi

For more information about using Shifter at NERSC, please see our Shifter documentation.

PyOMP Example

Here is a simple example using openmp in numba based on an example from the PyOMP project repo.

from numba import njit
from numba.openmp import openmp_context as openmp

@njit
def pi_numba_openmp(num_steps):
    step = 1.0/num_steps
    sum = 0.0
    with openmp("parallel for private(x) reduction(+:sum)"):
        for i in range(num_steps):
            x = (i+0.5)*step
            sum += 4.0/(1.0 + x*x)
    pi = step*sum
    return pi

pi = pi_numba_openmp(1_000_000_000)
print(pi)

Aftering creating and activiting your pyomp conda environment, you can run the code with:

(pyomp)> OMP_NUM_THREADS=8 python pi_loop_openmp.py
3.1415926535897687

where we use the environment variable OMP_NUM_THREADS to specify the number threads to use for parallel regions.

If you have questions or feedback about PyOMP, please let us know by submitting a ticket at help.nersc.gov.