Skip to content

Commit 0fc966f

Browse files
authored
Collapse 4 monitoring router to db queues into 1 queue (#3593)
Prior to this PR, there are four multiprocessing queues from the monitoring router process to the database manager process. (also used by the submit process via MultiprocessingQueueRadioSender but that is not so relevant for this PR) Each message arriving at the router goes into MonitoringRouter.start_zmq_listener where it is dispatched based on tag type into one of these four queues towards the monitoring database. In the monitoring database code, no matter which queue the messages arrive on, they are all passed into DatabaseManager._dispatch_to_internal. The four queues then don't provide much functionality - their effect is maybe some non-deterministic message order shuffling. This PR collapses those four queues into a single queue. # Changed Behaviour Messages will arrive at the database manager in possibly different orders. This might flush out more race conditions. The monitoring router would previous validate that a message tag was one of 5 known message tags (as part of choosing which queue to dispatch to). This PR removes that validation. That validation now happens at the receiving end of the (now single) queue, in DatabaseManager._dispatch_to_internal. Error messages related to invalid tags (which should only be coming from development of new message types) will now appear in the database manager process, rather than the router process.
1 parent bdfbb26 commit 0fc966f

File tree

3 files changed

+12
-91
lines changed

3 files changed

+12
-91
lines changed

parsl/monitoring/db_manager.py

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -308,35 +308,9 @@ def __init__(self,
308308
self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue()
309309

310310
def start(self,
311-
priority_queue: mpq.Queue,
312-
node_queue: mpq.Queue,
313-
block_queue: mpq.Queue,
314311
resource_queue: mpq.Queue) -> None:
315312

316313
self._kill_event = threading.Event()
317-
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
318-
args=(
319-
priority_queue, self._kill_event,),
320-
name="Monitoring-migrate-priority",
321-
daemon=True,
322-
)
323-
self._priority_queue_pull_thread.start()
324-
325-
self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
326-
args=(
327-
node_queue, self._kill_event,),
328-
name="Monitoring-migrate-node",
329-
daemon=True,
330-
)
331-
self._node_queue_pull_thread.start()
332-
333-
self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
334-
args=(
335-
block_queue, self._kill_event,),
336-
name="Monitoring-migrate-block",
337-
daemon=True,
338-
)
339-
self._block_queue_pull_thread.start()
340314

341315
self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
342316
args=(
@@ -372,20 +346,18 @@ def start(self,
372346
while (not self._kill_event.is_set() or
373347
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
374348
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
375-
priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or
376-
node_queue.qsize() != 0 or block_queue.qsize() != 0):
349+
resource_queue.qsize() != 0):
377350

378351
"""
379352
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
380353
381354
"""
382355
try:
383-
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format(
356+
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
384357
self._kill_event.is_set(),
385358
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
386359
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
387-
priority_queue.qsize() != 0, resource_queue.qsize() != 0,
388-
node_queue.qsize() != 0, block_queue.qsize() != 0))
360+
resource_queue.qsize() != 0))
389361

390362
# This is the list of resource messages which can be reprocessed as if they
391363
# had just arrived because the corresponding first task message has been
@@ -707,9 +679,6 @@ def close(self) -> None:
707679
@wrap_with_logs(target="database_manager")
708680
@typeguard.typechecked
709681
def dbm_starter(exception_q: mpq.Queue,
710-
priority_msgs: mpq.Queue,
711-
node_msgs: mpq.Queue,
712-
block_msgs: mpq.Queue,
713682
resource_msgs: mpq.Queue,
714683
db_url: str,
715684
logdir: str,
@@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue,
726695
logdir=logdir,
727696
logging_level=logging_level)
728697
logger.info("Starting dbm in dbm starter")
729-
dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
698+
dbm.start(resource_msgs)
730699
except KeyboardInterrupt:
731700
logger.exception("KeyboardInterrupt signal caught")
732701
dbm.close()

parsl/monitoring/monitoring.py

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
from multiprocessing import Event, Process
99
from multiprocessing.queues import Queue
10-
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast
10+
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast
1111

1212
import typeguard
1313

@@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
138138
self.exception_q: Queue[Tuple[str, str]]
139139
self.exception_q = SizedQueue(maxsize=10)
140140

141-
self.priority_msgs: Queue[Tuple[Any, int]]
142-
self.priority_msgs = SizedQueue()
143-
144-
self.resource_msgs: Queue[AddressedMonitoringMessage]
141+
self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]]
145142
self.resource_msgs = SizedQueue()
146143

147-
self.node_msgs: Queue[AddressedMonitoringMessage]
148-
self.node_msgs = SizedQueue()
149-
150-
self.block_msgs: Queue[AddressedMonitoringMessage]
151-
self.block_msgs = SizedQueue()
152-
153144
self.router_exit_event: ms.Event
154145
self.router_exit_event = Event()
155146

156147
self.router_proc = ForkProcess(target=router_starter,
157148
kwargs={"comm_q": comm_q,
158149
"exception_q": self.exception_q,
159-
"priority_msgs": self.priority_msgs,
160-
"node_msgs": self.node_msgs,
161-
"block_msgs": self.block_msgs,
162150
"resource_msgs": self.resource_msgs,
163151
"exit_event": self.router_exit_event,
164152
"hub_address": self.hub_address,
@@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
173161
self.router_proc.start()
174162

175163
self.dbm_proc = ForkProcess(target=dbm_starter,
176-
args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,),
164+
args=(self.exception_q, self.resource_msgs,),
177165
kwargs={"logdir": self.logdir,
178166
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
179167
"db_url": self.logging_endpoint,
@@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
192180
self.filesystem_proc.start()
193181
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
194182

195-
self.radio = MultiprocessingQueueRadioSender(self.block_msgs)
183+
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
196184

197185
try:
198186
comm_q_result = comm_q.get(block=True, timeout=120)
@@ -249,7 +237,7 @@ def close(self) -> None:
249237
logger.debug("Finished waiting for router termination")
250238
if len(exception_msgs) == 0:
251239
logger.debug("Sending STOP to DBM")
252-
self.priority_msgs.put(("STOP", 0))
240+
self.resource_msgs.put(("STOP", 0))
253241
else:
254242
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
255243
logger.debug("Waiting for DB termination")
@@ -267,14 +255,8 @@ def close(self) -> None:
267255
logger.info("Closing monitoring multiprocessing queues")
268256
self.exception_q.close()
269257
self.exception_q.join_thread()
270-
self.priority_msgs.close()
271-
self.priority_msgs.join_thread()
272258
self.resource_msgs.close()
273259
self.resource_msgs.join_thread()
274-
self.node_msgs.close()
275-
self.node_msgs.join_thread()
276-
self.block_msgs.close()
277-
self.block_msgs.join_thread()
278260
logger.info("Closed monitoring multiprocessing queues")
279261

280262

parsl/monitoring/router.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import zmq
1515

1616
from parsl.log_utils import set_file_logger
17-
from parsl.monitoring.message_type import MessageType
1817
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
1918
from parsl.process_loggers import wrap_with_logs
2019
from parsl.utils import setproctitle
@@ -34,9 +33,6 @@ def __init__(self,
3433
logdir: str = ".",
3534
logging_level: int = logging.INFO,
3635
atexit_timeout: int = 3, # in seconds
37-
priority_msgs: mpq.Queue,
38-
node_msgs: mpq.Queue,
39-
block_msgs: mpq.Queue,
4036
resource_msgs: mpq.Queue,
4137
exit_event: Event,
4238
):
@@ -57,8 +53,8 @@ def __init__(self,
5753
Logging level as defined in the logging module. Default: logging.INFO
5854
atexit_timeout : float, optional
5955
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
60-
*_msgs : Queue
61-
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
56+
resource_msgs : multiprocessing.Queue
57+
A multiprocessing queue to receive messages to be routed onwards to the database process
6258
6359
exit_event : Event
6460
An event that the main Parsl process will set to signal that the monitoring router should shut down.
@@ -102,9 +98,6 @@ def __init__(self,
10298
min_port=zmq_port_range[0],
10399
max_port=zmq_port_range[1])
104100

105-
self.priority_msgs = priority_msgs
106-
self.node_msgs = node_msgs
107-
self.block_msgs = block_msgs
108101
self.resource_msgs = resource_msgs
109102
self.exit_event = exit_event
110103

@@ -170,24 +163,7 @@ def start_zmq_listener(self) -> None:
170163
msg_0: AddressedMonitoringMessage
171164
msg_0 = (msg, 0)
172165

173-
if msg[0] == MessageType.NODE_INFO:
174-
self.node_msgs.put(msg_0)
175-
elif msg[0] == MessageType.RESOURCE_INFO:
176-
self.resource_msgs.put(msg_0)
177-
elif msg[0] == MessageType.BLOCK_INFO:
178-
self.block_msgs.put(msg_0)
179-
elif msg[0] == MessageType.TASK_INFO:
180-
self.priority_msgs.put(msg_0)
181-
elif msg[0] == MessageType.WORKFLOW_INFO:
182-
self.priority_msgs.put(msg_0)
183-
else:
184-
# There is a type: ignore here because if msg[0]
185-
# is of the correct type, this code is unreachable,
186-
# but there is no verification that the message
187-
# received from zmq_receiver_channel.recv_pyobj() is actually
188-
# of that type.
189-
self.logger.error("Discarding message " # type: ignore[unreachable]
190-
f"from interchange with unknown type {msg[0].value}")
166+
self.resource_msgs.put(msg_0)
191167
except zmq.Again:
192168
pass
193169
except Exception:
@@ -207,9 +183,6 @@ def start_zmq_listener(self) -> None:
207183
def router_starter(*,
208184
comm_q: mpq.Queue,
209185
exception_q: mpq.Queue,
210-
priority_msgs: mpq.Queue,
211-
node_msgs: mpq.Queue,
212-
block_msgs: mpq.Queue,
213186
resource_msgs: mpq.Queue,
214187
exit_event: Event,
215188

@@ -226,9 +199,6 @@ def router_starter(*,
226199
zmq_port_range=zmq_port_range,
227200
logdir=logdir,
228201
logging_level=logging_level,
229-
priority_msgs=priority_msgs,
230-
node_msgs=node_msgs,
231-
block_msgs=block_msgs,
232202
resource_msgs=resource_msgs,
233203
exit_event=exit_event)
234204
except Exception as e:

0 commit comments

Comments
 (0)