Skip to content

Commit 30ad40a

Browse files
committed
gpus: add remote shape
Signed-off-by: vsoch <[email protected]>
1 parent 8b06e73 commit 30ad40a

File tree

6 files changed

+195
-66
lines changed

6 files changed

+195
-66
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
default:
2+
type: numa
3+
locality: gpu-remote

fluxbind/bind/bind.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ def set_flags(self, cmd):
195195
cmd += ["-o", f"gpu-affinity={self.gpu_affinity}"]
196196
if self.cores_per_task is not None:
197197
cmd += ["--cores-per-task", str(self.cores_per_task)]
198+
if self.gpus_per_task is not None:
199+
cmd += ["--gpus-per-task", str(self.gpus_per_task)]
198200
if self.tasks_per_core is not None:
199201
cmd += ["--tasks-per-core", str(self.tasks_per_core)]
200202
if self.taskmap is not None:

fluxbind/scripts/run_mapping.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ fi
2424
# Call the fluxbind helper script to get the target location string (e.g., "core:0" or "UNBOUND")
2525
# It ALWAYS returns a single line in the format: BIND_LOCATION,CUDA_DEVICE_ID
2626
# For CPU jobs, CUDA_DEVICE_ID will be the string "NONE".
27+
echo fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank"
2728
BIND_INFO=$(fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank")
2829

2930
# Exit if the helper script failed

fluxbind/shape/commands.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,63 @@ def run(self, command, shell: bool = False):
2929
class HwlocCalcCommand(Command):
3030
name = "hwloc-calc"
3131

32-
def execute(self, args_list: list) -> str:
32+
def count(self, hw_type: str, within: str = "machine:0") -> int:
3333
"""
34-
Executes hwloc-calc with a list of arguments.
35-
This is safer as it avoids shell interpretation of the arguments.
34+
Returns the total number of a specific hardware object.
35+
36+
Args:
37+
hw_type: The type of object to count (e.g., "core", "numa").
38+
within_object: Optional object to restrict the count to (e.g., "numa:0").
39+
"""
40+
try:
41+
args = ["--number-of", hw_type, within]
42+
result_stdout = self.run([self.name] + args)
43+
return int(result_stdout)
44+
except (RuntimeError, ValueError) as e:
45+
raise RuntimeError(f"Failed to count number of '{hw_type}': {e}")
46+
47+
def list_cpusets(self, hw_type: str, within: str = "machine:0") -> list[str]:
3648
"""
37-
if isinstance(args_list, str):
38-
args_list = shlex.split(args_list)
49+
Returns a list of cpuset strings for each object of a given type.
50+
51+
Args:
52+
hw_type: The type of object to list (e.g., "numa").
53+
within_object: Optional object to restrict the list to.
54+
"""
55+
try:
56+
# Get the indices of all objects of this type
57+
args_intersect = ["--intersect", hw_type, within]
58+
indices_str = self.run([self.name] + args_intersect)
59+
indices = indices_str.split(",")
60+
61+
# Cut out early
62+
if not indices or not indices[0]:
63+
return []
3964

40-
# A more robust validation could be added here if needed,
41-
command_list = [self.name] + args_list
42-
return self.run(command_list, shell=False)
65+
# For each index, get its specific cpuset
66+
return [self.run([self.name, f"{hw_type}:{i}"]) for i in indices]
67+
except (RuntimeError, ValueError) as e:
68+
raise RuntimeError(f"Failed to list cpusets for '{hw_type}': {e}")
69+
70+
def get_cpuset(self, location: str) -> str:
71+
"""
72+
Gets the cpuset for a single, specific location string (e.g., "pci=...", "core:0").
73+
"""
74+
return self.run([self.name, location])
75+
76+
def get_object_in_set(self, cpuset: str, obj_type: str, index: int) -> str:
77+
"""
78+
Gets the Nth object of a type within a given cpuset.
79+
e.g., find the 1st 'core' within cpuset '0x00ff'.
80+
"""
81+
# This uses the robust two-step process internally
82+
all_objects_str = self.run([self.name, cpuset, "--intersect", obj_type])
83+
available_indices = all_objects_str.split(",")
84+
try:
85+
target_index = available_indices[index]
86+
return f"{obj_type}:{target_index}"
87+
except IndexError:
88+
raise ValueError(f"Cannot find the {index}-th '{obj_type}' in cpuset {cpuset}.")
4389

4490

4591
class NvidiaSmiCommand(Command):

fluxbind/shape/gpu.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class GPUAssignment:
6+
"""
7+
Data structure to hold information about a rank's assigned GPU.
8+
"""
9+
10+
index: int # logical index of the GPU
11+
pci_id: str # The PCI bus ID of the GPU
12+
cuda_devices: str # CUDA_VISIBLE_DEVICES
13+
14+
@classmethod
15+
def for_rank(cls, local_rank, gpu_pci_ids):
16+
"""
17+
A factory method that assigns a GPU to a given local rank
18+
using a round-robin strategy.
19+
"""
20+
if not gpu_pci_ids:
21+
raise RuntimeError("Attempted to assign a GPU, but no GPUs were discovered.")
22+
23+
num_gpus = len(gpu_pci_ids)
24+
target_gpu_index = local_rank % num_gpus
25+
26+
# Return the assignment
27+
return cls(
28+
index=target_gpu_index,
29+
pci_id=gpu_pci_ids[target_gpu_index],
30+
cuda_devices=str(target_gpu_index),
31+
)

fluxbind/shape/shape.py

Lines changed: 104 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import re
21
import subprocess
32
import sys
43

54
import fluxbind.shape.commands as commands
5+
import fluxbind.shape.gpu as gpus
66
import fluxbind.utils as utils
77

88

@@ -21,35 +21,24 @@ def __init__(self, filepath, machine="machine:0"):
2121
self.data = self.load_file(filepath)
2222
# This discovers and cache hardware properties on init
2323
# The expectation is that this is running from the node (task)
24-
self.num_cores = self.discover_hardware("core")
25-
self.num_pus = self.discover_hardware("pu")
24+
self.num_cores = commands.hwloc_calc.count("core", within=self.machine)
25+
self.num_pus = commands.hwloc_calc.count("pu", within=self.machine)
26+
self.numa_node_cpusets = commands.hwloc_calc.list_cpusets("numa", within=self.machine)
2627
self.pus_per_core = self.num_pus // self.num_cores if self.num_cores > 0 else 0
2728
self.gpu_pci_ids = self.discover_gpus()
2829

2930
def discover_gpus(self) -> list:
30-
"""Discovers available GPU PCI bus IDs."""
31-
try:
32-
return commands.nvidia_smi.get_pci_bus_ids()
33-
except (RuntimeError, FileNotFoundError):
34-
print("Warning: Could not discover GPUs. GPU binding unavailable.", file=sys.stderr)
35-
return []
31+
"""
32+
Discovers available GPU PCI bus IDs.
33+
"""
34+
return commands.nvidia_smi.get_pci_bus_ids()
3635

3736
def load_file(self, filepath):
3837
"""
3938
Loads and parses the YAML shape file.
4039
"""
4140
return utils.read_yaml(filepath)
4241

43-
def discover_hardware(self, hw_type: str) -> int:
44-
"""
45-
Gets the number of a specific hardware object.
46-
"""
47-
try:
48-
result = commands.hwloc_calc.execute(["--number-of", hw_type, self.machine])
49-
return int(result)
50-
except (RuntimeError, ValueError) as e:
51-
raise RuntimeError(f"Failed to determine number of '{hw_type}': {e}")
52-
5342
@staticmethod
5443
def parse_range(range_str: str) -> set:
5544
"""
@@ -101,65 +90,119 @@ def find_matching_rule(self, rank: int, node_id: int) -> dict:
10190
return item["default"]
10291
return None
10392

104-
def get_gpu_local_binding(self, rule: dict, local_rank: int) -> str:
93+
def get_gpu_remote_binding(self, rule: dict, local_rank: int) -> str:
10594
"""
106-
Calculates binding for a rank based on its proximity to an assigned GPU.
107-
Supports an optional prefer key for user-preferred object selection.
95+
Calculate a binding that is deliberately remote from an assigned GPU.
10896
"""
109-
if not self.gpu_pci_ids:
110-
raise RuntimeError(
111-
"Shape specifies 'locality: gpu-local', but no GPUs were discovered."
112-
)
97+
if not self.gpu_pci_ids or not self.numa_node_cpusets:
98+
raise RuntimeError("GPU/NUMA discovery failed, cannot perform remote binding.")
99+
if len(self.numa_node_cpusets) < 2:
100+
raise RuntimeError("'locality: gpu-remote' requires a multi-NUMA system.")
113101

114102
num_gpus = len(self.gpu_pci_ids)
115-
hwloc_type = rule.get("type")
116-
if not hwloc_type:
117-
raise ValueError("Rule with 'locality: gpu-local' must also specify a 'type'.")
118103

119-
# 1. Assign a GPU to this rank (round-robin)
104+
# This assumes round robin assignment
120105
target_gpu_index = local_rank % num_gpus
121106
cuda_devices = str(target_gpu_index)
122107
target_gpu_pci_id = self.gpu_pci_ids[target_gpu_index]
123108

124-
# 2. Get the cpuset for the GPU's locality domain
125-
gpu_locality_cpuset = commands.hwloc_calc.execute([f"pci={target_gpu_pci_id}"])
126-
cpu_binding_string = ""
109+
# Figure out remote NUMA that aren't in local set
110+
local_cpuset = commands.hwloc_calc.get_cpuset(f"pci={target_gpu_pci_id}")
111+
remote_numa_cpusets = [cs for cs in self.numa_node_cpusets if cs != local_cpuset]
112+
if not remote_numa_cpusets:
113+
raise RuntimeError(f"Could not find a NUMA node remote from GPU {target_gpu_index}.")
127114

128-
# 3. Determine the final CPU binding
129-
if hwloc_type in ["numa", "package", "l3cache"]:
130-
cpu_binding_string = gpu_locality_cpuset
115+
# if the user asks for an offset. Otherwise, just take first
116+
offset = rule.get("offset", 0)
117+
if offset >= len(remote_numa_cpusets):
118+
raise ValueError(f"Offset {offset} is out of range.")
119+
target_remote_cpuset = remote_numa_cpusets[offset]
120+
return self.get_binding_within_domain(rule, local_rank, target_remote_cpuset, cuda_devices)
131121

132-
elif hwloc_type in ["core", "pu", "l2cache"]:
133-
all_objects_in_domain_str = commands.hwloc_calc.execute(
134-
[gpu_locality_cpuset, "--intersect", hwloc_type]
122+
def get_gpu_local_binding(self, rule: dict, local_rank: int) -> str:
123+
"""
124+
Calculate binding for a rank based on its proximity to an assigned GPU.
125+
"""
126+
# Get the assignment.
127+
assignment = gpus.GPUAssignment.for_rank(local_rank, self.gpu_pci_ids)
128+
129+
# Find the LOCAL cpuset for this assigned GPU
130+
local_cpuset = commands.hwloc_calc.get_cpuset(f"pci={assignment.pci_id}")
131+
132+
# This is shared logic for binding within a domain
133+
return self.get_binding_within_domain(
134+
rule, local_rank, local_cpuset, assignment.cuda_devices
135+
)
136+
137+
def get_gpu_remote_binding(self, rule: dict, local_rank: int) -> str:
138+
"""
139+
Calculate a binding that is deliberately remote from an assigned GPU.
140+
"""
141+
if len(self.numa_node_cpusets) < 2:
142+
raise RuntimeError("'locality: gpu-remote' requires a multi-NUMA system.")
143+
144+
assignment = gpus.GPUAssignment.for_rank(local_rank, self.gpu_pci_ids)
145+
local_cpuset = commands.hwloc_calc.get_cpuset(f"pci={assignment.pci_id}")
146+
147+
# Find all REMOTE NUMA domains
148+
remote_numa_cpusets = [cs for cs in self.numa_node_cpusets if cs != local_cpuset]
149+
if not remote_numa_cpusets:
150+
raise RuntimeError(
151+
f"Could not find a NUMA node remote from the one for GPU {assignment.index}."
135152
)
136-
available_indices = all_objects_in_domain_str.split(",")
137-
target_object_index = None
138153

139-
# Is the user asking to prefer a specific identifier?
140-
if "prefer" in rule:
141-
requested_index = str(rule["prefer"])
142-
if requested_index in available_indices:
143-
target_object_index = requested_index
154+
# Allow for an offset, default to 0 (the first in list, no offset)
155+
offset = rule.get("offset", 0)
156+
if offset >= len(remote_numa_cpusets):
157+
raise ValueError(f"Offset {offset} is out of range.")
158+
target_remote_cpuset = remote_numa_cpusets[offset]
159+
160+
# 5. Delegate to the common logic, passing the REMOTE cpuset
161+
return self.get_binding_within_domain(
162+
rule, local_rank, target_remote_cpuset, assignment.cuda_devices
163+
)
164+
165+
def get_binding_within_domain(self, rule, local_rank, domain_cpuset, cuda_devices):
166+
"""
167+
Helper to calculate a CPU binding within a given cpuset domain.
168+
"""
169+
hwloc_type = rule.get("type")
170+
num_gpus = len(self.gpu_pci_ids)
144171

145-
if target_object_index is None:
146-
rank_index_in_gpu_group = local_rank // num_gpus
172+
if hwloc_type in ["numa", "package", "l3cache"]:
173+
return f"{domain_cpuset},{cuda_devices}"
174+
175+
elif hwloc_type in ["core", "pu", "l2cache"]:
176+
target_object_index_str = None
177+
if "prefer" in rule:
147178
try:
148-
target_object_index = available_indices[rank_index_in_gpu_group]
149-
except IndexError:
179+
requested_index = int(rule["prefer"])
180+
except (ValueError, TypeError):
150181
raise ValueError(
151-
f"Cannot find the {rank_index_in_gpu_group}-th '{hwloc_type}' for local_rank {local_rank} "
152-
f"within the GPU's locality. Only {len(available_indices)} are available."
182+
f"The 'prefer' key must be a simple integer, but got: {rule['prefer']}"
153183
)
154184

155-
cpu_binding_string = f"{hwloc_type}:{target_object_index}"
156-
else:
157-
raise ValueError(f"Unsupported type '{hwloc_type}' for 'locality: gpu-local' binding.")
185+
# Attempt to get the preferred object. If it fails, we know it's not valid.
186+
try:
187+
binding_obj = commands.hwloc_calc.get_object_in_set(
188+
domain_cpuset, hwloc_type, requested_index
189+
)
190+
target_object_index_str = binding_obj
191+
except Exception:
192+
print(
193+
f"Warning: Preferred index '{requested_index}' is not available in domain {domain_cpuset}. Falling back."
194+
)
158195

159-
if not cpu_binding_string:
160-
raise RuntimeError("Failed to calculate a valid cpu_binding_string.")
196+
# If no preference was given, or the preference was invalid, fall back to default.
197+
if target_object_index_str is None:
198+
rank_index_in_group = local_rank // num_gpus
199+
target_object_index_str = commands.hwloc_calc.get_object_in_set(
200+
domain_cpuset, hwloc_type, rank_index_in_group
201+
)
161202

162-
return f"{cpu_binding_string},{cuda_devices}"
203+
return f"{target_object_index_str},{cuda_devices}"
204+
else:
205+
raise ValueError(f"Unsupported type '{hwloc_type}' for GPU locality binding.")
163206

164207
def get_binding_for_rank(self, rank: int, node_id: int, local_rank: int) -> str:
165208
"""
@@ -186,6 +229,9 @@ def get_binding_for_rank(self, rank: int, node_id: int, local_rank: int) -> str:
186229
# Are we doing something with GPU?
187230
if rule.get("bind") == "gpu-local":
188231
return self.get_gpu_local_binding(rule, local_rank)
232+
if rule.get("bind") == "gpu-remote":
233+
return self.get_gpu_remote_binding(rule, local_rank)
234+
189235
cpu_binding_string = self.get_cpu_binding(hwloc_type, rule, local_rank)
190236

191237
# Return the CPU binding and the "NONE" sentinel for the device.

0 commit comments

Comments
 (0)