Skip to content

Commit 01e7a62

Browse files
committed
cli: add support for gpu affinity and tasks
Signed-off-by: vsoch <[email protected]>
1 parent ce2ba0d commit 01e7a62

File tree

7 files changed

+92
-55
lines changed

7 files changed

+92
-55
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
default:
22
type: core
3-
on: gpu-local
3+
bind: gpu-local
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
default:
22
type: numa
3-
on: gpu-local
3+
bind: gpu-local

fluxbind/bind/bind.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ def __init__(self, **kwargs):
108108
"exclusive",
109109
"cores_per_task",
110110
"tasks_per_core",
111+
"gpus_per_task",
112+
"gpu_affinity",
111113
"cpu_affinity",
112114
"taskmap",
113115
"command",
@@ -172,19 +174,31 @@ def get_custom_command(self):
172174
A custom command uses flux to ask for a specific binding
173175
"""
174176
cmd = ["flux", "run", "-N", str(self.nodes)]
177+
cmd = self.set_envars(cmd)
178+
cmd = self.set_flags(cmd)
179+
180+
# This is the main difference between a shape and not command.
181+
# We don't have to ask for exclusive here. For shape, we do.
182+
if self.exclusive:
183+
cmd.append("--exclusive")
184+
return cmd
185+
186+
def set_flags(self, cmd):
187+
"""
188+
Set command flags.
189+
"""
175190
if self.tasks is not None:
176191
cmd += ["-n", str(self.tasks)]
177192
if self.cpu_affinity is not None:
178193
cmd += ["-o", f"cpu-affinity={self.cpu_affinity}"]
179-
if self.exclusive:
180-
cmd.append("--exclusive")
194+
if self.gpu_affinity is not None:
195+
cmd += ["-o", f"gpu-affinity={self.gpu_affinity}"]
181196
if self.cores_per_task is not None:
182197
cmd += ["--cores-per-task", str(self.cores_per_task)]
183198
if self.tasks_per_core is not None:
184199
cmd += ["--tasks-per-core", str(self.tasks_per_core)]
185200
if self.taskmap is not None:
186201
cmd += [f"--taskmap={self.taskmap}"]
187-
cmd = self.set_envars(cmd)
188202
return cmd
189203

190204
def set_envars(self, cmd):
@@ -209,6 +223,8 @@ def get_shape_command(self):
209223
"""
210224
A shape command requires exclusive (for now) and then exports
211225
(provides) the JOB_SHAPE_FILE to the job.
226+
227+
TODO combine this into one command.
212228
"""
213229
cmd = [
214230
"flux",
@@ -218,16 +234,7 @@ def get_shape_command(self):
218234
"--exclusive",
219235
]
220236
cmd = self.set_envars(cmd)
221-
if self.tasks is not None:
222-
cmd += ["-n", str(self.tasks)]
223-
if self.tasks_per_core is not None:
224-
cmd += ["--tasks-per-core", str(self.tasks_per_core)]
225-
if self.cpu_affinity is not None:
226-
cmd += ["-o", f"cpu-affinity={self.cpu_affinity}"]
227-
if self.env is not None:
228-
for envar in self.env:
229-
cmd += ["--env", envar]
230-
return cmd
237+
return self.set_flags(cmd)
231238

232239
def execute(self, script):
233240
"""

fluxbind/cli/__init__.py

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ def get_parser():
2929
action="store_true",
3030
)
3131

32+
parser.add_argument(
33+
"--quiet",
34+
dest="quiet",
35+
help="suppress additional output.",
36+
default=False,
37+
action="store_true",
38+
)
3239
parser.add_argument(
3340
"--version",
3441
dest="version",
@@ -64,7 +71,11 @@ def get_parser():
6471
"--cpu-affinity",
6572
default=None,
6673
help="Add cpu-affinity",
67-
# choices=["none", "per-task"],
74+
)
75+
run.add_argument(
76+
"--gpu-affinity",
77+
default=None,
78+
help="Add gpu-affinity",
6879
)
6980
run.add_argument("-N", "--nodes", type=int, default=1, help="The number of nodes (default: 1).")
7081
run.add_argument(
@@ -93,32 +104,12 @@ def get_parser():
93104
help="The number of CORES (not PUs) to bind per task.",
94105
)
95106
run.add_argument(
96-
"--tasks-per-core",
107+
"-g",
108+
"--gpus-per-task",
97109
type=int,
98110
default=None,
99-
help="The number of tasks per core.",
100-
)
101-
run.add_argument(
102-
"--silent",
103-
dest="silent",
104-
help="no additional output.",
105-
default=False,
106-
action="store_true",
107-
)
108-
run.add_argument(
109-
"--quiet",
110-
dest="quiet",
111-
help="suppress additional output (only print fluxbind mapping)",
112-
default=False,
113-
action="store_true",
114-
)
115-
run.add_argument(
116-
"--nocolor",
117-
help="suppress color output (e.g., piping to log)",
118-
default=False,
119-
action="store_true",
111+
help="The number of GPUs per task.",
120112
)
121-
122113
predict = subparsers.add_parser(
123114
"predict",
124115
formatter_class=argparse.RawTextHelpFormatter,
@@ -186,7 +177,7 @@ def help(return_code=0):
186177
sys.exit(0)
187178

188179
setup_logger(
189-
quiet=False,
180+
quiet=args.quiet,
190181
debug=args.debug,
191182
)
192183

fluxbind/scripts/run_mapping.sh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fi
2424
# Call the fluxbind helper script to get the target location string (e.g., "core:0" or "UNBOUND")
2525
# It ALWAYS returns a single line in the format: BIND_LOCATION,CUDA_DEVICE_ID
2626
# For CPU jobs, CUDA_DEVICE_ID will be the string "NONE".
27-
BIND_LOCATION=$(fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank")
27+
BIND_INFO=$(fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank")
2828

2929
# Exit if the helper script failed
3030
if [ $? -ne 0 ]; then
@@ -80,6 +80,9 @@ if [[ "$FLUXBIND_QUIET" != "1" ]]
8080
echo -e "${prefix}: Effective Cpuset Mask: ${CYAN}$cpuset_mask${RESET}"
8181
echo -e "${prefix}: Logical CPUs (PUs): ${BLUE}${logical_cpu_list:-none}${RESET}"
8282
echo -e "${prefix}: Physical Cores: ${ORANGE}${physical_core_list:-none}${RESET}"
83+
if [[ "$CUDA_DEVICE" != "NONE" ]]; then
84+
echo -e "${prefix}: CUDA Devices: ${YELLOW}${CUDA_DEVICE}${RESET}"
85+
fi
8386
echo
8487
fi
8588

@@ -92,6 +95,11 @@ if [[ "${BIND_LOCATION}" == "UNBOUND" ]]; then
9295
exec "$@"
9396
else
9497
# Use hwloc-bind to set the affinity and then execute the command.
95-
if [[ "$FLUXBIND_SILENT" != "1" ]]; then echo -e "${GREEN}fluxbind${RESET}: Rank ${rank} is bound to ${BIND_LOCATION} to execute: $@" >&2; fi
98+
if [[ "$FLUXBIND_SILENT" != "1" ]]; then
99+
if [[ "$CUDA_DEVICE" == "NONE" ]]; then
100+
echo -e "${GREEN}fluxbind${RESET}: Rank ${rank} is bound to ${BIND_LOCATION} cuda:${CUDA_DEVICE} to execute: $@" >&2; fi
101+
else
102+
echo -e "${GREEN}fluxbind${RESET}: Rank ${rank} is bound to ${BIND_LOCATION} to execute: $@" >&2; fi
103+
fi
96104
exec hwloc-bind "${BIND_LOCATION}" -- "$@"
97105
fi

fluxbind/shape/commands.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
# In file: fluxbind/commands.py
2-
import re
1+
import shlex
32
import subprocess
43
import sys
54

@@ -35,9 +34,12 @@ def execute(self, args_list: list) -> str:
3534
Executes hwloc-calc with a list of arguments.
3635
This is safer as it avoids shell interpretation of the arguments.
3736
"""
37+
if isinstance(args_list, str):
38+
args_list = shlex.split(args_list)
39+
3840
# A more robust validation could be added here if needed,
3941
command_list = [self.name] + args_list
40-
return self._run(command_list, shell=False)
42+
return self.run(command_list, shell=False)
4143

4244

4345
class NvidiaSmiCommand(Command):
@@ -51,7 +53,7 @@ def get_pci_bus_ids(self) -> list[str]:
5153
command_str = f"{self.name} --query-gpu=pci.bus_id --format=csv,noheader"
5254

5355
# shell=True is safe here because the entire command is static and defined internally.
54-
output = self._run(command_str, shell=True)
56+
output = self.run(command_str, shell=True)
5557

5658
# Parse the output into a clean list
5759
ids = output.strip().split("\n")

fluxbind/shape/shape.py

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,31 +104,60 @@ def find_matching_rule(self, rank: int, node_id: int) -> dict:
104104
def get_gpu_local_binding(self, rule: dict, local_rank: int) -> str:
105105
"""
106106
Calculates binding for a rank based on its proximity to an assigned GPU.
107+
Supports an optional prefer key for user-preferred object selection.
107108
"""
108109
if not self.gpu_pci_ids:
109-
raise RuntimeError("Shape specifies 'on: gpu-local', but no GPUs were discovered.")
110+
raise RuntimeError(
111+
"Shape specifies 'locality: gpu-local', but no GPUs were discovered."
112+
)
110113

111114
num_gpus = len(self.gpu_pci_ids)
112115
hwloc_type = rule.get("type")
113116
if not hwloc_type:
114-
raise ValueError(
115-
"Rule with 'on: gpu-local' must also specify a 'type' (e.g., core, numa)."
116-
)
117+
raise ValueError("Rule with 'locality: gpu-local' must also specify a 'type'.")
117118

119+
# 1. Assign a GPU to this rank (round-robin)
118120
target_gpu_index = local_rank % num_gpus
119121
cuda_devices = str(target_gpu_index)
120122
target_gpu_pci_id = self.gpu_pci_ids[target_gpu_index]
123+
124+
# 2. Get the cpuset for the GPU's locality domain
121125
gpu_locality_cpuset = commands.hwloc_calc.execute([f"pci={target_gpu_pci_id}"])
126+
cpu_binding_string = ""
122127

128+
# 3. Determine the final CPU binding
123129
if hwloc_type in ["numa", "package", "l3cache"]:
124130
cpu_binding_string = gpu_locality_cpuset
131+
125132
elif hwloc_type in ["core", "pu", "l2cache"]:
126-
rank_index_in_gpu_group = local_rank // num_gpus
127-
cpu_binding_string = commands.HwlocCalcCommand().execute(
128-
f'"{gpu_locality_cpuset}" -I {hwloc_type}:{rank_index_in_gpu_group}'
133+
all_objects_in_domain_str = commands.hwloc_calc.execute(
134+
[gpu_locality_cpuset, "--intersect", hwloc_type]
129135
)
136+
available_indices = all_objects_in_domain_str.split(",")
137+
target_object_index = None
138+
139+
# Is the user asking to prefer a specific identifier?
140+
if "prefer" in rule:
141+
requested_index = str(rule["prefer"])
142+
if requested_index in available_indices:
143+
target_object_index = requested_index
144+
145+
if target_object_index is None:
146+
rank_index_in_gpu_group = local_rank // num_gpus
147+
try:
148+
target_object_index = available_indices[rank_index_in_gpu_group]
149+
except IndexError:
150+
raise ValueError(
151+
f"Cannot find the {rank_index_in_gpu_group}-th '{hwloc_type}' for local_rank {local_rank} "
152+
f"within the GPU's locality. Only {len(available_indices)} are available."
153+
)
154+
155+
cpu_binding_string = f"{hwloc_type}:{target_object_index}"
130156
else:
131-
raise ValueError(f"Unsupported type '{hwloc_type}' for 'on: gpu-local' binding.")
157+
raise ValueError(f"Unsupported type '{hwloc_type}' for 'locality: gpu-local' binding.")
158+
159+
if not cpu_binding_string:
160+
raise RuntimeError("Failed to calculate a valid cpu_binding_string.")
132161

133162
return f"{cpu_binding_string},{cuda_devices}"
134163

@@ -155,7 +184,7 @@ def get_binding_for_rank(self, rank: int, node_id: int, local_rank: int) -> str:
155184
raise ValueError(f"Matching rule has no 'type' defined: {rule}")
156185

157186
# Are we doing something with GPU?
158-
if rule.get("on") == "gpu-local":
187+
if rule.get("bind") == "gpu-local":
159188
return self.get_gpu_local_binding(rule, local_rank)
160189
cpu_binding_string = self.get_cpu_binding(hwloc_type, rule, local_rank)
161190

0 commit comments

Comments
 (0)