Dask at NERSC¶
What is Dask?¶
Dask is a task-based parallelization framework for Python. It allows you to distribute your work among a collection of workers controlled by a central scheduler. Dask can enable internode and intranode scaling on both CPUs and GPUs and is a central part of the NVIDIA RAPIDS ecosystem. Users who want to scale their work but may not want to use options like mpi4py may be interested in Dask.
To learn how to use Dask at NERSC, please explore our example notebooks. In these examples we provide both CPU and GPU examples that you can run yourself on Cori and Cori-GPU.
Dask is well-documented, flexible, and currently under active development. A good way to start learning about Dask is to read through or try some of their tutorials. You can also watch a 15 minute Dask demo. The Dask architecture is comprised of a scheduler and workers, which can be either CPU or GPU workers.
Strengths of Dask¶
- Dask can run on small systems like your laptop all the way up to large systems like Cori. The number of workers can be easily adjusted.
- It is resilient. A nanny process can revive dead workers and the job can continue.
- It has a useful dashboard for profiling, monitoring, and debugging.
- Dask can scale both CPU and GPU code.
Disadvantages of Dask¶
- Dask will not scale as well as MPI.
- Dask is stable but is also under active development and does change.
- Some kinds of Dask communication (like TCP, the default) can be slow.
- Like other frameworks (including MPI), it is best to avoid moving large amounts of data.
Dask Tips and Best Practices at NERSC¶
Run your Dask jobs on $SCRATCH
It is better to run your Dask jobs on $SCRATCH. Dask will try to lock the files associated with each worker which works automatically on on $SCRATCH. On $HOME, however, file locking causes errors and you will see many error messages that look like:
distributed.diskutils - ERROR - Could not acquire workspace lock on path: /global/u1/s/elvis/worker-klsptdq3.dirlock .Continuing without lock. This may result in workspaces not being cleaned up
Each Dask task has about 1 ms of overhead. If you have a lot tasks this overhead can add up. For example, having 10,000 tasks can result in a few seconds of overhead for each operation. For this reason it is a good idea to give each task more than a few seconds of work.
Dask provides some advice about understanding performance. You can also group your work into fewer, more substantial tasks. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per processor. For KNL for example, this would mean that you could assign 8 threads per worker. In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
There are several types of Dask API options, including Dask Delayed, Dask Futures, and Dask Bag. For beginners, Dask Bag is an easy ("harder to shoot yourself in the foot with") but less configurable solution. Dask Delayed and Dask Futures are more powerful but also more complex. Users should be aware of all three options so they can choose which one is best suited to their application.
There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. For 1 ms overhead per task and for 1 second of work per task, the workers begin to swamp around 5000 workers.
To automatically clean up workers, you can set
--death-timeout 60so that workers will die if they haven't communicated with the scheduler in the last 60 seconds. This will help keep your directory squeaky clean.