Skip to content

Commit 6dfaa10

Browse files
committed
Make UDP radio receiver shutdown period configurable
.. and use that new parameter in stderr/out tests which don't make use of the UDP radio.
1 parent 925705f commit 6dfaa10

File tree

3 files changed

+18
-5
lines changed

3 files changed

+18
-5
lines changed

parsl/monitoring/monitoring.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def __init__(self,
5151
logdir: Optional[str] = None,
5252
monitoring_debug: bool = False,
5353
resource_monitoring_enabled: bool = True,
54-
resource_monitoring_interval: float = 30): # in seconds
54+
resource_monitoring_interval: float = 30, # in seconds
55+
udp_atexit_timeout: float = 3):
5556
"""
5657
Parameters
5758
----------
@@ -90,6 +91,10 @@ def __init__(self,
9091
If set to 0, only start and end information will be logged, and no periodic monitoring will
9192
be made.
9293
Default: 30 seconds
94+
udp_atexit_timeout : float
95+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
96+
workflow message is received.
97+
9398
"""
9499

95100
if _db_manager_excepts:
@@ -109,6 +114,8 @@ def __init__(self,
109114
self.resource_monitoring_enabled = resource_monitoring_enabled
110115
self.resource_monitoring_interval = resource_monitoring_interval
111116

117+
self.udp_atexit_timeout = udp_atexit_timeout
118+
112119
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int:
113120

114121
logger.debug("Starting MonitoringHub")
@@ -164,7 +171,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
164171
"zmq_port_range": self.hub_port_range,
165172
"logdir": self.logdir,
166173
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
167-
"run_id": run_id
174+
"run_id": run_id,
175+
"udp_atexit_timeout": self.udp_atexit_timeout
168176
},
169177
name="Monitoring-Router-Process",
170178
daemon=True,

parsl/monitoring/router.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __init__(self,
3535
logdir: str = ".",
3636
run_id: str,
3737
logging_level: int = logging.INFO,
38-
atexit_timeout: int = 3 # in seconds
38+
atexit_timeout: float
3939
):
4040
""" Initializes a monitoring configuration class.
4141
@@ -53,7 +53,8 @@ def __init__(self,
5353
logging_level : int
5454
Logging level as defined in the logging module. Default: logging.INFO
5555
atexit_timeout : float, optional
56-
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
56+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
57+
workflow message is received.
5758
5859
"""
5960
os.makedirs(logdir, exist_ok=True)
@@ -184,6 +185,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
184185
udp_port: Optional[int],
185186
zmq_port_range: Tuple[int, int],
186187

188+
udp_atexit_timeout: float,
189+
187190
logdir: str,
188191
logging_level: int,
189192
run_id: str) -> None:
@@ -194,7 +197,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
194197
zmq_port_range=zmq_port_range,
195198
logdir=logdir,
196199
logging_level=logging_level,
197-
run_id=run_id)
200+
run_id=run_id,
201+
atexit_timeout=udp_atexit_timeout)
198202
except Exception as e:
199203
logger.error("MonitoringRouter construction failed.", exc_info=True)
200204
comm_q.put(f"Monitoring router construction failed: {e}")

parsl/tests/test_monitoring/test_stdouterr.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def fresh_config(run_dir):
3838
monitoring=MonitoringHub(
3939
hub_address="localhost",
4040
hub_port=55055,
41+
udp_atexit_timeout=0
4142
)
4243
)
4344

0 commit comments

Comments
 (0)