Maestro¶
Maestro is a Python-based workflow orchestration tool developed at Lawrence Livermore National Laboratory. Users define multi-step workflows as YAML "study specs," and Maestro handles step ordering, dependency resolution, and Slurm job submission.
Strengths of Maestro¶
- DAG-based dependency resolution: Define inter-step dependencies with the
dependskey; Maestro builds a directed acyclic graph and submits each step only after its prerequisites complete. - Built-in parameter sweeps: Declare combinatorial parameter sets in
global.parametersand Maestro automatically expands them into individual job instances—no shell loops required. - Slurm-native on Perlmutter: The
$(LAUNCHER)token expands tosrun, and thebatchblock maps directly to#SBATCHdirectives. - Lightweight installation: Pure Python; install with
pip install maestrowfin a virtual environment or conda environment. - Study monitoring:
maestro statusreports per-step state (submitted, running, finished, failed) across all parameter instances.
Disadvantages of Maestro¶
- Driver process required: The
maestro runprocess must remain alive for the duration of the study. If this process exits—for example, because a login session times out—the study stalls even though individual Slurm jobs may still be running. - Slurm polling lag: Maestro polls Slurm at a configurable interval (default 60 seconds). Short-duration steps may wait up to one polling cycle before downstream steps are submitted.
- No web UI: Study status is text-only via
maestro status; there is no browser-based dashboard. - Single-machine driver: The driver process runs on a single login node. Very large parameter sweeps may strain the polling loop.
Installing Maestro at NERSC¶
Maestro requires Python 3.9 or later. Install it into an isolated environment so it does not conflict with other packages.
Using venv¶
module load python
python -m venv maestro_env
source maestro_env/bin/activate
pip install maestrowf
maestro --version
Using conda¶
module load conda
conda create -n maestro_env "python>=3.9" -y
conda activate maestro_env
pip install maestrowf
maestro --version
Running Maestro on Perlmutter¶
A Maestro study spec is a YAML file with three top-level sections: description, batch, and study. The batch block configures the Slurm scheduler; the study block lists the workflow steps.
On Perlmutter, set queue: "" (empty) and specify qos. Steps that perform lightweight work—staging files, running short Python scripts—can omit nodes, procs, and walltime entirely; Maestro runs those steps directly on the login node without submitting a Slurm job. Steps that need compute nodes include those keys and are submitted to Slurm.
The following two-step example stages input data on the login node, then runs a parallel computation with srun. Three Maestro tokens appear throughout:
$(LAUNCHER): Expands tosrunon Perlmutter. Prefix any parallel executable with this token to launch it across all allocated cores.$(WORKSPACE): The per-step output directory that Maestro creates at runtime. Write step outputs here to keep runs isolated. Use$(step_name.workspace)to reference another step's output directory by name.$(SPECROOT): The directory containing the study spec YAML file. Use it to reference scripts and input data relative to the spec.
description:
name: perlmutter_demo
description: Two-step CPU pipeline on Perlmutter
batch:
type: slurm
host: perlmutter
bank: m1234 # Replace with your NERSC project account
queue: "" # Leave empty; partition is set by qos
qos: regular # Use "debug" for short test runs
study:
- name: setup
description: Generate input data on the login node
run:
cmd: |
echo "1000000" > $(WORKSPACE)/input.txt
# No nodes/procs/walltime: runs on the login node
- name: compute
description: Run the parallel computation with srun
run:
cmd: |
#SBATCH --constraint=cpu
$(LAUNCHER) python $(SPECROOT)/scripts/compute.py \
--input $(setup.workspace)/input.txt \
--output $(WORKSPACE)/result.txt
depends: [setup]
nodes: 1
procs: 64
walltime: "00:20:00"
scripts/compute.py
import argparse
import random
def main():
parser = argparse.ArgumentParser(description="Example compute script")
parser.add_argument("--input", help="Path to input file")
parser.add_argument("--output", required=True, help="Path to output file")
args = parser.parse_args()
values = [random.random() for _ in range(10_000)]
with open(args.output, "w") as f:
f.write("\n".join(f"{v:.6f}" for v in values))
if __name__ == "__main__":
main()
Embedding #SBATCH directives in a step's cmd
Perlmutter requires --constraint=cpu or --constraint=gpu on every Slurm job. Because the top-level batch block does not expose a constraint key, embed it as an #SBATCH directive at the top of the step's cmd field. Maestro passes cmd to Slurm as a batch script, so inline #SBATCH lines are treated as standard directives. Use the same approach for any other per-step option such as --gpus-per-task.
- name: gpu_step
description: GPU-accelerated computation
run:
cmd: |
#SBATCH --constraint=gpu
#SBATCH --gpus-per-task=1
$(LAUNCHER) python $(SPECROOT)/scripts/gpu_kernel.py \
--output $(WORKSPACE)/result.dat
Keep the maestro run process alive for the duration of the study
The maestro run process monitors submitted jobs and submits downstream steps as dependencies complete. If this process exits—for example, because you log out or your login session times out—the study stalls even though individual Slurm jobs may still be running.
To run long studies without keeping a terminal open, launch the driver process as a long-running Slurm job using the workflow QOS.
Run, monitor, and cancel a study with these commands:
# Submit the study; Maestro creates a timestamped output directory
maestro run study.yaml
# Check progress of a running or completed study
maestro status study_20240101-120000/
# Cancel all running jobs in a study
maestro cancel study_20240101-120000/
Parameter Sweeps¶
Maestro's global.parameters block turns a single study spec into a combinatorial parameter sweep. Each combination of parameter values becomes an independent set of job instances, with step outputs isolated in separate workspace directories.
The following example sweeps over three problem sizes. Maestro expands run_simulation into three instances (SIZE.10, SIZE.100, SIZE.1000) and runs aggregate_results only after all three instances complete.
description:
name: size_sweep
description: Parameter sweep over problem sizes
batch:
type: slurm
host: perlmutter
bank: m1234 # Replace with your NERSC project account
queue: "" # Leave empty; partition is set by qos
qos: regular
study:
- name: run_simulation
description: Run simulation for a given problem size
run:
cmd: |
#SBATCH --constraint=cpu
$(LAUNCHER) python $(SPECROOT)/scripts/run_simulation.py \
--size $(SIZE) \
--output $(WORKSPACE)/output.dat
nodes: 1
procs: 64
walltime: "00:30:00"
- name: aggregate_results
description: Collect all simulation outputs into a single report
run:
cmd: |
python $(SPECROOT)/scripts/aggregate_results.py \
--study-root $(OUTPUT_PATH) \
--output $(WORKSPACE)/final_report.txt
depends: [run_simulation_*]
# No nodes/procs/walltime: runs on the login node
global.parameters:
SIZE:
values: [10, 100, 1000]
label: SIZE.%%
Key details:
global.parameters: Declares parameter names, their values, and alabelpattern.SIZE.%%produces workspace subdirectory names likeSIZE.10,SIZE.100, andSIZE.1000.$(SIZE): Expands to the current parameter value inside each job instance.depends: [run_simulation_*]: The glob*matches all threerun_simulationinstances, soaggregate_resultswaits until every instance has finished.$(OUTPUT_PATH): Expands to the timestamped study root directory (parent of all step workspaces). The aggregation script uses it to locate outputs from all parameter instances.
scripts/run_simulation.py
import argparse
def main():
parser = argparse.ArgumentParser(description="Simulation for one parameter instance")
parser.add_argument("--size", type=int, required=True, help="Problem size")
parser.add_argument("--output", required=True, help="Path to output file")
args = parser.parse_args()
result = sum(range(args.size))
with open(args.output, "w") as f:
f.write(f"size={args.size} result={result}\n")
print(f"size={args.size} result={result}")
if __name__ == "__main__":
main()
scripts/aggregate_results.py
import argparse
import glob
import os
def main():
parser = argparse.ArgumentParser(
description="Aggregate results from all parameter instances"
)
parser.add_argument(
"--study-root",
required=True,
help="Path to the timestamped study output directory ($(OUTPUT_PATH))",
)
parser.add_argument("--output", required=True, help="Path to the final report file")
args = parser.parse_args()
pattern = os.path.join(args.study_root, "run_simulation_*/output.dat")
paths = sorted(glob.glob(pattern))
lines = []
for path in paths:
with open(path) as f:
lines.append(f.read().strip())
with open(args.output, "w") as f:
f.write("\n".join(lines) + "\n")
print(f"Aggregated {len(paths)} results to {args.output}")
if __name__ == "__main__":
main()