Skip to content

Commit 7342925

Browse files
committed
feat: add amd gpu support (rocm-smi)
Signed-off-by: vsoch <[email protected]>
1 parent ef0f985 commit 7342925

File tree

4 files changed

+162
-58
lines changed

4 files changed

+162
-58
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
default:
22
type: numa
3-
locality: gpu-remote
3+
bind: gpu-remote

fluxbind/scripts/run_mapping.sh

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ gpus_per_task=${GPUS_PER_TASK:-1}
2828
# For CPU jobs, CUDA_DEVICE_ID will be the string "NONE".
2929
echo fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank" --gpus-per-task "$gpus_per_task"
3030
BIND_INFO=$(fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank" --gpus-per-task "$gpus_per_task")
31-
echo $BIND_INFO
31+
echo
3232

3333
# Exit if the helper script failed
3434
if [ $? -ne 0 ]; then
@@ -40,9 +40,6 @@ fi
4040
BIND_LOCATION="${BIND_INFO%;*}"
4141
CUDA_DEVICES="${BIND_INFO#*;}"
4242

43-
echo "BIND_LOCATION: ${BIND_LOCATION}"
44-
echo "CUDA_DEVICES: ${CUDA_DEVICES}"
45-
4643
if [[ "$CUDA_DEVICES" != "NONE" ]]; then
4744
export CUDA_VISIBLE_DEVICES=$CUDA_DEVICES
4845
fi
@@ -56,9 +53,9 @@ if [[ "${BIND_LOCATION}" == "UNBOUND" ]]; then
5653
else
5754
# For a bound task, calculate the mask and lists from the target location string.
5855
binding_source=${BIND_LOCATION}
59-
cpuset_mask=$(hwloc-calc "${BIND_LOCATION}")
60-
logical_cpu_list=$(hwloc-calc "${BIND_LOCATION}" --intersect PU 2>/dev/null)
61-
physical_core_list=$(hwloc-calc "${BIND_LOCATION}" --intersect core 2>/dev/null)
56+
cpuset_mask=$(hwloc-calc ${BIND_LOCATION})
57+
logical_cpu_list=$(hwloc-calc ${BIND_LOCATION} --intersect PU 2>/dev/null)
58+
physical_core_list=$(hwloc-calc ${BIND_LOCATION} --intersect core 2>/dev/null)
6259
fi
6360

6461
if [[ "$FLUXBIND_NOCOLOR" != "1" ]]
@@ -110,5 +107,5 @@ if [[ "${BIND_LOCATION}" == "UNBOUND" ]]; then
110107
# Execute the command directly without changing affinity.
111108
exec "$@"
112109
else
113-
exec hwloc-bind "${BIND_LOCATION}" -- "$@"
110+
exec hwloc-bind ${BIND_LOCATION} -- "$@"
114111
fi

fluxbind/shape/commands.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
import json
12
import subprocess
23
import sys
3-
from itertools import zip_longest
44

55

66
class Command:
@@ -19,7 +19,6 @@ def run(self, command, shell: bool = False):
1919
return result.stdout.strip()
2020
except subprocess.CalledProcessError as e:
2121
cmd_str = command if shell else " ".join(command)
22-
print(f"Error running '{cmd_str}': {e.stderr}", file=sys.stderr)
2322
raise RuntimeError("Command execution failed.") from e
2423
except FileNotFoundError as e:
2524
cmd_str = command[0] if isinstance(command, list) else command.split()[0]
@@ -29,15 +28,15 @@ def run(self, command, shell: bool = False):
2928
class HwlocCalcCommand(Command):
3029
name = "hwloc-calc"
3130

32-
def _parse_cpuset_to_list(self, cpuset_str: str) -> list[int]:
31+
def parse_cpuset_to_list(self, cpuset_str: str) -> list[int]:
3332
"""
3433
Convert a potentially comma-separated hex string into a list of integers.
3534
"""
3635
if not cpuset_str or cpuset_str.lower() in ["0x0", "0"]:
3736
return [0]
3837
return [int(chunk, 16) for chunk in cpuset_str.strip().split(",")]
3938

40-
def _operate_on_lists(self, list_a: list[int], list_b: list[int], operator: str) -> list[int]:
39+
def operate_on_lists(self, list_a: list[int], list_b: list[int], operator: str) -> list[int]:
4140
"""
4241
Perform a bitwise operation on two lists of cpuset integers.
4342
"""
@@ -112,8 +111,8 @@ def union_of_locations(self, locations: list[str]) -> str:
112111

113112
for loc in locations:
114113
loc_cpuset_str = self.get_cpuset(loc)
115-
loc_cpuset_list = self._parse_cpuset_to_list(loc_cpuset_str)
116-
union_mask_list = self._union_of_lists(union_mask_list, loc_cpuset_list)
114+
loc_cpuset_list = self.parse_cpuset_to_list(loc_cpuset_str)
115+
union_mask_list = self.operate_on_lists(union_mask_list, loc_cpuset_list, "+")
117116
return " ".join([hex(chunk) for chunk in union_mask_list])
118117

119118

@@ -135,5 +134,28 @@ def get_pci_bus_ids(self) -> list[str]:
135134
return [bus_id for bus_id in ids if bus_id]
136135

137136

137+
class RocmSmiCommand(Command):
138+
name = "rocm-smi"
139+
140+
def get_pci_bus_ids(self) -> list[str]:
141+
"""
142+
Specifically queries for and returns a list of GPU PCI bus IDs.
143+
"""
144+
# The '--showbus' and '--json' flags provide a reliable, machine-readable output.
145+
command_str = f"{self.name} --showbus --json"
146+
147+
# {"card0": {"PCI Bus": "0000:03:00.0"}, "card1": ..., "card7": {"PCI Bus": "0000:E3:00.0"}}
148+
output = self.run(command_str, shell=True)
149+
data = json.loads(output)
150+
151+
pci_ids = []
152+
# I'm choosing not to sort so the devices are read in the order provided.
153+
for card_key in data.keys():
154+
card_info = data[card_key]
155+
pci_ids.append(card_info.get("PCI Bus"))
156+
return pci_ids
157+
158+
138159
hwloc_calc = HwlocCalcCommand()
139160
nvidia_smi = NvidiaSmiCommand()
161+
rocm_smi = RocmSmiCommand()

fluxbind/shape/shape.py

Lines changed: 128 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,53 @@ def __init__(self, filepath, machine="machine:0"):
2626
self.num_pus = commands.hwloc_calc.count("pu", within=self.machine)
2727
self.numa_node_cpusets = commands.hwloc_calc.list_cpusets("numa", within=self.machine)
2828
self.pus_per_core = self.num_pus // self.num_cores if self.num_cores > 0 else 0
29-
self.gpu_pci_ids = self.discover_gpus()
29+
# For GPU topology, we care about NUMA nodes.
30+
self.gpus_by_numa = self.discover_gpus()
3031

31-
def discover_gpus(self) -> list:
32+
def discover_gpus(self):
3233
"""
3334
Discovers available GPU PCI bus IDs.
3435
"""
35-
return commands.nvidia_smi.get_pci_bus_ids()
36+
all_pci_ids = []
37+
38+
# Try for nvidia and then rocm
39+
for command in [commands.nvidia_smi.get_pci_bus_ids, commands.rocm_smi.get_pci_bus_ids]:
40+
try:
41+
# This is pci addresses ACROSS numa nodes
42+
all_pci_ids = command()
43+
except Exception:
44+
pass
45+
46+
gpus_by_numa = {}
47+
48+
# For each GPU, find out which NUMA node it belongs to
49+
for pci_id in all_pci_ids:
50+
# Ask hwloc for the cpuset where the pci lives.
51+
# I'm not sure if this will work for nvidia if doens't show in lstopo
52+
gpu_cpuset = commands.hwloc_calc.get_cpuset(f"pci={pci_id}")
53+
found_numa = False
54+
for i, numa_cpuset in enumerate(self.numa_node_cpusets):
55+
# Check if the GPU's cpuset is a subset of this NUMA node's cpuset
56+
intersection = commands.hwloc_calc.get_cpuset(f"'{gpu_cpuset}' x '{numa_cpuset}'")
57+
58+
if intersection == gpu_cpuset:
59+
if i not in gpus_by_numa:
60+
gpus_by_numa[i] = []
61+
gpus_by_numa[i].append(pci_id)
62+
found_numa = True
63+
break
64+
65+
# Raise an error - I want to know about this case.
66+
if not found_numa:
67+
raise ValueError(f"Warning: Could not determine NUMA locality for GPU {pci_id}")
68+
69+
# Make an ordered set just for easy list access
70+
self.ordered_gpus = []
71+
for numa_idx in sorted(gpus_by_numa.keys()):
72+
for pci_id in gpus_by_numa[numa_idx]:
73+
self.ordered_gpus.append({"pci_id": pci_id, "numa_index": numa_idx})
74+
75+
return gpus_by_numa
3676

3777
def load_file(self, filepath):
3878
"""
@@ -92,61 +132,106 @@ def find_matching_rule(self, rank: int, node_id: int) -> dict:
92132
return None
93133

94134
def get_gpu_local_binding(self, rule: dict, local_rank: int, gpus_per_task: int) -> str:
95-
assignment = gpus.GPUAssignment.for_rank(local_rank, gpus_per_task, self.gpu_pci_ids)
96-
pci_locations = [f"pci={pci_id}" for pci_id in assignment.pci_ids]
97-
domain_cpuset = commands.hwloc_calc.union_of_locations(pci_locations)
98-
cpu_binding_string = self.get_gpu_cpu_binding(
99-
rule, local_rank, gpus_per_task, domain_cpuset
100-
)
101-
return f"{cpu_binding_string};{assignment.cuda_devices}"
135+
"""
136+
Calculate a 'gpu-local' binding using the topology-aware ordered GPU list.
137+
"""
138+
if not self.ordered_gpus:
139+
raise RuntimeError("Shape specifies 'bind: gpu-local', but no GPUs were discovered.")
140+
141+
# Assign a slice of GPUs from the canonical, ordered list.
142+
start_idx = local_rank * gpus_per_task
143+
end_idx = start_idx + gpus_per_task
144+
if end_idx > len(self.ordered_gpus):
145+
raise ValueError(f"Not enough total GPUs to satisfy request for rank {local_rank}.")
146+
147+
assigned_gpu_slice = self.ordered_gpus[start_idx:end_idx]
148+
cuda_devices = ",".join([str(start_idx + i) for i, _ in enumerate(assigned_gpu_slice)])
149+
150+
# The CPU domain is the union of NUMA nodes for the assigned GPUs.
151+
local_numa_indices = sorted(list({gpu["numa_index"] for gpu in assigned_gpu_slice}))
152+
domain_locations = [f"numa:{i}" for i in local_numa_indices]
153+
domain = " ".join(domain_locations) # e.g., "numa:0" or "numa:0 numa:1"
154+
155+
# Get the final CPU binding WITHIN that domain.
156+
cpu_binding_string = self.get_binding_in_gpu_domain(rule, local_rank, gpus_per_task, domain)
157+
return f"{cpu_binding_string};{cuda_devices}"
102158

103159
def get_gpu_remote_binding(self, rule: dict, local_rank: int, gpus_per_task: int) -> str:
160+
"""
161+
Calculates a 'gpu-remote' binding using the topology-aware ordered GPU list.
162+
"""
104163
if len(self.numa_node_cpusets) < 2:
105164
raise RuntimeError("'bind: gpu-remote' is invalid on a single-NUMA system.")
106-
assignment = gpus.GPUAssignment.for_rank(local_rank, gpus_per_task, self.gpu_pci_ids)
107-
primary_gpu_pci_id = assignment.pci_ids[0]
108-
primary_gpu_numa_cpuset = commands.hwloc_calc.get_cpuset(f"numaof:pci={primary_gpu_pci_id}")
109-
remote_numa_cpusets = [cs for cs in self.numa_node_cpusets if cs != primary_gpu_numa_cpuset]
110-
if not remote_numa_cpusets:
165+
if not self.ordered_gpus:
166+
raise RuntimeError("Shape specifies 'bind: gpu-remote', but no GPUs were discovered.")
167+
168+
# Assign a slice of GPUs to determine the local NUMA domains.
169+
start_idx = local_rank * gpus_per_task
170+
end_idx = start_idx + gpus_per_task
171+
if end_idx > len(self.ordered_gpus):
172+
raise ValueError(f"Not enough total GPUs to satisfy request for rank {local_rank}.")
173+
174+
assigned_gpu_slice = self.ordered_gpus[start_idx:end_idx]
175+
cuda_devices = ",".join([str(start_idx + i) for i, _ in enumerate(assigned_gpu_slice)])
176+
177+
# Find the set of all local NUMA domains for this rank's GPUs.
178+
local_numa_indices = {gpu["numa_index"] for gpu in assigned_gpu_slice}
179+
180+
# Find all remote NUMA domains.
181+
all_numa_indices = set(range(len(self.numa_node_cpusets)))
182+
remote_numa_indices = sorted(list(all_numa_indices - local_numa_indices))
183+
184+
if not remote_numa_indices:
111185
raise RuntimeError(
112-
f"Could not find a NUMA node remote from GPU {assignment.indices[0]}."
186+
f"Cannot find a remote NUMA node for rank {local_rank}; its GPUs span all NUMA domains."
113187
)
188+
189+
# 4. Select the target remote domain.
114190
offset = rule.get("offset", 0)
115-
domain = remote_numa_cpusets[offset]
116-
cpu_binding_string = self.get_gpu_cpu_binding(rule, local_rank, gpus_per_task, domain)
117-
return f"{cpu_binding_string};{assignment.cuda_devices}"
191+
if offset >= len(remote_numa_indices):
192+
raise ValueError(f"Offset {offset} is out of range for remote NUMA domains.")
193+
194+
target_remote_numa_idx = remote_numa_indices[offset]
195+
domain = f"numa:{target_remote_numa_idx}"
118196

119-
def get_gpu_cpu_binding(
197+
# Get the final CPU binding WITHIN that remote domain.
198+
cpu_binding_string = self.get_binding_in_gpu_domain(rule, local_rank, gpus_per_task, domain)
199+
return f"{cpu_binding_string};{cuda_devices}"
200+
201+
def get_binding_in_gpu_domain(
120202
self, rule: dict, local_rank: int, gpus_per_task: int, domain: str
121203
) -> str:
204+
"""
205+
A dedicated binding engine for GPU jobs. It applies user preferences within a calculated domain
206+
(e.g., "numa:0" or "numa:0 numa:1").
207+
"""
122208
hwloc_type = rule.get("type")
123209
if not hwloc_type:
124210
raise ValueError("Rule with GPU binding must have a 'type'.")
125211

126212
if hwloc_type in ["numa", "package", "l3cache", "machine"]:
127-
# The user wants to bind to the entire domain, try to get location for it
128-
return commands.hwloc_calc.get_cpuset(f"'{domain}'")
129-
130-
elif hwloc_type in ["core", "pu", "l2cache"]:
131-
# This logic returns a simple name like "core:5", which is correct.
132-
if "prefer" in rule:
133-
try:
134-
requested_index = int(rule["prefer"])
135-
return commands.hwloc_calc.get_object_in_set(
136-
domain, hwloc_type, requested_index
137-
)
138-
except (ValueError, RuntimeError, TypeError):
139-
print(
140-
f"Warning: Preferred index '{rule['prefer']}' invalid/not in domain '{domain}'. Falling back.",
141-
file=sys.stderr,
142-
)
143-
144-
rank_index_in_gpu_group = local_rank // gpus_per_task
145-
return commands.hwloc_calc.get_object_in_set(
146-
domain, hwloc_type, rank_index_in_gpu_group
147-
)
148-
else:
149-
raise ValueError(f"Unsupported type '{hwloc_type}' for GPU locality binding.")
213+
# If a broad type is requested, the binding is the domain itself.
214+
return domain
215+
216+
if "prefer" in rule:
217+
try:
218+
requested_index = int(rule["prefer"])
219+
# Validate by attempting to get the object.
220+
return commands.hwloc_calc.get_object_in_set(domain, hwloc_type, requested_index)
221+
except (ValueError, RuntimeError, TypeError):
222+
print(
223+
f"Warning: Preferred index '{rule['prefer']}' invalid/not in domain '{domain}'. Falling back.",
224+
file=sys.stderr,
225+
)
226+
227+
# Default assignment: Rank's Nth turn for a resource of this type within its GPU group.
228+
# This is the correct index for packing sub-objects within a domain.
229+
index = local_rank // gpus_per_task if gpus_per_task > 0 else local_rank
230+
231+
# For certain patterns like interleave or spread, the index calculation
232+
# would need to be more complex, but for a simple packed pattern this is the logic.
233+
# Let's assume a simple packed logic for now as pattern is not yet implemented here.
234+
return commands.hwloc_calc.get_object_in_set(domain, hwloc_type, index)
150235

151236
def get_binding_for_rank(self, rank, node_id, local_rank, gpus_per_task=None) -> str:
152237
"""

0 commit comments

Comments
 (0)