diff --git a/README.md b/README.md index 39b7e83..d439250 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,29 @@ ![img/fluxbind.png](img/fluxbind-small.png) +## How does this work? + +OK I think I know what we might do. The top level description for a job resources is the jobspec "job specification" that might look like this: + +```yaml +resources: +- type: slot + count: 4 + with: + - type: core + count: 8 +``` + +Flux run / submit is going to use flux-sched (or a scheduler) to assign jobs to nodes and to go into the flux exec shell and execute some number of tasks per node. Each of those tasks is what is going to hit and then execute our bash script, with a view of the entire node, and with need to run fluxbind shape to derive the binding for the task. We can technically derive a shapefile from a jobspec. It is the same, but only needs to describe the shape of one slot, for which the task that receives it is responsible for some N. So a shapefile that describes the shape of a slot looks like this: + +```yaml +resources: +- type: core + count: 8 +``` + +And then that is to say "On this node, we are breaking resources down into this slot shape." We calculate the number of slots that the task is handling based on `FLUX_` environment variables. For now this is assume exclusive resources per node so if we are slightly off its not a huge deal, but in the future (given a slice of a node for a slot) we will need to be right, because we might see an entire node with hwloc but already be in a cpuset. Right now I'm also assuming the `fluxbind run` matches the topology of the shapefile. If you do something that doesn't match it probably can't be satisfied and will get an error, but not guaranteed. + ## Run Use fluxbind to run a job binding to specific cores. For flux, this means we require exclusive, and then for each node customize the binding exactly as we want it. We do this via a shape file. @@ -14,11 +37,15 @@ Use fluxbind to run a job binding to specific cores. For flux, this means we req ### Basic Examples ```bash -# Start with a first match policy +# Start with a first match policy (I think this just works one node) flux start --config ./examples/config/match-first.toml +# This works >1 node +flux alloc --conf ./examples/config/match-first.toml + # 1. Bind each task to a unique physical core, starting from core:0 (common case) -fluxbind run -n 8 --quiet --shape ./examples/shape/1node/packed-cores-shapefile.yaml sleep 1 +# STOPPED HERE - get this working with run! I don't think --graph is being passed. +fluxbind run -n 8 --quiet --shape ./examples/shape-graph/single-node/simple_cores/shape.yaml --graph sleep 1 # 2. Reverse it! fluxbind run -n 8 --quiet --shape ./examples/shape/1node/packed-cores-reversed-shapefile.yaml sleep 1 @@ -61,6 +88,14 @@ fluxbind run -N 1 --tasks-per-core 2 --shape ./examples/shape/kripke/packed-pus- fluxbind run -N 1 -n 2 --env OMP_NUM_THREADS=4 --env OMP_PLACES=cores --shape ./examples/shape/kripke/hybrid-l3-shapefile.yaml kripke --zones 16,16,16 --niter 500 --procs 2,1,1 --layout GZD ``` +## Shape + +The run command generates a shape, and we can test the shape command without it to provide a shapefile (basically, a modified jobspec with a pattern and optional affinity). Currently, the basic shape works as expected, but we are trying to work on the `--graph` implementation (uses a Flux jobspec and builds into a graph of nodes). + +```bash +fluxbind shape --file examples/shape-graph/basic/4-cores-anywhere/shape.yaml --rank 0 --node-id 1 --local-rank 0 --gpus-per-task 0 --graph --global-ranks $(nproc) --nodes 1 +``` + ## Predict @@ -75,6 +110,7 @@ fluxbind predict core:0-7 fluxbind predict --xml ./examples/topology/corona.xml numa:0,1 x core:0-2 ``` + ## License DevTools is distributed under the terms of the MIT license. diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..7096d7a --- /dev/null +++ b/docs/README.md @@ -0,0 +1,180 @@ +# fluxbind + +## Binding + +How does fluxbind handle the cpuset calculation? I realize that when we bind to PUs (processing units) we are doing something akin to SMT. Choosing to bind to `Core` is without that. The objects obove those are containers - we don't really bind to them, we select them to then bind to child PU or Core (as I understand it). Since we are controlling the binding in the library, we need to think about both how the user specifies this, and defaults if they do not. We will implement a hierarchy of rules (checks) that the library does to determine what to do. + +### Highest Priority: Explicit Request + +The Shapefile needs an explicit request from the user - "This is my shape, but bind to PU/Core." +For this request, the shape.yaml file can have an options block with `bind`. + +```yaml +# Avoid SMT and bind to physical cores. +options: + bind: core + +resources: + - type: l3cache + count: 1 +``` + +In the above, the `options.bind` key exists so we honor it no matter what. This selection has to be Core or PU (someone tell me if I'm off about this - I'm pretty sure the cpusets in the hwloc on the containers are going to select the lower levels). + + +### Second level: Implicit Intent + +This comes from the resource request. If a user has a lowest level, we can assume that is what they want to bind to. This would say "Bind to Core" + +```yaml +resources: +- type: socket + count: 1 + with: + - type: core + count: 4 +``` + +This would say bind to PU (and the user is required to know the count) + +```yaml +resources: +- type: socket + count: 1 + with: + - type: core + count: 4 + with: + - type: process + count: 4 +``` + +If they don't know the count, they can use the first strategy and request it explicitly: + +```yaml +options: + bind: process + +resources: + - type: l3cache + count: 1 +``` + +And note that I'm mapping "process" to "pu" because I don't think people (users) are familiar with pu. Probably I should support both. +In other words, if there is no `options.bind` we will inspect the `resources` and see if the final level (most granular) is Core or PU. If yes, we assume that is what we bind to. + + +### Lowest Priority: HPC Default + +If we don't have an explicit request for binding and the lowest level is not PU or CPU, we have to assume some default. E.g., "Start with this container and bind to `` under it. Since most HPC workloads are run single threaded, I think we should assume Core. People that want SMT need to specify something special. Here is an example where we cannot know: + +```yaml +resources: +- type: l3cache + count: 1 +``` + +We will allocate one `L3Cache` object, and when it's time to bind, we won't bind a bind directive or a PU/Core at the lowest level. We have to assume the default, which will be Core. + +### Special Cases + +#### Unbound + +A special case is unbound. I didn't add this at first because I figured if the user didn't want binding, they wouldn't use the tool. But the exception is devices! I might want to be close to a GPU or NIC but not actually bind any processes. In that case I would use fluxbind and specify the shape, but I'd ask for unbound: + + +```yaml +options: + bind: none + +resources: + - type: core + count: 4 + affinity: + type: gpu + count: 1 +``` + +Note that the affinity spec above is still a WIP. I have something implemented for my first approach but am still working on this new graph one. The above is subject to change, but illustrates the point - we don't want to bind processes, but we want the cores to have affinity (be close to) a gpu. + +#### GPU + +This might be an alternative to the above - I'm not decided yet. GPU affinity (remote or local) means we want a GPU that is close by (same NUMA node) or remote (different NUMA), I haven't tested this yet, but it will look like this: + +```yaml +options: + bind: gpu-local + +resources: + - type: core + count: 4 +``` + +Right now I have this request akin to `bind` (as a bind type I mean) because then the pattern defaults to `packed`. I think that is OK. I like this maybe a little better than the one before because we don't change the jobspec too much... :) + + +### Examples + +Here are examples for different scenarios. + +| `shape.yaml` | Logic Used | Final Binding Unit | +| :--- | :--- | :--- | +| **`options: {bind: process}`**, `resources: [{type: socket}]` | Explicit Request | `pu` | +| **`options: {bind: core}`**, `resources: [{type: socket}]` | Explicit Request | `core` | +| No options, `resources: [{type: core, count: 4}]` | Implicit Intent | `core` | +| No options, `resources: [{type: pu, count: 4}]` | Implicit Intent | `pu` | +| No options, `resources: [{type: l3cache, count: 1}]` | HPC Default | `core` | +| No options, `resources: [{type: numanode, count: 1}]` | HPC Default | `core` | +| `options: {bind: process}`, `resources: [{type: core, count: 2}]` | Explicit Request | `pu` | + + +## Patterns + +The binding rules determine *what* kind of hardware to bind to (physical cores vs. hardware threads) and patterns determine *how* a total pool of those resources is distributed among the tasks on a node. When a `shape.yaml` describes a total pool of resources (e.g., `core: 8`) and a job is launched with multiple tasks on the node (e.g., `local_size=4`), `fluxbind` must have a deterministic strategy to give each task its own unique slice of the total pool. This strategy is controlled by the `pattern` key. + +### packed + +> Default + +The packed pattern assigns resources in contiguous, dense blocks. This is the default behavior if no pattern is specified, because I think it is what generally would be wanted, because cores are physically close. As an example, given 8 available cores and 4 tasks, packed assigns resources like this: + * `local_rank=0` gets `[Core:0, Core:1]` + * `local_rank=1` gets `[Core:2, Core:3]` + * `local_rank=2` gets `[Core:4, Core:5]` + * `local_rank=3` gets `[Core:6, Core:7]` + +```yaml +# pattern: packed is optional as it's the default, so you could leave this out. +resources: + - type: core + count: 8 + pattern: packed +``` + +## scatter (spread) + +> The pattern that makes you think of peanut butter + +The scatter pattern distributes resources with the largest possible stride, like dealing out cards to each task. I think this can be similar to [cyclic](https://hpc.llnl.gov/sites/default/files/distributions_0.gif) or round robin. I think we'd want to do this for memory intensive tasks, where we would want cores physically far apart so each gets its own memory (L2/L3 caches). + +```yaml +# 'spread' is an alias for 'scatter'. +resources: + - type: core + count: 8 + pattern: spread +``` + +Right now I'm calling this interleaved, but I think they are actually different and if we want this case we need to add it. Interleaved would be like filling up all cores first (one PU) before going back and filling other PUs. Like filling cookies with Jam, but only every other cookie. + +## Modifiers + +### reverse + +The reverse modifier is a boolean (true/false) that can be combined with any pattern. It simply reverses the canonical list of available resources before the distribution pattern is applied. Not sure when it's useful, but maybe we'd want to test one end and then "the other end." + +```yaml +resources: + - type: core + count: 8 + reverse: true +``` diff --git a/examples/topology/corona.xml b/examples/topology/corona.xml index 4264f59..bde44b2 100644 --- a/examples/topology/corona.xml +++ b/examples/topology/corona.xml @@ -1,7 +1,7 @@ - - - + + + @@ -17,990 +17,1117 @@ - + - - - + + + - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + - - + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + - - + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - + + + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - + + + + + + - - - - - - + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - + + + + + + + - - + + + + + + + - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - + + + + + + + + + + + + + + + - - - - + + + + + + - - + + + + + + + - - - - - - - - - - - - + + - - + - \ No newline at end of file + + 0 1 + 10 32 32 10 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/fluxbind/bind/bind.py b/fluxbind/bind/bind.py index a9a64cb..841cc0d 100644 --- a/fluxbind/bind/bind.py +++ b/fluxbind/bind/bind.py @@ -113,6 +113,7 @@ def __init__(self, **kwargs): "cpu_affinity", "taskmap", "command", + "graph", "shape", "env", "quiet", @@ -222,6 +223,8 @@ def set_envars(self, cmd): if self.env is not None: for envar in self.env: cmd += ["--env", envar] + if self.graph: + cmd += ["--env", "FLUXBIND_GRAPH=1"] return cmd def get_shape_command(self): diff --git a/fluxbind/cli/__init__.py b/fluxbind/cli/__init__.py index eb7e8db..424a089 100644 --- a/fluxbind/cli/__init__.py +++ b/fluxbind/cli/__init__.py @@ -29,13 +29,6 @@ def get_parser(): action="store_true", ) - parser.add_argument( - "--quiet", - dest="quiet", - help="suppress additional output.", - default=False, - action="store_true", - ) parser.add_argument( "--version", dest="version", @@ -136,18 +129,34 @@ def get_parser(): shape.add_argument( "--local-rank", required=True, type=int, help="Rank of the process on the local node." ) + shape.add_argument( + "--local-size", required=True, type=int, help="Local size (tasks on the node)." + ) shape.add_argument( "--gpus-per-task", dest="gpus_per_task", type=int, help="Number of GPUs per task." ) + for command in [run, shape]: + command.add_argument( + "--graph", + default=False, + action="store_true", + help="Use the graph hwloc parser instead", + ) + command.add_argument( + "--quiet", + dest="quiet", + help="suppress additional output.", + default=False, + action="store_true", + ) - for command in [predict, run]: + for command in [predict, run, shape]: command.add_argument( "--xml", "--topology-file", dest="topology_file", help="Path to a lstopo XML file.\nIf not provided, runs 'lstopo' do detect.", ) - return parser diff --git a/fluxbind/cli/shape.py b/fluxbind/cli/shape.py index 86e3d19..11dc877 100644 --- a/fluxbind/cli/shape.py +++ b/fluxbind/cli/shape.py @@ -1,3 +1,4 @@ +from fluxbind.graph import Shape as GraphShape from fluxbind.shape import Shape @@ -5,15 +6,23 @@ def main(args, extra, **kwargs): """ Parse a shape file to return the binding for a specific rank (local) """ - shape = Shape(args.file) + # This input file is the shapefile + if args.graph: + # This is the (mostly) Flux jobspec with pattern, etc. + shape = GraphShape(args.file, debug=args.debug) + else: + # This is a simple, custom design. + shape = Shape(args.file) - # 2. Call the public method to get the final binding string + # Call the public method to get the final binding string binding_string = shape.get_binding_for_rank( rank=args.rank, node_id=args.node_id, local_rank=args.local_rank, + local_size=args.local_size, gpus_per_task=args.gpus_per_task, + xml_file=args.topology_file, ) - # 3. Print the result to stdout for the wrapper script + # Print the result to stdout for the wrapper script print(binding_string) diff --git a/fluxbind/graph/__init__.py b/fluxbind/graph/__init__.py new file mode 100644 index 0000000..8384fb2 --- /dev/null +++ b/fluxbind/graph/__init__.py @@ -0,0 +1,3 @@ +from .shape import Shape + +assert Shape diff --git a/fluxbind/graph/graph.py b/fluxbind/graph/graph.py new file mode 100644 index 0000000..c41586c --- /dev/null +++ b/fluxbind/graph/graph.py @@ -0,0 +1,763 @@ +import io +import logging +from itertools import combinations + +import networkx as nx + +import fluxbind.graph.worker as worker +import fluxbind.shape.commands as commands +import fluxbind.utils as utils + +log = logging.getLogger(__name__) + + +class HwlocTopology: + """ + An HwlocTopology maps xml from lstopo into a graph. + """ + + def __init__(self, xml_input=None, max_workers=None): + self.graph = nx.DiGraph() + self.gpus = [] + self.nics = [] + self.load(xml_input, max_workers) + + @property + def node_count(self): + return self.graph.number_of_nodes() + + def load(self, xml_input, max_workers=None): + """ + Load the graph, including distances, and pre-calculate + entire set of affinities for objects. + """ + self.last_affinity_target = None + + # If we don't have an xml file, derive from system + if not xml_input: + xml_input = commands.lstopo.get_xml() + root = utils.read_xml(xml_input) + + # I think this is required - I haven't seen one without it. + top_level_object = root.find("object") + if top_level_object is not None: + self.build_graph(top_level_object) + else: + raise ValueError('Could not find a top-level "object" in the hwloc XML.') + log.debug(f"Graph built successfully using XML tree with {self.node_count} nodes.") + + # Use nvidia/rocm-smi to find devices, since not always in hwloc + self.discover_devices() + + # Get the distances or distances2 matrix + self.add_distance_edges(root) + self.hierarchy_view = nx.subgraph_view( + self.graph, filter_edge=lambda u, v: self.graph.edges[u, v].get("type") == "contains" + ) + + # Cache affinities on startup so we only do it once. + # This assumes the node cannot change (I assume it cannot) + self.precompute_affinities(max_workers=max_workers) + self.create_ordered_gpus() + + def build_graph(self, element, parent_gp=None): + """ + Recursively builds the topology graph by parsing the XML tree structure directly. + + gp == global peristent index. pci == peripheral component interconnect. + """ + # Process current element and add it as a node + node_data = element.attrib.copy() + + # This is a peristent identifier that includes location, bus, and device function + if "pci_busid" in node_data: + node_data["pci_busid"] = node_data["pci_busid"].lower() + for info in element.findall("info"): + node_data[info.get("name")] = info.get("value") + + for key in ["os_index", "gp_index", "depth", "cache_size", "local_memory"]: + if key in node_data: + try: + node_data[key] = int(node_data[key]) + except (ValueError, TypeError): + pass + + # Use the element's memory address as a guaranteed unique graph pointer (gp) + current_gp = id(element) + node_data["gp_index"] = current_gp + + # Also parse cpuset if it exists, as it's useful data, but don't use it for hierarchy. + if "cpuset" in node_data: + try: + full_hex = "0x" + node_data["cpuset"].replace("0x", "").replace(",", "") + node_data["cpuset_val"] = int(full_hex, 16) + except ValueError: + node_data["cpuset_val"] = None + + # Add the node to the graph + self.graph.add_node(current_gp, **node_data) + + # If there's a parent, add a 'contains' edge to represent the hierarchy + if parent_gp is not None: + self.graph.add_edge(parent_gp, current_gp, type="contains") + + # Recurse for all direct child 'object' elements + for child_element in element.findall("./object"): + self.build_graph(child_element, parent_gp=current_gp) + + def discover_devices(self): + """ + Discovers GPUs and NICs and enriches their corresponding PCIDev nodes + in the graph with a 'device_type' attribute. I originally didn't do this, + but it's better to get the nodes back and use the device type to color + in a graph, etc. + """ + # Find GPUs first. We add annotation for gpu and the vendor + self.gpus = [] + for vendor, command in {"NVIDIA": commands.nvidia_smi, "AMD": commands.rocm_smi}.items(): + try: + pci_ids = command.get_pci_bus_ids() + for bus_id in pci_ids: + matches = self.find_objects(type="PCIDev", pci_busid=bus_id.lower()) + if matches: + gp, data = matches[0] + # This is the key: add the new attribute directly to the graph node. + self.graph.nodes[gp]["device_type"] = "gpu" + self.graph.nodes[gp]["vendor"] = vendor + self.gpus.append((gp, self.graph.nodes[gp])) # Append the updated node data + except RuntimeError: + pass + + # NICs! This isn't perfect with keywords, but should handle early cases + self.nics = [] + nic_keywords = ["ethernet", "infiniband", "connectx", "mellanox", "network connection"] + + # Find all PCIDevs that haven't been claimed as GPUs yet + for gp, data in self.find_objects(type="PCIDev"): + if data.get("device_type") == "gpu": + continue + + is_nic = False + if (os_dev := data.get("OSDev")) and any( + os_dev.startswith(p) for p in ["eth", "en", "ib"] + ): + is_nic = True + elif (pci_type := data.get("pci_type")) and pci_type.split(" ")[0].startswith("02"): + is_nic = True + elif (pci_name := data.get("PCIDevice")) and any( + k in pci_name.lower() for k in nic_keywords + ): + is_nic = True + + if is_nic: + # Add the new attribute directly to the graph node. + self.graph.nodes[gp]["device_type"] = "nic" + self.nics.append((gp, self.graph.nodes[gp])) + + if self.nics: + unique_nics = {data["pci_busid"]: (gp, data) for gp, data in self.nics} + self.nics = list(unique_nics.values()) + + log.info( + f"Successfully discovered and tagged {len(self.gpus)} GPU(s) and {len(self.nics)} NIC(s) in the graph." + ) + + def add_distance_edges(self, root): + """ + Hwloc can have a distances or distances2 section that has a NUMA + node distance matrix. distances2 is for hwloc 2.x and distances for + hwloc 1.x + """ + + def add_latency_edges(matrix, indexes): + """ + Helper funcion to convert Python matrix to add edges to graph + """ + # Create a fast lookup table to convert from a NUMA node's OS index + # (e.g., 0, 1, 2) to its unique graph pointer (a large integer). + os_to_gp = { + d["os_index"]: gp for gp, d in self.find_objects(type="NUMANode") if "os_index" in d + } + + # Iterate through every cell (i, j) in the distance matrix. + for i, from_os in enumerate(indexes): + for j, to_os in enumerate(indexes): + # Find the graph pointers for the 'from' and 'to' NUMA nodes. + from_gp, to_gp = os_to_gp.get(from_os), os_to_gp.get(to_os) + + # If both nodes were found in our graph... + if from_gp and to_gp: + # ...add a special directed edge between them. + self.graph.add_edge( + from_gp, + to_gp, + type="latency_link", # Mark this as a special non-hierarchical edge. + weight=matrix[i][j], # The weight is the latency value from the matrix. + ) + + # First, try to parse newer hwloc v2.x format + for dist_el in root.iter("distances2"): + try: + # nbobjs is how many objects are in the matrix (e.g., 2 for 2 NUMA nodes). + num = int(dist_el.get("nbobjs")) + # is the OS indexes of the NUMA nodes (e.g., "0 1"). + idxs = [int(v) for v in dist_el.find("indexes").text.strip().split()] + # is latency values as a flat list (e.g., "10 21 21 10"). + vals = [float(v) for v in dist_el.find("u64values").text.strip().split()] + + # Sanity check: a 2x2 matrix should have 4 values. + if len(vals) == num * num and len(idxs) == num: + # Convert the flat list vals into a 2D list-of-lists (our matrix). + matrix = [vals[i * num : (i + 1) * num] for i in range(num)] + add_latency_edges(matrix, idxs) + log.debug("Parsed hwloc v2 distances.") + return # Success! We are done, so exit the function. + + except (ValueError, TypeError, AttributeError): + # If anything goes wrong (e.g., missing tags, bad text), just skip it. + continue + + # Fall back to the legacy hwloc v1.x format + for dist_el in root.iter("distances"): + try: + num = int(dist_el.get("nbobjs")) + # In the old format, the values are relative to a base latency. + latency_base = float(dist_el.get("latency_base")) + # The values are stored in a 'value' attribute as a space-separated string. + # We multiply each value by the base to get the true latency. + vals = [float(v) * latency_base for v in dist_el.get("value").strip().split()] + + # The indexes are found the same way as in v2. + idxs = [int(v) for v in dist_el.find("indexes").text.strip().split()] + + if len(vals) == num * num and len(idxs) == num: + matrix = [vals[i * num : (i + 1) * num] for i in range(num)] + add_latency_edges(matrix, idxs) + log.debug("Parsed legacy hwloc v1 distances.") + return # Success! We are done. + except (ValueError, TypeError, AttributeError): + continue + + def precompute_affinities(self, max_workers=None): + """ + Precomputes the NUMA affinity for all relevant objects in the graph. + This version is corrected to work with the enriched graph model. + """ + # The self.gpus and self.nics lists have actual (gp, data) tuples + # from the graph. We can add them directly to the list of objects to locate. + objects_to_locate = ( + self.find_objects(type="PU") + self.find_objects(type="Core") + self.nics + self.gpus + ) + w = worker.AffinityCalculator(max_workers) + for result in w.calculate_numa_affinity(objects_to_locate): + if result: + self.graph.nodes[result[0]]["numa_os_index"] = result[1] + + def match_resources(self, jobspec, allocated_gps=None): + """ + Finds the single, next available allocation that satisfies the jobspec. + """ + if allocated_gps is None: + allocated_gps = set() + + # Pass the set of already-taken resources to the search. + # The search function will use copies so the original set is not modified. + + job_requests = jobspec.get("resources") or jobspec.get("resource") + if not job_requests: + raise ValueError("Jobspec does not contain a 'resources' section.") + + final_allocation = [] + machine_gp, _ = self.find_objects(type="Machine")[0] + + # Use a temporary set to track allocations within this single search + temp_allocations = allocated_gps.copy() + + for request in job_requests: + assignment = self.find_assignment_recursive( + request, machine_gp, temp_allocations, depth=1 + ) + + if assignment is None: + log.debug(f"Failed to find a match for request: {request}") + return None # Indicate that no valid slot could be found + + final_allocation.extend(assignment) + temp_allocations.update({gp for gp, _ in assignment}) + + log.debug(f"Successfully found a slot with {len(final_allocation)} objects.") + return final_allocation + + def sort_by_affinity(self, candidates, affinity, allocated, domain_gp): + """ + Sort list of candidates by affinity so we get closest one. + """ + target_type = self.translate_type(affinity.get("type")) + if not target_type: + log.warning("Affinity spec missing 'type'.") + return candidates + + # Search within the domain we were provided, not across the machine + log.debug( + f" -> Searching for affinity target '{target_type}' within the current domain." + ) + targets = self.get_available_children(domain_gp, target_type, allocated) + if not targets: + log.warning(f"Affinity target '{target_type}' not found.") + return candidates + target_gp, target_data = targets[0] + self.last_affinity_target = (target_gp, target_data) + log.debug( + f" -> Distances to target {target_data.get('type')}:{target_data.get('PCIDevice') or target_data.get('os_index')}" + ) + decorated = sorted( + [(self.get_distance(c[0], target_gp), c) for c in candidates], + key=lambda x: x[0], + ) + return [item for _, item in decorated] + + def translate_type(self, requested_type: str): + """ + Translates a user-friendly type string from the shapefile into the + exact string used by the hwloc graph. This is the single source of + truth for type name mapping. + """ + # A dictionary to map all known aliases and lowercase variants + # to the official hwloc type string. + mapping = { + "process": "PU", + "pu": "PU", + "core": "Core", + "socket": "Package", + "package": "Package", + "numanode": "NUMANode", + "l1cache": "L1Cache", + "l2cache": "L2Cache", + "l3cache": "L3Cache", + "machine": "Machine", + "nic": "PCIDev", + "gpu": "PCIDev", + } + + # capitalizing the word (e.g., 'l3cache' -> 'L3cache'). + return mapping.get(requested_type.lower(), requested_type.capitalize()) + + def summarize(self, nodes): + """ + Given a set of nodes in the graph (a set of resources) print a textual visual. + """ + for gp, data in nodes: + p_info = "" + if data["type"] in ["Core", "PU"]: + package = self.get_ancestor_of_type(gp, "Package") + if package: + p_info = f"Package:{package[1].get('os_index')} -> " + if data["type"] == "PU": + core = self.get_ancestor_of_type(gp, "Core") + if core: + p_info += f"Core:{core[1].get('os_index')} -> " + log.info(f" -> {p_info}{data['type']}:{data.get('os_index', data.get('pci_busid'))}") + + def calculate_bindable_nodes(self, total_allocation): + """ + Given an allocation, get nodes in the graph we can bind to. + """ + log.info( + "No explicit CPU resources requested. Deriving a binding from the allocation context." + ) + leaf_nodes = [] + + # Find the NUMA domain(s) from the allocation context. This is correct. + numa_gps = {node[0] for node in total_allocation if node[1].get("type") == "NUMANode"} + if not numa_gps: + for _, data in total_allocation: + if (numa_idx := data.get("numa_os_index")) is not None: + if numa_matches := self.find_objects(type="NUMANode", os_index=numa_idx): + numa_gps.add(numa_matches[0][0]) + + # This should not happen - throw up if it does. + if not numa_gps: + raise RuntimeError( + "Allocation succeeded but could not determine a NUMA domain for CPU binding." + ) + + # Find the parent Package(s) of the NUMA domains. + package_gps = set() + for numa_gp in numa_gps: + package = self.get_ancestor_of_type(numa_gp, "Package") + if package: + package_gps.add(package[0]) + + if not package_gps: + raise RuntimeError(f"Could not find parent Package for NUMA nodes: {list(numa_gps)}") + + package_indices = [self.graph.nodes[gp].get("os_index") for gp in package_gps] + log.info(f"Binding to all PUs within parent Package(s): {package_indices}") + + # Find all PUs that are hierarchical children of those Package(s). + for package_gp in package_gps: + pus_in_package = self.get_descendants(package_gp, type="PU") + leaf_nodes.extend(pus_in_package) + return leaf_nodes + + def get_available_children(self, parent_gp, child_type, allocated): + """ + Given a parent and child type, find the child type! + """ + parent_node = self.graph.nodes[parent_gp] + parent_info = f"{parent_node.get('type')}:{parent_node.get('os_index', parent_node.get('pci_busid', parent_gp))}" + log.debug(f" - get_children(child='{child_type}', parent={parent_info})") + + parent_type = parent_node.get("type") + child_type_lower = child_type.lower() + all_candidates = self.find_objects(type=child_type) + log.debug( + f" - -> Found {len(all_candidates)} total unique system-wide candidates for '{child_type}'." + ) + + available = [] + for gp, data in all_candidates: + if gp in allocated: + continue + + is_valid_child = False + + # Rule 1: Relationship to NUMA node is through shared PACKAGE parent (for Cores) or LOCALITY (for Devices) + if parent_type == "NUMANode": + if child_type_lower in ["core", "pu"]: + package_of_numa = self.get_ancestor_of_type(parent_gp, "Package") + if package_of_numa and nx.has_path(self.hierarchy_view, package_of_numa[0], gp): + is_valid_child = True + elif child_type_lower == "pcidev": + if data.get("numa_os_index") == parent_node.get("os_index"): + is_valid_child = True + + # Rule 2 (NEW): Relationship of a Core/PU to a Device is through shared NUMA LOCALITY + elif parent_type == "PCIDev" and child_type_lower in ["core", "pu"]: + parent_numa_idx = parent_node.get("numa_os_index") + if data.get("numa_os_index") == parent_numa_idx: + is_valid_child = True + + # Default Rule: Strict HIERARCHY for all other cases + else: + if nx.has_path(self.hierarchy_view, parent_gp, gp): + is_valid_child = True + + if is_valid_child: + available.append((gp, data)) + + available.sort(key=lambda item: self.get_sort_key_for_node(item)) + log.debug(f" - -> Returning {len(available)} available children.") + return available + + def find_objects(self, **attributes): + """ + Search nodes in the graph for a specific attribute (or more than one) + """ + return [ + (gp, data) + for gp, data in self.graph.nodes(data=True) + if all(data.get(k) == v for k, v in attributes.items()) + ] + + def find_assignment_recursive(self, request, domain_gp, allocated, depth=0): + """ + Given a "with" section at a particular depth, determine if the request is satsified. + + We call this function recursively until the graph "with" sections terminate. + """ + indent = " " * depth + domain_node = self.graph.nodes[domain_gp] + domain_info = f"{domain_node.get('type')}:{domain_node.get('os_index', domain_node.get('pci_busid', domain_gp))}" + log.debug(f"{indent}[ENTER] find_assignment(req={request}, domain={domain_info})") + + # This can also be gpu/nic + raw_request_type = request["type"] + req_type, count = self.translate_type(raw_request_type), request["count"] + + # If the req_type is gpu or nic, this isn't an actual type in the graph - it is PCIDev. + candidates = self.get_available_children(domain_gp, req_type, allocated) + + # Add GPU and NIC - both have the same structure! + if raw_request_type.lower() in ["gpu", "nic"]: + + # Map the abstract type string to the actual list attribute on the object. + device_list_map = {"gpu": self.gpus, "nic": self.nics} + source_device_list = device_list_map[raw_request_type.lower()] + + # This generic logic now works for both GPUs and NICs. + device_bus_ids = {device[1].get("pci_busid") for device in source_device_list} + candidates = [node for node in candidates if node[1].get("pci_busid") in device_bus_ids] + log.debug( + f"{indent} -> Filtered for '{raw_request_type.lower()}', {len(candidates)} candidates remain." + ) + + log.debug(f"{indent} -> Found {len(candidates)} initial candidates for '{req_type}'.") + + # Affinity is asking to try to be physically close to something. + affinity_spec = request.get("affinity") + if affinity_spec: + affinity_type_from_yaml = affinity_spec.get("type", "").lower() + domain_type_from_hwloc = domain_node.get("type", "").lower() + + # A local affinity search is needed if: + # 1. The affinity type and domain type match exactly (e.g., numanode) + # 2. Or, if the affinity is for a gpu/nic and the domain is a pcidev. + is_local_affinity_target = (affinity_type_from_yaml == domain_type_from_hwloc) or ( + affinity_type_from_yaml in ["gpu", "nic"] and domain_type_from_hwloc == "pcidev" + ) + + if is_local_affinity_target: + log.debug(f"{indent} -> Applying LOCAL affinity to parent domain {domain_info}") + target_gp = domain_gp + decorated = sorted( + [(self.get_distance(c[0], target_gp), c) for c in candidates], + key=lambda x: x[0], + ) + candidates = [item for _, item in decorated] + self.last_affinity_target = (target_gp, domain_node) + else: + log.debug(f"{indent} -> Sorting candidates by GLOBAL affinity to {affinity_spec}") + candidates = self.sort_by_affinity(candidates, affinity_spec, allocated, domain_gp) + + if len(candidates) < count: + log.debug( + f"{indent}[FAIL] Not enough candidates available. Found {len(candidates)}, need {count}." + ) + return None + + for i, combo in enumerate(combinations(candidates, count)): + combo_info = ", ".join( + [f"{d.get('type')}:{d.get('os_index', d.get('pci_busid'))}" for _, d in combo] + ) + log.debug(f"{indent} -> Trying Combination #{i+1}: ({combo_info})") + + path_allocations = allocated.copy() + full_solution_for_combo = list(combo) + for gp, _ in combo: + path_allocations.add(gp) + + all_children_found = True + if "with" in request: + for parent_gp, _ in combo: + for child_req in request["with"]: + child_assignment = self.find_assignment_recursive( + child_req, parent_gp, path_allocations, depth + 1 + ) + if child_assignment is None: + all_children_found = False + break + if not all_children_found: + break + + if all_children_found: + log.debug(f"{indent}[SUCCESS] Combination #{i+1} satisfied all constraints.") + if "with" in request: + temp_alloc = allocated.copy() + for gp, _ in combo: + temp_alloc.add(gp) + for parent_gp, _ in combo: + for child_req in request["with"]: + child_assignment = self.find_assignment_recursive( + child_req, parent_gp, temp_alloc, depth + 1 + ) + if child_assignment: # Ensure we don't extend with None + full_solution_for_combo.extend(child_assignment) + for c_gp, _ in child_assignment: + temp_alloc.add(c_gp) + log.debug(f"{indent}[EXIT] find_assignment -> returning solution") + return full_solution_for_combo + + log.debug(f"{indent}[FAIL] Exhausted all {i+1 if 'i' in locals() else 0} combinations.") + return None + + def get_descendants(self, gp_index, **filters): + """ + Given a global position index, return descendents that match a filter. + """ + if gp_index not in self.graph: + return [] + desc = list(nx.descendants(self.hierarchy_view, gp_index)) + return [ + (di, self.graph.nodes[di]) + for di in desc + if all(self.graph.nodes[di].get(k) == v for k, v in filters.items()) + ] + + def get_ancestor_of_type(self, start_node_gp, ancestor_type): + """ + Given a starting node, return all ancestors of a certain type + """ + current_gp = start_node_gp + + # Walk up the hierarchy tree one parent at a time. + while current_gp in self.hierarchy_view: + # Get the parent (should only be one in a tree) + parents = list(self.hierarchy_view.predecessors(current_gp)) + if not parents: + break + parent_gp = parents[0] + parent_data = self.graph.nodes[parent_gp] + if parent_data.get("type") == ancestor_type: + return (parent_gp, parent_data) + current_gp = parent_gp + return None + + def get_sort_key_for_node(self, leaf_node): + """ + Return tuple sorting key e.g., (0, package_id, core_id) -> e.g., (0, 0, 5) + """ + gp, data = leaf_node + + # TYPE_ORDER: CPU types < PCI types < Other OS types < Nameless types + # This ensures consistent sorting across different object types. + + # Handle CPU-like objects (Cores, PUs) + if data.get("type") in ["Core", "PU"]: + package = self.get_ancestor_of_type(gp, "Package") + package_idx = package[1].get("os_index", -1) if package else -1 + return (0, int(package_idx), int(data.get("os_index", -1))) + + # Handle PCI devices (GPUs, NICs) + elif "pci_busid" in data: + try: + # Convert '0000:c4:00.0' into a sortable tuple of integers + pci_tuple = tuple( + int(p, 16) for p in data["pci_busid"].replace(":", ".").split(".") + ) + # Returns (1, (0, 196, 0, 0)) + return (1, pci_tuple) + except (ValueError, TypeError): + # Fallback for weirdly formatted pci_busid + return (1, data["pci_busid"]) + + # Handle other objects with an os_index (like NUMANodes, if they were leaves) + elif "os_index" in data: + # Returns (2, os_index) -> e.g., (2, 1) + return (2, int(data.get("os_index", -1))) + + # Fallback for any other object type + else: + # Returns (3, gp_index) as a last resort for stable sorting + return (3, gp) + + def get_distance(self, node1_gp, node2_gp): + node1, node2 = self.graph.nodes.get(node1_gp), self.graph.nodes.get(node2_gp) + if not node1 or not node2: + return float("inf") + numa1, numa2 = node1.get("numa_os_index"), node2.get("numa_os_index") + try: + if numa1 is None or numa2 is None or numa1 == numa2: + lca = nx.lowest_common_ancestor(self.hierarchy_view, node1_gp, node2_gp) + return nx.shortest_path_length( + self.hierarchy_view, lca, node1_gp + ) + nx.shortest_path_length(self.hierarchy_view, lca, node2_gp) + else: + numa1_gp = self.find_objects(type="NUMANode", os_index=numa1)[0][0] + numa2_gp = self.find_objects(type="NUMANode", os_index=numa2)[0][0] + return ( + self.get_distance(node1_gp, numa1_gp) + + self.graph.edges[numa1_gp, numa2_gp].get("weight", float("inf")) + + self.get_distance(node2_gp, numa2_gp) + ) + except (nx.NetworkXError, KeyError, IndexError): + return float("inf") + + def create_ordered_gpus(self): + """ + Creates a deterministically sorted list of GPU information, which is + essential for assigning GPUs to ranks consistently. This version correctly + handles the enriched graph's (gp, data) tuple format. + """ + # self.gpus is a list of (gp, data) tuples populated by discover_devices. + # This list is not guaranteed to be in a stable order. + gpus_to_sort = [] + for _, data in self.gpus: + numa_idx = data.get("numa_os_index") + pci_id = data.get("pci_busid") + + # A GPU might not have a NUMA index if hwloc-calc failed for it. + # We must handle this gracefully for sorting (a large inf places it at end) + if numa_idx is None: + log.warning( + f"GPU {pci_id} is missing NUMA affinity information. It will be sorted last." + ) + numa_idx = float("inf") + + # The GPUAssignment class expects a list of simple dictionaries. + gpus_to_sort.append({"pci_id": pci_id, "numa_index": numa_idx}) + + # Sort the list to create a stable, deterministic order. + # The primary sort key is the NUMA node index. This groups GPUs by locality. + # The secondary sort key is the PCI bus ID. This ensures a consistent + # order for GPUs within the same NUMA node. + self.ordered_gpus = sorted(gpus_to_sort, key=lambda g: (g["numa_index"], g["pci_id"])) + log.info(f"Created an ordered list of {len(self.ordered_gpus)} GPUs for assignment.") + + def find_bindable_leaves(self, total_allocation, bind_level): + """ + Transforms a list of allocated resources into a final list of bindable + nodes by first choosing a strategy based on the allocation contents, + then executing that single, correct strategy. + """ + leaf_nodes = [] + log.debug( + f"Transforming {len(total_allocation)} allocated objects to bind_level '{bind_level}'..." + ) + + bind_type_concrete = self.translate_type(bind_level) + + # Check for high-level structural containers. Their presence dictates the entire strategy. + high_level_containers = [ + node for node in total_allocation if node[1]["type"] in ["Package", "NUMANode"] + ] + + if high_level_containers: + # If we find a Package or NUMANode, we IGNORE all other items in the allocation + # and bind to the contents of this container ONLY. We have to do this because + # hwloc-calc can report that some CPU/PU are closer to the OTHER Numa node, or + # in other words, the physical layout of the xml != what hwloc-calc reports. + # So here we use get_ancestor_of_type to JUST use the hardware layout (which + # is more predictable). + container_gp, container_data = high_level_containers[0] + container_type = container_data.get("type") + log.debug( + f"High-level container '{container_type}' found. Binding exclusively to its physical contents." + ) + package_gp = ( + container_gp + if container_type == "Package" + else self.get_ancestor_of_type(container_gp, "Package")[0] + ) + if package_gp: + leaf_nodes = self.get_descendants(package_gp, type=bind_type_concrete) + else: + # No high-level containers - we can safely process each object individually. + # This is the logic that correctly handles the simple Core, PU, and device-affinity tests. + log.debug( + "No high-level containers found. Processing each allocated object individually." + ) + for gp, data in total_allocation: + + # Case 1: The object is already the type we want to bind to. + if data.get("type") == bind_type_concrete: + leaf_nodes.append((gp, data)) + continue + + # Case 2 (Container): Must be a low-level container (Core or PCIDev). + descendants = self.get_descendants(gp, type=bind_type_concrete) + if descendants: + leaf_nodes.extend(descendants) + continue + + # Case 2c (Child): The object is a child of the target type (e.g., PU -> Core). + ancestor = self.get_ancestor_of_type(gp, bind_type_concrete) + if ancestor: + leaf_nodes.append(ancestor) + + # De-duplicate the final list and sort for deterministic assignment. + unique_nodes = list({gp: (gp, data) for gp, data in leaf_nodes}.values()) + unique_nodes.sort(key=self.get_sort_key_for_node) + + log.debug(f"Transformation resulted in {len(unique_nodes)} unique bindable leaf nodes.") + return unique_nodes diff --git a/fluxbind/graph/graphic.py b/fluxbind/graph/graphic.py new file mode 100644 index 0000000..b5ec548 --- /dev/null +++ b/fluxbind/graph/graphic.py @@ -0,0 +1,98 @@ +import logging + +import networkx as nx + +try: + import matplotlib.pyplot as plt + import pydot + + VISUALIZATION_ENABLED = True +except ImportError: + VISUALIZATION_ENABLED = False + +log = logging.getLogger(__name__) + + +class TopologyVisualizer: + """ + Creates a visual representation of a hardware allocation on a topology graph. + This version draws the ENTIRE hardware topology as a stable background. + """ + + def __init__(self, topology, assigned_nodes: list, affinity_target=None): + if not VISUALIZATION_ENABLED: + raise ImportError("Visualization libraries (matplotlib, pydot) are not installed.") + + self.topology = topology + self.assigned_gps = {gp for gp, _ in assigned_nodes} + self.affinity_target_gp = affinity_target[0] if affinity_target else None + self.title = "Hardware Allocation" + + def draw(self, filename, width=12, height=8): + """ + Generates and saves the allocation graph to a file, drawing the + entire topology and highlighting the assigned resources. + """ + log.info(f"Generating allocation graphic at '{filename}'...") + + # 1. Start with the complete hierarchy view as the base. + subgraph = self.topology.hierarchy_view + + # 2. Create a clean copy of the graph to avoid modifying the original. + clean_subgraph = subgraph.copy() + + # 3. Iterate through all nodes in the copy and remove the conflicting 'name' attribute. + # This is a known issue when interfacing networkx with pydot. + for _, data in clean_subgraph.nodes(data=True): + if "name" in data: + del data["name"] + + # All subsequent drawing operations will use this sanitized graph copy. + subgraph = clean_subgraph + + # Make it pink! Err, green and blue and gray... :) + labels, colors = {}, {} + for g, d in subgraph.nodes(data=True): + labels[g] = f"{d.get('type')}" + if "os_index" in d: + labels[g] += f":{d['os_index']}" + elif "pci_busid" in d: + labels[g] += f"\n{d.get('pci_busid', '')[:10]}" + + if g == self.affinity_target_gp: + colors[g] = "gold" + elif g in self.assigned_gps: + node_type_key = d.get("device_type") or d.get("type") + color_map = { + "gpu": "orange", + "nic": "violet", + "Core": "lightgreen", + "PU": "lightgreen", + } + colors[g] = color_map.get(node_type_key, "lightgreen") + elif d["type"] == "NUMANode": + colors[g] = "skyblue" + else: + colors[g] = "lightgray" + + node_colors = [colors.get(node, "lightgray") for node in subgraph.nodes()] + edge_colors = ["black" for _, _, d in subgraph.edges(data=True)] + + try: + pos = nx.drawing.nx_pydot.graphviz_layout(subgraph, prog="dot") + except Exception as e: + log.warning(f"graphviz_layout failed: {e}. Falling back to a simpler layout.") + pos = nx.spring_layout(subgraph, seed=42) + + plt.figure(figsize=(width, height)) + nx.draw_networkx_nodes( + subgraph, pos, node_color=node_colors, node_size=1500, edgecolors="black" + ) + nx.draw_networkx_edges(subgraph, pos, edge_color=edge_colors, arrows=False, width=1.0) + nx.draw_networkx_labels(subgraph, pos, labels=labels, font_size=8) + + plt.title(self.title, fontsize=20) + plt.box(False) + plt.savefig(filename, bbox_inches="tight") + plt.close() + log.info("...graphic saved successfully.") diff --git a/fluxbind/graph/shape.py b/fluxbind/graph/shape.py new file mode 100644 index 0000000..f344b4b --- /dev/null +++ b/fluxbind/graph/shape.py @@ -0,0 +1,344 @@ +import logging +import sys +from dataclasses import dataclass + +import fluxbind.shape.commands as commands +import fluxbind.utils as utils +from fluxbind.graph.graph import HwlocTopology +from fluxbind.graph.graphic import TopologyVisualizer +from fluxbind.shape.gpu import GPUAssignment + +log = logging.getLogger(__name__) + + +@dataclass +class TopologyResult: + """ + A simple dataclass to hold the results of an allocation run. + """ + + topo: HwlocTopology = None + nodes: list = None + mask: str = None + gpu_string: str = "NONE" + + +class Shape: + """ + Finds a hardware binding for a single task by interpreting a shapefile + according to a hierarchy of rules. This class implements the "pool division" + model, where a total set of resources on a node is divided among the + local tasks. + """ + + valid_bind_modes = ["core", "pu", "process", "none", "gpu-local", "gpu-remote"] + bind_default = "core" + + def __init__(self, jobspec, debug=False): + """ + Initializes the Shape object, parsing the jobspec and any binding options. + """ + self._setup_logging(debug) + self.load(jobspec) + self.hwloc_calc = commands.HwlocCalcCommand() + self.set_bind_preference() + + def load(self, jobspec): + """ + Load the jobspec, or if already loaded, just set. + """ + if isinstance(jobspec, dict): + self.jobspec = jobspec + else: + self.jobspec = utils.read_yaml(jobspec) + + def set_bind_preference(self): + """ + Get the binding preference. + """ + options = self.jobspec.get("options", {}) + bind_mode = options.get("bind") + if bind_mode: + bind_mode = bind_mode.lower() + if bind_mode not in self.valid_bind_modes: + raise ValueError( + f"Invalid 'bind' option: {bind_mode}. Must be one of {self.valid_bind_modes}." + ) + if bind_mode == "process": + bind_mode = "pu" + self.bind_mode = bind_mode + + def _setup_logging(self, debug=False): + """ + Setup logging, honoring debug if user provides from client. + """ + logging.basicConfig( + level=logging.DEBUG if debug else logging.INFO, + format="[%(levelname)s] %(message)s", + force=True, + ) + + def get_binding_for_rank( + self, + rank, + local_rank, + local_size, + gpus_per_task=None, + xml_file=None, + graphic=None, + **kwargs, + ): + """ + Main entrypoint. Calculates a binding for a rank given the job geometry. + Unused arguments like 'node_id' are captured by kwargs and discarded. + TODO: we should make everything lower() from the getgo. + """ + log.info( + f"Processing request for global_rank={rank}, with local_rank={local_rank} of {local_size}" + ) + if local_rank >= local_size: + raise ValueError(f"local-rank ({local_rank}) must be < local_size ({local_size}).") + + mapping = self.run(xml_file, local_size, local_rank, gpus_per_task) + + if graphic and mapping.nodes: + visualizer = TopologyVisualizer( + mapping.topo, mapping.nodes, affinity_target=mapping.topo.last_affinity_target + ) + visualizer.draw(graphic) + + final_output = f"{mapping.mask};{mapping.gpu_string}" + log.info( + f"\nSUCCESS! The final cpuset binding mask for rank {local_rank} is: {final_output}" + ) + + if mapping.mask: + print(final_output) + sys.exit(0 if mapping.mask else 1) + + def get_bind_type(self, total_allocation): + """ + Finds a binding by interpreting a shapefile according to a hierarchy of rules: + 1. Explicit options.bind in the shapefile. + 2. Implicit intent from the most granular resource type requested. + 3. A default of core for HPC. + """ + # See https://gist.github.com/vsoch/be2d1ec712e33ec157bab2dc9a36b10a + # User explicit preference takes priority. We check here because the gpu-* types + # cannot trigger here. + if self.bind_mode in ["core", "pu", "none"]: + log.info( + f"Using explicit binding preference from shapefile options: '{self.bind_mode}'." + ) + return self.bind_mode + + # Try to infer implicit intent + if total_allocation: + most_granular_type = ( + max(total_allocation, key=lambda item: item[1].get("depth", -1))[1] + .get("type") + .lower() + ) + if most_granular_type in ["core", "pu"]: + log.info( + f"Using implicit binding preference from resource request: '{most_granular_type}'." + ) + return most_granular_type + + # Fall back to a safe default for HPC. + log.info("No explicit or implicit preference. Using safe HPC default: 'core'.") + return self.bind_default + + def get_gpu_binding(self, topology, local_rank, gpus_per_task): + """ + Handles GPU-specific binding logic. + + This method determines which GPUs are assigned to the current rank and then + calculates the appropriate CPU search domain (a set of parent Packages) + based on the GPU's NUMA locality. + + Returns: + A tuple containing the GPUAssignment object and a set of graph pointers + to the Package(s) that should be used for the CPU search. + """ + gpus_per_task = gpus_per_task or 0 + if gpus_per_task <= 0: + raise ValueError(f"'bind: {self.bind_mode}' requires --gpus-per-task to be > 0.") + + # This uses the dataclass you provided to get our rank's slice of GPUs. + gpu_assignment = GPUAssignment.for_rank(local_rank, gpus_per_task, topology.ordered_gpus) + + # Determine the target NUMA domains based on the bind mode. + if self.bind_mode == "gpu-local": + log.info( + f"Binding to CPUs local to assigned GPUs (NUMA domains: {list(gpu_assignment.numa_indices)})." + ) + target_numa_indices = gpu_assignment.numa_indices + + else: # gpu-remote + all_numa_indices = { + data.get("os_index") for _, data in topology.find_objects(type="NUMANode") + } + remote_numa_indices = all_numa_indices - gpu_assignment.numa_indices + + if not remote_numa_indices: + raise RuntimeError( + f"Cannot find a remote NUMA node for rank {local_rank}; assigned GPUs span all NUMA domains." + ) + + # For simplicity, we target the first available remote NUMA domain. + target_numa_indices = {sorted(list(remote_numa_indices))[0]} + log.info( + f"Binding to CPUs remote to assigned GPUs (target NUMA domains: {list(target_numa_indices)})." + ) + + # Find the graph pointers for the NUMA objects corresponding to our target indices. + domain_numa_gps = { + gp + for gp, data in topology.find_objects(type="NUMANode") + if data.get("os_index") in target_numa_indices + } + + # Now, find the parent Packages of these NUMA domains to define the final CPU search space. + cpu_binding_domain_gps = set() + for numa_gp in domain_numa_gps: + package = topology.get_ancestor_of_type(numa_gp, "Package") + if package: + cpu_binding_domain_gps.add(package[0]) + + if not cpu_binding_domain_gps: + raise RuntimeError(f"Could not find a parent Package for the target NUMA domains.") + + return gpu_assignment, cpu_binding_domain_gps + + def run(self, xml_file, local_size, local_rank, gpus_per_task=None): + """ + Finds a binding by applying the hierarchy of rules to the shapefile and topology. + This is the definitive version, returning a single, complete list of assigned nodes. + """ + topology = HwlocTopology(xml_file) + gpu_assignment = None + total_allocation = None + + if self.bind_mode in ["gpu-local", "gpu-remote"]: + gpu_assignment, cpu_domain_gps = self.get_gpu_binding( + topology, local_rank, gpus_per_task + ) + total_allocation = [(gp, topology.graph.nodes[gp]) for gp in cpu_domain_gps] + else: + log.info(f"Finding the total resource pool for all {local_size} ranks on this node...") + total_allocation = topology.match_resources(self.jobspec) + + if total_allocation is None: + raise RuntimeError( + "Failed to find any resources on the node that match the requested shape." + ) + + # Determine the target binding level. No binding? Then we probably just wanted GPUs. + bind_level = self.get_bind_type(total_allocation) + if bind_level == "none": + return TopologyResult( + nodes=total_allocation, + mask="UNBOUND", + gpu_string=gpu_assignment.cuda_devices if gpu_assignment else "NONE", + ) + + # Change the allocation into a list of bindable nodes. + log.info(f"Deriving bindable resources with final preference: '{bind_level}'.") + leaf_nodes = topology.find_bindable_leaves(total_allocation, bind_level) + if not leaf_nodes: + raise RuntimeError( + f"Could not find any bindable resources of type '{bind_level}' for the allocation." + ) + + # Apply a pattern of distribution (e.g., packed/scatter). + final_nodes = self.apply_binding_pattern(leaf_nodes, local_size, local_rank) + log.info( + f"\nAssigning {len(final_nodes)} '{bind_level}' resources for local rank {local_rank}:" + ) + if not final_nodes: + return TopologyResult(nodes=[], mask="0x0", topo=topology) + + topology.summarize(final_nodes) + + # Now we need the actual cpusets which is what does the binding. + cpusets = [] + if bind_level == "pu": + cpusets = [d["cpuset"] for _, d in final_nodes if "cpuset" in d] + elif bind_level == "core": + for core_gp, _ in final_nodes: + pus = sorted( + topology.get_descendants(core_gp, type=topology.translate_type("pu")), + key=lambda x: x[1].get("os_index", 0), + ) + if pus: + first_pu_cpuset = pus[0][1].get("cpuset") + if first_pu_cpuset: + cpusets.append(first_pu_cpuset) + raw_mask = self.hwloc_calc.get_cpuset(" ".join(cpusets)) if cpusets else "0x0" + mask = raw_mask.replace(",,", ",") + + # We need to add devices (GPU,NIC) to the final nodes. + # This is mostly for the graphic visualization + assigned_devices = [] + if self.bind_mode in ["gpu-local", "gpu-remote"]: + + # For GPU modes, the assigned GPUs are in the `gpu_assignment` object. + if gpu_assignment and gpu_assignment.pci_ids: + for pci_id in gpu_assignment.pci_ids: + matches = topology.find_objects(type="PCIDev", pci_busid=pci_id.lower()) + if matches: + assigned_devices.extend(matches) + else: + # For CPU-driven jobs, the assigned devices are any PCIDevs part of the allocation. + assigned_devices = [ + node for node in total_allocation if node[1].get("type") == "PCIDev" + ] + + # THE FINAL LIST dun dun dun + all_assigned_nodes = final_nodes + assigned_devices + + return TopologyResult( + # This contains both CPUs and Devices + nodes=all_assigned_nodes, + mask=mask, + topo=topology, + gpu_string=gpu_assignment.cuda_devices if gpu_assignment else "NONE", + ) + + def apply_binding_pattern(self, leaf_nodes, local_size, local_rank): + """ + Given a set of chosen leaf nodes (typicall Core/PU) apply a binding pattern. + + The binding pattern is an option in the jobspec. + """ + main_request = self.jobspec.get("resources", [{}])[0] + pattern = main_request.get("pattern", "packed").lower() + reverse = main_request.get("reverse", False) + log.info(f"Applying distribution pattern: '{pattern}' (reverse={reverse}).") + if reverse: + leaf_nodes.reverse() + + # This block is correct. It divides the pool and finds the CPU slice for this rank. + items_per_rank = len(leaf_nodes) // local_size + + # This will hold the assigned CPUs. + final_nodes = [] + if items_per_rank == 0 and local_size > 0: + if local_rank < len(leaf_nodes): + final_nodes = [leaf_nodes[local_rank]] + + # Pack em up! Like little weiner hotdogs in plastic! + elif pattern == "packed": + start_index = local_rank * items_per_rank + end_index = start_index + items_per_rank + final_nodes = leaf_nodes[start_index:end_index] + + # I think interleaved is a little different, but I feel lazy right + # now and don't want to think about it. + elif pattern in ["scatter", "spread", "interleaved"]: + strided_slice = leaf_nodes[local_rank::local_size] + final_nodes = strided_slice[:items_per_rank] + else: + raise ValueError(f"Unknown pattern '{pattern}'.") + return final_nodes diff --git a/fluxbind/graph/worker.py b/fluxbind/graph/worker.py new file mode 100644 index 0000000..a479b77 --- /dev/null +++ b/fluxbind/graph/worker.py @@ -0,0 +1,77 @@ +import logging +import multiprocessing +import subprocess +from concurrent.futures import ThreadPoolExecutor, as_completed + +log = logging.getLogger(__name__) + + +def get_numa_affinity(gp_index, data): + """ + Worker function to get NUMA affinity for an object of interest. + E.g., the data is from hwloc. It will have a type and/or pci_busis. + If it has a type, we can use the os_index to get distance. Note that + this is functionally and numerically correct, but in practice we can + sometimes see that a Core is reported closer to ANOTHER NUMA node. This + becomes a choice of predictibility (trust the hardware layout, the hwloc + xml) vs. peak performance (trust the output of hwloc calc). I think + for a tool like this we need to trust predictibility. + """ + hwloc_obj_str = "" + if data.get("type") in ["Core", "PU", "NUMANode"] and "os_index" in data: + hwloc_obj_str = f"{data['type'].lower()}:{data['os_index']}" + elif data.get("pci_busid"): + hwloc_obj_str = f"pci={data['pci_busid']}" + + if not hwloc_obj_str: + return None + + try: + cmd = ["hwloc-calc", hwloc_obj_str, "--nodelist"] + result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=5) + output = result.stdout.strip() + + if ":" in output: + # Handles "numa:0", "node:0", etc. + numa_str = output.split(":")[-1] + else: + # Handles just "0" + numa_str = output + + if not numa_str: + log.debug( + f"hwloc-calc for {hwloc_obj_str} gave empty nodelist string: '{result.stdout}'" + ) + return None + + numa_index = int(numa_str) + return gp_index, numa_index + + except ( + subprocess.CalledProcessError, + FileNotFoundError, + ValueError, + IndexError, + subprocess.TimeoutExpired, + ) as e: + # This will now catch genuine errors, not simple parsing failures. + log.debug(f"hwloc-calc failed for {hwloc_obj_str}: {e}") + return None + + +class AffinityCalculator: + def __init__(self, max_workers=None): + self.max_workers = max_workers or int(multiprocessing.cpu_count() / 2) + + def calculate_numa_affinity(self, objects): + """ + Calculate NUMA affinities for objects to locate using a thread pool executor. + This runs SO much faster like this than in serial! + """ + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_gp = { + executor.submit(get_numa_affinity, gp, data): gp for gp, data in objects + } + for future in as_completed(future_to_gp): + if result := future.result(): + yield result diff --git a/fluxbind/scripts/run_mapping.sh b/fluxbind/scripts/run_mapping.sh index c6a4709..8f9ea54 100644 --- a/fluxbind/scripts/run_mapping.sh +++ b/fluxbind/scripts/run_mapping.sh @@ -11,9 +11,17 @@ fi rank=${FLUX_TASK_RANK:-0} local_rank=${FLUX_TASK_LOCAL_ID:-0} node=$(hostname) +total_ranks=${FLUX_JOB_SIZE:-1} +total_nodes=${FLUX_JOB_NNODES:-1} # Get the logical node id using the arithmetic approach node_id=$(flux job taskmap --nodeid=${rank} ${FLUX_JOB_ID}) +local_size=$(flux job taskmap --ntasks=${node_id} ${FLUX_JOB_ID}) + +if [ -z "$rank" ] || [ -z "$local_rank" ] || [ -z "$node_id" ]; then + echo "Error: Required job task environment variables are not set." >&2 + exit 1 +fi # The user provides the path to the shape file in the environment. if [ -z "$JOB_SHAPE_FILE" ]; then @@ -21,13 +29,25 @@ if [ -z "$JOB_SHAPE_FILE" ]; then exit 1 fi +# If we want to use the graph parser +grapharg="" +if [ ! -z "$FLUXBIND_GRAPH" ]; then + grapharg="--graph" +fi + gpus_per_task=${GPUS_PER_TASK:-0} # Call the fluxbind helper script to get the target location string (e.g., "core:0" or "UNBOUND") # It ALWAYS returns a single line in the format: BIND_LOCATION,CUDA_DEVICE_ID # For CPU jobs, CUDA_DEVICE_ID will be the string "NONE". -echo fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank" --gpus-per-task "$gpus_per_task" -BIND_INFO=$(fluxbind shape --file "$JOB_SHAPE_FILE" --rank "$rank" --node-id "$node_id" --local-rank "$local_rank" --gpus-per-task "$gpus_per_task") +echo "Executing: fluxbind shape --file \"$JOB_SHAPE_FILE\" --rank \"$rank\" --node-id \"$node_id\" --local-rank \"$local_rank\" --local-size \"$local_size\" --gpus-per-task \"$gpus_per_task\" $grapharg" +BIND_INFO=$(fluxbind shape --file "$JOB_SHAPE_FILE" \ + --rank "$rank" \ + --node-id "$node_id" \ + --local-rank "$local_rank" \ + --local-size "$local_size" \ + --nodes "$total_nodes" \ + --gpus-per-task "$gpus_per_task" $grapharg) echo # Exit if the helper script failed diff --git a/fluxbind/shape/commands.py b/fluxbind/shape/commands.py index 3aa568a..911416f 100644 --- a/fluxbind/shape/commands.py +++ b/fluxbind/shape/commands.py @@ -1,6 +1,5 @@ import json import subprocess -import sys class Command: @@ -25,6 +24,13 @@ def run(self, command, shell: bool = False): raise RuntimeError(f"Command not found: '{cmd_str}'") from e +class LstopoCommand(Command): + name = "lstopo" + + def get_xml(self): + return self.run(f"{self.name} -p --output-format xml", shell=True) + + class HwlocCalcCommand(Command): name = "hwloc-calc" @@ -164,3 +170,4 @@ def get_pci_bus_ids(self) -> list[str]: hwloc_calc = HwlocCalcCommand() nvidia_smi = NvidiaSmiCommand() rocm_smi = RocmSmiCommand() +lstopo = LstopoCommand() diff --git a/fluxbind/shape/shape.py b/fluxbind/shape/shape.py index 462d263..3ea9fc4 100644 --- a/fluxbind/shape/shape.py +++ b/fluxbind/shape/shape.py @@ -239,7 +239,7 @@ def get_binding_in_gpu_domain( else: raise ValueError(f"Unsupported type '{hwloc_type}' for GPU binding.") - def get_binding_for_rank(self, rank, node_id, local_rank, gpus_per_task=None) -> str: + def get_binding_for_rank(self, rank, node_id, local_rank, gpus_per_task=None, **kwargs) -> str: """ The main method to get the final hwloc binding string for a process. diff --git a/fluxbind/utils/fileio.py b/fluxbind/utils/fileio.py index 6c2d982..b88df88 100644 --- a/fluxbind/utils/fileio.py +++ b/fluxbind/utils/fileio.py @@ -5,6 +5,7 @@ import stat import subprocess import tempfile +import xml.etree.ElementTree as ET from contextlib import contextmanager import yaml @@ -17,6 +18,22 @@ def get_local_cluster(): return platform.node().split("-")[0] +def read_xml(xml_input): + """ + Read an xml file or string + """ + try: + if os.path.exists(xml_input): + with open(xml_input, "r") as f: + xml_content = f.read() + root = ET.fromstring(xml_content) + else: + root = ET.fromstring(xml_input) + except ET.ParseError as e: + raise ValueError(f"Failed to parse XML: {e}") + return root + + def read_json(filename): """ Read json from file diff --git a/fluxbind/version.py b/fluxbind/version.py index 9a7d85d..b014b31 100644 --- a/fluxbind/version.py +++ b/fluxbind/version.py @@ -14,6 +14,7 @@ # Note that the spack / environment modules plugins are installed automatically. # This doesn't need to be the case. INSTALL_REQUIRES = ( + ("networkx", {"min_version": None}), ("jsonschema", {"min_version": None}), ("Jinja2", {"min_version": None}), # Yeah, probably overkill, just being used for printing the scripts diff --git a/tests/corona.xml b/tests/corona.xml new file mode 100644 index 0000000..bde44b2 --- /dev/null +++ b/tests/corona.xml @@ -0,0 +1,1133 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 0 1 + 10 32 32 10 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/img/gpu/01_gpu_local_rank0.png b/tests/img/gpu/01_gpu_local_rank0.png new file mode 100644 index 0000000..ea64844 Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank0.png differ diff --git a/tests/img/gpu/01_gpu_local_rank1.png b/tests/img/gpu/01_gpu_local_rank1.png new file mode 100644 index 0000000..ad9b1d8 Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank1.png differ diff --git a/tests/img/gpu/01_gpu_local_rank2.png b/tests/img/gpu/01_gpu_local_rank2.png new file mode 100644 index 0000000..e8cb82b Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank2.png differ diff --git a/tests/img/gpu/01_gpu_local_rank3.png b/tests/img/gpu/01_gpu_local_rank3.png new file mode 100644 index 0000000..e1fc51c Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank3.png differ diff --git a/tests/img/gpu/01_gpu_local_rank4.png b/tests/img/gpu/01_gpu_local_rank4.png new file mode 100644 index 0000000..ff97f46 Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank4.png differ diff --git a/tests/img/gpu/01_gpu_local_rank5.png b/tests/img/gpu/01_gpu_local_rank5.png new file mode 100644 index 0000000..23bb850 Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank5.png differ diff --git a/tests/img/gpu/01_gpu_local_rank6.png b/tests/img/gpu/01_gpu_local_rank6.png new file mode 100644 index 0000000..2c3a95a Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank6.png differ diff --git a/tests/img/gpu/01_gpu_local_rank7.png b/tests/img/gpu/01_gpu_local_rank7.png new file mode 100644 index 0000000..d56cd59 Binary files /dev/null and b/tests/img/gpu/01_gpu_local_rank7.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank0.png b/tests/img/gpu/02_gpu_remote_rank0.png new file mode 100644 index 0000000..39e1b2b Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank0.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank1.png b/tests/img/gpu/02_gpu_remote_rank1.png new file mode 100644 index 0000000..b0c1fa8 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank1.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank2.png b/tests/img/gpu/02_gpu_remote_rank2.png new file mode 100644 index 0000000..61e4d85 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank2.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank3.png b/tests/img/gpu/02_gpu_remote_rank3.png new file mode 100644 index 0000000..ed45a56 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank3.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank4.png b/tests/img/gpu/02_gpu_remote_rank4.png new file mode 100644 index 0000000..be00180 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank4.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank5.png b/tests/img/gpu/02_gpu_remote_rank5.png new file mode 100644 index 0000000..748e3ef Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank5.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank6.png b/tests/img/gpu/02_gpu_remote_rank6.png new file mode 100644 index 0000000..6852100 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank6.png differ diff --git a/tests/img/gpu/02_gpu_remote_rank7.png b/tests/img/gpu/02_gpu_remote_rank7.png new file mode 100644 index 0000000..33f85f2 Binary files /dev/null and b/tests/img/gpu/02_gpu_remote_rank7.png differ diff --git a/tests/img/gpu/03_contextual_affinity.png b/tests/img/gpu/03_contextual_affinity.png new file mode 100644 index 0000000..c52cb43 Binary files /dev/null and b/tests/img/gpu/03_contextual_affinity.png differ diff --git a/tests/img/gpu/04_gpu_and_nic.png b/tests/img/gpu/04_gpu_and_nic.png new file mode 100644 index 0000000..0f5f8f5 Binary files /dev/null and b/tests/img/gpu/04_gpu_and_nic.png differ diff --git a/tests/img/gpu/05_multi_gpu_rank0.png b/tests/img/gpu/05_multi_gpu_rank0.png new file mode 100644 index 0000000..631be1d Binary files /dev/null and b/tests/img/gpu/05_multi_gpu_rank0.png differ diff --git a/tests/img/gpu/05_multi_gpu_rank1.png b/tests/img/gpu/05_multi_gpu_rank1.png new file mode 100644 index 0000000..d35340d Binary files /dev/null and b/tests/img/gpu/05_multi_gpu_rank1.png differ diff --git a/tests/img/gpu/05_multi_gpu_rank2.png b/tests/img/gpu/05_multi_gpu_rank2.png new file mode 100644 index 0000000..437fd90 Binary files /dev/null and b/tests/img/gpu/05_multi_gpu_rank2.png differ diff --git a/tests/img/gpu/05_multi_gpu_rank3.png b/tests/img/gpu/05_multi_gpu_rank3.png new file mode 100644 index 0000000..8ece026 Binary files /dev/null and b/tests/img/gpu/05_multi_gpu_rank3.png differ diff --git a/tests/img/single-node/01_simple_cores.gif b/tests/img/single-node/01_simple_cores.gif new file mode 100644 index 0000000..40481b1 Binary files /dev/null and b/tests/img/single-node/01_simple_cores.gif differ diff --git a/tests/img/single-node/01_simple_cores_rank0.png b/tests/img/single-node/01_simple_cores_rank0.png new file mode 100644 index 0000000..4fa27f9 Binary files /dev/null and b/tests/img/single-node/01_simple_cores_rank0.png differ diff --git a/tests/img/single-node/01_simple_cores_rank1.png b/tests/img/single-node/01_simple_cores_rank1.png new file mode 100644 index 0000000..9be55d9 Binary files /dev/null and b/tests/img/single-node/01_simple_cores_rank1.png differ diff --git a/tests/img/single-node/02_explicit_pu_rank0.png b/tests/img/single-node/02_explicit_pu_rank0.png new file mode 100644 index 0000000..a116cda Binary files /dev/null and b/tests/img/single-node/02_explicit_pu_rank0.png differ diff --git a/tests/img/single-node/02_explicit_pu_rank1.png b/tests/img/single-node/02_explicit_pu_rank1.png new file mode 100644 index 0000000..23066ab Binary files /dev/null and b/tests/img/single-node/02_explicit_pu_rank1.png differ diff --git a/tests/img/single-node/02_explicit_rank.gif b/tests/img/single-node/02_explicit_rank.gif new file mode 100644 index 0000000..3d5b755 Binary files /dev/null and b/tests/img/single-node/02_explicit_rank.gif differ diff --git a/tests/img/single-node/03_implicit_core.png b/tests/img/single-node/03_implicit_core.png new file mode 100644 index 0000000..02f3845 Binary files /dev/null and b/tests/img/single-node/03_implicit_core.png differ diff --git a/tests/img/single-node/04_default_core_container.png b/tests/img/single-node/04_default_core_container.png new file mode 100644 index 0000000..387e1f9 Binary files /dev/null and b/tests/img/single-node/04_default_core_container.png differ diff --git a/tests/img/single-node/06_scatter.gif b/tests/img/single-node/06_scatter.gif new file mode 100644 index 0000000..c72161f Binary files /dev/null and b/tests/img/single-node/06_scatter.gif differ diff --git a/tests/img/single-node/06_scatter_rank0.png b/tests/img/single-node/06_scatter_rank0.png new file mode 100644 index 0000000..e7c9aee Binary files /dev/null and b/tests/img/single-node/06_scatter_rank0.png differ diff --git a/tests/img/single-node/06_scatter_rank1.png b/tests/img/single-node/06_scatter_rank1.png new file mode 100644 index 0000000..f1385f0 Binary files /dev/null and b/tests/img/single-node/06_scatter_rank1.png differ diff --git a/tests/img/single-node/06_scatter_rank2.png b/tests/img/single-node/06_scatter_rank2.png new file mode 100644 index 0000000..a6c083b Binary files /dev/null and b/tests/img/single-node/06_scatter_rank2.png differ diff --git a/tests/img/single-node/06_scatter_rank3.png b/tests/img/single-node/06_scatter_rank3.png new file mode 100644 index 0000000..13abece Binary files /dev/null and b/tests/img/single-node/06_scatter_rank3.png differ diff --git a/tests/img/single-node/07_reverse_rank.gif b/tests/img/single-node/07_reverse_rank.gif new file mode 100644 index 0000000..25ff595 Binary files /dev/null and b/tests/img/single-node/07_reverse_rank.gif differ diff --git a/tests/img/single-node/07_reverse_rank0.png b/tests/img/single-node/07_reverse_rank0.png new file mode 100644 index 0000000..f612648 Binary files /dev/null and b/tests/img/single-node/07_reverse_rank0.png differ diff --git a/tests/img/single-node/07_reverse_rank1.png b/tests/img/single-node/07_reverse_rank1.png new file mode 100644 index 0000000..6bd88a2 Binary files /dev/null and b/tests/img/single-node/07_reverse_rank1.png differ diff --git a/tests/img/single-node/08_explicit_core_rank.gif b/tests/img/single-node/08_explicit_core_rank.gif new file mode 100644 index 0000000..628528e Binary files /dev/null and b/tests/img/single-node/08_explicit_core_rank.gif differ diff --git a/tests/img/single-node/08_explicit_core_rank0.png b/tests/img/single-node/08_explicit_core_rank0.png new file mode 100644 index 0000000..5901900 Binary files /dev/null and b/tests/img/single-node/08_explicit_core_rank0.png differ diff --git a/tests/img/single-node/08_explicit_core_rank1.png b/tests/img/single-node/08_explicit_core_rank1.png new file mode 100644 index 0000000..f3f2c5c Binary files /dev/null and b/tests/img/single-node/08_explicit_core_rank1.png differ diff --git a/tests/img/single-node/09_explicit_pu_reverse_rank0.png b/tests/img/single-node/09_explicit_pu_reverse_rank0.png new file mode 100644 index 0000000..66a0c67 Binary files /dev/null and b/tests/img/single-node/09_explicit_pu_reverse_rank0.png differ diff --git a/tests/img/single-node/09_explicit_pu_reverse_rank1.png b/tests/img/single-node/09_explicit_pu_reverse_rank1.png new file mode 100644 index 0000000..c720300 Binary files /dev/null and b/tests/img/single-node/09_explicit_pu_reverse_rank1.png differ diff --git a/tests/single-node.xml b/tests/single-node.xml new file mode 100644 index 0000000..efb1844 --- /dev/null +++ b/tests/single-node.xml @@ -0,0 +1,203 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/test_gpu_node.py b/tests/test_gpu_node.py new file mode 100644 index 0000000..cadb2e8 --- /dev/null +++ b/tests/test_gpu_node.py @@ -0,0 +1,280 @@ +import os +import unittest + +import yaml + +from fluxbind.graph.graphic import TopologyVisualizer +from fluxbind.graph.shape import Shape, TopologyResult + +here = os.path.dirname(os.path.abspath(__file__)) + +TEST_GPU_XML_FILE = os.path.join(here, "corona.xml") +GRAPHICS_OUTPUT_DIR = os.path.join(here, "img", "gpu") + + +@unittest.skipUnless( + os.path.exists(TEST_GPU_XML_FILE), f"Skipping GPU tests, {TEST_GPU_XML_FILE} not found." +) +class TestGpuShapeAllocator(unittest.TestCase): + """ + Tests for GPU-aware allocation, requiring a multi-NUMA topology (corona.xml). + """ + + @classmethod + def setUpClass(cls): + os.makedirs(GRAPHICS_OUTPUT_DIR, exist_ok=True) + + def run_test_case( + self, + shape_yaml_str: str, + local_rank: int, + local_size: int, + title: str, + output_filename: str, + width=30, + height=24, + **kwargs, + ) -> TopologyResult: + """ + A helper function to run a single test case and generate a graphic. + Defaults to a smaller image size, but can be overridden. + """ + shape_allocator = Shape(yaml.safe_load(shape_yaml_str)) + result = shape_allocator.run( + xml_file=TEST_GPU_XML_FILE, local_size=local_size, local_rank=local_rank, **kwargs + ) + + if output_filename and result.nodes: + filepath = os.path.join(GRAPHICS_OUTPUT_DIR, output_filename) + visualizer = TopologyVisualizer( + result.topo, + result.nodes, # This single list contains all assigned resources + affinity_target=result.topo.last_affinity_target, + ) + visualizer.title = title + # Pass the width and height to the draw method + visualizer.draw(filepath, width=width, height=height) + + return result + + def test_01_gpu_local_binding(self): + """ + Test: Assign 1 GPU per task and bind to 2 local cores. + This test now generates a graphic for all 8 ranks. + """ + print("\n--- Testing: `bind: gpu-local` (Multi-NUMA) ---") + shape_yaml = """ +options: + bind: gpu-local +resources: + - type: core + count: 2 +""" + local_size = 8 + gpus_per_task = 1 + + for i in range(local_size): + title = f"GPU Local: Rank {i} of {local_size}" + output_filename = f"01_gpu_local_rank{i}.png" + print(f" -> Testing and generating graphic for local_rank {i}...") + + result = self.run_test_case( + shape_yaml, + local_rank=i, + local_size=local_size, + gpus_per_task=gpus_per_task, + title=title, + output_filename=output_filename, + width=36, + height=16, + ) + + self.assertEqual(result.gpu_string, str(i)) + expected_package = 0 if i < 4 else 1 + + self.assertTrue(result.nodes, f"Rank {i} was not assigned any CPU nodes.") + # We only check the CPU nodes for package affinity + cpu_nodes = [node for node in result.nodes if node[1].get("type") in ["Core", "PU"]] + for gp, _ in cpu_nodes: + package = result.topo.get_ancestor_of_type(gp, "Package") + self.assertIsNotNone( + package, f"Could not find parent package for core on rank {i}." + ) + self.assertEqual( + package[1].get("os_index"), + expected_package, + f"Rank {i} bound to wrong package.", + ) + + def test_02_gpu_remote_binding(self): + """ + Test: Assign a GPU on one NUMA node, but bind to cores on the other. + This test now generates a graphic for all 8 ranks. + """ + print("\n--- Testing: `bind: gpu-remote` (Multi-NUMA) ---") + shape_yaml = """ +options: + bind: gpu-remote +resources: + - type: core + count: 4 +""" + local_size = 8 + gpus_per_task = 1 + + for i in range(local_size): + title = f"GPU Remote: Rank {i} of {local_size}" + output_filename = f"02_gpu_remote_rank{i}.png" + print(f" -> Testing and generating graphic for local_rank {i}...") + + result = self.run_test_case( + shape_yaml, + local_rank=i, + local_size=local_size, + gpus_per_task=gpus_per_task, + title=title, + output_filename=output_filename, + width=36, + height=16, + ) + self.assertEqual(result.gpu_string, str(i)) + + expected_package = 1 if i < 4 else 0 + + cpu_nodes = [node for node in result.nodes if node[1].get("type") in ["Core", "PU"]] + self.assertTrue(cpu_nodes, f"Rank {i} was not assigned any CPU nodes.") + for gp, _ in cpu_nodes: + package = result.topo.get_ancestor_of_type(gp, "Package") + self.assertIsNotNone( + package, f"Could not find parent package for core on rank {i}." + ) + self.assertEqual( + package[1].get("os_index"), + expected_package, + f"Rank {i} bound to wrong package.", + ) + + def test_03_contextual_affinity_to_gpu(self): + """ + Test: Find a GPU, then find cores with affinity to *that specific* GPU. + """ + print("\n--- Testing: Contextual Affinity to specific GPU ---") + shape_yaml = """ +resources: + - type: numanode + count: 1 + with: + - type: gpu + count: 1 + with: + - type: core + count: 2 + affinity: + type: gpu +""" + result = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=1, + title="Contextual Affinity", + output_filename="03_contextual_affinity.png", + width=36, + height=16, + ) + self.assertIsNotNone(result.topo.last_affinity_target, "Affinity target was not set") + target_pkg = result.topo.get_ancestor_of_type( + result.topo.last_affinity_target[0], "Package" + ) + self.assertIsNotNone(target_pkg, "Could not find parent package for affinity target.") + + cpu_nodes = [node for node in result.nodes if node[1].get("type") in ["Core", "PU"]] + for gp, _ in cpu_nodes: + package = result.topo.get_ancestor_of_type(gp, "Package") + self.assertIsNotNone(package, "Could not find parent package for allocated core.") + self.assertEqual(package[1].get("os_index"), target_pkg[1].get("os_index")) + + def test_04_multi_resource_gpu_and_nic(self): + """ + Test: Find a NUMA node that contains BOTH a GPU and a NIC. + """ + print("\n--- Testing: Find NUMA node with GPU and NIC ---") + shape_yaml = """ +resources: + - type: numanode + count: 1 + with: + - type: nic + count: 1 + - type: gpu + count: 1 +""" + result = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=1, + title="Find NUMA with GPU+NIC", + output_filename="04_gpu_and_nic.png", + width=36, + height=16, + ) + # IMPORTANT: we have 24 actual result, and 1 GPU and 1 NIC here. + self.assertEqual(len(result.nodes), 26) + for gp, _ in result.nodes: + package = result.topo.get_ancestor_of_type(gp, "Package") + self.assertIsNotNone(package, "Could not find parent package for allocated core.") + self.assertEqual(package[1].get("os_index"), 1) + + def test_05_multi_gpu_task(self): + """ + Test: Assign 2 GPUs per task and bind locally. + """ + print("\n--- Testing: Multi-GPU Task (2 GPUs per rank) ---") + shape_yaml = """ +options: + bind: gpu-local +resources: + - type: core + count: 4 +""" + local_size = 4 + gpus_per_task = 2 + + for i in range(local_size): + title = f"Multi-GPU: Rank {i} of {local_size}" + output_filename = f"05_multi_gpu_rank{i}.png" + print(f" -> Testing and generating graphic for local_rank {i}...") + + result = self.run_test_case( + shape_yaml, + local_rank=i, + local_size=local_size, + gpus_per_task=gpus_per_task, + title=title, + output_filename=output_filename, + width=36, + height=16, + ) + + gpu_start = i * gpus_per_task + gpu_end = gpu_start + gpus_per_task + expected_gpus = ",".join(map(str, range(gpu_start, gpu_end))) + self.assertEqual(result.gpu_string, expected_gpus) + + expected_package = 0 if i < 2 else 1 + + cpu_nodes = [node for node in result.nodes if node[1].get("type") in ["Core", "PU"]] + self.assertTrue(cpu_nodes, f"Rank {i} was not assigned any CPU nodes.") + for gp, _ in cpu_nodes: + package = result.topo.get_ancestor_of_type(gp, "Package") + self.assertIsNotNone( + package, f"Could not find parent package for core on rank {i}." + ) + self.assertEqual( + package[1].get("os_index"), + expected_package, + f"Rank {i} bound to wrong package.", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_single_node.py b/tests/test_single_node.py new file mode 100644 index 0000000..5487939 --- /dev/null +++ b/tests/test_single_node.py @@ -0,0 +1,313 @@ +import os +import unittest + +import yaml + +from fluxbind.graph.graphic import TopologyVisualizer +from fluxbind.graph.shape import Shape, TopologyResult + +here = os.path.dirname(os.path.abspath(__file__)) + +TEST_XML_FILE = os.path.join(here, "single-node.xml") +TEST_GPU_XML_FILE = os.path.join(here, "corona.xml") +GRAPHICS_OUTPUT_DIR = os.path.join(here, "img", "single-node") + +# To generate gifs +# convert -delay 50 -loop 0 08_explicit_core_rank* 08_explicit_core_rank.gif + + +class TestShapeAllocator(unittest.TestCase): + + @classmethod + def setUpClass(cls): + """ + This method runs once before any tests. It ensures our test XML file exists. + """ + if not os.path.exists(TEST_XML_FILE): + raise FileNotFoundError( + f"Test XML file not found: {TEST_XML_FILE}. Please generate it first." + ) + os.makedirs(GRAPHICS_OUTPUT_DIR, exist_ok=True) + + def run_test_case( + self, + shape, + local_rank: int, + local_size: int, + output_filename: str = None, + title: str = None, + gpus_per_task=None, + **kwargs, + ) -> TopologyResult: + """ + A helper function that runs an allocation and optionally generates a graphic. + """ + shape_allocator = Shape(yaml.safe_load(shape)) + + # Use the correct XML file based on kwargs + xml_file = kwargs.get("xml_file", TEST_XML_FILE) + result = shape_allocator.run( + xml_file=xml_file, + local_size=local_size, + local_rank=local_rank, + gpus_per_task=gpus_per_task, + ) + + # If a filename is provided, generate the visualization + if output_filename and result.nodes and result.topo: + filepath = os.path.join(GRAPHICS_OUTPUT_DIR, output_filename) + visualizer = TopologyVisualizer( + result.topo, result.nodes, affinity_target=result.topo.last_affinity_target + ) + # We can customize the title for the plot + visualizer.title = title or f"Allocation for local_rank {local_rank}" + visualizer.draw(filepath) + + return result + + def test_01_simple_cores_multi_rank(self): + print("\n--- Testing: Simple Core Pool (Multi-Rank) ---") + shape_yaml = """ +resources: + - type: core + count: 8 +""" + result_rank0 = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=2, + output_filename="01_simple_cores_rank0.png", + title="Simple Cores: Rank 1 of 2", + ) + self.assertEqual(result_rank0.mask, "0x00000055") + + result_rank1 = self.run_test_case( + shape_yaml, + local_rank=1, + local_size=2, + output_filename="01_simple_cores_rank1.png", + title="Simple Cores: Rank 2 of 2", + ) + self.assertEqual(result_rank1.mask, "0x00005500") + + def test_02_explicit_bind_pu_multi_rank(self): + print("\n--- Testing: Explicit `bind: pu` (Multi-Rank) ---") + shape_yaml = """ +options: + bind: pu +resources: + - type: core + count: 4 +""" + result_rank0 = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=2, + output_filename="02_explicit_pu_rank0.png", + title="Explicit PU Binding: Rank 1 of 2", + ) + self.assertEqual(result_rank0.mask, "0x0000000f") + + result_rank1 = self.run_test_case( + shape_yaml, + local_rank=1, + local_size=2, + output_filename="02_explicit_pu_rank1.png", + title="Explicit PU Binding: Rank 2 of 2", + ) + self.assertEqual(result_rank1.mask, "0x000000f0") + + def test_03_implicit_bind_core(self): + print("\n--- Testing: Implicit `bind: core` (Rule 2) ---") + shape_yaml = "resources:\n - type: core\n count: 2" + self.run_test_case( + shape_yaml, + local_rank=0, + local_size=1, + output_filename="03_implicit_core.png", + title="Implicit Core Binding", + ) + + def test_04_default_bind_core_from_container(self): + print("\n--- Testing: Default `bind: core` from Container (Rule 3) ---") + shape_yaml = "resources:\n - type: l3cache\n count: 1" + self.run_test_case( + shape_yaml, + local_rank=0, + local_size=1, + output_filename="04_default_core_container.png", + title="Default Core Binding from L3Cache", + ) + + def test_05_bind_none(self): + print("\n--- Testing: `bind: none` ---") + shape_yaml = "options:\n bind: none\nresources:\n - type: core\n count: 2" + result = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=1, + output_filename="05_bind_none.png", + title="Bind None (shows found resources)", + ) + self.assertEqual(result.mask, "UNBOUND") + + def test_06_pattern_scatter_multi_rank(self): + print("\n--- Testing: `pattern: scatter` (Multi-Rank) ---") + shape_yaml = "resources:\n - type: core\n count: 8\n pattern: scatter" + self.run_test_case( + shape_yaml, + local_rank=0, + local_size=4, + output_filename="06_scatter_rank0.png", + title="Scatter Pattern: Rank 1 of 4", + ) + self.run_test_case( + shape_yaml, + local_rank=1, + local_size=4, + output_filename="06_scatter_rank1.png", + title="Scatter Pattern: Rank 2 of 4", + ) + self.run_test_case( + shape_yaml, + local_rank=2, + local_size=4, + output_filename="06_scatter_rank2.png", + title="Scatter Pattern: Rank 3 of 4", + ) + self.run_test_case( + shape_yaml, + local_rank=3, + local_size=4, + output_filename="06_scatter_rank3.png", + title="Scatter Pattern: Rank 4 of 4", + ) + + def test_07_pattern_reverse_multi_rank(self): + print("\n--- Testing: `reverse: true` (Multi-Rank) ---") + shape_yaml = "resources:\n - type: core\n count: 8\n reverse: true" + self.run_test_case( + shape_yaml, + local_rank=0, + local_size=2, + output_filename="07_reverse_rank0.png", + title="Reverse Pattern: Rank 1 of 2", + ) + self.run_test_case( + shape_yaml, + local_rank=1, + local_size=2, + output_filename="07_reverse_rank1.png", + title="Reverse Pattern: Rank 2 of 2", + ) + + def test_08_explicit_core_binding_multi_rank(self): + """ + Test: A multi-rank job where the shape explicitly asks for cores. + This validates Rule 2 (Implicit Intent) in a distribution scenario. + """ + print("\n--- Testing: Explicit `bind: core` (Multi-Rank, Rule 2) ---") + shape_yaml = """ +# No options block is provided. The script should infer the binding +# preference from the resource type requested. +resources: + - type: core + count: 4 +""" + # We are asking for a total pool of 4 cores, to be divided among 2 ranks. + # Each rank should get 2 cores. + + # We expect this rank to get the first 2 cores: Core:0, Core:1 + # The binding is implicitly core so we bind to the first PU of each. + # Mask = PU:0 (0x1) | PU:2 (0x4) = 0x5 + result_rank0 = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=2, + output_filename="08_explicit_core_rank0.png", + title="Explicit Core Binding (2 per rank): Rank 1 of 2", + ) + self.assertEqual(result_rank0.mask, "0x00000005") + self.assertEqual(len(result_rank0.nodes), 2) + + # We expect this rank to get the next 2 cores: Core:2, Core:3 + # Binding to the first PU of each. + # Mask = PU:4 (0x10) | PU:6 (0x40) = 0x50 + result_rank1 = self.run_test_case( + shape_yaml, + local_rank=1, + local_size=2, + output_filename="08_explicit_core_rank1.png", + title="Explicit Core Binding (2 per rank): Rank 2 of 2", + ) + self.assertEqual(result_rank1.mask, "0x00000050") + self.assertEqual(len(result_rank1.nodes), 2) + + def test_09_explicit_bind_pu_reverse_multi_rank(self): + """ + Test: Ask for 4 cores, bind to PUs in REVERSE, divide among 2 ranks. + This validates the `reverse: true` modifier with `bind: pu`. + """ + print("\n--- Testing: Explicit `bind: pu` with `reverse: true` (Multi-Rank) ---") + shape_yaml = """ +options: + bind: pu +resources: + - type: core + count: 4 + reverse: true +""" + # The total pool of resources will be all PUs on the first 4 cores, + # but the list will be reversed. + # Initial PUs: [PU0, PU1, PU2, PU3, PU4, PU5, PU6, PU7] + # Reversed PUs: [PU7, PU6, PU5, PU4, PU3, PU2, PU1, PU0] + # local_size = 2, so items_per_rank = 8 // 2 = 4. + + # We expect this rank to get the first 4 PUs from the REVERSED list: + # PUs 7, 6, 5, 4. + # The mask for these is 0x80 | 0x40 | 0x20 | 0x10 = 0xF0 + result_rank0 = self.run_test_case( + shape_yaml, + local_rank=0, + local_size=2, + output_filename="09_explicit_pu_reverse_rank0.png", + title="Explicit PU Binding, Reversed: Rank 1 of 2", + ) + self.assertEqual(result_rank0.mask, "0x000000f0") + self.assertEqual(len(result_rank0.nodes), 4) + + # We expect this rank to get the next 4 PUs from the REVERSED list: + # PUs 3, 2, 1, 0. + # The mask for these is 0x8 | 0x4 | 0x2 | 0x1 = 0xF + result_rank1 = self.run_test_case( + shape_yaml, + local_rank=1, + local_size=2, + output_filename="09_explicit_pu_reverse_rank1.png", + title="Explicit PU Binding, Reversed: Rank 2 of 2", + ) + self.assertEqual(result_rank1.mask, "0x0000000f") + self.assertEqual(len(result_rank1.nodes), 4) + + +@unittest.skipUnless( + os.path.exists(TEST_GPU_XML_FILE), f"Skipping GPU tests, {TEST_GPU_XML_FILE} not found." +) +class TestGpuShapeAllocator(TestShapeAllocator): # Inherit the helper + """ + Tests for GPU-aware allocation when there aren't GPU. + """ + + def test_01_gpu_remote_fails_on_single_numa(self): + print("\n--- Testing: `bind: gpu-remote` (Single-NUMA, Expect Fail) ---") + shape_yaml = "options:\n bind: gpu-remote\nresources:\n - type: core\n count: 1" + + # This test is expected to raise an error, so no graphic will be generated. + with self.assertRaises(RuntimeError): + self.run_test_case( + shape_yaml, local_rank=0, local_size=1, gpus_per_task=1, xml_file=TEST_XML_FILE + ) + + +if __name__ == "__main__": + unittest.main()