Skip to content

Commit ce2ba0d

Browse files
committed
test: adding configs for gpu-local
Signed-off-by: vsoch <[email protected]>
1 parent 28d8e05 commit ce2ba0d

File tree

4 files changed

+110
-75
lines changed

4 files changed

+110
-75
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
default:
2+
type: core
3+
on: gpu-local
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
default:
2+
type: numa
3+
on: gpu-local

fluxbind/shape/commands.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# In file: fluxbind/commands.py
2+
import re
3+
import subprocess
4+
import sys
5+
6+
7+
class Command:
8+
"""
9+
Abstract base class for a controlled command.
10+
"""
11+
12+
def run(self, command, shell: bool = False):
13+
"""
14+
Private helper to run a subprocess command.
15+
"""
16+
try:
17+
result = subprocess.run(
18+
command, shell=shell, capture_output=True, text=True, check=True
19+
)
20+
return result.stdout.strip()
21+
except subprocess.CalledProcessError as e:
22+
cmd_str = command if shell else " ".join(command)
23+
print(f"Error running '{cmd_str}': {e.stderr}", file=sys.stderr)
24+
raise RuntimeError("Command execution failed.") from e
25+
except FileNotFoundError as e:
26+
cmd_str = command[0] if isinstance(command, list) else command.split()[0]
27+
raise RuntimeError(f"Command not found: '{cmd_str}'") from e
28+
29+
30+
class HwlocCalcCommand(Command):
31+
name = "hwloc-calc"
32+
33+
def execute(self, args_list: list) -> str:
34+
"""
35+
Executes hwloc-calc with a list of arguments.
36+
This is safer as it avoids shell interpretation of the arguments.
37+
"""
38+
# A more robust validation could be added here if needed,
39+
command_list = [self.name] + args_list
40+
return self._run(command_list, shell=False)
41+
42+
43+
class NvidiaSmiCommand(Command):
44+
name = "nvidia-smi"
45+
46+
def get_pci_bus_ids(self) -> list[str]:
47+
"""
48+
Specifically queries for and returns a list of GPU PCI bus IDs.
49+
The command is hardcoded for security.
50+
"""
51+
command_str = f"{self.name} --query-gpu=pci.bus_id --format=csv,noheader"
52+
53+
# shell=True is safe here because the entire command is static and defined internally.
54+
output = self._run(command_str, shell=True)
55+
56+
# Parse the output into a clean list
57+
ids = output.strip().split("\n")
58+
return [bus_id for bus_id in ids if bus_id]
59+
60+
61+
hwloc_calc = HwlocCalcCommand()
62+
nvidia_smi = NvidiaSmiCommand()

fluxbind/shape/shape.py

Lines changed: 42 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import subprocess
33
import sys
44

5+
import fluxbind.shape.commands as commands
56
import fluxbind.utils as utils
67

78

@@ -23,18 +24,14 @@ def __init__(self, filepath, machine="machine:0"):
2324
self.num_cores = self.discover_hardware("core")
2425
self.num_pus = self.discover_hardware("pu")
2526
self.pus_per_core = self.num_pus // self.num_cores if self.num_cores > 0 else 0
26-
self.gpu_objects = self.discover_gpus()
27+
self.gpu_pci_ids = self.discover_gpus()
2728

2829
def discover_gpus(self) -> list:
29-
"""
30-
Discovers available GPU objects using hwloc.
31-
"""
30+
"""Discovers available GPU PCI bus IDs."""
3231
try:
33-
cmd = f"hwloc-calc --whole-system {self.machine} -I os --filter-by-type-name CUDA"
34-
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
35-
return result.stdout.strip().split()
36-
except subprocess.CalledProcessError:
37-
print("Warning: Could not discover any CUDA GPUs.", file=sys.stderr)
32+
return commands.nvidia_smi.get_pci_bus_ids()
33+
except (RuntimeError, FileNotFoundError):
34+
print("Warning: Could not discover GPUs. GPU binding unavailable.", file=sys.stderr)
3835
return []
3936

4037
def load_file(self, filepath):
@@ -45,14 +42,12 @@ def load_file(self, filepath):
4542

4643
def discover_hardware(self, hw_type: str) -> int:
4744
"""
48-
Runs hwloc-calc to get the number of a specific hardware object.
45+
Gets the number of a specific hardware object.
4946
"""
5047
try:
51-
command = f"hwloc-calc --number-of {hw_type} {self.machine}"
52-
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
53-
return int(result.stdout.strip())
54-
except (subprocess.CalledProcessError, ValueError) as e:
55-
# Better to raise an error than to silently return a default
48+
result = commands.hwloc_calc.execute(["--number-of", hw_type, self.machine])
49+
return int(result)
50+
except (RuntimeError, ValueError) as e:
5651
raise RuntimeError(f"Failed to determine number of '{hw_type}': {e}")
5752

5853
@staticmethod
@@ -76,23 +71,10 @@ def evaluate_formula(formula_template: str, local_rank: int) -> int:
7671
7772
This assumes running on the rank where the binding is asked for.
7873
"""
79-
processed_formula = str(formula_template)
80-
substitutions = re.findall(r"\{\{([^}]+)\}\}", processed_formula)
81-
for command_to_run in set(substitutions):
82-
try:
83-
result = subprocess.run(
84-
command_to_run, shell=True, capture_output=True, text=True, check=True
85-
)
86-
placeholder = f"{{{{{command_to_run}}}}}"
87-
processed_formula = processed_formula.replace(placeholder, result.stdout.strip())
88-
except subprocess.CalledProcessError as e:
89-
raise RuntimeError(f"Error executing sub-command '{command_to_run}': {e}")
90-
91-
# Substitute local_rank and evaluate final expression
92-
final_expression = processed_formula.replace("$local_rank", str(local_rank))
93-
command = f'echo "{final_expression}"'
74+
formula = str(formula_template).replace("$local_rank", str(local_rank))
75+
command = f'echo "{formula}"'
9476
result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
95-
return result.stdout.strip()
77+
return int(result.stdout.strip())
9678

9779
def find_matching_rule(self, rank: int, node_id: int) -> dict:
9880
"""
@@ -119,50 +101,36 @@ def find_matching_rule(self, rank: int, node_id: int) -> dict:
119101
return item["default"]
120102
return None
121103

122-
def get_gpu_binding_for_rank(self, on_domain, hwloc_type, local_rank):
104+
def get_gpu_local_binding(self, rule: dict, local_rank: int) -> str:
123105
"""
124-
Get a GPU binding for a rank. Local means a numa node close by, remote not.
106+
Calculates binding for a rank based on its proximity to an assigned GPU.
125107
"""
126-
if not self.gpu_objects:
127-
raise RuntimeError("Shape is GPU-aware, but no GPUs were discovered.")
128-
if local_rank >= len(self.gpu_objects):
129-
raise IndexError(
130-
f"local_rank {local_rank} is out of range for {len(self.gpu_objects)} GPUs."
108+
if not self.gpu_pci_ids:
109+
raise RuntimeError("Shape specifies 'on: gpu-local', but no GPUs were discovered.")
110+
111+
num_gpus = len(self.gpu_pci_ids)
112+
hwloc_type = rule.get("type")
113+
if not hwloc_type:
114+
raise ValueError(
115+
"Rule with 'on: gpu-local' must also specify a 'type' (e.g., core, numa)."
131116
)
132117

133-
my_gpu_object = self.gpu_objects[local_rank]
134-
135-
pci_bus_id_cmd = f"hwloc-pci-lookup {my_gpu_object}"
136-
cuda_devices = subprocess.run(
137-
pci_bus_id_cmd, shell=True, capture_output=True, text=True, check=True
138-
).stdout.strip()
139-
140-
local_numa_cmd = f"hwloc-calc {my_gpu_object} --ancestor numa -I"
141-
local_numa_id = int(
142-
subprocess.run(
143-
local_numa_cmd, shell=True, capture_output=True, text=True, check=True
144-
).stdout.strip()
145-
)
146-
147-
target_numa_location = ""
148-
if on_domain == "gpu-local":
149-
target_numa_location = f"numa:{local_numa_id}"
150-
else: # gpu-remote
151-
remote_numa_id = (local_numa_id + 1) % self.num_numa
152-
target_numa_location = f"numa:{remote_numa_id}"
153-
154-
# If the requested type is just 'numa', we're done.
155-
if hwloc_type == "numa":
156-
return f"{target_numa_location},{cuda_devices}"
157-
158-
# Otherwise, find the first object of the requested type WITHIN that NUMA domain.
159-
# This is a powerful composition of the two concepts.
160-
# E.g., find the first 'core' on the 'gpu-local' NUMA domain.
161-
cmd = f"hwloc-calc {target_numa_location} --intersect {hwloc_type} --first"
162-
binding_string = subprocess.run(
163-
cmd, shell=True, capture_output=True, text=True, check=True
164-
).stdout.strip()
165-
return f"{binding_string},{cuda_devices}"
118+
target_gpu_index = local_rank % num_gpus
119+
cuda_devices = str(target_gpu_index)
120+
target_gpu_pci_id = self.gpu_pci_ids[target_gpu_index]
121+
gpu_locality_cpuset = commands.hwloc_calc.execute([f"pci={target_gpu_pci_id}"])
122+
123+
if hwloc_type in ["numa", "package", "l3cache"]:
124+
cpu_binding_string = gpu_locality_cpuset
125+
elif hwloc_type in ["core", "pu", "l2cache"]:
126+
rank_index_in_gpu_group = local_rank // num_gpus
127+
cpu_binding_string = commands.HwlocCalcCommand().execute(
128+
f'"{gpu_locality_cpuset}" -I {hwloc_type}:{rank_index_in_gpu_group}'
129+
)
130+
else:
131+
raise ValueError(f"Unsupported type '{hwloc_type}' for 'on: gpu-local' binding.")
132+
133+
return f"{cpu_binding_string},{cuda_devices}"
166134

167135
def get_binding_for_rank(self, rank: int, node_id: int, local_rank: int) -> str:
168136
"""
@@ -186,10 +154,9 @@ def get_binding_for_rank(self, rank: int, node_id: int, local_rank: int) -> str:
186154
if hwloc_type is None:
187155
raise ValueError(f"Matching rule has no 'type' defined: {rule}")
188156

189-
on_domain = rule.get("on")
190-
if on_domain in ["gpu-local", "gpu-remote"]:
191-
return self.get_gpu_binding_for_rank(on_domain, hwloc_type, local_rank)
192-
157+
# Are we doing something with GPU?
158+
if rule.get("on") == "gpu-local":
159+
return self.get_gpu_local_binding(rule, local_rank)
193160
cpu_binding_string = self.get_cpu_binding(hwloc_type, rule, local_rank)
194161

195162
# Return the CPU binding and the "NONE" sentinel for the device.

0 commit comments

Comments
 (0)