Skip to content

Parsl

Parsl is a Python library for programming and executing data-oriented workflows in parallel. Parsl has excellent documentation and hosts the software publicly on github (https://github.com/Parsl/parsl) for easy review.

  • Using Parsl at NERSC is easy and just requires that you build a custom conda env.
  • Parsl works best at NERSC via the detached head mode in the second example.
  • Parsl can work both with and without MPI.

Please note that this guide is limited to deploying your Parsl workflow with suitable configurations at NERSC and not necessarily how to express your workflow in Parsl. To learn how to create your workflow in Parsl, please see the Parsl docs. For questions regarding the use of Parsl or reporting a bug, please contact:

Parsl Support <parsl-user@googlegroups.com> or connect via GitHub or Parsl’s Slack channel

How Parsl works

A great short description on how Parsl works can be found on its project page.

Parsl is designed to enable the straightforward orchestration of asynchronous tasks into dataflow-based workflows in Python. Parsl manages the parallel execution of these tasks across computation resources when dependencies (e.g., input data dependencies) are met. Developing a workflow is a two-step process:

  • Annotate functions that can be executed in parallel as Parsl apps.
  • Specify dependencies between functions using standard Python code.

In Parsl, the execution of an app yields futures. These futures can be passed to other apps as inputs, establishing a dependency. These dependencies are assembled implicitly into directed acyclic graphs, although these are never explicitly expressed. Furthermore, the dependency graph is dynamically built and then updated while the Parsl script executes. That is, the graph is not computed in advance and is only complete when the script finishes executing. Apps that have all their dependencies met are slated for execution (in parallel).

As can be seen in many of the examples, this design allows a user to express complex workflows in a single Python script. The process running the script also becomes the anchorpoint to the workflow. It's similar to running interactive jobs which are tied to your login session.

Strengths of Parsl

  • Easy to install, no database dependencies.
  • Designed for large scale computing.
  • Good documentation.
  • Entirely Python orchestration: No bash scripts for workflows, no explicit slurm calls in the workflow.

Disadvantages of Parsl

  • Most issues with running Parsl arise from its preferred setup of a master process trying to orchestrate resources with NERSC's busy slurm instance. Please look a the warning box at the first example to avoid pitfalls.
  • How executor, provider and launcher parameters relate to slurm parameters is a learning process.

Scaling and scheduling

  • Parsl can schedule up to 1000-2000 apps/tasks per second. As a rule of thumb, in a high throughput case, the ideal duration of a task is larger than 0.01s times the number of nodes, i.e. for 100 nodes the duration of a task shall be longer than a second.
  • Parsl has been tested to scale up to 8000 nodes (https://doi.org/10.1145/3307681.3325400)
  • For logging, Parsl will create one directory for each node, and 1 manager and N workers will write separate logs to this directory. Logging is pretty minimal at two lines per tasks unless debug logging is turned on.
  • Bash apps communicate via files only. This may add to or suffer from file system congestions

How to use Parsl at NERSC

Parsl can be installed at with pip or conda like any other Python package and does not require additional resources. Parsl is not part of the base environment at NERSC so you need to create your own conda environment and install parsl into it.

For example:

$ module load python
$ conda create --name parsl
$ source activate parsl
(parsl) $

Install parsl or with pip or through conda forge:

(parsl) $ python -m pip install parsl
or
(parsl) $ conda install -c conda-forge parsl

Your workers need conda too.

When configuring Parsl you will need to ensure that the same conda environment is loaded on the worker nodes. This can be set by specifying worker_init=source activate <CONDA_ENVIRONMENT>

Below we outline examples tuned for NERSC resources.

The first example is considered the most common use case for Parsl: It deploys the Parsl master process on a workflow or login node and manages execution on one or more workers, who are organized in workflow pools on Cori's compute nodes. The caveat with this example is that the speed at which resources are made available to the master process has many factors, e.q. queue policies, other jobs already submited, resources requested, etc.. It can take days until an allocation becomes available while the master process patiently waits for execution.

Therefore, we provide a second example where Parsl is run in a detached mode by submitting a Parsl job to the Slurm queue. The master process then runs on the head node (node with rank 0). This example is useful for long-running jobs and when users want to avoid hosting the main Parsl process on a login node, i.e. outside their allocation. Additionally, this example can be used for interactive sessions.

The last example shows how Parsl can manage multiple multi-node MPI jobs. However, this is considered an advanced execution model and users are advised to consult the Parsl docs to get a better understanding.

Example: Creating a Parsl script for a high-throughput scenario

Below we outline steps for creating a Parsl program by configuring Parsl to manage execution on NERSC resources, writing a Parsl program by annotating Python functions as apps, and finally linking together apps to create a workflow. All these steps can be combined into a single Python script. That script can then simply be launched from a login or a workflow node and will use the SlurmProvider plugin to allocate/deallocate resources for its worker pool.

Running Parsl's master process outside the allocation

  • A Parsl script (the driver process) with SlurmProvider plugin submits jobs to slurm for its worker pool. It then has to wait for the job to become live before doing any distributed processing. This means that the main script will wait while the job is queued. If you start your driver process from a workflow or login node, please make sure that it’s lean on usage of resources as login and workflow nodes are shared with other users. Processes that take up an unreasonable amount of resources for a long time raise suspicion and might get canceled involuntarily.
  • As the worker pool is tied to the driver process, you need to keep that process alive when logging out. Consider a NoMachine session or screening the process. Prior to version 1.0, Parsl had elastic scaling enabled as default which, on occasion, led to over-allocation of resources. Elastic scaling is now disabled by default and should be used with care so as to avoid submitting many jobs to slurm.
  • Parsl might fail to cancel your allocation if something goes wrong. Check your submitted jobs in the case that the workflow didn’t execute properly. You might need to clean up your Parsl jobs manually with scancel, but they should auto-exit within a two minute time frame.

Step 1: Specify a configuration

Parsl provides several ways of configuring resources for execution. Parsl configurations are simply Python objects that define how resources should be used (e.g., queue name, allocation name, number of workers per node). The main part of Parsl programs are config-independent and the same program may load a different config to be executed in a different environment (via parsl.load(config)).

This example shows the most common way of deploying Parsl. Here the Parsl process is hosted on the login node and workers are deployed on 2 compute nodes (nodes_per_block). Each worker node will allocate 2 cores per task (cores_per_worker) for running Parsl tasks.

import parsl
from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_interface

# Uncomment this to see logging info
#import logging
#logging.basicConfig(level=logging.DEBUG)

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_multinode',
            # Optional: the network interface on the login node to
            # which compute nodes can communicate
            address=address_by_interface('bond0.144'),
            cores_per_worker=2,
            provider=SlurmProvider(
                'regular',  # Partition / QOS
                nodes_per_block=2,
                # string to prepend to #SBATCH blocks in the submit
                scheduler_options='#SBATCH -C haswell',
                # Command to be run before starting a worker
                worker_init='module load python; source activate parsl',
                # We request all hyperthreads on a node.
                launcher=SrunLauncher(overrides='-c 64'),
                walltime='00:10:00',
                # Slurm scheduler on Cori can be slow at times,
                # increase the command timeouts
                cmd_timeout=120,
            ),
        )
    ]
)

# load the Parsl config
parsl.load(config)

Step 2: Define Parsl Apps

Parsl enables standard Python scripts to be annotated with simple decorators that indicate where functions may be executed concurrently (assuming their dependencies are met). Parsl provides two key building blocks: the python_app for Python functions and bash_app for wrapping external applications (e.g., binaries, scripts, and programs written in other languages).

Below we show an example of a sum function run in two ways, first as a Python function and then as remote calls to a Python program invoked via Bash. We are are recycling the taskfarmer workload. Understand, that the bash_app calls Python as a binary, but execute any command available on your allocated compute nodes.

from parsl import python_app, bash_app

# Here we are attempting the simple sum calculation 
@python_app
def app_argsum(a, b, c):
    return a+b+c

# Here we are actually calling an external program that will perform the same sum operation 
@bash_app
def app_argsum_bash(a, b, c, stderr=parsl.AUTO_LOGNAME, stdout=parsl.AUTO_LOGNAME):
    return 'python calc_sum.py -a {} -b {} -c {}'.format(a,b,c)

Step 3: Create a Parsl workflow

Having annotated functions with Parsl’s decorators, calls to these functions will be executed concurrently and remotely as defined by the Parsl configuration. As the functions are executed asynchronously, Parsl will return a future object in lieu of the function result. The following example shows how you can invoke the Parsl Python and Bash apps defined above using standard Python calls.

Parsl determines the workflow from input and outputs of apps.

Passing an output future from one app as input to another will establish a dependency which Parsl uses internally to create a workflow graph for execution.

# Create a list of integers
items = [(a,b,c) for a in [0,1] for b in [0,1] for c in [0,1]]

# Map phase: apply an *app* function to each item in list
mapping = []
for i in items:
    x = app_argsum(*i)
    mapping.append(x)

# same for the bash app
bashing = []
for i in items:
    x = app_argsum_bash(*i)
    bashing.append(x)

# Python app futures can send their results (pickling)
print("\n".join(["sum is %d" %r.result() for r in mapping]))

# Bash app futures are files,
for r in bashing:
    # block for execution
    r.result()
    with open(r.stdout, 'r') as f:
        print(f.read())

Example: Headless workflow launched from node 0

In this configuration, we will submit a Parsl script as a job to the slurm scheduler, rather than have Parsl do the job submission and management directly with the slurm scheduler. This approach requires a two step process of first defining the Parsl configuration and then submitting it as a job to the scheduler.

Step 1: Create a Parsl script with a config that only uses the Srun launcher and not the Slurm provider.

Let's call the script run_parsl_inside_allocation.py with the following contents. Though not strictly necessary with parsl>=1.0.0, we specify the addresses field to the interface of Cori's compute nodes for a slightly speedier startup.

import parsl
from parsl.config import Config
from parsl.providers import LocalProvider
from parsl.launchers import SrunLauncher
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor

# Uncomment this to see logging info
#import logging
#logging.basicConfig(level=logging.DEBUG)

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_headless',
            # Optional: The network interface on node 0 which compute nodes can communicate with.
            address=address_by_interface('ipogif0')
            # one worker per manager / node
            max_workers=1,
            provider=LocalProvider(
                channel=LocalChannel(script_dir='.'),
                # make sure the nodes_per_block matches the nodes requested in the submit script in the next step
                nodes_per_block=2,
                launcher=SrunLauncher(overrides='-c 32'),
                cmd_timeout=120,
                init_blocks=1,
                max_blocks=1
            ),
        )
    ],
    strategy=None,
)

parsl.load(config)

# Here we sleep for 2 seconds and return platform information
@python_app
def platform():
    import platform
    import time
    time.sleep(2)
    return platform.uname()

calls = [platform() for i in range(4)]
print(calls)

for c in calls:
    print("Got result: ", c.result())

Step 2: Run the script

Here you have the choice. You can either create a bash wrapper named submit_parsl.sl

#!/bin/sh
#SBATCH -N 2 -c 64
#SBATCH -q debug
#SBATCH -t 00:05:00
#SBATCH -C haswell


module load python
source activate parsl

python run_parsl_inside_allocation.py

and submit that script to the scheduler

sbatch submit_parsl.sl

or you can get an interactive session and launch it right in there.

user@cori11:$ salloc -t 5 -N 2 -q interactive -C haswell
salloc: Granted job allocation 31601125
salloc: Waiting for resource configuration
salloc: Nodes nid00[272-273] are ready for job

user@nid00272:$ module load python; source activate parsl

(parsl) user@nid00272:$ python run_parsl_inside_allocation.py
[<AppFuture super=<AppFuture at 0x2aaaae18aad0 state=pending>>, <AppFuture super=<AppFuture at 0x2aaaac3c4390 state=pending>>, <AppFuture super=<AppFuture at 0x2aaaafc4cfd0 state=pending>>, <AppFuture super=<AppFuture at 0x2aaab61aa2d0 state=pending>>, <AppFuture super=<AppFuture at 0x2aaab61bb350 state=pending>>]
Got result:  uname_result(system='Linux', node='nid00272', release='4.12.14-150.17_5.0.90-cray_ari_c', version='#1 SMP Tue Apr 28 21:17:03 UTC 2020 (3e6e478)', machine='x86_64', processor='x86_64')
Got result:  uname_result(system='Linux', node='nid00273', release='4.12.14-150.17_5.0.90-cray_ari_c', version='#1 SMP Tue Apr 28 21:17:03 UTC 2020 (3e6e478)', machine='x86_64', processor='x86_64')
Got result:  uname_result(system='Linux', node='nid00272', release='4.12.14-150.17_5.0.90-cray_ari_c', version='#1 SMP Tue Apr 28 21:17:03 UTC 2020 (3e6e478)', machine='x86_64', processor='x86_64')
Got result:  uname_result(system='Linux', node='nid00273', release='4.12.14-150.17_5.0.90-cray_ari_c', version='#1 SMP Tue Apr 28 21:17:03 UTC 2020 (3e6e478)', machine='x86_64', processor='x86_64')
Got result:  uname_result(system='Linux', node='nid00272', release='4.12.14-150.17_5.0.90-cray_ari_c', version='#1 SMP Tue Apr 28 21:17:03 UTC 2020 (3e6e478)', machine='x86_64', processor='x86_

Example: Multi-node MPI execution

This final example demonstrates how to run multi-node MPI tasks in Parsl. Here we load openmpi in the scheduler options, and instead of using the srunlauncher, we call srun in the Parsl app. The following example assumes you have compiled the mpi_hello binary (https://github.com/Parsl/parsl-tutorial/tree/master/mpi_apps). Once resources are allocated, Parsl will launch 4 worker processes on node 0 of the allocation which will, in turn, launch MPI processes with srun.

import parsl
from parsl import bash_app

from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_interface

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_multinode',
            # Optional: declare interface on masternode, 
            # with which the workers communicate
            address=address_by_interface('bond0.144'),
            # with the default launcher, these workers will all be on the head node
            max_workers=4,
            provider=SlurmProvider(
                'debug',  # Partition / QOS
                nodes_per_block=2,
                # string to prepend to #SBATCH blocks in the submit
                scheduler_options='#SBATCH -C haswell\n module load openmpi',
                worker_init='module load python; source activate parsl',
                walltime='00:10:00',
                cmd_timeout=120,
            ),
        )
    ]
)

parsl.load(config)

@bash_app
def mpi_hello(nodes, ranks, msg, stdout=parsl.AUTO_LOGNAME, stderr=parsl.AUTO_LOGNAME):
    cmd = f"srun -n {ranks} -N {nodes} mpi_hello hello {msg}"
    return cmd

def print_file(filename):
    with open(filename) as f:
        print(f.read())

futures = []
# Launch a mix of single node and 2 node tasks
for i in range(10):
    if i%2 == 0:
        x = mpi_hello(1, 4, i)
    else:
        x = mpi_hello(2, 4, i)
    futures.append(x)

# wait for everything
for i in futures:
    print(i.result())
    print(i.stdout, print_file(i.stdout))