Skip to content

Commit 3eb7e93

Browse files
authored
Make ZMQ, UDP and filesystem monitoring routers send via radios (#3700)
This PR is intended to consolidate monitoring message sending in the monitoring radio code. This is a step towards removing Python multiprocessing from the monitoring code base (see issue #2343) by making it clearer how to change to a different message send implementation (by swapping out the radio implementation and configuration) Compare to how the interchange forwards HTEXRadio messages onwards via some other radio (which right now is always the ZMQRadioSender) -- rather than having its own ZMQ code. # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent 5cb58d1 commit 3eb7e93

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

parsl/monitoring/monitoring.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
270270
new_dir = f"{base_path}/new/"
271271
logger.debug("Creating new and tmp paths under %s", base_path)
272272

273+
target_radio = MultiprocessingQueueRadioSender(q)
274+
273275
os.makedirs(tmp_dir, exist_ok=True)
274276
os.makedirs(new_dir, exist_ok=True)
275277

@@ -285,7 +287,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
285287
message = pickle.load(f)
286288
logger.debug("Message received is: %s", message)
287289
assert isinstance(message, tuple)
288-
q.put(cast(TaggedMonitoringMessage, message))
290+
target_radio.send(cast(TaggedMonitoringMessage, message))
289291
os.remove(full_path_filename)
290292
except Exception:
291293
logger.exception("Exception processing %s - probably will be retried next iteration", filename)

parsl/monitoring/router.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import zmq
1515

1616
from parsl.log_utils import set_file_logger
17+
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
1718
from parsl.monitoring.types import TaggedMonitoringMessage
1819
from parsl.process_loggers import wrap_with_logs
1920
from parsl.utils import setproctitle
@@ -55,7 +56,6 @@ def __init__(self,
5556
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
5657
resource_msgs : multiprocessing.Queue
5758
A multiprocessing queue to receive messages to be routed onwards to the database process
58-
5959
exit_event : Event
6060
An event that the main Parsl process will set to signal that the monitoring router should shut down.
6161
"""
@@ -98,7 +98,7 @@ def __init__(self,
9898
min_port=zmq_port_range[0],
9999
max_port=zmq_port_range[1])
100100

101-
self.resource_msgs = resource_msgs
101+
self.target_radio = MultiprocessingQueueRadioSender(resource_msgs)
102102
self.exit_event = exit_event
103103

104104
@wrap_with_logs(target="monitoring_router")
@@ -125,7 +125,7 @@ def start_udp_listener(self) -> None:
125125
data, addr = self.udp_sock.recvfrom(2048)
126126
resource_msg = pickle.loads(data)
127127
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
128-
self.resource_msgs.put(resource_msg)
128+
self.target_radio.send(resource_msg)
129129
except socket.timeout:
130130
pass
131131

@@ -136,7 +136,7 @@ def start_udp_listener(self) -> None:
136136
data, addr = self.udp_sock.recvfrom(2048)
137137
msg = pickle.loads(data)
138138
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
139-
self.resource_msgs.put(msg)
139+
self.target_radio.send(msg)
140140
last_msg_received_time = time.time()
141141
except socket.timeout:
142142
pass
@@ -160,7 +160,7 @@ def start_zmq_listener(self) -> None:
160160
assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg)
161161
assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg)
162162

163-
self.resource_msgs.put(msg)
163+
self.target_radio.send(msg)
164164
except zmq.Again:
165165
pass
166166
except Exception:

0 commit comments

Comments
 (0)