Skip to content

Commit e2b4176

Browse files
committed
desc-sponsored monitoring untangling that fits in here
i think if i try using the dfk.monitoring.send channel, this is going to be very thread unsafe. but I think that's already the case, because there's nothing thread specific about who can call dfk.monitoring.send... ... just going to happen more often.
1 parent fb7a941 commit e2b4176

File tree

5 files changed

+39
-46
lines changed

5 files changed

+39
-46
lines changed

parsl/dataflow/dflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,11 @@ def __init__(self, config: Config) -> None:
177177

178178
# this must be set before executors are added since add_executors calls
179179
# job_status_poller.add_executors.
180+
radio = self.monitoring.radio if self.monitoring else None
180181
self.job_status_poller = JobStatusPoller(strategy=self.config.strategy,
181182
strategy_period=self.config.strategy_period,
182183
max_idletime=self.config.max_idletime,
183-
dfk=self)
184+
monitoring=radio)
184185

185186
self.executors: Dict[str, ParslExecutor] = {}
186187

parsl/jobs/job_status_poller.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import logging
22
import parsl
33
import time
4-
import zmq
54
from typing import Dict, List, Sequence, Optional, Union
65

76
from parsl.jobs.states import JobStatus, JobState
@@ -17,25 +16,11 @@
1716

1817

1918
class PolledExecutorFacade:
20-
def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None):
19+
def __init__(self, executor: BlockProviderExecutor, monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None):
2120
self._executor = executor
2221
self._last_poll_time = 0.0
2322
self._status = {} # type: Dict[str, JobStatus]
24-
25-
# Create a ZMQ channel to send poll status to monitoring
26-
27-
self.hub_channel: Optional[zmq.Socket]
28-
29-
if dfk and dfk.monitoring is not None:
30-
hub_address = dfk.hub_address
31-
hub_port = dfk.hub_zmq_port
32-
context = zmq.Context()
33-
self.hub_channel = context.socket(zmq.DEALER)
34-
self.hub_channel.set_hwm(0)
35-
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
36-
logger.info("Monitoring enabled on job status poller")
37-
else:
38-
self.hub_channel = None
23+
self._monitoring = monitoring
3924

4025
def poll(self) -> None:
4126
now = time.time()
@@ -54,10 +39,10 @@ def poll(self) -> None:
5439

5540
def send_monitoring_info(self, status: Dict) -> None:
5641
# Send monitoring info for HTEX when monitoring enabled
57-
if self.hub_channel:
42+
if self._monitoring:
5843
msg = self._executor.create_monitoring_info(status)
5944
logger.debug("Sending message {} to hub from job status poller".format(msg))
60-
self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg))
45+
self._monitoring.send((MessageType.BLOCK_INFO, msg))
6146

6247
@property
6348
def status(self) -> Dict[str, JobStatus]:
@@ -107,9 +92,9 @@ def __repr__(self) -> str:
10792
class JobStatusPoller(Timer):
10893
def __init__(self, *, strategy: Optional[str], max_idletime: float,
10994
strategy_period: Union[float, int],
110-
dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None:
95+
monitoring: Optional["parsl.monitoring.radios.MonitoringRadio"] = None) -> None:
11196
self._executor_facades = [] # type: List[PolledExecutorFacade]
112-
self.dfk = dfk
97+
self.monitoring = monitoring
11398
self._strategy = Strategy(strategy=strategy,
11499
max_idletime=max_idletime)
115100
super().__init__(self.poll, interval=strategy_period, name="JobStatusPoller")
@@ -131,7 +116,7 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
131116
for executor in executors:
132117
if executor.status_polling_interval > 0:
133118
logger.debug("Adding executor {}".format(executor.label))
134-
self._executor_facades.append(PolledExecutorFacade(executor, self.dfk))
119+
self._executor_facades.append(PolledExecutorFacade(executor, self.monitoring))
135120
self._strategy.add_executors(executors)
136121

137122
def close(self):

parsl/monitoring/monitoring.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import logging
66
import multiprocessing.synchronize as ms
77
import typeguard
8-
import zmq
98

109
import queue
1110

@@ -20,6 +19,7 @@
2019

2120
from parsl.serialize import deserialize
2221

22+
from parsl.monitoring.radios import MultiprocessingQueueRadio
2323
from parsl.monitoring.router import router_starter
2424
from parsl.monitoring.message_type import MessageType
2525
from parsl.monitoring.types import AddressedMonitoringMessage
@@ -92,12 +92,6 @@ def __init__(self,
9292
Default: 30 seconds
9393
"""
9494

95-
# Any is used to disable typechecking on uses of _dfk_channel,
96-
# because it is used in the code as if it points to a channel, but
97-
# the static type is that it can also be None. The code relies on
98-
# .start() being called and initialising this to a real channel.
99-
self._dfk_channel = None # type: Any
100-
10195
if _db_manager_excepts:
10296
raise _db_manager_excepts
10397

@@ -197,6 +191,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
197191
self.filesystem_proc.start()
198192
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
199193

194+
self.radio = MultiprocessingQueueRadio(self.block_msgs)
195+
200196
try:
201197
comm_q_result = comm_q.get(block=True, timeout=120)
202198
except queue.Empty:
@@ -211,26 +207,14 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
211207

212208
self.monitoring_hub_url = "udp://{}:{}".format(self.hub_address, udp_port)
213209

214-
context = zmq.Context()
215-
self.dfk_channel_timeout = 10000 # in milliseconds
216-
self._dfk_channel = context.socket(zmq.DEALER)
217-
self._dfk_channel.setsockopt(zmq.LINGER, 0)
218-
self._dfk_channel.set_hwm(0)
219-
self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout)
220-
self._dfk_channel.connect("tcp://{}:{}".format(self.hub_address, zmq_port))
221-
222210
logger.info("Monitoring Hub initialized")
223211

224212
return zmq_port
225213

226214
# TODO: tighten the Any message format
227215
def send(self, mtype: MessageType, message: Any) -> None:
228216
logger.debug("Sending message type {}".format(mtype))
229-
try:
230-
self._dfk_channel.send_pyobj((mtype, message))
231-
except zmq.Again:
232-
logger.exception(
233-
"The monitoring message sent from DFK to router timed-out after {}ms".format(self.dfk_channel_timeout))
217+
self.radio.send((mtype, message))
234218

235219
def close(self) -> None:
236220
logger.info("Terminating Monitoring Hub")
@@ -241,9 +225,8 @@ def close(self) -> None:
241225
logger.error("There was a queued exception (Either router or DBM process got exception much earlier?)")
242226
except queue.Empty:
243227
break
244-
if self._dfk_channel and self.monitoring_hub_active:
228+
if self.monitoring_hub_active:
245229
self.monitoring_hub_active = False
246-
self._dfk_channel.close()
247230
if exception_msgs:
248231
for exception_msg in exception_msgs:
249232
logger.error(

parsl/monitoring/radios.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from abc import ABCMeta, abstractmethod
88

9+
from multiprocessing.queues import Queue
910
from typing import Optional
1011

1112
from parsl.serialize import serialize
@@ -173,3 +174,17 @@ def send(self, message: object) -> None:
173174
logging.error("Could not send message within timeout limit")
174175
return
175176
return
177+
178+
179+
class MultiprocessingQueueRadio(MonitoringRadio):
180+
"""A monitoring radio intended which connects over a multiprocessing Queue.
181+
This radio is intended to be used on the submit side, where components
182+
in the submit process, or processes launched by multiprocessing, will have
183+
access to a Queue shared with the monitoring database code (bypassing the
184+
monitoring router).
185+
"""
186+
def __init__(self, queue: Queue) -> None:
187+
self.queue = queue
188+
189+
def send(self, message: object) -> None:
190+
self.queue.put((message, 0))

parsl/tests/test_monitoring/test_fuzz_zmq.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import socket
66
import time
7+
import zmq
78

89
logger = logging.getLogger(__name__)
910

@@ -48,8 +49,16 @@ def test_row_counts():
4849
s.connect((hub_address, hub_zmq_port))
4950
s.sendall(b'fuzzing\r')
5051

52+
context = zmq.Context()
53+
channel_timeout = 10000 # in milliseconds
54+
hub_channel = context.socket(zmq.DEALER)
55+
hub_channel.setsockopt(zmq.LINGER, 0)
56+
hub_channel.set_hwm(0)
57+
hub_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
58+
hub_channel.connect("tcp://{}:{}".format(hub_address, hub_zmq_port))
59+
5160
# this will send a non-object down the DFK's existing ZMQ connection
52-
parsl.dfk().monitoring._dfk_channel.send(b'FuzzyByte\rSTREAM')
61+
hub_channel.send(b'FuzzyByte\rSTREAM')
5362

5463
# This following attack is commented out, because monitoring is not resilient
5564
# to this.

0 commit comments

Comments
 (0)