Skip to content

Commit a95ed58

Browse files
authored
Add monitoring info for tasks given to a manager (#3833)
# Description Update any interested listeners to the state of the manager when new tasks are assigned. Based on `.start()`, this now covers most (all?) relevant changes to a manager's info block that will be shared. # Changed Behaviour Adds a monitoring message per manager in the interchange loop, for any outstanding tasks that move from the interchange to a worker manager. I don't think any existing workflows will be impacted, but there will now be an extra NODEINFO record in the sqlite database per task. ## Type of change - New feature
1 parent 7558fd0 commit a95ed58

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

parsl/executors/high_throughput/interchange.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ def start(self) -> None:
328328
self.process_results_incoming(interesting_managers, monitoring_radio)
329329
self.expire_bad_managers(interesting_managers, monitoring_radio)
330330
self.expire_drained_managers(interesting_managers, monitoring_radio)
331-
self.process_tasks_to_send(interesting_managers)
331+
self.process_tasks_to_send(interesting_managers, monitoring_radio)
332332

333333
self.zmq_context.destroy()
334334
delta = time.time() - start
@@ -452,7 +452,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
452452
m['active'] = False
453453
self._send_monitoring_info(monitoring_radio, m)
454454

455-
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
455+
def process_tasks_to_send(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
456456
# Check if there are tasks that could be sent to managers
457457

458458
logger.debug(
@@ -488,6 +488,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
488488
else:
489489
logger.debug("Manager %r is now saturated", manager_id)
490490
interesting_managers.remove(manager_id)
491+
self._send_monitoring_info(monitoring_radio, m)
491492
else:
492493
interesting_managers.remove(manager_id)
493494
# logger.debug("Nothing to send to manager {}".format(manager_id))

parsl/tests/test_monitoring/test_basic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def test_row_counts(tmpd_cwd, fresh_config):
120120
# Two entries: one showing manager active, one inactive
121121
result = connection.execute(text("SELECT COUNT(*) FROM node"))
122122
(c, ) = result.first()
123-
assert c == 2
123+
assert c == 3
124124

125125
# There should be one block polling status
126126
# local provider has a status_polling_interval of 5s

0 commit comments

Comments
 (0)