@@ -108,6 +108,10 @@ def build_graph(self, element, parent_gp=None):
108108 def discover_devices (self ):
109109 """
110110 Discover different kinds of GPU and NIC.
111+
112+ TODO: a better design would be to discover devices, then annotate the graph
113+ nodes with device type. Then when we search for gpu or nic, we can just
114+ search by that attribute. Right now we search for PCIDev, and then filter.
111115 """
112116 # Don't assume only one vendor of GPU.
113117 for vendor , command in {"NVIDIA" : commands .nvidia_smi , "AMD" : commands .rocm_smi }.items ():
@@ -180,7 +184,7 @@ def add_latency_edges(matrix, indexes):
180184 weight = matrix [i ][j ], # The weight is the latency value from the matrix.
181185 )
182186
183- # --- First, try to parse the modern hwloc v2.x format ---
187+ # First, try to parse newer hwloc v2.x format
184188 for dist_el in root .iter ("distances2" ):
185189 try :
186190 # nbobjs is how many objects are in the matrix (e.g., 2 for 2 NUMA nodes).
@@ -274,15 +278,18 @@ def match_resources(self, jobspec, allocated_gps=None):
274278 log .debug (f"Successfully found a slot with { len (final_allocation )} objects." )
275279 return final_allocation
276280
277- def sort_by_affinity (self , candidates , affinity , allocated ):
281+ def sort_by_affinity (self , candidates , affinity , allocated , domain_gp ):
282+ """
283+ Sort list of candidates by affinity so we get closest one.
284+ """
278285 target_type = self .translate_type (affinity .get ("type" ))
279286 if not target_type :
280287 log .warning ("Affinity spec missing 'type'." )
281288 return candidates
282- machine_gp = self . find_objects ( type = "Machine" )[ 0 ][ 0 ]
283-
284- # Affinity targets can be anywhere, so search from Machine
285- targets = self .get_available_children (machine_gp , target_type , allocated )
289+
290+ # Search within the domain we were provided, not across the machine
291+ log . debug ( f" -> Searching for affinity target ' { target_type } ' within the current domain." )
292+ targets = self .get_available_children (domain_gp , target_type , allocated )
286293 if not targets :
287294 log .warning (f"Affinity target '{ target_type } ' not found." )
288295 return candidates
@@ -330,12 +337,10 @@ def summarize(self, nodes):
330337 for gp , data in nodes :
331338 p_info = ""
332339 if data ["type" ] in ["Core" , "PU" ]:
333- # BUGFIX: Changed self.topology to self
334340 package = self .get_ancestor_of_type (gp , "Package" )
335341 if package :
336342 p_info = f"Package:{ package [1 ].get ('os_index' )} -> "
337343 if data ["type" ] == "PU" :
338- # BUGFIX: Changed self.topology to self
339344 core = self .get_ancestor_of_type (gp , "Core" )
340345 if core :
341346 p_info += f"Core:{ core [1 ].get ('os_index' )} -> "
@@ -393,22 +398,7 @@ def get_available_children(self, parent_gp, child_type, allocated):
393398
394399 parent_type = parent_node .get ("type" )
395400 child_type_lower = child_type .lower ()
396-
397- candidate_gp_set = set ()
398- if child_type_lower == "gpu" :
399- for gpu_info in self .gpus :
400- if (pci_id := gpu_info .get ("pci_bus_id" )) and (
401- matches := self .find_objects (type = "PCIDev" , pci_busid = pci_id )
402- ):
403- candidate_gp_set .add (matches [0 ][0 ])
404- elif child_type_lower == "nic" :
405- for nic_gp , _ in self .nics :
406- candidate_gp_set .add (nic_gp )
407- else :
408- for gp , _ in self .find_objects (type = child_type ):
409- candidate_gp_set .add (gp )
410-
411- all_candidates = [(gp , self .graph .nodes [gp ]) for gp in candidate_gp_set ]
401+ all_candidates = self .find_objects (type = child_type )
412402 log .debug (
413403 f" - -> Found { len (all_candidates )} total unique system-wide candidates for '{ child_type } '."
414404 )
@@ -422,14 +412,14 @@ def get_available_children(self, parent_gp, child_type, allocated):
422412
423413 # Rule 1: Relationship to NUMA node is through shared PACKAGE parent (for Cores) or LOCALITY (for Devices)
424414 if parent_type == "NUMANode" :
425- if child_type_lower in ["gpu" , "nic" ]:
426- if data .get ("numa_os_index" ) == parent_node .get ("os_index" ):
427- is_valid_child = True
428- elif child_type_lower in ["core" , "pu" ]:
415+ if child_type_lower in ["core" , "pu" ]:
429416 package_of_numa = self .get_ancestor_of_type (parent_gp , "Package" )
430417 if package_of_numa and nx .has_path (self .hierarchy_view , package_of_numa [0 ], gp ):
431418 is_valid_child = True
432-
419+ elif child_type_lower == "pcidev" :
420+ if data .get ("numa_os_index" ) == parent_node .get ("os_index" ):
421+ is_valid_child = True
422+
433423 # Rule 2 (NEW): Relationship of a Core/PU to a Device is through shared NUMA LOCALITY
434424 elif parent_type == "PCIDev" and child_type_lower in ["core" , "pu" ]:
435425 parent_numa_idx = parent_node .get ("numa_os_index" )
@@ -449,8 +439,9 @@ def get_available_children(self, parent_gp, child_type, allocated):
449439 return available
450440
451441 def find_objects (self , ** attributes ):
452- # STOPED HERE, trace this and see if gp is globally identifier, add comments.
453- # then go back to making distances/distances2 function and add comments.
442+ """
443+ Search nodes in the graph for a specific attribute (or more than one)
444+ """
454445 return [
455446 (gp , data )
456447 for gp , data in self .graph .nodes (data = True )
@@ -468,9 +459,23 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
468459 domain_info = f"{ domain_node .get ('type' )} :{ domain_node .get ('os_index' , domain_node .get ('pci_busid' , domain_gp ))} "
469460 log .debug (f"{ indent } [ENTER] find_assignment(req={ request } , domain={ domain_info } )" )
470461
471- req_type , count = self .translate_type (request ["type" ]), request ["count" ]
462+ # This can also be gpu/nic
463+ raw_request_type = request ['type' ]
464+ req_type , count = self .translate_type (raw_request_type ), request ["count" ]
472465
466+ # If the req_type is gpu or nic, this isn't an actual type in the graph - it is PCIDev.
473467 candidates = self .get_available_children (domain_gp , req_type , allocated )
468+
469+ # Now we handle the type of the pcidev request and filter candidates to those devices.
470+ if raw_request_type .lower () == 'gpu' :
471+ gpu_bus_ids = {g ['pci_bus_id' ] for g in self .gpus }
472+ candidates = [node for node in candidates if node [1 ].get ('pci_busid' ) in gpu_bus_ids ]
473+ log .debug (f"{ indent } -> Filtered for 'gpu', { len (candidates )} candidates remain." )
474+ elif raw_request_type .lower () == 'nic' :
475+ nic_bus_ids = {n [1 ]['pci_busid' ] for n in self .nics }
476+ candidates = [node for node in candidates if node [1 ].get ('pci_busid' ) in nic_bus_ids ]
477+ log .debug (f"{ indent } -> Filtered for 'nic', { len (candidates )} candidates remain." )
478+
474479 log .debug (f"{ indent } -> Found { len (candidates )} initial candidates for '{ req_type } '." )
475480
476481 affinity_spec = request .get ("affinity" )
@@ -496,7 +501,7 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
496501 self .last_affinity_target = (target_gp , domain_node )
497502 else :
498503 log .debug (f"{ indent } -> Sorting candidates by GLOBAL affinity to { affinity_spec } " )
499- candidates = self .sort_by_affinity (candidates , affinity_spec , allocated )
504+ candidates = self .sort_by_affinity (candidates , affinity_spec , allocated , domain_gp )
500505
501506 if len (candidates ) < count :
502507 log .debug (
@@ -550,6 +555,9 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
550555 return None
551556
552557 def get_descendants (self , gp_index , ** filters ):
558+ """
559+ Given a global position index, return descendents that match a filter.
560+ """
553561 if gp_index not in self .graph :
554562 return []
555563 desc = list (nx .descendants (self .hierarchy_view , gp_index ))
@@ -648,14 +656,17 @@ def create_ordered_gpus(self):
648656 precompute_numa_affinities are done.
649657 """
650658 ordered_gpus = []
659+
660+ # The gpus we found were discovered with nvidia/rocm-smi and we need
661+ # to map to things in the graph.
651662 for gpu_info in self .gpus :
652663 pci_id = gpu_info .get ("pci_bus_id" )
653664 if not pci_id :
654665 continue
655666
656667 # Find the corresponding PCIDev object in our graph
657668 # Note: We now store and search for types in lowercase
658- matches = self .find_objects (type = "pcidev " , pci_busid = pci_id )
669+ matches = self .find_objects (type = "PCIDev " , pci_busid = pci_id )
659670 if not matches :
660671 log .warning (
661672 f"Could not find a graph object for discovered GPU with PCI ID: { pci_id } "
@@ -682,3 +693,60 @@ def create_ordered_gpus(self):
682693 # Finally, create the attribute that the rest of the code expects.
683694 self .ordered_gpus = ordered_gpus
684695 log .info (f"Created an ordered list of { len (self .ordered_gpus )} GPUs for assignment." )
696+
697+
698+ def find_bindable_leaves (self , total_allocation , bind_level ):
699+ """
700+ Transforms a list of allocated resources into a final list of bindable
701+ nodes by first choosing a strategy based on the allocation contents,
702+ then executing that single, correct strategy.
703+ """
704+ leaf_nodes = []
705+ log .debug (f"Transforming { len (total_allocation )} allocated objects to bind_level '{ bind_level } '..." )
706+
707+ bind_type_concrete = self .translate_type (bind_level )
708+
709+ # Check for high-level structural containers. Their presence dictates the entire strategy.
710+ high_level_containers = [node for node in total_allocation if node [1 ]['type' ] in ['Package' , 'NUMANode' ]]
711+
712+ if high_level_containers :
713+ # If we find a Package or NUMANode, we IGNORE all other items in the allocation
714+ # and bind to the contents of this container ONLY. We have to do this because
715+ # hwloc-calc can report that some CPU/PU are closer to the OTHER Numa node, or
716+ # in other words, the physical layout of the xml != what hwloc-calc reports.
717+ # So here we use get_ancestor_of_type to JUST use the hardware layout (which
718+ # is more predictable).
719+ container_gp , container_data = high_level_containers [0 ]
720+ container_type = container_data .get ("type" )
721+ log .debug (f"High-level container '{ container_type } ' found. Binding exclusively to its physical contents." )
722+ package_gp = container_gp if container_type == "Package" else self .get_ancestor_of_type (container_gp , "Package" )[0 ]
723+ if package_gp :
724+ leaf_nodes = self .get_descendants (package_gp , type = bind_type_concrete )
725+ else :
726+ # No high-level containers - we can safely process each object individually.
727+ # This is the logic that correctly handles the simple Core, PU, and device-affinity tests.
728+ log .debug ("No high-level containers found. Processing each allocated object individually." )
729+ for gp , data in total_allocation :
730+
731+ # Case 1: The object is already the type we want to bind to.
732+ if data .get ('type' ) == bind_type_concrete :
733+ leaf_nodes .append ((gp , data ))
734+ continue
735+
736+ # Case 2 (Container): Must be a low-level container (Core or PCIDev).
737+ descendants = self .get_descendants (gp , type = bind_type_concrete )
738+ if descendants :
739+ leaf_nodes .extend (descendants )
740+ continue
741+
742+ # Case 2c (Child): The object is a child of the target type (e.g., PU -> Core).
743+ ancestor = self .get_ancestor_of_type (gp , bind_type_concrete )
744+ if ancestor :
745+ leaf_nodes .append (ancestor )
746+
747+ # De-duplicate the final list and sort for deterministic assignment.
748+ unique_nodes = list ({gp : (gp , data ) for gp , data in leaf_nodes }.values ())
749+ unique_nodes .sort (key = self .get_sort_key_for_node )
750+
751+ log .debug (f"Transformation resulted in { len (unique_nodes )} unique bindable leaf nodes." )
752+ return unique_nodes
0 commit comments