Skip to content

Commit 5eb30f1

Browse files
authored
Add Modular Manager Selector Interface (#3547)
Added a Manager Selector interface which allows users to choose an algorithm to sort the interesting managers. This will allow for flexible testing and implementation of manager selection strategies to optimize efficiency of the interchange.
1 parent 4da6657 commit 5eb30f1

File tree

5 files changed

+40
-4
lines changed

5 files changed

+40
-4
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from parsl.executors.errors import BadMessage, ScalingFailed
2121
from parsl.executors.high_throughput import zmq_pipes
2222
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
23+
from parsl.executors.high_throughput.manager_selector import (
24+
ManagerSelector,
25+
RandomManagerSelector,
26+
)
2327
from parsl.executors.high_throughput.mpi_prefix_composer import (
2428
VALID_LAUNCHERS,
2529
validate_resource_spec,
@@ -261,6 +265,7 @@ def __init__(self,
261265
worker_logdir_root: Optional[str] = None,
262266
enable_mpi_mode: bool = False,
263267
mpi_launcher: str = "mpiexec",
268+
manager_selector: ManagerSelector = RandomManagerSelector(),
264269
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
265270
encrypted: bool = False):
266271

@@ -276,6 +281,7 @@ def __init__(self,
276281
self.prefetch_capacity = prefetch_capacity
277282
self.address = address
278283
self.address_probe_timeout = address_probe_timeout
284+
self.manager_selector = manager_selector
279285
if self.address:
280286
self.all_addresses = address
281287
else:
@@ -544,6 +550,7 @@ def _start_local_interchange_process(self) -> None:
544550
"poll_period": self.poll_period,
545551
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
546552
"cert_dir": self.cert_dir,
553+
"manager_selector": self.manager_selector,
547554
}
548555

549556
config_pickle = pickle.dumps(interchange_config)

parsl/executors/high_throughput/interchange.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import pickle
77
import platform
88
import queue
9-
import random
109
import signal
1110
import sys
1211
import threading
@@ -19,6 +18,7 @@
1918
from parsl.app.errors import RemoteExceptionWrapper
2019
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
2120
from parsl.executors.high_throughput.manager_record import ManagerRecord
21+
from parsl.executors.high_throughput.manager_selector import ManagerSelector
2222
from parsl.monitoring.message_type import MessageType
2323
from parsl.process_loggers import wrap_with_logs
2424
from parsl.serialize import serialize as serialize_object
@@ -53,6 +53,7 @@ def __init__(self,
5353
logging_level: int,
5454
poll_period: int,
5555
cert_dir: Optional[str],
56+
manager_selector: ManagerSelector,
5657
) -> None:
5758
"""
5859
Parameters
@@ -160,6 +161,8 @@ def __init__(self,
160161

161162
self.heartbeat_threshold = heartbeat_threshold
162163

164+
self.manager_selector = manager_selector
165+
163166
self.current_platform = {'parsl_v': PARSL_VERSION,
164167
'python_v': "{}.{}.{}".format(sys.version_info.major,
165168
sys.version_info.minor,
@@ -485,8 +488,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
485488
interesting=len(interesting_managers)))
486489

487490
if interesting_managers and not self.pending_task_queue.empty():
488-
shuffled_managers = list(interesting_managers)
489-
random.shuffle(shuffled_managers)
491+
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)
490492

491493
while shuffled_managers and not self.pending_task_queue.empty(): # cf. the if statement above...
492494
manager_id = shuffled_managers.pop()
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import random
2+
from abc import ABCMeta, abstractmethod
3+
from typing import Dict, List, Set
4+
5+
from parsl.executors.high_throughput.manager_record import ManagerRecord
6+
7+
8+
class ManagerSelector(metaclass=ABCMeta):
9+
10+
@abstractmethod
11+
def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]:
12+
""" Sort a given list of managers.
13+
14+
Any operations pertaining to the sorting and rearrangement of the
15+
interesting_managers Set should be performed here.
16+
"""
17+
pass
18+
19+
20+
class RandomManagerSelector(ManagerSelector):
21+
22+
def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]:
23+
c_manager_list = list(manager_list)
24+
random.shuffle(c_manager_list)
25+
return c_manager_list

parsl/tests/test_htex/test_zmq_binding.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from parsl import curvezmq
1111
from parsl.executors.high_throughput.interchange import Interchange
12+
from parsl.executors.high_throughput.manager_selector import RandomManagerSelector
1213

1314

1415
def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange:
@@ -23,6 +24,7 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s
2324
heartbeat_threshold=60,
2425
logdir=".",
2526
logging_level=logging.INFO,
27+
manager_selector=RandomManagerSelector(),
2628
poll_period=10)
2729

2830

parsl/tests/test_mpi_apps/test_mpiex.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_init():
4444

4545
new_kwargs = {'max_workers_per_block'}
4646
excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node',
47-
'mem_per_worker', 'cpu_affinity', 'max_workers'}
47+
'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'}
4848

4949
# Get the kwargs from both HTEx and MPIEx
5050
htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters)

0 commit comments

Comments
 (0)