Skip to content

Commit 10a6a00

Browse files
authored
Remove monitoring router modification of node message (#3567)
Prior to this PR, the monitoring router would add a run_id field to every NODE_INFO message that it received. These are messages from the interchange describing worker pools. The monitoring router does not modify any other messages. This PR sets the run_id at the point of message origination inside the interchange (in _send_monitoring_info), and makes the router leave NODE_INFO messages unmodified (like the other message types). This is part of work to make the router less aware of message types by removing a bunch of message-type specific handling. This PR brings in a bunch of rewiring to get the run id into the interchange rather than into the monitoring router. * Changed Behaviour This should not change any workflow-user-facing behaviour. Globus Compute (or anyone else building a fake Parsl environment) will maybe have to change how they fake their Parsl implementation to pass in a run id (the executor.run_id part of dfk.add_executors).
1 parent d8e8d4b commit 10a6a00

File tree

6 files changed

+10
-10
lines changed

6 files changed

+10
-10
lines changed

parsl/dataflow/dflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None:
116116
if self.monitoring:
117117
if self.monitoring.logdir is None:
118118
self.monitoring.logdir = self.run_dir
119-
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
119+
self.monitoring.start(self.run_dir, self.config.run_dir)
120120

121121
self.time_began = datetime.datetime.now()
122122
self.time_completed: Optional[datetime.datetime] = None

parsl/executors/high_throughput/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None:
551551
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
552552
"cert_dir": self.cert_dir,
553553
"manager_selector": self.manager_selector,
554+
"run_id": self.run_id,
554555
}
555556

556557
config_pickle = pickle.dumps(interchange_config)

parsl/executors/high_throughput/interchange.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
poll_period: int,
5656
cert_dir: Optional[str],
5757
manager_selector: ManagerSelector,
58+
run_id: str,
5859
) -> None:
5960
"""
6061
Parameters
@@ -125,6 +126,8 @@ def __init__(self,
125126
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
126127
logger.info("Connected to client")
127128

129+
self.run_id = run_id
130+
128131
self.hub_address = hub_address
129132
self.hub_zmq_port = hub_zmq_port
130133

@@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender
227230
d: Dict = cast(Dict, manager.copy())
228231
d['timestamp'] = datetime.datetime.now()
229232
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])
233+
d['run_id'] = self.run_id
230234

231235
monitoring_radio.send((MessageType.NODE_INFO, d))
232236

parsl/monitoring/monitoring.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def __init__(self,
106106
self.resource_monitoring_enabled = resource_monitoring_enabled
107107
self.resource_monitoring_interval = resource_monitoring_interval
108108

109-
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
109+
def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
110110

111111
logger.debug("Starting MonitoringHub")
112112

@@ -161,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
161161
"zmq_port_range": self.hub_port_range,
162162
"logdir": self.logdir,
163163
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
164-
"run_id": run_id
165164
},
166165
name="Monitoring-Router-Process",
167166
daemon=True,

parsl/monitoring/router.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ def __init__(self,
3131

3232
monitoring_hub_address: str = "127.0.0.1",
3333
logdir: str = ".",
34-
run_id: str,
3534
logging_level: int = logging.INFO,
3635
atexit_timeout: int = 3, # in seconds
3736
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
@@ -71,7 +70,6 @@ def __init__(self,
7170

7271
self.hub_address = hub_address
7372
self.atexit_timeout = atexit_timeout
74-
self.run_id = run_id
7573

7674
self.loop_freq = 10.0 # milliseconds
7775

@@ -172,7 +170,6 @@ def start_zmq_listener(self) -> None:
172170
msg_0 = (msg, 0)
173171

174172
if msg[0] == MessageType.NODE_INFO:
175-
msg[1]['run_id'] = self.run_id
176173
self.node_msgs.put(msg_0)
177174
elif msg[0] == MessageType.RESOURCE_INFO:
178175
self.resource_msgs.put(msg_0)
@@ -218,16 +215,14 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
218215
zmq_port_range: Tuple[int, int],
219216

220217
logdir: str,
221-
logging_level: int,
222-
run_id: str) -> None:
218+
logging_level: int) -> None:
223219
setproctitle("parsl: monitoring router")
224220
try:
225221
router = MonitoringRouter(hub_address=hub_address,
226222
udp_port=udp_port,
227223
zmq_port_range=zmq_port_range,
228224
logdir=logdir,
229225
logging_level=logging_level,
230-
run_id=run_id,
231226
priority_msgs=priority_msgs,
232227
node_msgs=node_msgs,
233228
block_msgs=block_msgs,

parsl/tests/test_htex/test_zmq_binding.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s
2525
logdir=".",
2626
logging_level=logging.INFO,
2727
manager_selector=RandomManagerSelector(),
28-
poll_period=10)
28+
poll_period=10,
29+
run_id="test_run_id")
2930

3031

3132
@pytest.fixture

0 commit comments

Comments
 (0)