Skip to content

Commit 2981a28

Browse files
authored
Move monitoring router parameters into object attributes (#3521)
This is to support rearrangement of the structure of the router code into multiple threads and methods, without having to manually wire all of the multiprocessing objects between the new methods and threads. These objects are part of the context of the router object, rather than parameters to individual methods which might change, and they are all multiprocessing objects which are thread-safe.
1 parent 5eb30f1 commit 2981a28

File tree

1 file changed

+32
-17
lines changed

1 file changed

+32
-17
lines changed

parsl/monitoring/router.py

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ def __init__(self,
3232
logdir: str = ".",
3333
run_id: str,
3434
logging_level: int = logging.INFO,
35-
atexit_timeout: int = 3 # in seconds
35+
atexit_timeout: int = 3, # in seconds
36+
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
37+
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
38+
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
39+
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
40+
exit_event: Event,
3641
):
3742
""" Initializes a monitoring configuration class.
3843
@@ -51,7 +56,11 @@ def __init__(self,
5156
Logging level as defined in the logging module. Default: logging.INFO
5257
atexit_timeout : float, optional
5358
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
59+
*_msgs : Queue
60+
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
5461
62+
exit_event : Event
63+
An event that the main Parsl process will set to signal that the monitoring router should shut down.
5564
"""
5665
os.makedirs(logdir, exist_ok=True)
5766
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
@@ -93,19 +102,20 @@ def __init__(self,
93102
min_port=zmq_port_range[0],
94103
max_port=zmq_port_range[1])
95104

96-
def start(self,
97-
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
98-
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
99-
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
100-
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
101-
exit_event: Event) -> None:
105+
self.priority_msgs = priority_msgs
106+
self.node_msgs = node_msgs
107+
self.block_msgs = block_msgs
108+
self.resource_msgs = resource_msgs
109+
self.exit_event = exit_event
110+
111+
def start(self) -> None:
102112
try:
103-
while not exit_event.is_set():
113+
while not self.exit_event.is_set():
104114
try:
105115
data, addr = self.udp_sock.recvfrom(2048)
106116
resource_msg = pickle.loads(data)
107117
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
108-
resource_msgs.put((resource_msg, addr))
118+
self.resource_msgs.put((resource_msg, addr))
109119
except socket.timeout:
110120
pass
111121

@@ -125,15 +135,15 @@ def start(self,
125135

126136
if msg[0] == MessageType.NODE_INFO:
127137
msg[1]['run_id'] = self.run_id
128-
node_msgs.put(msg_0)
138+
self.node_msgs.put(msg_0)
129139
elif msg[0] == MessageType.RESOURCE_INFO:
130-
resource_msgs.put(msg_0)
140+
self.resource_msgs.put(msg_0)
131141
elif msg[0] == MessageType.BLOCK_INFO:
132-
block_msgs.put(msg_0)
142+
self.block_msgs.put(msg_0)
133143
elif msg[0] == MessageType.TASK_INFO:
134-
priority_msgs.put(msg_0)
144+
self.priority_msgs.put(msg_0)
135145
elif msg[0] == MessageType.WORKFLOW_INFO:
136-
priority_msgs.put(msg_0)
146+
self.priority_msgs.put(msg_0)
137147
else:
138148
# There is a type: ignore here because if msg[0]
139149
# is of the correct type, this code is unreachable,
@@ -158,7 +168,7 @@ def start(self,
158168
data, addr = self.udp_sock.recvfrom(2048)
159169
msg = pickle.loads(data)
160170
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
161-
resource_msgs.put((msg, addr))
171+
self.resource_msgs.put((msg, addr))
162172
last_msg_received_time = time.time()
163173
except socket.timeout:
164174
pass
@@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
191201
zmq_port_range=zmq_port_range,
192202
logdir=logdir,
193203
logging_level=logging_level,
194-
run_id=run_id)
204+
run_id=run_id,
205+
priority_msgs=priority_msgs,
206+
node_msgs=node_msgs,
207+
block_msgs=block_msgs,
208+
resource_msgs=resource_msgs,
209+
exit_event=exit_event)
195210
except Exception as e:
196211
logger.error("MonitoringRouter construction failed.", exc_info=True)
197212
comm_q.put(f"Monitoring router construction failed: {e}")
@@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
200215

201216
router.logger.info("Starting MonitoringRouter in router_starter")
202217
try:
203-
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
218+
router.start()
204219
except Exception as e:
205220
router.logger.exception("router.start exception")
206221
exception_q.put(('Hub', str(e)))

0 commit comments

Comments
 (0)