55import os
66import tomllib
77import time
8+ from typing import Callable
89
910SPECIAL_SCRIPTS = {
1011 "ghast_consensus_test.py" : 1
1112}
1213
13- def get_num_test_nodes (py , test_dir , script ) :
14+ def get_num_test_nodes (py : str , test_dir : str , script : str ) -> int :
1415 if script in SPECIAL_SCRIPTS :
1516 return SPECIAL_SCRIPTS [script ]
1617 toml_output = subprocess .check_output (
@@ -22,7 +23,15 @@ def get_num_test_nodes(py, test_dir, script):
2223class TestScheduler :
2324 """Scheduler for managing test execution and controlling concurrency based on resource requirements"""
2425
25- def __init__ (self , task_executable , py , test_dir , max_workers , available_nodes , port_min , port_max , conflux_binary ):
26+ def __init__ (self ,
27+ task_executable : Callable [[str , str , str , int , int , int , str ], None ],
28+ py : str ,
29+ test_dir : str ,
30+ max_workers : int ,
31+ available_nodes : int ,
32+ port_min : int ,
33+ port_max : int ,
34+ conflux_binary : str ):
2635 self .task_executable = task_executable
2736 self .py = py
2837 self .test_dir = test_dir
@@ -41,7 +50,7 @@ def __init__(self, task_executable, py, test_dir, max_workers, available_nodes,
4150 self .results = []
4251 self .failed_tests = set ()
4352
44- def schedule (self , test_scripts ) :
53+ def schedule (self , test_scripts : list [ str ]) -> set [ str ] :
4554 """Schedules the execution of test scripts"""
4655
4756 # Prepare task queue
@@ -53,11 +62,11 @@ def schedule(self, test_scripts):
5362 self ._collect_results ()
5463 return self .failed_tests
5564
56- def _prepare_task_queue (self , test_scripts ) :
65+ def _prepare_task_queue (self , test_scripts : list [ str ]) -> list [ tuple [ str , int , int ]] :
5766 """Prepares a task queue with scripts and resource requirements"""
5867
59- task_queue = queue . Queue ()
60- print ("Scanning num_nodes requirement for each task." , end = " " )
68+ task_queue = []
69+ print ("Scanning num_nodes requirement for each task" )
6170 with ThreadPoolExecutor () as executor :
6271 tasks = [(executor .submit (
6372 get_num_test_nodes ,
@@ -68,40 +77,43 @@ def _prepare_task_queue(self, test_scripts):
6877 # Collect completed task results and add them to the queue
6978 for future , i , script in tasks :
7079 result = future .result ()
71- task_queue .put ((script , result , i ))
72- print (" Done" )
80+ if result > self .available_nodes :
81+ raise RuntimeError (f"Cannot run { script } because it requires { result } nodes, "
82+ f"but only max to { self .available_nodes } nodes are available"
83+ f"Please specify --max-nodes to run the test" )
84+
85+ task_queue .append ((script , result , i ))
86+ for script , nodes_needed , index in task_queue :
87+ print (f"Task { index } : { script } requires { nodes_needed } nodes" )
88+ print ("Scanning done" )
7389 return task_queue
90+
91+ def _pop_next_task (self , task_queue : list [tuple [str , int , int ]]) -> tuple [str , int , int ]:
92+ """Selects the next task to process"""
93+ if not task_queue :
94+ raise RuntimeError ("No tasks to process" )
95+ while True :
96+ for i , (script , nodes_needed , index ) in enumerate (task_queue ):
97+ if self ._try_acquire_resources (nodes_needed ):
98+ task_queue .pop (i )
99+ return script , nodes_needed , index
100+ self .resource_event .wait (timeout = 10 )
101+ self .resource_event .clear ()
74102
75- def _process_task_queue (self , executor , task_queue ):
103+ def _process_task_queue (self , executor : ThreadPoolExecutor , task_queue : list [ tuple [ str , int , int ]] ):
76104 """Processes the task queue, scheduling tests based on resource availability"""
77105
78- while not task_queue .empty ():
79- try :
80- # Retrieve task
81- script , nodes_needed , index = task_queue .get (block = False )
82-
83- # Attempt to allocate resources
84- if self ._try_acquire_resources (nodes_needed ):
85- # Enough resources available, execute test
86- future = executor .submit (
87- self ._run_test_with_cleanup ,
88- script ,
89- index ,
90- nodes_needed
91- )
92- self .results .append ((script , future ))
93-
94- # Wait for at least 1 second to avoid launch a lot of tasks
95- time .sleep (1 )
96- else :
97- # Insufficient resources, re-add to queue and wait
98- task_queue .put ((script , nodes_needed , index ))
99- self .resource_event .wait (timeout = 0.2 )
100- self .resource_event .clear ()
101- except queue .Empty :
102- break
106+ while task_queue :
107+ script , nodes_needed , index = self ._pop_next_task (task_queue )
108+ future = executor .submit (
109+ self ._run_test_with_cleanup ,
110+ script ,
111+ index ,
112+ nodes_needed
113+ )
114+ self .results .append ((script , future ))
103115
104- def _try_acquire_resources (self , nodes_needed ) :
116+ def _try_acquire_resources (self , nodes_needed : int ) -> bool :
105117 """Attempts to acquire required resources, returns True if successful"""
106118
107119 with self .resource_lock :
@@ -111,15 +123,15 @@ def _try_acquire_resources(self, nodes_needed):
111123 return True
112124 return False
113125
114- def _release_resources (self , nodes_count ):
126+ def _release_resources (self , nodes_count : int ):
115127 """Releases resources and notifies waiting threads"""
116128
117129 with self .resource_lock :
118130 self .available_nodes += nodes_count
119131 self .available_workers += 1
120132 self .resource_event .set ()
121133
122- def _run_test_with_cleanup (self , script , index , nodes_count ):
134+ def _run_test_with_cleanup (self , script : str , index : int , nodes_count : int ):
123135 """Runs a test and ensures resources are released"""
124136
125137 try :
0 commit comments