diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index e610b17bee..c36f06ad8d 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -6,7 +6,7 @@ import subprocess from dataclasses import dataclass, field from enum import Enum -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from parsl.multiprocessing import SpawnContext from parsl.serialize import pack_res_spec_apply_message, unpack_res_spec_apply_message @@ -76,6 +76,8 @@ class PrioritizedTask: # This dataclass limits comparison to the priority field priority: int task: Dict = field(compare=False) + unpacked_task: Tuple = field(compare=False) + nodes_needed: int = field(compare=False) class TaskScheduler: @@ -165,40 +167,58 @@ def _return_nodes(self, nodes: List[str]) -> None: with self._free_node_counter.get_lock(): self._free_node_counter.value += len(nodes) # type: ignore[attr-defined] - def put_task(self, task_package: dict): - """Schedule task if resources are available otherwise backlog the task""" - user_ns = locals() - user_ns.update({"__builtins__": __builtins__}) - _f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"]) - - nodes_needed = resource_spec.get("num_nodes") - tid = task_package["task_id"] + def schedule_task(self, p_task: PrioritizedTask): + """Schedule a prioritized task if resources are available, and push to backlog + otherwise.""" + nodes_needed = p_task.nodes_needed + tid = p_task.task["task_id"] if nodes_needed: try: allocated_nodes = self._get_nodes(nodes_needed) except MPINodesUnavailable: logger.info(f"Not enough resources, placing task {tid} into backlog") - self._backlog_queue.put(PrioritizedTask(nodes_needed, task_package)) + self._backlog_queue.put(p_task) return else: + f, args, kwargs, resource_spec = p_task.unpacked_task resource_spec["MPI_NODELIST"] = ",".join(allocated_nodes) self._map_tasks_to_nodes[tid] = allocated_nodes - buffer = pack_res_spec_apply_message(_f, _args, _kwargs, resource_spec) - task_package["buffer"] = buffer - task_package["resource_spec"] = resource_spec + buffer = pack_res_spec_apply_message(f, args, kwargs, resource_spec) + p_task.task["buffer"] = buffer + p_task.task["resource_spec"] = resource_spec + + self.pending_task_q.put(p_task.task) + + def put_task(self, task_package: dict): + """Schedule task if resources are available otherwise backlog the task""" + user_ns = locals() + user_ns.update({"__builtins__": __builtins__}) + _f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"]) + + nodes_needed = resource_spec.get("num_nodes") + # Prioritize large jobs + prioritized_task = PrioritizedTask(priority=-1 * nodes_needed, + task=task_package, + unpacked_task=(_f, _args, _kwargs, resource_spec), + nodes_needed=nodes_needed) - self.pending_task_q.put(task_package) + self.schedule_task(prioritized_task) def _schedule_backlog_tasks(self): """Attempt to schedule backlogged tasks""" - try: - prioritized_task = self._backlog_queue.get(block=False) - self.put_task(prioritized_task.task) - except queue.Empty: - return - else: - # Keep attempting to schedule tasks till we are out of resources - self._schedule_backlog_tasks() + + # Separate fetching tasks from the _backlog_queue and scheduling them + # since tasks that failed to schedule will be pushed to the _backlog_queue + backlogged_tasks = [] + while True: + try: + prioritized_task = self._backlog_queue.get(block=False) + backlogged_tasks.append(prioritized_task) + except queue.Empty: + break + + for backlogged_task in backlogged_tasks: + self.schedule_task(backlogged_task) def get_result(self, block: bool = True, timeout: Optional[float] = None): """Return result and relinquish provisioned nodes""" diff --git a/parsl/tests/test_mpi_apps/test_mpi_scheduler.py b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py index d64520bea5..4b0e26e4fc 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_scheduler.py +++ b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py @@ -1,6 +1,7 @@ import logging import os import pickle +import random from unittest import mock import pytest @@ -186,3 +187,70 @@ def test_hashable_backlog_queue(): task_package = {"task_id": i, "buffer": mock_task_buffer} scheduler.put_task(task_package) assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks" + + +@pytest.mark.local +def test_tiny_large_loop(): + """Run a set of tiny and large tasks in a loop""" + + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + + assert scheduler._free_node_counter.value == 8 + + for i in range(10): + num_nodes = 2 if i % 2 == 0 else 8 + mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs", + resource_specification={ + "num_nodes": num_nodes, + "ranks_per_node": 2 + }) + task_package = {"task_id": i, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + for i in range(10): + task = task_q.get(timeout=30) + result_pkl = pickle.dumps( + {"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"}) + result_q.put(result_pkl) + got_result = scheduler.get_result(True, 1) + + assert got_result == result_pkl + + +@pytest.mark.local +def test_larger_jobs_prioritized(): + """Larger jobs should be scheduled first""" + + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + max_nodes = len(scheduler.available_nodes) + + # The first task will get scheduled with all the nodes, + # and the remainder hits the backlog queue. + node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)] + + for task_id, num_nodes in enumerate(node_request_list): + mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs", + resource_specification={ + "num_nodes": num_nodes, + "ranks_per_node": 2 + }) + task_package = {"task_id": task_id, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + # Confirm that the tasks are sorted in decreasing order + output_priority = [] + for i in range(len(node_request_list) - 1): + p_task = scheduler._backlog_queue.get() + output_priority.append(p_task.nodes_needed) + + # Remove the first large job that blocks the nodes and forces following + # tasks into backlog + expected_priority = node_request_list[1:] + expected_priority.sort(reverse=True) + assert expected_priority == output_priority, "Expected nodes in decreasing sorted order"