55import os
66import tomllib
77import time
8+ from typing import Callable
89
910SPECIAL_SCRIPTS = {
1011 "ghast_consensus_test.py" : 1
1112}
1213
13- def get_num_test_nodes (py , test_dir , script ) :
14+ def get_num_test_nodes (py : str , test_dir : str , script : str ) -> int :
1415 if script in SPECIAL_SCRIPTS :
1516 return SPECIAL_SCRIPTS [script ]
1617 toml_output = subprocess .check_output (
@@ -22,7 +23,15 @@ def get_num_test_nodes(py, test_dir, script):
2223class TestScheduler :
2324 """Scheduler for managing test execution and controlling concurrency based on resource requirements"""
2425
25- def __init__ (self , task_executable , py , test_dir , max_workers , available_nodes , port_min , port_max , conflux_binary ):
26+ def __init__ (self ,
27+ task_executable : Callable [[str , str , str , int , int , int , str ], None ],
28+ py : str ,
29+ test_dir : str ,
30+ max_workers : int ,
31+ available_nodes : int ,
32+ port_min : int ,
33+ port_max : int ,
34+ conflux_binary : str ):
2635 self .task_executable = task_executable
2736 self .py = py
2837 self .test_dir = test_dir
@@ -41,7 +50,7 @@ def __init__(self, task_executable, py, test_dir, max_workers, available_nodes,
4150 self .results = []
4251 self .failed_tests = set ()
4352
44- def schedule (self , test_scripts ) :
53+ def schedule (self , test_scripts : list [ str ]) -> set [ str ] :
4554 """Schedules the execution of test scripts"""
4655
4756 # Prepare task queue
@@ -53,7 +62,7 @@ def schedule(self, test_scripts):
5362 self ._collect_results ()
5463 return self .failed_tests
5564
56- def _prepare_task_queue (self , test_scripts ) :
65+ def _prepare_task_queue (self , test_scripts : list [ str ]) -> queue . Queue [ tuple [ str , int , int ]] :
5766 """Prepares a task queue with scripts and resource requirements"""
5867
5968 task_queue = queue .Queue ()
@@ -77,7 +86,7 @@ def _prepare_task_queue(self, test_scripts):
7786 print (" Done" )
7887 return task_queue
7988
80- def _process_task_queue (self , executor , task_queue ):
89+ def _process_task_queue (self , executor : ThreadPoolExecutor , task_queue : queue . Queue [ tuple [ str , int , int ]] ):
8190 """Processes the task queue, scheduling tests based on resource availability"""
8291
8392 while not task_queue .empty ():
@@ -106,7 +115,7 @@ def _process_task_queue(self, executor, task_queue):
106115 except queue .Empty :
107116 break
108117
109- def _try_acquire_resources (self , nodes_needed ) :
118+ def _try_acquire_resources (self , nodes_needed : int ) -> bool :
110119 """Attempts to acquire required resources, returns True if successful"""
111120
112121 with self .resource_lock :
@@ -116,15 +125,15 @@ def _try_acquire_resources(self, nodes_needed):
116125 return True
117126 return False
118127
119- def _release_resources (self , nodes_count ):
128+ def _release_resources (self , nodes_count : int ):
120129 """Releases resources and notifies waiting threads"""
121130
122131 with self .resource_lock :
123132 self .available_nodes += nodes_count
124133 self .available_workers += 1
125134 self .resource_event .set ()
126135
127- def _run_test_with_cleanup (self , script , index , nodes_count ):
136+ def _run_test_with_cleanup (self , script : str , index : int , nodes_count : int ):
128137 """Runs a test and ensures resources are released"""
129138
130139 try :
0 commit comments