Skip to content

Commit e01816e

Browse files
committed
Make UDP radio receiver shutdown period configurable
.. and use that new parameter in stderr/out tests which don't make use of the UDP radio.
1 parent 2b7e7f9 commit e01816e

File tree

3 files changed

+18
-5
lines changed

3 files changed

+18
-5
lines changed

parsl/monitoring/monitoring.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ def __init__(self,
4747
logdir: Optional[str] = None,
4848
monitoring_debug: bool = False,
4949
resource_monitoring_enabled: bool = True,
50-
resource_monitoring_interval: float = 30): # in seconds
50+
resource_monitoring_interval: float = 30, # in seconds
51+
udp_atexit_timeout: float = 3):
5152
"""
5253
Parameters
5354
----------
@@ -86,6 +87,10 @@ def __init__(self,
8687
If set to 0, only start and end information will be logged, and no periodic monitoring will
8788
be made.
8889
Default: 30 seconds
90+
udp_atexit_timeout : float
91+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
92+
workflow message is received.
93+
8994
"""
9095

9196
if _db_manager_excepts:
@@ -105,6 +110,8 @@ def __init__(self,
105110
self.resource_monitoring_enabled = resource_monitoring_enabled
106111
self.resource_monitoring_interval = resource_monitoring_interval
107112

113+
self.udp_atexit_timeout = udp_atexit_timeout
114+
108115
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int:
109116

110117
logger.debug("Starting MonitoringHub")
@@ -160,7 +167,8 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
160167
"zmq_port_range": self.hub_port_range,
161168
"logdir": self.logdir,
162169
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
163-
"run_id": run_id
170+
"run_id": run_id,
171+
"udp_atexit_timeout": self.udp_atexit_timeout
164172
},
165173
name="Monitoring-Router-Process",
166174
daemon=True,

parsl/monitoring/router.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self,
3232
logdir: str = ".",
3333
run_id: str,
3434
logging_level: int = logging.INFO,
35-
atexit_timeout: int = 3 # in seconds
35+
atexit_timeout: float
3636
):
3737
""" Initializes a monitoring configuration class.
3838
@@ -50,7 +50,8 @@ def __init__(self,
5050
logging_level : int
5151
Logging level as defined in the logging module. Default: logging.INFO
5252
atexit_timeout : float, optional
53-
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
53+
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
54+
workflow message is received.
5455
5556
"""
5657
os.makedirs(logdir, exist_ok=True)
@@ -181,6 +182,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
181182
udp_port: Optional[int],
182183
zmq_port_range: Tuple[int, int],
183184

185+
udp_atexit_timeout: float,
186+
184187
logdir: str,
185188
logging_level: int,
186189
run_id: str) -> None:
@@ -191,7 +194,8 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
191194
zmq_port_range=zmq_port_range,
192195
logdir=logdir,
193196
logging_level=logging_level,
194-
run_id=run_id)
197+
run_id=run_id,
198+
atexit_timeout=udp_atexit_timeout)
195199
except Exception as e:
196200
logger.error("MonitoringRouter construction failed.", exc_info=True)
197201
comm_q.put(f"Monitoring router construction failed: {e}")

parsl/tests/test_monitoring/test_stdouterr.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def fresh_config(run_dir):
3838
monitoring=MonitoringHub(
3939
hub_address="localhost",
4040
hub_port=55055,
41+
udp_atexit_timeout=0
4142
)
4243
)
4344

0 commit comments

Comments
 (0)