Skip to content

Commit 1e7adfc

Browse files
committed
Adding an extended test for new MPITaskScheduler logic
1 parent c99e0b2 commit 1e7adfc

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

parsl/tests/test_mpi_apps/test_mpi_scheduler.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,36 @@ def test_MPISched_contention():
161161
assert task_on_worker_side['task_id'] == 2
162162
_, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer'])
163163
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8
164+
165+
166+
167+
@pytest.mark.local
168+
def test_tiny_large_loop():
169+
"""Run a set of tiny and large tasks in a loop"""
170+
171+
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
172+
scheduler = MPITaskScheduler(task_q, result_q)
173+
174+
assert scheduler.available_nodes
175+
assert len(scheduler.available_nodes) == 8
176+
177+
assert scheduler._free_node_counter.value == 8
178+
179+
for i in range(10):
180+
num_nodes = 2 if i % 2 == 0 else 8
181+
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
182+
resource_specification={
183+
"num_nodes": num_nodes,
184+
"ranks_per_node": 2
185+
})
186+
task_package = {"task_id": i, "buffer": mock_task_buffer}
187+
scheduler.put_task(task_package)
188+
189+
for i in range(10):
190+
task = task_q.get(timeout=30)
191+
result_pkl = pickle.dumps(
192+
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
193+
result_q.put(result_pkl)
194+
got_result = scheduler.get_result(True, 1)
195+
assert got_result == result_pkl
196+
logging.warning(f"Got {task=}")

0 commit comments

Comments
 (0)