Skip to content

Commit fb7a941

Browse files
committed
Exit monitoring router exit on multiprocessing event, not exit message
Prior to this PR, the monitoring router exited due to receiving a WORKFLOW_INFO message with an exit_now field set to True, but only if that message was received through a specific path. This PR removes that exit_now field, and makes the monitoring router exit on a multiprocessing event. This removes the need for the exit message to arrive through that specific path into the router, which makes message handling more consistent, and opens up opportunities to feed messages into monitoring through different paths. Slowly ongoing work has been trying to make all the different monitoring message paths behave the same with a goal of eliminating some of them, and this change also works towards that.
1 parent e3df4db commit fb7a941

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

parsl/dataflow/dflow.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,8 +1227,7 @@ def cleanup(self) -> None:
12271227
'tasks_completed_count': self.task_state_counts[States.exec_done],
12281228
"time_began": self.time_began,
12291229
'time_completed': self.time_completed,
1230-
'run_id': self.run_id, 'rundir': self.run_dir,
1231-
'exit_now': True})
1230+
'run_id': self.run_id, 'rundir': self.run_dir})
12321231

12331232
logger.info("Terminating monitoring")
12341233
self.monitoring.close()

parsl/monitoring/monitoring.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import os
44
import time
55
import logging
6+
import multiprocessing.synchronize as ms
67
import typeguard
78
import zmq
89

910
import queue
1011

1112
from parsl.multiprocessing import ForkProcess, SizedQueue
1213
from multiprocessing import Process
14+
from multiprocessing import Event
1315
from multiprocessing.queues import Queue
1416
from parsl.log_utils import set_file_logger
1517
from parsl.utils import RepresentationMixin
@@ -157,8 +159,12 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
157159
self.block_msgs: Queue[AddressedMonitoringMessage]
158160
self.block_msgs = SizedQueue()
159161

162+
self.router_exit_event: ms.Event
163+
self.router_exit_event = Event()
164+
160165
self.router_proc = ForkProcess(target=router_starter,
161-
args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs),
166+
args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs,
167+
self.block_msgs, self.resource_msgs, self.router_exit_event),
162168
kwargs={"hub_address": self.hub_address,
163169
"udp_port": self.hub_port,
164170
"zmq_port_range": self.hub_port_range,
@@ -249,6 +255,8 @@ def close(self) -> None:
249255
self.router_proc.terminate()
250256
self.dbm_proc.terminate()
251257
self.filesystem_proc.terminate()
258+
logger.info("Setting router termination event")
259+
self.router_exit_event.set()
252260
logger.info("Waiting for router to terminate")
253261
self.router_proc.join()
254262
logger.debug("Finished waiting for router termination")

parsl/monitoring/router.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from parsl.monitoring.message_type import MessageType
1717
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
18+
19+
from multiprocessing.synchronize import Event
1820
from typing import Optional, Tuple, Union
1921

2022

@@ -98,10 +100,10 @@ def start(self,
98100
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
99101
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
100102
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
101-
resource_msgs: "queue.Queue[AddressedMonitoringMessage]") -> None:
103+
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
104+
exit_event: Event) -> None:
102105
try:
103-
router_keep_going = True
104-
while router_keep_going:
106+
while not exit_event.is_set():
105107
try:
106108
data, addr = self.udp_sock.recvfrom(2048)
107109
resource_msg = pickle.loads(data)
@@ -135,8 +137,6 @@ def start(self,
135137
priority_msgs.put(msg_0)
136138
elif msg[0] == MessageType.WORKFLOW_INFO:
137139
priority_msgs.put(msg_0)
138-
if 'exit_now' in msg[1] and msg[1]['exit_now']:
139-
router_keep_going = False
140140
else:
141141
# There is a type: ignore here because if msg[0]
142142
# is of the correct type, this code is unreachable,
@@ -178,6 +178,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
178178
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
179179
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
180180
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
181+
exit_event: Event,
181182

182183
hub_address: str,
183184
udp_port: Optional[int],
@@ -202,7 +203,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
202203

203204
router.logger.info("Starting MonitoringRouter in router_starter")
204205
try:
205-
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
206+
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
206207
except Exception as e:
207208
router.logger.exception("router.start exception")
208209
exception_q.put(('Hub', str(e)))

0 commit comments

Comments
 (0)