Skip to content

Commit ef0c845

Browse files
authored
Add command to get installed packages on managers (#3773)
# Description Added a `MANAGERS_PACKAGES` HTEX interchange command and associated HTEX method to retrieve a dict mapping each manager ID to a dict of installed packages and their versions. This is useful for debugging and monitoring worker environments. We gather the package information from the manager upon registration, but do not include it in the standard `MANAGERS` command, which runs every 5 seconds. This avoids unnecessary overhead. ## Type of change - New feature
1 parent 91c9b48 commit ef0c845

File tree

5 files changed

+35
-2
lines changed

5 files changed

+35
-2
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,12 @@ def connected_managers(self) -> List[Dict[str, typing.Any]]:
617617
"""
618618
return self.command_client.run("MANAGERS")
619619

620+
def connected_managers_packages(self) -> Dict[str, Dict[str, str]]:
621+
"""Returns a dict mapping each manager ID to a dict of installed
622+
packages and their versions
623+
"""
624+
return self.command_client.run("MANAGERS_PACKAGES")
625+
620626
def connected_blocks(self) -> List[str]:
621627
"""List of connected block ids"""
622628
return self.command_client.run("CONNECTED_BLOCKS")

parsl/executors/high_throughput/interchange.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,13 @@ def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) ->
257257
'draining': m['draining']}
258258
reply.append(resp)
259259

260+
elif command_req == "MANAGERS_PACKAGES":
261+
reply = {}
262+
for manager_id in self._ready_managers:
263+
m = self._ready_managers[manager_id]
264+
manager_id_str = manager_id.decode('utf-8')
265+
reply[manager_id_str] = m["packages"]
266+
260267
elif command_req.startswith("HOLD_WORKER"):
261268
cmd, s_manager = command_req.split(';')
262269
manager_id = s_manager.encode('utf-8')

parsl/executors/high_throughput/manager_record.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Any, List, Optional
2+
from typing import Any, Dict, List, Optional
33

44
from typing_extensions import TypedDict
55

@@ -18,3 +18,4 @@ class ManagerRecord(TypedDict, total=False):
1818
timestamp: datetime
1919
parsl_version: str
2020
python_version: str
21+
packages: Dict[str, str]

parsl/executors/high_throughput/process_worker_pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import threading
1515
import time
1616
import uuid
17+
from importlib.metadata import distributions
1718
from multiprocessing.managers import DictProxy
1819
from multiprocessing.sharedctypes import Synchronized
1920
from typing import Dict, List, Optional, Sequence
@@ -265,6 +266,7 @@ def create_reg_message(self):
265266
'python_v': "{}.{}.{}".format(sys.version_info.major,
266267
sys.version_info.minor,
267268
sys.version_info.micro),
269+
'packages': {dist.metadata['Name']: dist.version for dist in distributions()},
268270
'worker_count': self.worker_count,
269271
'uid': self.uid,
270272
'block_id': self.block_id,

parsl/tests/test_htex/test_managers_command.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import logging
21
import sys
32

3+
import packaging.version
44
import pytest
55

66
import parsl
@@ -34,3 +34,20 @@ def test_connected_managers():
3434
assert 'parsl_version' in manager_info
3535
assert manager_info['parsl_version'] == parsl.__version__
3636
assert manager_info['python_version'] == f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
37+
38+
39+
@pytest.mark.local
40+
def test_connected_managers_packages():
41+
# Run dummy function to ensure a manager is online
42+
f = dummy()
43+
assert f.result() is None
44+
45+
htex: parsl.HighThroughputExecutor = parsl.dfk().executors['htex_local']
46+
managers_info_list = htex.connected_managers()
47+
managers_packages = htex.connected_managers_packages()
48+
49+
assert len(managers_packages) == len(managers_info_list) == 1
50+
manager_id, packages = list(managers_packages.items())[0]
51+
assert manager_id == managers_info_list[0]['manager']
52+
normalized_version = str(packaging.version.parse(parsl.__version__))
53+
assert packages['parsl'] == normalized_version

0 commit comments

Comments
 (0)