Skip to content

Commit ffb3644

Browse files
authored
Correct and check types on monitoring router and database processes (#3572)
Prior to this PR, the startup code for the monitoring router and database processes had type annotations on queues; but these types were not checked, and were incorrect - they were labelled process-local Queue instead of multiprocessing queues. This did not cause much trouble execution- and mypy-wise, as the interfaces of those two classes are similar enough, but it is confusing to read in a part of the codebase that is already confusing (that confusion is probably what lead to the incorrect annotations in the first place...) They were not checked because the informal policy of "internal stuff is checked with mypy, external interfaces are checked with typeguard" works badly here: The startup methods are launched using multiprocessing.Process, and function invocations are not type-checked by mypy across a multiprocessing Process constructor. Changed Behaviour This PR introduces typeguard decorators onto the router and database start methods so that this internal checking happens at runtime. This consequently reveals that the type annotations of these methods are incorrect, and so this PR makes those consequential changes. Further, generic types (Queue[MessageType]) are not supported on multiprocessing.Queues before Python 3.12 - so those generic indices are removed from the type annotations. That is unfortunate and weakens in-process static verification - but they could be re-introduced after Parsl drops Python 3.11 support (around 2027 in the present informal support policy)
1 parent 2067b40 commit ffb3644

File tree

2 files changed

+27
-21
lines changed

2 files changed

+27
-21
lines changed

parsl/monitoring/db_manager.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import datetime
22
import logging
3+
import multiprocessing.queues as mpq
34
import os
45
import queue
56
import threading
67
import time
78
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast
89

10+
import typeguard
11+
912
from parsl.dataflow.states import States
1013
from parsl.errors import OptionalModuleMissing
1114
from parsl.log_utils import set_file_logger
@@ -305,10 +308,10 @@ def __init__(self,
305308
self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue()
306309

307310
def start(self,
308-
priority_queue: "queue.Queue[TaggedMonitoringMessage]",
309-
node_queue: "queue.Queue[MonitoringMessage]",
310-
block_queue: "queue.Queue[MonitoringMessage]",
311-
resource_queue: "queue.Queue[MonitoringMessage]") -> None:
311+
priority_queue: mpq.Queue,
312+
node_queue: mpq.Queue,
313+
block_queue: mpq.Queue,
314+
resource_queue: mpq.Queue) -> None:
312315

313316
self._kill_event = threading.Event()
314317
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
@@ -719,11 +722,12 @@ def close(self) -> None:
719722

720723

721724
@wrap_with_logs(target="database_manager")
722-
def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]",
723-
priority_msgs: "queue.Queue[TaggedMonitoringMessage]",
724-
node_msgs: "queue.Queue[MonitoringMessage]",
725-
block_msgs: "queue.Queue[MonitoringMessage]",
726-
resource_msgs: "queue.Queue[MonitoringMessage]",
725+
@typeguard.typechecked
726+
def dbm_starter(exception_q: mpq.Queue,
727+
priority_msgs: mpq.Queue,
728+
node_msgs: mpq.Queue,
729+
block_msgs: mpq.Queue,
730+
resource_msgs: mpq.Queue,
727731
db_url: str,
728732
logdir: str,
729733
logging_level: int) -> None:

parsl/monitoring/router.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
from __future__ import annotations
22

33
import logging
4+
import multiprocessing.queues as mpq
45
import os
56
import pickle
6-
import queue
77
import socket
88
import threading
99
import time
1010
from multiprocessing.synchronize import Event
11-
from typing import Optional, Tuple, Union
11+
from typing import Optional, Tuple
1212

13+
import typeguard
1314
import zmq
1415

1516
from parsl.log_utils import set_file_logger
@@ -33,10 +34,10 @@ def __init__(self,
3334
logdir: str = ".",
3435
logging_level: int = logging.INFO,
3536
atexit_timeout: int = 3, # in seconds
36-
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
37-
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
38-
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
39-
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
37+
priority_msgs: mpq.Queue,
38+
node_msgs: mpq.Queue,
39+
block_msgs: mpq.Queue,
40+
resource_msgs: mpq.Queue,
4041
exit_event: Event,
4142
):
4243
""" Initializes a monitoring configuration class.
@@ -202,12 +203,13 @@ def start_zmq_listener(self) -> None:
202203

203204

204205
@wrap_with_logs
205-
def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
206-
exception_q: "queue.Queue[Tuple[str, str]]",
207-
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
208-
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
209-
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
210-
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
206+
@typeguard.typechecked
207+
def router_starter(comm_q: mpq.Queue,
208+
exception_q: mpq.Queue,
209+
priority_msgs: mpq.Queue,
210+
node_msgs: mpq.Queue,
211+
block_msgs: mpq.Queue,
212+
resource_msgs: mpq.Queue,
211213
exit_event: Event,
212214

213215
hub_address: str,

0 commit comments

Comments
 (0)