Skip to content

How use parallelism in Python

Since you are running at NERSC you may be interested in parallelizing your Python code and/or its I/O. This is a detailed topic but we will provide a short overview of several options.

Multiprocessing

Python's standard library provides a multiprocessing package that supports spawning of processes. Multiprocessing be used to achieve some level of parallelism within a single compute node. It cannot be used to achieve parallelism across compute nodes. For more information, please see the official Python multiprocessing docs.

If your multiprocessing code makes calls to a threaded library like NumPy with threaded MKL support then you need to consider oversubscription of threads. While process affinity can be controlled to some degrees in certain contexts (e.g. Python distributions that implement os.sched_{get,set}affinity) it is generally easier to reduce the number of threads used by each process. Actually it is most advisable to set it to a single thread. In particular for OpenMP:

export OMP_NUM_THREADS=1

Furthermore, use of Python multiprocessing on KNL you are advised to specify:

export KMP_AFFINITY=disabled

mpi4py

mpi4py provides MPI standard bindings to the Python programming language. Documentation on mpi4py is available here.

Here is an example of how to use mpi4py on Cori:

#!/usr/bin/env python
from mpi4py import MPI
mpi_rank = MPI.COMM_WORLD.Get_rank()
mpi_size = MPI.COMM_WORLD.Get_size()
print(mpi_rank, mpi_size)

This program will initialize MPI, find each MPI task's rank in the global communicator, find the total number of ranks in the global communicator, print out these two results, and exit. Finalizing MPI with mpi4py is not necessary; it happens automatically when the program exits.

Suppose we put this program into a file called "mympi.py." To run it on the Haswell nodes on Cori, we could create the following batch script in the same directory as our Python script, that we call "myjob.sh:"

#!/bin/bash
#SBATCH --constraint=haswell
#SBATCH --nodes=3
#SBATCH --time=5

module load python
srun -n 96 -c 2 python mympi.py

To run "mympi.py" in batch on Cori, we submit the batch script from the command line using sbatch, and wait for it to run:

sbatch myjob.sh
Submitted batch job 987654321

After the job finishes, the output will be found in the file "slurm-987654321.out:"

% cat slurm-987654321.out
...
91 96
44 96
31 96
...
0 96
...

mpi4py in your custom conda environment

If you would like to use mpi4py in a custom conda environment, you will need to install and build it inside your environment.

Do NOT conda/pip install mpi4py

You can install mpi4py using these tools without any warnings, but your mpi4py programs just won't work. To use Cori's MPICH MPI, you'll need to build it yourself using the Cray compiler wrappers that link in Cray MPICH libraries.

You can build mpi4py and install it into a conda environment on Cori using a recipe like the following:

wget https://bitbucket.org/mpi4py/mpi4py/downloads/mpi4py-3.0.3.tar.gz
tar zxvf mpi4py-3.0.3.tar.gz
cd mpi4py-3.0.3
module swap PrgEnv-intel PrgEnv-gnu
python setup.py build --mpicc="$(which cc) -shared"
python setup.py install

New experimental option: you can clone the lazy-mpi4py conda environment we provide at NERSC and add your own packages on top:

module load python
conda create --name myenv --clone lazy-mpi4py
source activate myenv
conda install numpy scipy

If you have questions or feedback about this method, please let us know at help.nersc.gov.

Using mpi4py in a Shifter container

When a large number of Python tasks are running simultaneously, especially if they are launched with MPI, the result is many tasks trying to open the same files at the same time, causing contention and degradation of performance. Python applications running at the scale of a few hundred or a thousand tasks may take an unacceptable amount of time simply starting up.

To overcome this problem, NERSC strongly advises users to build a Docker image containing their Python stack and use Shifter to run it. This is the best approach to overcome the at-scale import bottleneck at NERSC.

Shifter is a technology developed at NERSC to provide scalable Linux container deployment in a high-performance computing environment. Shifter is very similar to Docker but without root privileges (which is necessary at a place like NERSC).

The idea is to package up an application and its entire software stack into a Linux container and distribute these containers to the compute nodes. This localizes the modules and any associated shared object libraries to the compute nodes, eliminating contention on the shared file system. Using Shifter results in tremendous speed-ups for launching larger process-parallel Python applications.

For more information about how to build and use mpi4py in a Shifter container, please see here.

Dask

Dask is a task-based system in which a scheduler assigns work to workers. It is robust to failure and provides a nice bokeh-based application dashboard. It can be used to scale to multinode CPU and GPU systems. You can find more information about using Dask at NERSC here.

Parallel I/O with h5py

You can use h5py for either serial or parallel I/O.

For more general information about HDF5 at NERSC please see this page.

If you would like to use h5py for parallel I/O, you will have to build h5py against mpi4py in your custom conda environment.

We will provide the directions for building an h5py-parallel enabled conda environment below. These directions are based on those found here and here.

You will first need a conda environment with mpi4py built and installed for NERSC. You can follow our directions here OR you can try cloning our lazy-mpi4py conda environment where we have already built mpi4py for you:

module load python
conda create -n h5pyenv --clone lazy-mpi4py

Activate your environment

source activate h5pyenv

Load and configure your modules:

module load cray-hdf5-parallel
module swap PrgEnv-intel PrgEnv-gnu

Clone the h5py github repository:

cd $SCRATCH
git clone https://github.com/h5py/h5py
cd h5py

Configure your build environment:

export HDF5_MPI="ON"
export CC=/opt/cray/pe/craype/2.6.2/bin/cc

Configure your h5py build:

python setup.py configure

The output should look like:

********************************************************************************
                       Summary of the h5py configuration

HDF5 include dirs: [
  '/opt/cray/pe/hdf5/1.10.5.2/GNU/8.2/include'
]
HDF5 library dirs: [
  '/opt/cray/pe/hdf5/1.10.5.2/GNU/8.2/lib'
]
     HDF5 Version: '1.10.5'
      MPI Enabled: True
 Rebuild Required: True

********************************************************************************

Now build:

python setup.py build

Once the build completes, you'll need to install h5py as a Python package:

pip install --no-binary=h5py h5py

Now we will test our h5py-parallel enabled conda environment on a compute node since mpi4py will not work on login nodes. Get an interactive compute node:

salloc -N 1 -t 20 -C haswell -q interactive
module load python
source activate h5pyenv

We'll use this test program described in the h5py docs:

from mpi4py import MPI
import h5py

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for 4-process run)

f = h5py.File('parallel_test.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)

dset = f.create_dataset('test', (4,), dtype='i')
dset[rank] = rank

f.close()

We can run this test with 4 mpi ranks:

srun -n 4 python test_h5pyparallel.py

Let's look at the file we wrote with h5dump parallel_test.hdf5. It should look like this:

HDF5 "parallel_test.hdf5" {
GROUP "/" {
   DATASET "test" {
      DATATYPE  H5T_STD_I32LE
      DATASPACE  SIMPLE { ( 4 ) / ( 4 ) }
      DATA {
      (0): 0, 1, 2, 3
      }
   }
}
}

Great! Our 4 mpi ranks each wrote part of this HDF5 file.