Skip to content

Commit 5e5c607

Browse files
authored
Move filesystem monitoring receiver to own file (#3798)
This is consistent with ZMQ and UDP receivers being in their own files. # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent a65b5b8 commit 5e5c607

File tree

2 files changed

+56
-45
lines changed

2 files changed

+56
-45
lines changed

parsl/monitoring/monitoring.py

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,21 @@
33
import logging
44
import multiprocessing.synchronize as ms
55
import os
6-
import pickle
76
import queue
8-
import time
97
from multiprocessing import Event
108
from multiprocessing.queues import Queue
11-
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast
9+
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union
1210

1311
import typeguard
1412

15-
from parsl.log_utils import set_file_logger
1613
from parsl.monitoring.errors import MonitoringHubStartError
14+
from parsl.monitoring.radios.filesystem_router import filesystem_router_starter
1715
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
1816
from parsl.monitoring.radios.udp_router import udp_router_starter
1917
from parsl.monitoring.radios.zmq_router import zmq_router_starter
2018
from parsl.monitoring.types import TaggedMonitoringMessage
2119
from parsl.multiprocessing import ForkProcess, SizedQueue
22-
from parsl.process_loggers import wrap_with_logs
23-
from parsl.utils import RepresentationMixin, setproctitle
20+
from parsl.utils import RepresentationMixin
2421

2522
_db_manager_excepts: Optional[Exception]
2623

@@ -183,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
183180
logger.info("Started ZMQ router process %s, UDP router process %s and DBM process %s",
184181
self.zmq_router_proc.pid, self.udp_router_proc.pid, self.dbm_proc.pid)
185182

186-
self.filesystem_proc = ForkProcess(target=filesystem_receiver,
183+
self.filesystem_proc = ForkProcess(target=filesystem_router_starter,
187184
args=(self.resource_msgs, dfk_run_dir),
188185
name="Monitoring-Filesystem-Process",
189186
daemon=True
@@ -285,41 +282,3 @@ def close(self) -> None:
285282
self.resource_msgs.close()
286283
self.resource_msgs.join_thread()
287284
logger.info("Closed monitoring multiprocessing queues")
288-
289-
290-
@wrap_with_logs
291-
def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
292-
logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
293-
name="monitoring_filesystem_radio",
294-
level=logging.INFO)
295-
296-
logger.info("Starting filesystem radio receiver")
297-
setproctitle("parsl: monitoring filesystem receiver")
298-
base_path = f"{run_dir}/monitor-fs-radio/"
299-
tmp_dir = f"{base_path}/tmp/"
300-
new_dir = f"{base_path}/new/"
301-
logger.debug("Creating new and tmp paths under %s", base_path)
302-
303-
target_radio = MultiprocessingQueueRadioSender(q)
304-
305-
os.makedirs(tmp_dir, exist_ok=True)
306-
os.makedirs(new_dir, exist_ok=True)
307-
308-
while True: # this loop will end on process termination
309-
logger.debug("Start filesystem radio receiver loop")
310-
311-
# iterate over files in new_dir
312-
for filename in os.listdir(new_dir):
313-
try:
314-
logger.info("Processing filesystem radio file %s", filename)
315-
full_path_filename = f"{new_dir}/{filename}"
316-
with open(full_path_filename, "rb") as f:
317-
message = pickle.load(f)
318-
logger.debug("Message received is: %s", message)
319-
assert isinstance(message, tuple)
320-
target_radio.send(cast(TaggedMonitoringMessage, message))
321-
os.remove(full_path_filename)
322-
except Exception:
323-
logger.exception("Exception processing %s - probably will be retried next iteration", filename)
324-
325-
time.sleep(1) # whats a good time for this poll?
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import os
5+
import pickle
6+
import time
7+
from multiprocessing.queues import Queue
8+
from typing import cast
9+
10+
from parsl.log_utils import set_file_logger
11+
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
12+
from parsl.monitoring.types import TaggedMonitoringMessage
13+
from parsl.process_loggers import wrap_with_logs
14+
from parsl.utils import setproctitle
15+
16+
17+
@wrap_with_logs
18+
def filesystem_router_starter(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
19+
logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
20+
name="monitoring_filesystem_radio",
21+
level=logging.INFO)
22+
23+
logger.info("Starting filesystem radio receiver")
24+
setproctitle("parsl: monitoring filesystem receiver")
25+
base_path = f"{run_dir}/monitor-fs-radio/"
26+
tmp_dir = f"{base_path}/tmp/"
27+
new_dir = f"{base_path}/new/"
28+
logger.debug("Creating new and tmp paths under %s", base_path)
29+
30+
target_radio = MultiprocessingQueueRadioSender(q)
31+
32+
os.makedirs(tmp_dir, exist_ok=True)
33+
os.makedirs(new_dir, exist_ok=True)
34+
35+
while True: # this loop will end on process termination
36+
logger.debug("Start filesystem radio receiver loop")
37+
38+
# iterate over files in new_dir
39+
for filename in os.listdir(new_dir):
40+
try:
41+
logger.info("Processing filesystem radio file %s", filename)
42+
full_path_filename = f"{new_dir}/{filename}"
43+
with open(full_path_filename, "rb") as f:
44+
message = pickle.load(f)
45+
logger.debug("Message received is: %s", message)
46+
assert isinstance(message, tuple)
47+
target_radio.send(cast(TaggedMonitoringMessage, message))
48+
os.remove(full_path_filename)
49+
except Exception:
50+
logger.exception("Exception processing %s - probably will be retried next iteration", filename)
51+
52+
time.sleep(1) # whats a good time for this poll?

0 commit comments

Comments
 (0)