Skip to content

Commit 4ac2db3

Browse files
committed
feat: improve task schedular
1 parent c9fffe0 commit 4ac2db3

File tree

1 file changed

+30
-31
lines changed

1 file changed

+30
-31
lines changed

tests/test_utils/test_scheduler.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ def schedule(self, test_scripts: list[str]) -> set[str]:
6262
self._collect_results()
6363
return self.failed_tests
6464

65-
def _prepare_task_queue(self, test_scripts: list[str]) -> queue.Queue[tuple[str, int, int]]:
65+
def _prepare_task_queue(self, test_scripts: list[str]) -> list[tuple[str, int, int]]:
6666
"""Prepares a task queue with scripts and resource requirements"""
6767

68-
task_queue = queue.Queue()
69-
print("Scanning num_nodes requirement for each task.", end = "")
68+
task_queue = []
69+
print("Scanning num_nodes requirement for each task")
7070
with ThreadPoolExecutor() as executor:
7171
tasks = [(executor.submit(
7272
get_num_test_nodes,
@@ -82,38 +82,37 @@ def _prepare_task_queue(self, test_scripts: list[str]) -> queue.Queue[tuple[str,
8282
f"but only max to {self.available_nodes} nodes are available"
8383
f"Please specify --max-nodes to run the test")
8484

85-
task_queue.put((script, result, i))
86-
print(" Done")
85+
task_queue.append((script, result, i))
86+
task_queue.sort(key=lambda x: x[1], reverse=True)
87+
for script, nodes_needed, index in task_queue:
88+
print(f"Task {index}: {script} requires {nodes_needed} nodes")
89+
print("Scanning done")
8790
return task_queue
91+
92+
def _pop_next_task(self, task_queue: list[tuple[str, int, int]]) -> tuple[str, int, int]:
93+
"""Selects the next task to process"""
94+
if not task_queue:
95+
raise RuntimeError("No tasks to process")
96+
while True:
97+
for i, (script, nodes_needed, index) in enumerate(task_queue):
98+
if self._try_acquire_resources(nodes_needed):
99+
task_queue.pop(i)
100+
return script, nodes_needed, index
101+
self.resource_event.wait(timeout=10)
102+
self.resource_event.clear()
88103

89-
def _process_task_queue(self, executor: ThreadPoolExecutor, task_queue: queue.Queue[tuple[str, int, int]]):
104+
def _process_task_queue(self, executor: ThreadPoolExecutor, task_queue: list[tuple[str, int, int]]):
90105
"""Processes the task queue, scheduling tests based on resource availability"""
91106

92-
while not task_queue.empty():
93-
try:
94-
# Retrieve task
95-
script, nodes_needed, index = task_queue.get(block=False)
96-
97-
# Attempt to allocate resources
98-
if self._try_acquire_resources(nodes_needed):
99-
# Enough resources available, execute test
100-
future = executor.submit(
101-
self._run_test_with_cleanup,
102-
script,
103-
index,
104-
nodes_needed
105-
)
106-
self.results.append((script, future))
107-
108-
# Wait for at least 1 second to avoid launch a lot of tasks
109-
time.sleep(1)
110-
else:
111-
# Insufficient resources, re-add to queue and wait
112-
task_queue.put((script, nodes_needed, index))
113-
self.resource_event.wait(timeout=0.2)
114-
self.resource_event.clear()
115-
except queue.Empty:
116-
break
107+
while task_queue:
108+
script, nodes_needed, index = self._pop_next_task(task_queue)
109+
future = executor.submit(
110+
self._run_test_with_cleanup,
111+
script,
112+
index,
113+
nodes_needed
114+
)
115+
self.results.append((script, future))
117116

118117
def _try_acquire_resources(self, nodes_needed: int) -> bool:
119118
"""Attempts to acquire required resources, returns True if successful"""

0 commit comments

Comments
 (0)