What is Dask?¶
Dask is task-based parallelization framework for Python. It allows you to distribute your work among a collection of workers controlled by a central scheduler.
Dask is well-documented, flexible, and currently under active development. A good way to start learning about Dask is to read through or try some of their tutorials. You can also watch a 15 minute Dask demo here.
Using Dask is experimental at NERSC
This means that recommended best practices will change. Ideally over time it will become easier and easier to use Dask at NERSC. We welcome your feedback!
Dask has three important parts:
- The scheduler, which coordinates your Dask job.
- The workers, which actually get your work done in parallel.
- The client, the means by which the scheduler and workers communicate.
Advantages of Dask¶
- Dask can run on small systems like your laptop all the way up to large systems like Cori. The number of workers can be easily adjusted or even automatically scaled.
- It is resilient. A nanny process can revive dead workers and the job can continue.
- It has a useful dashboard for profiling, monitoring, and debugging.
- Dask will not scale as well as MPI.
- Dask is stable but is also under active development and changes quickly.
- Dask uses TCP communication (slow).
You can use Dask with Jupyter at NERSC. Here is a small collection of example notebooks that use Dask. These are very much a work in progress and as we improve Dask support at NERSC they should become more streamlined.
Dask Tips and Best Practices¶
Run your Dask jobs on $SCRATCH
It is better to run your Dask jobs on $SCRATCH. Dask will try to lock the files associated with each worker which works automatically on on $SCRATCH. On $HOME, however, file locking causes errors and you will see many error messages that look like:
distributed.diskutils - ERROR - Could not acquire workspace lock on path: /global/u1/s/stephey/worker-klsptdq3.dirlock .Continuing without lock. This may result in workspaces not being cleaned up
Each Dask task has about 1 ms of overhead. If you have a lot tasks this overhead can add up. For example, having 10,000 tasks can result in a few seconds of overhead for each operation. For this reason it is a good idea to give each task more than a few seconds of work.
To better understand how your program is performing, check out this page. For a video demo that demonstrates how to group your work into fewer, more substantial tasks, see here. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per processor. For KNL for example, this would mean that you could assign 8 threads per worker. In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
For more information about which kind of Dask API to choose (Dask Delayed, Dask Futures, or Dask Bag), see this page. For beginners, Dask Bag is an easy ("harder to shoot yourself in the foot with") but less configurable solution. Dask Delayed and Dask Futures are more powerful but also more complex. Users should be aware of all three options so they can choose which one is best suited to their application.
There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. For 1 ms overhead per task and for 1 second of work per task, the workers begin to swamp around 5000 workers.
To automatically clean up workers, you can set
so that workers will die if they haven't communicated with the scheduler in the last 60 seconds. This will help keep your directory squeaky clean.