Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import subprocess
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple

from parsl.multiprocessing import SpawnContext
from parsl.serialize import pack_res_spec_apply_message, unpack_res_spec_apply_message
Expand Down Expand Up @@ -76,6 +76,8 @@ class PrioritizedTask:
# This dataclass limits comparison to the priority field
priority: int
task: Dict = field(compare=False)
unpacked_task: Tuple = field(compare=False)
nodes_needed: int = field(compare=False)


class TaskScheduler:
Expand Down Expand Up @@ -165,40 +167,58 @@ def _return_nodes(self, nodes: List[str]) -> None:
with self._free_node_counter.get_lock():
self._free_node_counter.value += len(nodes) # type: ignore[attr-defined]

def put_task(self, task_package: dict):
"""Schedule task if resources are available otherwise backlog the task"""
user_ns = locals()
user_ns.update({"__builtins__": __builtins__})
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])

nodes_needed = resource_spec.get("num_nodes")
tid = task_package["task_id"]
def schedule_task(self, p_task: PrioritizedTask):
"""Schedule a prioritized task if resources are available, and push to backlog
otherwise."""
nodes_needed = p_task.nodes_needed
tid = p_task.task["task_id"]
if nodes_needed:
try:
allocated_nodes = self._get_nodes(nodes_needed)
except MPINodesUnavailable:
logger.info(f"Not enough resources, placing task {tid} into backlog")
self._backlog_queue.put(PrioritizedTask(nodes_needed, task_package))
self._backlog_queue.put(p_task)
return
else:
f, args, kwargs, resource_spec = p_task.unpacked_task
resource_spec["MPI_NODELIST"] = ",".join(allocated_nodes)
self._map_tasks_to_nodes[tid] = allocated_nodes
buffer = pack_res_spec_apply_message(_f, _args, _kwargs, resource_spec)
task_package["buffer"] = buffer
task_package["resource_spec"] = resource_spec
buffer = pack_res_spec_apply_message(f, args, kwargs, resource_spec)
p_task.task["buffer"] = buffer
p_task.task["resource_spec"] = resource_spec

self.pending_task_q.put(p_task.task)

def put_task(self, task_package: dict):
"""Schedule task if resources are available otherwise backlog the task"""
user_ns = locals()
user_ns.update({"__builtins__": __builtins__})
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])

nodes_needed = resource_spec.get("num_nodes")
# Prioritize large jobs
prioritized_task = PrioritizedTask(priority=-1 * nodes_needed,
task=task_package,
unpacked_task=(_f, _args, _kwargs, resource_spec),
nodes_needed=nodes_needed)

self.pending_task_q.put(task_package)
self.schedule_task(prioritized_task)

def _schedule_backlog_tasks(self):
"""Attempt to schedule backlogged tasks"""
try:
prioritized_task = self._backlog_queue.get(block=False)
self.put_task(prioritized_task.task)
except queue.Empty:
return
else:
# Keep attempting to schedule tasks till we are out of resources
self._schedule_backlog_tasks()

# Separate fetching tasks from the _backlog_queue and scheduling them
# since tasks that failed to schedule will be pushed to the _backlog_queue
backlogged_tasks = []
while True:
try:
prioritized_task = self._backlog_queue.get(block=False)
backlogged_tasks.append(prioritized_task)
except queue.Empty:
break

for backlogged_task in backlogged_tasks:
self.schedule_task(backlogged_task)

def get_result(self, block: bool = True, timeout: Optional[float] = None):
"""Return result and relinquish provisioned nodes"""
Expand Down
68 changes: 68 additions & 0 deletions parsl/tests/test_mpi_apps/test_mpi_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import pickle
import random
from unittest import mock

import pytest
Expand Down Expand Up @@ -186,3 +187,70 @@ def test_hashable_backlog_queue():
task_package = {"task_id": i, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
assert scheduler._backlog_queue.qsize() == 2, "Expected 2 backlogged tasks"


@pytest.mark.local
def test_tiny_large_loop():
"""Run a set of tiny and large tasks in a loop"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8

assert scheduler._free_node_counter.value == 8

for i in range(10):
num_nodes = 2 if i % 2 == 0 else 8
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": num_nodes,
"ranks_per_node": 2
})
task_package = {"task_id": i, "buffer": mock_task_buffer}
scheduler.put_task(task_package)

for i in range(10):
task = task_q.get(timeout=30)
result_pkl = pickle.dumps(
{"task_id": task["task_id"], "type": "result", "buffer": "RESULT BUF"})
result_q.put(result_pkl)
got_result = scheduler.get_result(True, 1)

assert got_result == result_pkl


@pytest.mark.local
def test_larger_jobs_prioritized():
"""Larger jobs should be scheduled first"""

task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)

max_nodes = len(scheduler.available_nodes)

# The first task will get scheduled with all the nodes,
# and the remainder hits the backlog queue.
node_request_list = [max_nodes] + [random.randint(1, 4) for _i in range(8)]

for task_id, num_nodes in enumerate(node_request_list):
mock_task_buffer = pack_res_spec_apply_message("func", "args", "kwargs",
resource_specification={
"num_nodes": num_nodes,
"ranks_per_node": 2
})
task_package = {"task_id": task_id, "buffer": mock_task_buffer}
scheduler.put_task(task_package)

# Confirm that the tasks are sorted in decreasing order
output_priority = []
for i in range(len(node_request_list) - 1):
p_task = scheduler._backlog_queue.get()
output_priority.append(p_task.nodes_needed)

# Remove the first large job that blocks the nodes and forces following
# tasks into backlog
expected_priority = node_request_list[1:]
expected_priority.sort(reverse=True)
assert expected_priority == output_priority, "Expected nodes in decreasing sorted order"