@@ -108,6 +108,10 @@ def build_graph(self, element, parent_gp=None):
108108 def discover_devices (self ):
109109 """
110110 Discover different kinds of GPU and NIC.
111+
112+ TODO: a better design would be to discover devices, then annotate the graph
113+ nodes with device type. Then when we search for gpu or nic, we can just
114+ search by that attribute. Right now we search for PCIDev, and then filter.
111115 """
112116 # Don't assume only one vendor of GPU.
113117 for vendor , command in {"NVIDIA" : commands .nvidia_smi , "AMD" : commands .rocm_smi }.items ():
@@ -180,7 +184,7 @@ def add_latency_edges(matrix, indexes):
180184 weight = matrix [i ][j ], # The weight is the latency value from the matrix.
181185 )
182186
183- # --- First, try to parse the modern hwloc v2.x format ---
187+ # First, try to parse newer hwloc v2.x format
184188 for dist_el in root .iter ("distances2" ):
185189 try :
186190 # nbobjs is how many objects are in the matrix (e.g., 2 for 2 NUMA nodes).
@@ -274,15 +278,18 @@ def match_resources(self, jobspec, allocated_gps=None):
274278 log .debug (f"Successfully found a slot with { len (final_allocation )} objects." )
275279 return final_allocation
276280
277- def sort_by_affinity (self , candidates , affinity , allocated ):
281+ def sort_by_affinity (self , candidates , affinity , allocated , domain_gp ):
282+ """
283+ Sort list of candidates by affinity so we get closest one.
284+ """
278285 target_type = self .translate_type (affinity .get ("type" ))
279286 if not target_type :
280287 log .warning ("Affinity spec missing 'type'." )
281288 return candidates
282- machine_gp = self . find_objects ( type = "Machine" )[ 0 ][ 0 ]
283-
284- # Affinity targets can be anywhere, so search from Machine
285- targets = self .get_available_children (machine_gp , target_type , allocated )
289+
290+ # Search within the domain we were provided, not across the machine
291+ log . debug ( f" -> Searching for affinity target ' { target_type } ' within the current domain." )
292+ targets = self .get_available_children (domain_gp , target_type , allocated )
286293 if not targets :
287294 log .warning (f"Affinity target '{ target_type } ' not found." )
288295 return candidates
@@ -323,19 +330,82 @@ def translate_type(self, requested_type: str):
323330 # capitalizing the word (e.g., 'l3cache' -> 'L3cache').
324331 return mapping .get (requested_type .lower (), requested_type .capitalize ())
325332
333+ def find_bindable_leaves (self , total_allocation , bind_level ):
334+ """
335+ Transforms a list of allocated resources into a final list of bindable
336+ nodes by correctly handling all relationships for EACH allocated object.
337+ """
338+ leaf_nodes = []
339+ log .debug (f"Transforming { len (total_allocation )} allocated objects to bind_level '{ bind_level } '..." )
340+
341+ bind_type_concrete = self .translate_type (bind_level )
342+
343+ # Iterate through every object found by the allocator.
344+ for gp , data in total_allocation :
345+
346+ # Case 1 (Identity): The object is already the type we want to bind to.
347+ if data .get ('type' ) == bind_type_concrete :
348+ leaf_nodes .append ((gp , data ))
349+ continue
350+
351+ # Case 2 (Container): The object is a container. Find its children of the target type.
352+ found_descendants = False
353+ container_type = data .get ("type" )
354+
355+ # If the container is a NUMANode, the relationship is via the parent Package.
356+ if container_type == "NUMANode" :
357+ package = self .get_ancestor_of_type (gp , "Package" )
358+ if package :
359+ descendants = self .get_descendants (package [0 ], type = bind_type_concrete )
360+ if descendants :
361+ leaf_nodes .extend (descendants )
362+ found_descendants = True
363+
364+ # If the container is a PCIDev, the relationship is via NUMA locality.
365+ elif container_type == "PCIDev" :
366+ numa_idx = data .get ("numa_os_index" )
367+ if numa_idx is not None :
368+ all_bindable = self .find_objects (type = bind_type_concrete )
369+ local_nodes = [node for node in all_bindable if node [1 ].get ("numa_os_index" ) == numa_idx ]
370+ if local_nodes :
371+ leaf_nodes .extend (local_nodes )
372+ found_descendants = True
373+
374+ # For all other containers (like Package), the relationship is simple hierarchy.
375+ else :
376+ descendants = self .get_descendants (gp , type = bind_type_concrete )
377+ if descendants :
378+ leaf_nodes .extend (descendants )
379+ found_descendants = True
380+
381+ # If we successfully found descendants, onto the next!
382+ if found_descendants :
383+ continue
384+
385+ # Last case (Child): If it's not the right type and not a container, it might be a child.
386+ # Find its ancestor of the target type (e.g., allocated PU, bind to Core).
387+ ancestor = self .get_ancestor_of_type (gp , bind_type_concrete )
388+ if ancestor :
389+ leaf_nodes .append (ancestor )
390+
391+ # De-duplicate the final list and sort for deterministic assignment.
392+ leaf_nodes = list ({gp : (gp , data ) for gp , data in leaf_nodes }.values ())
393+ leaf_nodes .sort (key = self .get_sort_key_for_node )
394+
395+ log .debug (f"Transformation resulted in { len (leaf_nodes )} unique bindable leaf nodes." )
396+ return leaf_nodes
397+
326398 def summarize (self , nodes ):
327399 """
328400 Given a set of nodes in the graph (a set of resources) print a textual visual.
329401 """
330402 for gp , data in nodes :
331403 p_info = ""
332404 if data ["type" ] in ["Core" , "PU" ]:
333- # BUGFIX: Changed self.topology to self
334405 package = self .get_ancestor_of_type (gp , "Package" )
335406 if package :
336407 p_info = f"Package:{ package [1 ].get ('os_index' )} -> "
337408 if data ["type" ] == "PU" :
338- # BUGFIX: Changed self.topology to self
339409 core = self .get_ancestor_of_type (gp , "Core" )
340410 if core :
341411 p_info += f"Core:{ core [1 ].get ('os_index' )} -> "
@@ -393,22 +463,7 @@ def get_available_children(self, parent_gp, child_type, allocated):
393463
394464 parent_type = parent_node .get ("type" )
395465 child_type_lower = child_type .lower ()
396-
397- candidate_gp_set = set ()
398- if child_type_lower == "gpu" :
399- for gpu_info in self .gpus :
400- if (pci_id := gpu_info .get ("pci_bus_id" )) and (
401- matches := self .find_objects (type = "PCIDev" , pci_busid = pci_id )
402- ):
403- candidate_gp_set .add (matches [0 ][0 ])
404- elif child_type_lower == "nic" :
405- for nic_gp , _ in self .nics :
406- candidate_gp_set .add (nic_gp )
407- else :
408- for gp , _ in self .find_objects (type = child_type ):
409- candidate_gp_set .add (gp )
410-
411- all_candidates = [(gp , self .graph .nodes [gp ]) for gp in candidate_gp_set ]
466+ all_candidates = self .find_objects (type = child_type )
412467 log .debug (
413468 f" - -> Found { len (all_candidates )} total unique system-wide candidates for '{ child_type } '."
414469 )
@@ -422,14 +477,14 @@ def get_available_children(self, parent_gp, child_type, allocated):
422477
423478 # Rule 1: Relationship to NUMA node is through shared PACKAGE parent (for Cores) or LOCALITY (for Devices)
424479 if parent_type == "NUMANode" :
425- if child_type_lower in ["gpu" , "nic" ]:
426- if data .get ("numa_os_index" ) == parent_node .get ("os_index" ):
427- is_valid_child = True
428- elif child_type_lower in ["core" , "pu" ]:
480+ if child_type_lower in ["core" , "pu" ]:
429481 package_of_numa = self .get_ancestor_of_type (parent_gp , "Package" )
430482 if package_of_numa and nx .has_path (self .hierarchy_view , package_of_numa [0 ], gp ):
431483 is_valid_child = True
432-
484+ elif child_type_lower == "pcidev" :
485+ if data .get ("numa_os_index" ) == parent_node .get ("os_index" ):
486+ is_valid_child = True
487+
433488 # Rule 2 (NEW): Relationship of a Core/PU to a Device is through shared NUMA LOCALITY
434489 elif parent_type == "PCIDev" and child_type_lower in ["core" , "pu" ]:
435490 parent_numa_idx = parent_node .get ("numa_os_index" )
@@ -449,8 +504,9 @@ def get_available_children(self, parent_gp, child_type, allocated):
449504 return available
450505
451506 def find_objects (self , ** attributes ):
452- # STOPED HERE, trace this and see if gp is globally identifier, add comments.
453- # then go back to making distances/distances2 function and add comments.
507+ """
508+ Search nodes in the graph for a specific attribute (or more than one)
509+ """
454510 return [
455511 (gp , data )
456512 for gp , data in self .graph .nodes (data = True )
@@ -468,9 +524,23 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
468524 domain_info = f"{ domain_node .get ('type' )} :{ domain_node .get ('os_index' , domain_node .get ('pci_busid' , domain_gp ))} "
469525 log .debug (f"{ indent } [ENTER] find_assignment(req={ request } , domain={ domain_info } )" )
470526
471- req_type , count = self .translate_type (request ["type" ]), request ["count" ]
527+ # This can also be gpu/nic
528+ raw_request_type = request ['type' ]
529+ req_type , count = self .translate_type (raw_request_type ), request ["count" ]
472530
531+ # If the req_type is gpu or nic, this isn't an actual type in the graph - it is PCIDev.
473532 candidates = self .get_available_children (domain_gp , req_type , allocated )
533+
534+ # Now we handle the type of the pcidev request and filter candidates to those devices.
535+ if raw_request_type .lower () == 'gpu' :
536+ gpu_bus_ids = {g ['pci_bus_id' ] for g in self .gpus }
537+ candidates = [node for node in candidates if node [1 ].get ('pci_busid' ) in gpu_bus_ids ]
538+ log .debug (f"{ indent } -> Filtered for 'gpu', { len (candidates )} candidates remain." )
539+ elif raw_request_type .lower () == 'nic' :
540+ nic_bus_ids = {n [1 ]['pci_busid' ] for n in self .nics }
541+ candidates = [node for node in candidates if node [1 ].get ('pci_busid' ) in nic_bus_ids ]
542+ log .debug (f"{ indent } -> Filtered for 'nic', { len (candidates )} candidates remain." )
543+
474544 log .debug (f"{ indent } -> Found { len (candidates )} initial candidates for '{ req_type } '." )
475545
476546 affinity_spec = request .get ("affinity" )
@@ -496,7 +566,7 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
496566 self .last_affinity_target = (target_gp , domain_node )
497567 else :
498568 log .debug (f"{ indent } -> Sorting candidates by GLOBAL affinity to { affinity_spec } " )
499- candidates = self .sort_by_affinity (candidates , affinity_spec , allocated )
569+ candidates = self .sort_by_affinity (candidates , affinity_spec , allocated , domain_gp )
500570
501571 if len (candidates ) < count :
502572 log .debug (
@@ -550,6 +620,9 @@ def find_assignment_recursive(self, request, domain_gp, allocated, depth=0):
550620 return None
551621
552622 def get_descendants (self , gp_index , ** filters ):
623+ """
624+ Given a global position index, return descendents that match a filter.
625+ """
553626 if gp_index not in self .graph :
554627 return []
555628 desc = list (nx .descendants (self .hierarchy_view , gp_index ))
@@ -648,14 +721,17 @@ def create_ordered_gpus(self):
648721 precompute_numa_affinities are done.
649722 """
650723 ordered_gpus = []
724+
725+ # The gpus we found were discovered with nvidia/rocm-smi and we need
726+ # to map to things in the graph.
651727 for gpu_info in self .gpus :
652728 pci_id = gpu_info .get ("pci_bus_id" )
653729 if not pci_id :
654730 continue
655731
656732 # Find the corresponding PCIDev object in our graph
657733 # Note: We now store and search for types in lowercase
658- matches = self .find_objects (type = "pcidev " , pci_busid = pci_id )
734+ matches = self .find_objects (type = "PCIDev " , pci_busid = pci_id )
659735 if not matches :
660736 log .warning (
661737 f"Could not find a graph object for discovered GPU with PCI ID: { pci_id } "
0 commit comments