Skip to content

Commit d8e8d4b

Browse files
authored
Split monitoring router into two radio-specific receiver threads (#3558)
This is part of two ongoing strands of development: * Preparation for arbitrary monitoring radio receivers, where UDP is not special compared to other methods. This code separates out the UDP specific code making it easier to move around later (see development in PR #3315) * Avoiding poll-one-thing, poll-another-thing tight polling loops in favour of multiple blocking loops. The two router receiver sections used to run in a single tight non-blocking loop, each component waiting loop_freq (= 10 ms) for something to happen. After this PR, the separated loops are more amenable to longer blocking times - they only need to discover when exit event is set which probably can be more on the order of 1 second.
1 parent 2f6a185 commit d8e8d4b

File tree

1 file changed

+40
-14
lines changed

1 file changed

+40
-14
lines changed

parsl/monitoring/router.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pickle
66
import queue
77
import socket
8+
import threading
89
import time
910
from multiprocessing.synchronize import Event
1011
from typing import Optional, Tuple, Union
@@ -108,7 +109,24 @@ def __init__(self,
108109
self.resource_msgs = resource_msgs
109110
self.exit_event = exit_event
110111

112+
@wrap_with_logs(target="monitoring_router")
111113
def start(self) -> None:
114+
self.logger.info("Starting UDP listener thread")
115+
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
116+
udp_radio_receiver_thread.start()
117+
118+
self.logger.info("Starting ZMQ listener thread")
119+
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True)
120+
zmq_radio_receiver_thread.start()
121+
122+
self.logger.info("Joining on ZMQ listener thread")
123+
zmq_radio_receiver_thread.join()
124+
self.logger.info("Joining on UDP listener thread")
125+
udp_radio_receiver_thread.join()
126+
self.logger.info("Joined on both ZMQ and UDP listener threads")
127+
128+
@wrap_with_logs(target="monitoring_router")
129+
def start_udp_listener(self) -> None:
112130
try:
113131
while not self.exit_event.is_set():
114132
try:
@@ -119,6 +137,26 @@ def start(self) -> None:
119137
except socket.timeout:
120138
pass
121139

140+
self.logger.info("UDP listener draining")
141+
last_msg_received_time = time.time()
142+
while time.time() - last_msg_received_time < self.atexit_timeout:
143+
try:
144+
data, addr = self.udp_sock.recvfrom(2048)
145+
msg = pickle.loads(data)
146+
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
147+
self.resource_msgs.put((msg, addr))
148+
last_msg_received_time = time.time()
149+
except socket.timeout:
150+
pass
151+
152+
self.logger.info("UDP listener finishing normally")
153+
finally:
154+
self.logger.info("UDP listener finished")
155+
156+
@wrap_with_logs(target="monitoring_router")
157+
def start_zmq_listener(self) -> None:
158+
try:
159+
while not self.exit_event.is_set():
122160
try:
123161
dfk_loop_start = time.time()
124162
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
@@ -161,21 +199,9 @@ def start(self) -> None:
161199
# thing to do.
162200
self.logger.warning("Failure processing a ZMQ message", exc_info=True)
163201

164-
self.logger.info("Monitoring router draining")
165-
last_msg_received_time = time.time()
166-
while time.time() - last_msg_received_time < self.atexit_timeout:
167-
try:
168-
data, addr = self.udp_sock.recvfrom(2048)
169-
msg = pickle.loads(data)
170-
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
171-
self.resource_msgs.put((msg, addr))
172-
last_msg_received_time = time.time()
173-
except socket.timeout:
174-
pass
175-
176-
self.logger.info("Monitoring router finishing normally")
202+
self.logger.info("ZMQ listener finishing normally")
177203
finally:
178-
self.logger.info("Monitoring router finished")
204+
self.logger.info("ZMQ listener finished")
179205

180206

181207
@wrap_with_logs

0 commit comments

Comments
 (0)