Skip to content

Commit 6715018

Browse files
committed
Adding two tests for MPITaskScheduler
* test_larger_jobs_prioritized checks to confirm the ordering of jobs in the backlog queue * test_hashable_backlog_queue tests to confirm that the PrioritizedTask dataclass avoid the priority queue failing to hash tasks with the same priority.
1 parent 1e7adfc commit 6715018

File tree

1 file changed

+57
-3
lines changed

1 file changed

+57
-3
lines changed

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import logging
21
import os
32
import pickle
3+
import random
44
from unittest import mock
55

66
import pytest
@@ -163,6 +163,60 @@ def test_MPISched_contention():
163163
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8
164164

165165

166+
@pytest.mark.local
167+
def test_hashable_backlog_queue():
168+
"""Run multiple large tasks that to force entry into backlog_queue
169+
where queue.PriorityQueue expects hashability/comparability
170+
"""
171+
172+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
173+
scheduler = MPITaskScheduler(task_q, result_q)
174+
175+
assert scheduler.available_nodes
176+
assert len(scheduler.available_nodes) == 8
177+
178+
assert scheduler._free_node_counter.value == 8
179+
180+
for i in range(3):
181+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
182+
resource_specification={
183+
"num_nodes": 8,
184+
"ranks_per_node": 2
185+
})
186+
task_package = {"task_id": i, "buffer": mock_task_buffer}
187+
scheduler.put_task(task_package)
188+
assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks"
189+
190+
191+
@pytest.mark.local
192+
def test_larger_jobs_prioritized():
193+
"""Larger jobs should be scheduled first"""
194+
195+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
196+
scheduler = MPITaskScheduler(task_q, result_q)
197+
198+
max_nodes = len(scheduler.available_nodes)
199+
200+
# The first task will get scheduled with all the nodes,
201+
# and the remainder hits the backlog queue.
202+
counter = 0
203+
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]
204+
205+
for task_id, num_nodes in enumerate(node_request_list):
206+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
207+
resource_specification={
208+
"num_nodes": num_nodes,
209+
"ranks_per_node": 2
210+
})
211+
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
212+
scheduler.put_task(task_package)
213+
214+
# Confirm that the tasks are sorted in decreasing order
215+
prev_priority = 0
216+
for i in range(len(node_request_list) - 1):
217+
p_task = scheduler._backlog_queue.get()
218+
assert p_task.priority < 0
219+
assert p_task.priority <= prev_priority
166220

167221
@pytest.mark.local
168222
def test_tiny_large_loop():
@@ -192,5 +246,5 @@ def test_tiny_large_loop():
192246
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
193247
result_q.put(result_pkl)
194248
got_result = scheduler.get_result(True, 1)
195-
assert got_result == result_pkl
196-
logging.warning(f"Got {task=}")
249+
250+
assert got_result == result_pkl

0 commit comments

Comments
 (0)