Skip to content

Commit 2207904

Browse files
daniel-thomclaude
andauthored
Support multi-node allocations in direct mode (#215)
* Support multi-node allocations in direct mode Remove start_one_worker_per_node from Slurm example (requires direct mode) and fix rustfmt formatting issues. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f7bd0f1 commit 2207904

File tree

17 files changed

+394
-69
lines changed

17 files changed

+394
-69
lines changed

docs/src/core/reference/workflow-spec.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,18 +256,19 @@ slurm_defaults:
256256

257257
Defines conditional actions triggered by workflow or job state changes.
258258

259-
| Name | Type | Default | Description |
260-
| ------------------- | -------- | ---------- | --------------------------------------------------------------------------------------------------------- |
261-
| `trigger_type` | string | _required_ | When to trigger: `"on_workflow_start"`, `"on_workflow_complete"`, `"on_jobs_ready"`, `"on_jobs_complete"` |
262-
| `action_type` | string | _required_ | What to do: `"run_commands"`, `"schedule_nodes"` |
263-
| `jobs` | [string] | none | For job triggers: exact job names to match |
264-
| `job_name_regexes` | [string] | none | For job triggers: regex patterns to match job names |
265-
| `commands` | [string] | none | For `run_commands`: commands to execute |
266-
| `scheduler` | string | none | For `schedule_nodes`: scheduler name |
267-
| `scheduler_type` | string | none | For `schedule_nodes`: scheduler type (`"slurm"`, `"local"`) |
268-
| `num_allocations` | integer | none | For `schedule_nodes`: number of node allocations |
269-
| `max_parallel_jobs` | integer | none | For `schedule_nodes`: maximum parallel jobs |
270-
| `persistent` | boolean | false | Whether the action persists and can be claimed by multiple workers |
259+
| Name | Type | Default | Description |
260+
| --------------------------- | -------- | ---------- | --------------------------------------------------------------------------------------------------------- |
261+
| `trigger_type` | string | _required_ | When to trigger: `"on_workflow_start"`, `"on_workflow_complete"`, `"on_jobs_ready"`, `"on_jobs_complete"` |
262+
| `action_type` | string | _required_ | What to do: `"run_commands"`, `"schedule_nodes"` |
263+
| `jobs` | [string] | none | For job triggers: exact job names to match |
264+
| `job_name_regexes` | [string] | none | For job triggers: regex patterns to match job names |
265+
| `commands` | [string] | none | For `run_commands`: commands to execute |
266+
| `scheduler` | string | none | For `schedule_nodes`: scheduler name |
267+
| `scheduler_type` | string | none | For `schedule_nodes`: scheduler type (`"slurm"`, `"local"`) |
268+
| `num_allocations` | integer | none | For `schedule_nodes`: number of node allocations |
269+
| `start_one_worker_per_node` | boolean | false | For `schedule_nodes`: launch one worker per node (direct mode only) |
270+
| `max_parallel_jobs` | integer | none | For `schedule_nodes`: maximum parallel jobs |
271+
| `persistent` | boolean | false | Whether the action persists and can be claimed by multiple workers |
271272

272273
## ResourceMonitorConfig
273274

docs/src/specialized/design/workflow-actions.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,9 @@ Dynamically allocate compute resources from a Slurm scheduler.
256256
- `scheduler` (required) - Name of Slurm scheduler configuration (must exist in `slurm_schedulers`)
257257
- `scheduler_type` (required) - Must be "slurm"
258258
- `num_allocations` (required) - Number of Slurm allocation requests to submit
259+
- `start_one_worker_per_node` (optional, default: false) - Launch one worker per allocated node via
260+
`srun --ntasks-per-node=1`. Use this for direct-mode workflows with single-node jobs sharing a
261+
multi-node allocation. Not compatible with `execution_config.mode: slurm`.
259262

260263
**Use cases**:
261264

docs/src/specialized/hpc/multi-node-jobs.md

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,21 @@ nodes are in the allocation.
1818
**Use when**: You have many independent jobs that each fit on one node, and you want them to run in
1919
parallel across multiple nodes for throughput.
2020

21-
**How it works**: Torc requests a multi-node Slurm allocation (e.g., 4 nodes). One worker manages
22-
the allocation and places each single-node job onto one node via `srun --nodes=1`. Single-node jobs
23-
may share a node as long as CPU, memory, and GPU limits allow. With N nodes, Torc can spread work
24-
across the allocation for throughput.
21+
**How it works**: Torc requests a multi-node Slurm allocation (e.g., 4 nodes). The behavior depends
22+
on the execution mode:
2523

26-
**Example**: 100 independent analysis jobs, each needing 8 CPUs and 32 GB, across a 4-node
27-
allocation:
24+
- **Slurm mode** (default): A single worker manages the allocation and places each single-node job
25+
onto a node via `srun --nodes=1`. Slurm handles resource isolation and node placement.
26+
- **Direct mode**: Jobs are executed directly without `srun` wrapping. To distribute work across
27+
nodes, set `start_one_worker_per_node: true` on the `schedule_nodes` action. This launches one
28+
worker per node via `srun --ntasks-per-node=1`, and each worker executes jobs directly on its
29+
node.
30+
31+
Single-node jobs may share a node as long as CPU, memory, and GPU limits allow. With N nodes, Torc
32+
can spread work across the allocation for throughput.
33+
34+
**Example (Slurm mode)**: 100 independent analysis jobs, each needing 8 CPUs and 32 GB, across a
35+
4-node allocation:
2836

2937
```yaml
3038
name: parallel_analysis
@@ -58,9 +66,52 @@ actions:
5866
num_allocations: 1
5967
```
6068
69+
**Example (Direct mode)**: The same workload using direct execution with one worker per node:
70+
71+
```yaml
72+
name: parallel_analysis_direct
73+
description: Run 20 analysis tasks across 2 nodes via direct execution
74+
75+
execution_config:
76+
mode: direct
77+
78+
resource_requirements:
79+
- name: analysis
80+
num_cpus: 5
81+
num_nodes: 1
82+
memory: 2g
83+
runtime: PT3M
84+
85+
jobs:
86+
- name: analyze_{i}
87+
command: python analyze.py --chunk {i}
88+
resource_requirements: analysis
89+
scheduler: multi_node
90+
parameters:
91+
i: "1:20"
92+
93+
slurm_schedulers:
94+
- name: multi_node
95+
account: myproject
96+
nodes: 2
97+
walltime: "00:10:00"
98+
99+
actions:
100+
- trigger_type: on_workflow_start
101+
action_type: schedule_nodes
102+
scheduler: multi_node
103+
scheduler_type: slurm
104+
start_one_worker_per_node: true
105+
num_allocations: 1
106+
```
107+
61108
Each node has 8 CPUs and 32 GB available per job. If a node has 64 CPUs total, it can run up to 8
62109
jobs concurrently (64 / 8 = 8). Across 4 nodes, that means up to 32 jobs running at once.
63110
111+
> **Note:** `start_one_worker_per_node` is only supported with `execution_config.mode: direct`. It
112+
> is not compatible with slurm execution mode, where Torc uses a single worker with `srun`-based
113+
> node placement.
114+
64115
### Pattern 2: True Multi-Node Jobs (MPI, Distributed Training)
65116

66117
**Use when**: A single job needs to span multiple nodes — for example, MPI applications, distributed
@@ -136,8 +187,9 @@ underlying Slurm allocations. There are two approaches, each with trade-offs.
136187

137188
### One multi-node allocation
138189

139-
Request all nodes in a single `sbatch` job (e.g., `nodes: 4`). Torc runs one worker per node and
140-
distributes jobs across them.
190+
Request all nodes in a single `sbatch` job (e.g., `nodes: 4`). In slurm mode, a single worker
191+
distributes jobs across nodes via `srun`. In direct mode with `start_one_worker_per_node`, Torc runs
192+
one worker per node and each worker executes jobs locally.
141193

142194
**Advantages:**
143195

examples/kdl/workflow_actions_simple_slurm.kdl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ action {
7878
scheduler "process_scheduler"
7979
scheduler_type "slurm"
8080
num_allocations 2
81-
start_one_worker_per_node #true
8281
}
8382

8483
// Allocate resources for finalization stage
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Test: Multi-Node Direct Execution
2+
#
3+
# 2-node allocation with direct execution mode (no srun wrapping for jobs).
4+
# The head worker spawns one torc-slurm-job-runner per node via
5+
# srun --ntasks-per-node=1, and each per-node worker executes jobs directly.
6+
# Tests that single-node jobs are distributed across nodes in direct mode.
7+
8+
name: multi_node_direct
9+
description: 2-node allocation — 20 jobs x 5 CPUs via direct execution
10+
project: slurm-tests
11+
12+
execution_config:
13+
mode: direct
14+
15+
resource_monitor:
16+
enabled: true
17+
granularity: time_series
18+
sample_interval_seconds: 2
19+
20+
resource_requirements:
21+
- name: work_resources
22+
num_cpus: 5
23+
num_nodes: 1
24+
memory: 2g
25+
runtime: PT3M
26+
27+
jobs:
28+
- name: work_{i}
29+
command: bash -c 'echo "Running on $(hostname)"; stress-ng --cpu 5 --timeout 30 --metrics-brief'
30+
resource_requirements: work_resources
31+
scheduler: two_node_scheduler
32+
parameters:
33+
i: 1:20
34+
35+
slurm_schedulers:
36+
- name: two_node_scheduler
37+
account: PLACEHOLDER_ACCOUNT
38+
partition: PLACEHOLDER_PARTITION
39+
nodes: 2
40+
walltime: "00:10:00"
41+
42+
actions:
43+
- trigger_type: "on_workflow_start"
44+
action_type: "schedule_nodes"
45+
scheduler: "two_node_scheduler"
46+
scheduler_type: "slurm"
47+
num_allocations: 1
48+
start_one_worker_per_node: true

src/bin/torc-slurm-job-runner.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ mod unix_main {
235235
insecure: args.tls_insecure,
236236
};
237237
let mut config = Configuration::with_tls(tls);
238-
config.base_path = args.url;
238+
config.base_path = args.url.clone();
239239

240240
// Set up authentication if password is provided
241241
if let Some(ref password) = args.password {
@@ -292,12 +292,6 @@ mod unix_main {
292292
let scheduled_compute_node =
293293
get_scheduled_compute_node(&config, args.workflow_id, &slurm_interface);
294294

295-
if slurm_interface.is_head_node()
296-
&& let Some(ref node) = scheduled_compute_node
297-
{
298-
set_scheduled_compute_node_status(&config, node, "active");
299-
}
300-
301295
let scheduler_id = scheduled_compute_node.as_ref().map(|node| node.id);
302296
let scheduler_config_id = scheduled_compute_node
303297
.as_ref()
@@ -332,6 +326,12 @@ mod unix_main {
332326
args.wait_for_healthy_database_minutes,
333327
);
334328

329+
if slurm_interface.is_head_node()
330+
&& let Some(ref node) = scheduled_compute_node
331+
{
332+
set_scheduled_compute_node_status(&config, node, "active");
333+
}
334+
335335
let node_tracker = if num_nodes > 1 && !has_multi_node_jobs {
336336
match slurm_interface.list_active_nodes(&job_id) {
337337
Ok(node_names) => {

src/client/async_cli_command.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use crate::models::{JobModel, JobStatus, ResourceRequirementsModel, ResultModel,
3333
use chrono::{DateTime, Utc};
3434
use log::{self, debug, error, info, warn};
3535
use std::fs::File;
36-
use std::io::BufWriter;
3736
use std::path::Path;
3837
use std::process::{Child, Command, Stdio};
3938

@@ -160,8 +159,6 @@ pub struct AsyncCliCommand {
160159
return_code: Option<i64>,
161160
pub is_complete: bool,
162161
status: JobStatus,
163-
stdout_fp: Option<BufWriter<File>>,
164-
stderr_fp: Option<BufWriter<File>>,
165162
}
166163

167164
impl AsyncCliCommand {
@@ -185,8 +182,6 @@ impl AsyncCliCommand {
185182
return_code: None,
186183
is_complete: false,
187184
status,
188-
stdout_fp: None,
189-
stderr_fp: None,
190185
}
191186
}
192187

@@ -231,11 +226,6 @@ impl AsyncCliCommand {
231226
let stderr_path =
232227
get_job_stderr_path(output_dir, workflow_id, self.job_id, run_id, attempt_id);
233228

234-
let stdout_file = File::create(&stdout_path)?;
235-
let stderr_file = File::create(&stderr_path)?;
236-
self.stdout_fp = Some(BufWriter::new(stdout_file));
237-
self.stderr_fp = Some(BufWriter::new(stderr_file));
238-
239229
let command_str = if let Some(ref invocation_script) = self.job.invocation_script {
240230
format!("{} {}", invocation_script, self.job.command)
241231
} else {
@@ -680,8 +670,6 @@ impl AsyncCliCommand {
680670
(self.completion_time.unwrap() - self.start_time).num_milliseconds() as f64 / 1000.0;
681671
self.status = status;
682672
self.return_code = Some(return_code);
683-
self.stdout_fp = None;
684-
self.stderr_fp = None;
685673
self.handle = None;
686674

687675
// Collect Slurm accounting stats via sacct when running inside an allocation.

src/client/commands/slurm.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ use crate::client::hpc::hpc_interface::HpcInterface;
5757
use crate::client::utils;
5858
use crate::client::workflow_graph::WorkflowGraph;
5959
use crate::client::workflow_manager::WorkflowManager;
60-
use crate::client::workflow_spec::{ResourceRequirementsSpec, SlurmDefaultsSpec, WorkflowSpec};
60+
use crate::client::workflow_spec::{
61+
ExecutionConfig, ExecutionMode, ResourceRequirementsSpec, SlurmDefaultsSpec, WorkflowSpec,
62+
};
6163
use crate::config::TorcConfig;
6264
use crate::models;
6365
use tabled::Tabled;
@@ -389,6 +391,10 @@ EXAMPLES:
389391
/// Workflow ID
390392
#[arg()]
391393
workflow_id: Option<i64>,
394+
/// Start one worker per allocated node.
395+
/// Use this for direct-mode single-node jobs sharing a multi-node allocation.
396+
#[arg(long, default_value = "false")]
397+
start_one_worker_per_node: bool,
392398
/// Job prefix for the Slurm job names
393399
#[arg(short, long, default_value = "")]
394400
job_prefix: String,
@@ -1178,6 +1184,7 @@ pub fn handle_slurm_commands(config: &Configuration, command: &SlurmCommands, fo
11781184
}
11791185
SlurmCommands::ScheduleNodes {
11801186
workflow_id,
1187+
start_one_worker_per_node,
11811188
job_prefix,
11821189
keep_submission_scripts,
11831190
max_parallel_jobs,
@@ -1256,6 +1263,7 @@ pub fn handle_slurm_commands(config: &Configuration, command: &SlurmCommands, fo
12561263
wf_id,
12571264
sched_config_id,
12581265
*num_hpc_jobs,
1266+
*start_one_worker_per_node,
12591267
job_prefix,
12601268
output,
12611269
effective_poll_interval,
@@ -1433,6 +1441,7 @@ pub fn schedule_slurm_nodes(
14331441
workflow_id: i64,
14341442
scheduler_config_id: i64,
14351443
num_hpc_jobs: i32,
1444+
start_one_worker_per_node: bool,
14361445
job_prefix: &str,
14371446
output: &str,
14381447
poll_interval: i32,
@@ -1461,6 +1470,12 @@ pub fn schedule_slurm_nodes(
14611470
return Err(format!("Failed to get workflow: {}", e).into());
14621471
}
14631472
};
1473+
let execution_config = ExecutionConfig::from_workflow_model(&workflow);
1474+
if start_one_worker_per_node && execution_config.mode != ExecutionMode::Direct {
1475+
return Err(
1476+
"start_one_worker_per_node requires execution_config.mode to be 'direct'".into(),
1477+
);
1478+
}
14641479

14651480
let slurm_interface = match crate::client::hpc::slurm_interface::SlurmInterface::new() {
14661481
Ok(interface) => interface,
@@ -1539,6 +1554,7 @@ pub fn schedule_slurm_nodes(
15391554
max_parallel_jobs,
15401555
Path::new(&script_path),
15411556
&config_map,
1557+
start_one_worker_per_node,
15421558
tls_ca_cert,
15431559
tls_insecure,
15441560
) {
@@ -1633,7 +1649,13 @@ pub fn create_node_resources(
16331649
};
16341650

16351651
let num_gpus = interface.get_num_gpus() as i64;
1636-
let num_nodes = interface.get_num_nodes() as i64;
1652+
// When running as a subtask (one worker per node), each worker manages
1653+
// only its own node regardless of the total allocation size.
1654+
let num_nodes = if is_subtask {
1655+
1
1656+
} else {
1657+
interface.get_num_nodes() as i64
1658+
};
16371659

16381660
// Return per-node resource values. The job runner is responsible for
16391661
// multiplying by num_nodes to compute total allocation capacity.
@@ -3914,6 +3936,7 @@ fn handle_regenerate(
39143936
workflow_id,
39153937
scheduler_info.id,
39163938
scheduler_info.num_allocations as i32,
3939+
false, // start_one_worker_per_node
39173940
"",
39183941
output_dir.to_str().unwrap_or("torc_output"),
39193942
effective_poll_interval,

src/client/hpc/hpc_interface.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub trait HpcInterface: Send + Sync {
5454
/// * `max_parallel_jobs` - Optional maximum number of parallel jobs
5555
/// * `filename` - Path where the submission script should be written
5656
/// * `config` - Configuration parameters for the HPC scheduler
57+
/// * `start_one_worker_per_node` - Whether to launch one worker per node via srun
5758
/// * `tls_ca_cert` - Optional path to a PEM-encoded CA certificate
5859
/// * `tls_insecure` - Whether to skip certificate verification
5960
#[allow(clippy::too_many_arguments)]
@@ -67,6 +68,7 @@ pub trait HpcInterface: Send + Sync {
6768
max_parallel_jobs: Option<i32>,
6869
filename: &Path,
6970
config: &HashMap<String, String>,
71+
start_one_worker_per_node: bool,
7072
tls_ca_cert: Option<&str>,
7173
tls_insecure: bool,
7274
) -> Result<()>;

src/client/hpc/hpc_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ impl HpcManager {
133133
workflow_id: i64,
134134
poll_interval: i32,
135135
max_parallel_jobs: Option<i32>,
136+
start_one_worker_per_node: bool,
136137
keep_submission_script: bool,
137138
tls_ca_cert: Option<&str>,
138139
tls_insecure: bool,
@@ -148,6 +149,7 @@ impl HpcManager {
148149
max_parallel_jobs,
149150
&filename,
150151
&self.config,
152+
start_one_worker_per_node,
151153
tls_ca_cert,
152154
tls_insecure,
153155
)?;

0 commit comments

Comments
 (0)