A lightweight, scalable tool for launching and orchestrating task ensembles across HPC clusters with intelligent resource management and hierarchical execution.
- Features
- Installation
- Quick Start
- Architecture
- Configuration
- Execution Modes
- Examples
- Performance Tuning
- API Reference
- Testing
- Contributing
- Support
- Flexible Execution: Support for serial, MPI, and mixed workloads
- Intelligent Scheduling: Automatic resource allocation with customizable policies
- Hierarchical Architecture: Efficient master-worker patterns for large-scale deployments (1-2048+ nodes)
- Multiple Communication Backends: Choose between Python multiprocessing, ZMQ, or DragonHPC for performance at scale
- Resource Pinning: Fine-grained CPU and GPU affinity control
- Real-time Monitoring: Track task execution with configurable status updates
- Fault Tolerance: Graceful handling of task failures with detailed error reporting
- Python & Shell Support: Execute Python callables or shell commands seamlessly
- Python 3.6+
- numpy
- matplotlib
- scienceplots
- pytest
- cloudpickle
- pydantic
- pyzmq
- MPI implementation (for distributed execution via
mpirunormpiexec) - DragonHPC (for extreme-scale deployment on HPC systems)
- mcp and [paramiko] (https://www.paramiko.org/) for hosting mcp server on HPC compute nodes
git clone https://github.com/argonne-lcf/ensemble_launcher.git
cd ensemble_launcher
python3 -m pip install .Create a JSON configuration file describing your task ensemble:
{
"ensembles": {
"example_ensemble": {
"nnodes": 1,
"ppn": 1,
"cmd_template": "./exe -a {arg1} -b {arg2}",
"arg1": "linspace(0, 10, 5)",
"arg2": "linspace(0, 1, 5)",
"relation": "one-to-one"
}
}
}The configuration specifies an ensemble with:
- Tasks running on a single node with a single process per node
- Tasks executed with
./exe -a {arg1} -b {arg2}taking two input arguments - The values of the two input arguments are defined as 5 linearly spaced numbers between 0-10 and 0-1 for
arg1andarg2, respectively. - The raletionship between the values of the two arguments is set to
one-to-one, meaning the ensemble consists of 5 tasks, one for each pair of values.
Supported Relations:
one-to-one: Pair parameters element-wise (N tasks)many-to-many: Cartesian product of parameters (NĂ—M tasks)
from ensemble_launcher import EnsembleLauncher
if __name__ == '__main__':
# Auto-configure based on system and workload
el = EnsembleLauncher("config.json")
results = el.run()
# Write results to file
from ensemble_launcher import write_results_to_json
write_results_to_json(results, "results.json")python3 launcher_script.pyEnsemble Launcher provides a command-line interface for quick execution without writing launcher scripts.
After installation, use the el command:
el config.jsonAlternatively, run as a Python module:
python -m ensemble_launcher.cli config.jsonel --helpAvailable Options:
--ensemble-file(required): Path to the ensemble configuration JSON file--system-config-file(optional): Path to the system configuration JSON file--launcher-config-file(optional): Path to the launcher configuration JSON file--nodes-str(optional): Comma-separated list of compute nodes (e.g., "node-001,node-002,node-003")--pin-resources / --no-pin-resources: Enable/disable CPU/GPU resource pinning (default: enabled)--async-orchestrator / --no-async-orchestrator: Use event-driven orchestrator (default: disabled, only works with ZMQ)
Simple execution with default settings:
el my_ensemble.jsonWith custom configurations:
el my_ensemble.json \
--system-config-file system.json \
--launcher-config-file launcher.jsonSpecify compute nodes:
el my_ensemble.json \
--nodes-str "node-001,node-002,node-003,node-004"Use async orchestrator with ZMQ:
el my_ensemble.json \
--async-orchestratorDisable resource pinning:
el my_ensemble.json \
--no-pin-resourcesSystem Configuration (system.json):
{
"name": "my_cluster",
"ncpus": 104,
"ngpus": 12,
"cpus": [0, 1, 2, 3, 4],
"gpus": [0, 1, 2, 3]
}Launcher Configuration (launcher.json):
{
"child_executor_name": "mpi",
"task_executor_name": "mpi",
"comm_name": "zmq",
"nlevels": 2,
"report_interval": 10.0,
"return_stdout": true,
"worker_logs": true,
"master_logs": true
}- EnsembleLauncher: Main API entry point with auto-configuration
- Global/Local Master: Orchestrates workers, handles task distribution and aggregation
- Worker: Executes tasks using configured executor
- Scheduler: Allocates resources across cluster nodes with intelligent policies
- Executors: Backend task launching engines (Python multiprocessing, MPI, DragonHPC)
- Communication Layer: ZMQ or Python multiprocessing pipes
The master-worker architecture scales from single nodes to thousands of nodes:
- Single Node (nlevels=0): Direct execution without master overhead
- Small Scale (nlevels=1): Global master coordinates workers directly
- Large Scale (nlevels=2): Global master → Local masters → Workers for thousands of tasks
- Extreme Scale (nlevels=3): Deep hierarchy for supercomputer-scale deployments
The launcher automatically configures itself based on your workload and system:
from ensemble_launcher import EnsembleLauncher
el = EnsembleLauncher(
ensemble_file="config.json",
Nodes=["node-001", "node-002"], # Optional: auto-detects from PBS_NODEFILE, works only on PBS
pin_resources=True, # Enable CPU/GPU pinning
)For fine-grained control, explicitly configure system and launcher settings:
from ensemble_launcher import EnsembleLauncher
from ensemble_launcher.config import SystemConfig, LauncherConfig
# Define system resources
system_config = SystemConfig(
name="my_cluster",
ncpus=104, # CPUs per node
ngpus=12, # GPUs per node
cpus=list(range(104)), # Specific CPU IDs (optional)
gpus=list(range(12)) # Specific GPU IDs (optional)
)
# Configure launcher behavior
launcher_config = LauncherConfig(
child_executor_name="mpi", # multiprocessing, mpi, dragon
task_executor_name="mpi", # Executor for tasks
comm_name="zmq", # multiprocessing, zmq, dragon
nlevels=2, # Hierarchy depth (auto-computed if None)
report_interval=10.0, # Status update frequency (seconds)
return_stdout=True, # Capture stdout
worker_logs=True, # Enable worker logging
master_logs=True # Enable master logging
)
el = EnsembleLauncher(
ensemble_file="config.json",
system_config=system_config,
launcher_config=launcher_config,
pin_resources=True,
async_orchestrator=False #use event driven orchestrator (only for zmq communication backend)
)
results = el.run()Pin tasks to specific CPUs and GPUs for optimal performance:
{
"ensembles": {
"pinned_ensemble": {
"nnodes": 1,
"ppn": 4,
"cmd_template": "./gpu_code",
"cpu_affinity": "0,1,2,3",
"gpu_affinity": "0,1,2,3",
"ngpus_per_process": 1
}
}
}Resources are pinned using the gpu_selector option in the LauncherConfig (defaults to "ZE_AFFINITY_MASK" for Intel GPUs). The specific string the gpu_selector is set to depends on the SystemConfig. For example, setting:
system_config = SystemConfig(
name="my_cluster",
cpus=list(range(104)), # Specific CPU IDs (optional)
gpus=['0','0','1','1','2','3'] # Specific GPU IDs (optional)
)will overload the GPU 0 and 1 and the Scheduler assumes that node has a 6 GPUs instead of 4 GPUs.
Execute Python functions directly:
def my_simulation(param_a, param_b):
# Your simulation code
return result
from ensemble_launcher.ensemble import Task
tasks = {
"task-1": Task(
task_id="task-1",
nnodes=1,
ppn=1,
executable=my_simulation,
args=(10, 0.5)
)
}
el = EnsembleLauncher(
ensemble_file=tasks, # Pass dict directly
)
results = el.run()Note that, internally, the dictionary definition of the ensemble is converted to a collection of Task()s.
Execute binaries and shell commands with files as inputs:
{
"ensembles": {
"shell_ensemble": {
"nnodes": 1,
"ppn": 1,
"cmd_template": "./simulation --config {config_file}",
"config_file": ["config1.json", "config2.json", "config3.json"],
"relation": "one-to-one"
}
}
}which is launched using the following script.
from ensemble_launcher import EnsembleLauncher
if __name__ == '__main__':
# Auto-configure based on system and workload
el = EnsembleLauncher("config.json")
results = el.run()
# Write results to file
from ensemble_launcher import write_results_to_json
write_results_to_json(results, "results.json")Transform @mcp.tool into ensemble tool that can perform an ensemble of tool executions using a single AI tool call
from ensemble_launcher.mcp import Server
from sim_script import sim
mcp = Server(port=9276)
tool = mcp.ensemble_tool(sim)
"""
or
@mcp.ensemble_tool
def sim(a:float,b:float)->str:
return "Done sim"
or
from ensemble_launcher.config import LaucherConfig, SystemConfig
@mcp.ensemble_tool(launcher_config = LauncherConfig(...), system_config = SystemConfig(...))
def sim(a: float, b:floar)->str:
return "Done sim"
"""
if __name__ == "__main__":
mcp.run(transport="streamable-http")We also provide some tooling for port forwarding between compute and login nodes. In the client script do the following
from ensemble_launcher.mcp import start_tunnel, stop_tunnel
if __name__ == "__main__":
ret = start_tunnel("<User name>","<Job head node host name>",9276,9276)
asyncio.run(main())
stop_tunnel(*ret)See the examples directory for complete workflow samples:
examples/c++/workflow_pattern1.py- Basic parallel executionexamples/c++/workflow_pattern2.py- Parameter sweepsexamples/c++/workflow_pattern3.py- Complex dependencies
examples/mcp/combustion_agent- A simple combustion agent
| Backend | Best For | Nodes |
|---|---|---|
multiprocessing |
Single node, small ensembles | 1 |
zmq |
Multi-node, large scale | 2-2048+ |
The launcher automatically determines hierarchy depth based on node count, but you can override it with:
launcher_config = LauncherConfig(
nlevels=0 # Direct worker execution (single node)
nlevels=1 # Master + Workers (up to ~64 nodes)
nlevels=2 # Master + Sub-masters + Workers (64-2048 nodes)
nlevels=3 # Deep hierarchy (2048+ nodes)
)Auto-computed hierarchy:
- 1 node:
nlevels=0(worker only) - 2-64 nodes:
nlevels=1(master + workers) - 65-2048 nodes:
nlevels=2(master + sub-masters + workers) - 2048+ nodes:
nlevels=3(deep hierarchy)
Enable logging for detailed execution traces:
# import logging
# logging.basicConfig(level=logging.INFO)
launcher_config = LauncherConfig(
worker_logs=True,
master_logs=True,
report_interval=5.0, # Report status every 5 seconds
profile = "basic" or "timeline" #basic ouputs the communication latencies and task runtime. timeline outputs the mean, std, sum, and counts of various events in the orchestrator
)Logs are written to logs/master-*.log and logs/worker-*.log. Profiles are written to profiles/*
EnsembleLauncher(
ensemble_file: Union[str, Dict[str, Dict]],
system_config: SystemConfig = SystemConfig(name="local"),
launcher_config: Optional[LauncherConfig] = None,
Nodes: Optional[List[str]] = None,
pin_resources: bool = True,
async_orchestrator: bool = False
)Parameters:
ensemble_file: Path to JSON config or dict of task definitionssystem_config: System resource configurationlauncher_config: Launcher behavior configuration (auto-configured if None)Nodes: List of compute nodes (auto-detected if None)pin_resources: Enable CPU/GPU affinityasync_orchestrator: Use event-driven orchestrator (only for ZMQ backend)
Methods:
run(): Execute ensemble and return results
SystemConfig(
name: str,
ncpus: int = mp.cpu_count(),
ngpus: int = 0,
cpus: List[int] = [],
gpus: List[Union[str, int]] = []
)LauncherConfig(
child_executor_name: Literal["multiprocessing","dragon","mpi"] = "multiprocessing",
task_executor_name: Literal["multiprocessing","dragon","mpi"] = "multiprocessing",
comm_name: Literal["multiprocessing","zmq","dragon"] = "multiprocessing",
report_interval: float = 10.0,
nlevels: int = 1,
return_stdout: bool = False,
worker_logs: bool = False,
master_logs: bool = False,
nchildren: Optional[int] = None #Forces number of children at every level
profile: Optional[Literal["basic","timeline"]] = None
gpu_selector: str = "ZE_AFFINITY_MASK"
)Run the test suite:
pytest tests/Run specific tests:
pytest tests/test_el.py # End-to-end tests
pytest tests/test_executor.py # Executor tests
pytest tests/test_master.py # Master/Worker testsWe welcome contributions! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
git clone https://github.com/argonne-lcf/ensemble_launcher.git
cd ensemble_launcher
python3 -m pip install -e ".[dev]"
pytest tests/- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: See
examplesdirectory
This work was supported by the U.S. Department of Energy, Office of Science, under contract DE-AC02-06CH11357.
If you use Ensemble Launcher in your research, please cite:
@software{ensemble_launcher,
title = {Ensemble Launcher: Scalable Task Orchestration for HPC},
author = {Argonne National Laboratory},
year = {2025},
url = {https://github.com/argonne-lcf/ensemble_launcher}
}