Skip to content

Commit a6b159a

Browse files
authored
Convert DFK/monitoring message.send() to multiprocessing Queue (#3835)
This consolidates the API that the MonitoringHub offers for sending a monitoring message to be via a multiprocessing Queue. Prior to this PR, the MonitoringHub exposed both a multiprocessing Queue (since #3818) and a .send() method for use in the original process. This is part of ongoing work to move external communication for monitoring out of the MonitoringHub, except via this multiprocessing.Queue. This removes another constraint on what a replacement MonitoringHub looks like - something that is being partially driven by Globus Compute desire for some monitoring information via API, not via the existing database implementation. This PR splits the DFK state for monitoring into two: a configured MonitoringHub (self.monitoring) which comes from the Config object as before, and a new internal piece of state, self.monitoring_radio which is now managed separately and configured from self.monitoring. This is extra complexity for the DFK, as a tradeoff for a cleaner interface for MonitoringHub, and reflects the DFK's dual role as "thing that deals with tasks" and "God-object that binds everything together". # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent a95ed58 commit a6b159a

File tree

2 files changed

+17
-19
lines changed

2 files changed

+17
-19
lines changed

parsl/dataflow/dflow.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from parsl.jobs.job_status_poller import JobStatusPoller
4646
from parsl.monitoring import MonitoringHub
4747
from parsl.monitoring.message_type import MessageType
48+
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
4849
from parsl.monitoring.remote import monitor_wrapper
4950
from parsl.process_loggers import wrap_with_logs
5051
from parsl.usage_tracking.usage import UsageTracker
@@ -110,8 +111,11 @@ def __init__(self, config: Config) -> None:
110111
self.monitoring: Optional[MonitoringHub]
111112
self.monitoring = config.monitoring
112113

114+
self.monitoring_radio = None
115+
113116
if self.monitoring:
114117
self.monitoring.start(self.run_dir, self.config.run_dir)
118+
self.monitoring_radio = MultiprocessingQueueRadioSender(self.monitoring.resource_msgs)
115119

116120
self.time_began = datetime.datetime.now()
117121
self.time_completed: Optional[datetime.datetime] = None
@@ -156,9 +160,9 @@ def __init__(self, config: Config) -> None:
156160
'host': gethostname(),
157161
}
158162

159-
if self.monitoring:
160-
self.monitoring.send((MessageType.WORKFLOW_INFO,
161-
workflow_info))
163+
if self.monitoring_radio:
164+
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
165+
workflow_info))
162166

163167
if config.checkpoint_files is not None:
164168
checkpoint_files = config.checkpoint_files
@@ -231,9 +235,9 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
231235
raise InternalConsistencyError(f"Exit case for {mode} should be unreachable, validated by typeguard on Config()")
232236

233237
def _send_task_log_info(self, task_record: TaskRecord) -> None:
234-
if self.monitoring:
238+
if self.monitoring_radio:
235239
task_log_info = self._create_task_log_info(task_record)
236-
self.monitoring.send((MessageType.TASK_INFO, task_log_info))
240+
self.monitoring_radio.send((MessageType.TASK_INFO, task_log_info))
237241

238242
def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]:
239243
"""
@@ -1215,15 +1219,16 @@ def cleanup(self) -> None:
12151219
logger.info("Terminated executors")
12161220
self.time_completed = datetime.datetime.now()
12171221

1218-
if self.monitoring:
1222+
if self.monitoring_radio:
12191223
logger.info("Sending final monitoring message")
1220-
self.monitoring.send((MessageType.WORKFLOW_INFO,
1221-
{'tasks_failed_count': self.task_state_counts[States.failed],
1222-
'tasks_completed_count': self.task_state_counts[States.exec_done],
1223-
"time_began": self.time_began,
1224-
'time_completed': self.time_completed,
1225-
'run_id': self.run_id, 'rundir': self.run_dir}))
1224+
self.monitoring_radio.send((MessageType.WORKFLOW_INFO,
1225+
{'tasks_failed_count': self.task_state_counts[States.failed],
1226+
'tasks_completed_count': self.task_state_counts[States.exec_done],
1227+
"time_began": self.time_began,
1228+
'time_completed': self.time_completed,
1229+
'run_id': self.run_id, 'rundir': self.run_dir}))
12261230

1231+
if self.monitoring:
12271232
logger.info("Terminating monitoring")
12281233
self.monitoring.close()
12291234
logger.info("Terminated monitoring")

parsl/monitoring/monitoring.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
from parsl.monitoring.errors import MonitoringHubStartError
1414
from parsl.monitoring.radios.filesystem_router import filesystem_router_starter
15-
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
1615
from parsl.monitoring.radios.udp_router import udp_router_starter
1716
from parsl.monitoring.types import TaggedMonitoringMessage
1817
from parsl.multiprocessing import (
@@ -180,8 +179,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
180179
self.filesystem_proc.start()
181180
logger.info("Started filesystem radio receiver process %s", self.filesystem_proc.pid)
182181

183-
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)
184-
185182
try:
186183
udp_comm_q_result = udp_comm_q.get(block=True, timeout=120)
187184
udp_comm_q.close()
@@ -199,10 +196,6 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
199196

200197
logger.info("Monitoring Hub initialized")
201198

202-
def send(self, message: TaggedMonitoringMessage) -> None:
203-
logger.debug("Sending message type %s", message[0])
204-
self.radio.send(message)
205-
206199
def close(self) -> None:
207200
logger.info("Terminating Monitoring Hub")
208201
if self.monitoring_hub_active:

0 commit comments

Comments
 (0)