Skip to content

Commit 4f139c2

Browse files
authored
Factor interchange monitoring code into a ZMQRadioSender (#3556)
From an interchange perspective: this is a refactoring intended to clarify that the interchange isn't doing anything special wrt. monitoring messages and that it can send monitoring messages in the same way that remote workers can. From a monitoring perspective: this pulls ZMQ sender code out of the interchange and puts it in a place that is more natural for ongoing development. For example, a potential future use with Work Queue and Task Vine is that workers would also benefit from using ZMQ to send monitoring messages. In some potential use cases, it might be desirable to configure the radio used by the interchange instead of the hard-coded ZMQRadio. On-going work in draft PR #3315 addresses configuration of different types of radio and that work should be relevant here too.
1 parent 2b01411 commit 4f139c2

File tree

2 files changed

+47
-34
lines changed

2 files changed

+47
-34
lines changed

parsl/executors/high_throughput/interchange.py

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from parsl.executors.high_throughput.manager_record import ManagerRecord
2121
from parsl.executors.high_throughput.manager_selector import ManagerSelector
2222
from parsl.monitoring.message_type import MessageType
23+
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
2324
from parsl.process_loggers import wrap_with_logs
2425
from parsl.serialize import serialize as serialize_object
2526
from parsl.utils import setproctitle
@@ -219,36 +220,27 @@ def task_puller(self) -> NoReturn:
219220
task_counter += 1
220221
logger.debug(f"Fetched {task_counter} tasks so far")
221222

222-
def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
223-
if self.hub_address and self.hub_zmq_port:
224-
logger.info("Connecting to MonitoringHub")
225-
# This is a one-off because monitoring is unencrypted
226-
hub_channel = zmq.Context().socket(zmq.DEALER)
227-
hub_channel.set_hwm(0)
228-
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
229-
logger.info("Connected to MonitoringHub")
230-
return hub_channel
231-
else:
232-
return None
233-
234-
def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
235-
if hub_channel:
223+
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
224+
if monitoring_radio:
236225
logger.info("Sending message {} to MonitoringHub".format(manager))
237226

238227
d: Dict = cast(Dict, manager.copy())
239228
d['timestamp'] = datetime.datetime.now()
240229
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])
241230

242-
hub_channel.send_pyobj((MessageType.NODE_INFO, d))
231+
monitoring_radio.send((MessageType.NODE_INFO, d))
243232

244233
@wrap_with_logs(target="interchange")
245234
def _command_server(self) -> NoReturn:
246235
""" Command server to run async command to the interchange
247236
"""
248237
logger.debug("Command Server Starting")
249238

250-
# Need to create a new ZMQ socket for command server thread
251-
hub_channel = self._create_monitoring_channel()
239+
if self.hub_address is not None and self.hub_zmq_port is not None:
240+
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
241+
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
242+
else:
243+
monitoring_radio = None
252244

253245
reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)
254246

@@ -298,7 +290,7 @@ def _command_server(self) -> NoReturn:
298290
if manager_id in self._ready_managers:
299291
m = self._ready_managers[manager_id]
300292
m['active'] = False
301-
self._send_monitoring_info(hub_channel, m)
293+
self._send_monitoring_info(monitoring_radio, m)
302294
else:
303295
logger.warning("Worker to hold was not in ready managers list")
304296

@@ -333,9 +325,14 @@ def start(self) -> None:
333325
# parent-process-inheritance problems.
334326
signal.signal(signal.SIGTERM, signal.SIG_DFL)
335327

336-
logger.info("Incoming ports bound")
328+
logger.info("Starting main interchange method")
337329

338-
hub_channel = self._create_monitoring_channel()
330+
if self.hub_address is not None and self.hub_zmq_port is not None:
331+
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
332+
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
333+
logger.debug("Created monitoring radio")
334+
else:
335+
monitoring_radio = None
339336

340337
poll_period = self.poll_period
341338

@@ -366,10 +363,10 @@ def start(self) -> None:
366363
while not kill_event.is_set():
367364
self.socks = dict(poller.poll(timeout=poll_period))
368365

369-
self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event)
370-
self.process_results_incoming(interesting_managers, hub_channel)
371-
self.expire_bad_managers(interesting_managers, hub_channel)
372-
self.expire_drained_managers(interesting_managers, hub_channel)
366+
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
367+
self.process_results_incoming(interesting_managers, monitoring_radio)
368+
self.expire_bad_managers(interesting_managers, monitoring_radio)
369+
self.expire_drained_managers(interesting_managers, monitoring_radio)
373370
self.process_tasks_to_send(interesting_managers)
374371

375372
self.zmq_context.destroy()
@@ -380,7 +377,7 @@ def start(self) -> None:
380377
def process_task_outgoing_incoming(
381378
self,
382379
interesting_managers: Set[bytes],
383-
hub_channel: Optional[zmq.Socket],
380+
monitoring_radio: Optional[MonitoringRadioSender],
384381
kill_event: threading.Event
385382
) -> None:
386383
"""Process one message from manager on the task_outgoing channel.
@@ -434,7 +431,7 @@ def process_task_outgoing_incoming(
434431
m.update(msg) # type: ignore[typeddict-item]
435432

436433
logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
437-
self._send_monitoring_info(hub_channel, m)
434+
self._send_monitoring_info(monitoring_radio, m)
438435

439436
if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
440437
msg['parsl_v'] != self.current_platform['parsl_v']):
@@ -465,7 +462,7 @@ def process_task_outgoing_incoming(
465462
logger.error(f"Unexpected message type received from manager: {msg['type']}")
466463
logger.debug("leaving task_outgoing section")
467464

468-
def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
465+
def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
469466

470467
for manager_id in list(interesting_managers):
471468
# is it always true that a draining manager will be in interesting managers?
@@ -478,7 +475,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel:
478475
self._ready_managers.pop(manager_id)
479476

480477
m['active'] = False
481-
self._send_monitoring_info(hub_channel, m)
478+
self._send_monitoring_info(monitoring_radio, m)
482479

483480
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
484481
# Check if there are tasks that could be sent to managers
@@ -521,7 +518,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
521518
else:
522519
logger.debug("either no interesting managers or no tasks, so skipping manager pass")
523520

524-
def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
521+
def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
525522
# Receive any results and forward to client
526523
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
527524
logger.debug("entering results_incoming section")
@@ -541,11 +538,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
541538
elif r['type'] == 'monitoring':
542539
# the monitoring code makes the assumption that no
543540
# monitoring messages will be received if monitoring
544-
# is not configured, and that hub_channel will only
541+
# is not configured, and that monitoring_radio will only
545542
# be None when monitoring is not configurated.
546-
assert hub_channel is not None
543+
assert monitoring_radio is not None
547544

548-
hub_channel.send_pyobj(r['payload'])
545+
monitoring_radio.send(r['payload'])
549546
elif r['type'] == 'heartbeat':
550547
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
551548
b_messages.append((p_message, r))
@@ -589,15 +586,15 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
589586
interesting_managers.add(manager_id)
590587
logger.debug("leaving results_incoming section")
591588

592-
def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
589+
def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
593590
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
594591
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
595592
for (manager_id, m) in bad_managers:
596593
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
597594
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
598595
if m['active']:
599596
m['active'] = False
600-
self._send_monitoring_info(hub_channel, m)
597+
self._send_monitoring_info(monitoring_radio, m)
601598

602599
logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
603600
for tid in m['tasks']:

parsl/monitoring/radios.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from multiprocessing.queues import Queue
88
from typing import Optional
99

10+
import zmq
11+
1012
from parsl.serialize import serialize
1113

1214
_db_manager_excepts: Optional[Exception]
@@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None:
186188

187189
def send(self, message: object) -> None:
188190
self.queue.put((message, 0))
191+
192+
193+
class ZMQRadioSender(MonitoringRadioSender):
194+
"""A monitoring radio which connects over ZMQ. This radio is not
195+
thread-safe, because its use of ZMQ is not thread-safe.
196+
"""
197+
198+
def __init__(self, hub_address: str, hub_zmq_port: int) -> None:
199+
self._hub_channel = zmq.Context().socket(zmq.DEALER)
200+
self._hub_channel.set_hwm(0)
201+
self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}")
202+
203+
def send(self, message: object) -> None:
204+
self._hub_channel.send_pyobj(message)

0 commit comments

Comments
 (0)