Skip to content

Commit 0e6c2f3

Browse files
authored
Split MonitoringRouter into ZMQ and UDP processes (#3796)
This is to facilitate further work where the ZMQ and UDP router processes can have different lifetimes (along with the already separated filesystem router process and any future plugins). The ZMQ router process is only needed when HTEX is being used, to route messages from the interchange. The UDP router process is only needed when resource monitoring is enabled, and an executor is configured with the udp radio mode. This PR does not change when these two processes run, but is only intended to help future PRs make this change. # Changed Behaviour one more process at runtime when monitoring is enabled ## Type of change - Code maintenance/cleanup
1 parent 686039e commit 0e6c2f3

File tree

4 files changed

+248
-111
lines changed

4 files changed

+248
-111
lines changed

parsl/monitoring/monitoring.py

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
from parsl.log_utils import set_file_logger
1616
from parsl.monitoring.errors import MonitoringHubStartError
1717
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
18-
from parsl.monitoring.router import router_starter
18+
from parsl.monitoring.radios.udp_router import udp_router_starter
19+
from parsl.monitoring.radios.zmq_router import zmq_router_starter
1920
from parsl.monitoring.types import TaggedMonitoringMessage
2021
from parsl.multiprocessing import ForkProcess, SizedQueue
2122
from parsl.process_loggers import wrap_with_logs
@@ -121,11 +122,14 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
121122
# in the future, Queue will allow runtime subscripts.
122123

123124
if TYPE_CHECKING:
124-
comm_q: Queue[Union[Tuple[int, int], str]]
125+
zmq_comm_q: Queue[Union[int, str]]
126+
udp_comm_q: Queue[Union[int, str]]
125127
else:
126-
comm_q: Queue
128+
zmq_comm_q: Queue
129+
udp_comm_q: Queue
127130

128-
comm_q = SizedQueue(maxsize=10)
131+
zmq_comm_q = SizedQueue(maxsize=10)
132+
udp_comm_q = SizedQueue(maxsize=10)
129133

130134
self.exception_q: Queue[Tuple[str, str]]
131135
self.exception_q = SizedQueue(maxsize=10)
@@ -136,21 +140,35 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
136140
self.router_exit_event: ms.Event
137141
self.router_exit_event = Event()
138142

139-
self.router_proc = ForkProcess(target=router_starter,
140-
kwargs={"comm_q": comm_q,
141-
"exception_q": self.exception_q,
142-
"resource_msgs": self.resource_msgs,
143-
"exit_event": self.router_exit_event,
144-
"hub_address": self.hub_address,
145-
"udp_port": self.hub_port,
146-
"zmq_port_range": self.hub_port_range,
147-
"run_dir": dfk_run_dir,
148-
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
149-
},
150-
name="Monitoring-Router-Process",
151-
daemon=True,
152-
)
153-
self.router_proc.start()
143+
self.zmq_router_proc = ForkProcess(target=zmq_router_starter,
144+
kwargs={"comm_q": zmq_comm_q,
145+
"exception_q": self.exception_q,
146+
"resource_msgs": self.resource_msgs,
147+
"exit_event": self.router_exit_event,
148+
"hub_address": self.hub_address,
149+
"zmq_port_range": self.hub_port_range,
150+
"run_dir": dfk_run_dir,
151+
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
152+
},
153+
name="Monitoring-ZMQ-Router-Process",
154+
daemon=True,
155+
)
156+
self.zmq_router_proc.start()
157+
158+
self.udp_router_proc = ForkProcess(target=udp_router_starter,
159+
kwargs={"comm_q": udp_comm_q,
160+
"exception_q": self.exception_q,
161+
"resource_msgs": self.resource_msgs,
162+
"exit_event": self.router_exit_event,
163+
"hub_address": self.hub_address,
164+
"udp_port": self.hub_port,
165+
"run_dir": dfk_run_dir,
166+
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
167+
},
168+
name="Monitoring-UDP-Router-Process",
169+
daemon=True,
170+
)
171+
self.udp_router_proc.start()
154172

155173
self.dbm_proc = ForkProcess(target=dbm_starter,
156174
args=(self.exception_q, self.resource_msgs,),
@@ -162,7 +180,8 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
162180
daemon=True,
163181
)
164182
self.dbm_proc.start()
165-
logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid)
183+
logger.info("Started ZMQ router process %s, UDP router process %s and DBM process %s",
184+
self.zmq_router_proc.pid, self.udp_router_proc.pid, self.dbm_proc.pid)
166185

167186
self.filesystem_proc = ForkProcess(target=filesystem_receiver,
168187
args=(self.resource_msgs, dfk_run_dir),
@@ -175,25 +194,36 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
175194
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
176195

177196
try:
178-
comm_q_result = comm_q.get(block=True, timeout=120)
179-
comm_q.close()
180-
comm_q.join_thread()
197+
zmq_comm_q_result = zmq_comm_q.get(block=True, timeout=120)
198+
zmq_comm_q.close()
199+
zmq_comm_q.join_thread()
181200
except queue.Empty:
182-
logger.error("MonitoringRouter has not reported ports in 120s. Aborting")
201+
logger.error("Monitoring ZMQ Router has not reported port in 120s. Aborting")
183202
raise MonitoringHubStartError()
184203

185-
if isinstance(comm_q_result, str):
186-
logger.error("MonitoringRouter sent an error message: %s", comm_q_result)
187-
raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}")
204+
if isinstance(zmq_comm_q_result, str):
205+
logger.error("MonitoringRouter sent an error message: %s", zmq_comm_q_result)
206+
raise RuntimeError(f"MonitoringRouter failed to start: {zmq_comm_q_result}")
207+
208+
self.hub_zmq_port = zmq_comm_q_result
188209

189-
udp_port, zmq_port = comm_q_result
210+
try:
211+
udp_comm_q_result = udp_comm_q.get(block=True, timeout=120)
212+
udp_comm_q.close()
213+
udp_comm_q.join_thread()
214+
except queue.Empty:
215+
logger.error("Monitoring UDP router has not reported port in 120s. Aborting")
216+
raise MonitoringHubStartError()
190217

218+
if isinstance(udp_comm_q_result, str):
219+
logger.error("MonitoringRouter sent an error message: %s", udp_comm_q_result)
220+
raise RuntimeError(f"MonitoringRouter failed to start: {udp_comm_q_result}")
221+
222+
udp_port = udp_comm_q_result
191223
self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
192224

193225
logger.info("Monitoring Hub initialized")
194226

195-
self.hub_zmq_port = zmq_port
196-
197227
def send(self, message: TaggedMonitoringMessage) -> None:
198228
logger.debug("Sending message type %s", message[0])
199229
self.radio.send(message)
@@ -216,14 +246,21 @@ def close(self) -> None:
216246
exception_msg[0],
217247
exception_msg[1]
218248
)
219-
self.router_proc.terminate()
249+
self.zmq_router_proc.terminate()
250+
self.udp_router_proc.terminate()
220251
self.dbm_proc.terminate()
221252
self.filesystem_proc.terminate()
222253
logger.info("Setting router termination event")
223254
self.router_exit_event.set()
224-
logger.info("Waiting for router to terminate")
225-
self.router_proc.join()
226-
self.router_proc.close()
255+
256+
logger.info("Waiting for ZMQ router to terminate")
257+
self.zmq_router_proc.join()
258+
self.zmq_router_proc.close()
259+
260+
logger.info("Waiting for UDP router to terminate")
261+
self.udp_router_proc.join()
262+
self.udp_router_proc.close()
263+
227264
logger.debug("Finished waiting for router termination")
228265
if len(exception_msgs) == 0:
229266
logger.debug("Sending STOP to DBM")
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import multiprocessing.queues as mpq
5+
import os
6+
import pickle
7+
import socket
8+
import threading
9+
import time
10+
from multiprocessing.synchronize import Event
11+
from typing import Optional
12+
13+
import typeguard
14+
15+
from parsl.log_utils import set_file_logger
16+
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
17+
from parsl.process_loggers import wrap_with_logs
18+
from parsl.utils import setproctitle
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class MonitoringRouter:
24+
25+
def __init__(self,
26+
*,
27+
hub_address: str,
28+
udp_port: Optional[int] = None,
29+
30+
monitoring_hub_address: str = "127.0.0.1",
31+
run_dir: str = ".",
32+
logging_level: int = logging.INFO,
33+
atexit_timeout: int = 3, # in seconds
34+
resource_msgs: mpq.Queue,
35+
exit_event: Event,
36+
):
37+
""" Initializes a monitoring configuration class.
38+
39+
Parameters
40+
----------
41+
hub_address : str
42+
The ip address at which the workers will be able to reach the Hub.
43+
udp_port : int
44+
The specific port at which workers will be able to reach the Hub via UDP. Default: None
45+
run_dir : str
46+
Parsl log directory paths. Logs and temp files go here. Default: '.'
47+
logging_level : int
48+
Logging level as defined in the logging module. Default: logging.INFO
49+
atexit_timeout : float, optional
50+
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
51+
resource_msgs : multiprocessing.Queue
52+
A multiprocessing queue to receive messages to be routed onwards to the database process
53+
exit_event : Event
54+
An event that the main Parsl process will set to signal that the monitoring router should shut down.
55+
"""
56+
os.makedirs(run_dir, exist_ok=True)
57+
self.logger = set_file_logger(f"{run_dir}/monitoring_udp_router.log",
58+
name="monitoring_router",
59+
level=logging_level)
60+
self.logger.debug("Monitoring router starting")
61+
62+
self.hub_address = hub_address
63+
self.atexit_timeout = atexit_timeout
64+
65+
self.loop_freq = 10.0 # milliseconds
66+
67+
# Initialize the UDP socket
68+
self.udp_sock = socket.socket(socket.AF_INET,
69+
socket.SOCK_DGRAM,
70+
socket.IPPROTO_UDP)
71+
72+
# We are trying to bind to all interfaces with 0.0.0.0
73+
if not udp_port:
74+
self.udp_sock.bind(('0.0.0.0', 0))
75+
self.udp_port = self.udp_sock.getsockname()[1]
76+
else:
77+
self.udp_port = udp_port
78+
try:
79+
self.udp_sock.bind(('0.0.0.0', self.udp_port))
80+
except Exception as e:
81+
raise RuntimeError(f"Could not bind to udp_port {udp_port} because: {e}")
82+
self.udp_sock.settimeout(self.loop_freq / 1000)
83+
self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.udp_port))
84+
85+
self.target_radio = MultiprocessingQueueRadioSender(resource_msgs)
86+
self.exit_event = exit_event
87+
88+
@wrap_with_logs(target="monitoring_router")
89+
def start(self) -> None:
90+
self.logger.info("Starting UDP listener thread")
91+
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
92+
udp_radio_receiver_thread.start()
93+
94+
self.logger.info("Joining on UDP listener thread")
95+
udp_radio_receiver_thread.join()
96+
self.logger.info("Joined on both ZMQ and UDP listener threads")
97+
98+
@wrap_with_logs(target="monitoring_router")
99+
def start_udp_listener(self) -> None:
100+
try:
101+
while not self.exit_event.is_set():
102+
try:
103+
data, addr = self.udp_sock.recvfrom(2048)
104+
resource_msg = pickle.loads(data)
105+
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
106+
self.target_radio.send(resource_msg)
107+
except socket.timeout:
108+
pass
109+
110+
self.logger.info("UDP listener draining")
111+
last_msg_received_time = time.time()
112+
while time.time() - last_msg_received_time < self.atexit_timeout:
113+
try:
114+
data, addr = self.udp_sock.recvfrom(2048)
115+
msg = pickle.loads(data)
116+
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
117+
self.target_radio.send(msg)
118+
last_msg_received_time = time.time()
119+
except socket.timeout:
120+
pass
121+
122+
self.logger.info("UDP listener finishing normally")
123+
finally:
124+
self.logger.info("UDP listener finished")
125+
126+
127+
@wrap_with_logs
128+
@typeguard.typechecked
129+
def udp_router_starter(*,
130+
comm_q: mpq.Queue,
131+
exception_q: mpq.Queue,
132+
resource_msgs: mpq.Queue,
133+
exit_event: Event,
134+
135+
hub_address: str,
136+
udp_port: Optional[int],
137+
138+
run_dir: str,
139+
logging_level: int) -> None:
140+
setproctitle("parsl: monitoring UDP router")
141+
try:
142+
router = MonitoringRouter(hub_address=hub_address,
143+
udp_port=udp_port,
144+
run_dir=run_dir,
145+
logging_level=logging_level,
146+
resource_msgs=resource_msgs,
147+
exit_event=exit_event)
148+
except Exception as e:
149+
logger.error("MonitoringRouter construction failed.", exc_info=True)
150+
comm_q.put(f"Monitoring router construction failed: {e}")
151+
else:
152+
comm_q.put(router.udp_port)
153+
154+
router.logger.info("Starting MonitoringRouter in router_starter")
155+
try:
156+
router.start()
157+
except Exception as e:
158+
router.logger.exception("UDP router start exception")
159+
exception_q.put(('Hub', str(e)))

0 commit comments

Comments
 (0)