Skip to content

Commit 1ea41f8

Browse files
authored
Move ManagerLost and VersionMismatch to errors.py (#3496)
Per the analysis in #3495, defining the `ManagerLost` and `VersionMismatch` errors in the `interchange.py` became a problem in #3463, where the interchange now runs as `__main__`. This makes it difficult for Dill to get the serde correct. The organizational fix is simply to move these classes to an importable location, which follows the expectation that classes are available in both local and remote locations, which defining in `__main__` can't easily guarantee. Fixes: #3495
1 parent a128fdc commit 1ea41f8

File tree

3 files changed

+81
-26
lines changed

3 files changed

+81
-26
lines changed

parsl/executors/high_throughput/errors.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,36 @@
1+
import time
2+
3+
4+
class ManagerLost(Exception):
5+
"""
6+
Task lost due to manager loss. Manager is considered lost when multiple heartbeats
7+
have been missed.
8+
"""
9+
def __init__(self, manager_id: bytes, hostname: str) -> None:
10+
self.manager_id = manager_id
11+
self.tstamp = time.time()
12+
self.hostname = hostname
13+
14+
def __str__(self) -> str:
15+
return (
16+
f"Task failure due to loss of manager {self.manager_id.decode()} on"
17+
f" host {self.hostname}"
18+
)
19+
20+
21+
class VersionMismatch(Exception):
22+
"""Manager and Interchange versions do not match"""
23+
def __init__(self, interchange_version: str, manager_version: str):
24+
self.interchange_version = interchange_version
25+
self.manager_version = manager_version
26+
27+
def __str__(self) -> str:
28+
return (
29+
f"Manager version info {self.manager_version} does not match interchange"
30+
f" version info {self.interchange_version}, causing a critical failure"
31+
)
32+
33+
134
class WorkerLost(Exception):
235
"""Exception raised when a worker is lost
336
"""

parsl/executors/high_throughput/interchange.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from parsl import curvezmq
1919
from parsl.app.errors import RemoteExceptionWrapper
20+
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
2021
from parsl.executors.high_throughput.manager_record import ManagerRecord
2122
from parsl.monitoring.message_type import MessageType
2223
from parsl.process_loggers import wrap_with_logs
@@ -31,32 +32,6 @@
3132
logger = logging.getLogger(LOGGER_NAME)
3233

3334

34-
class ManagerLost(Exception):
35-
''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats
36-
have been missed.
37-
'''
38-
def __init__(self, manager_id: bytes, hostname: str) -> None:
39-
self.manager_id = manager_id
40-
self.tstamp = time.time()
41-
self.hostname = hostname
42-
43-
def __str__(self) -> str:
44-
return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname)
45-
46-
47-
class VersionMismatch(Exception):
48-
''' Manager and Interchange versions do not match
49-
'''
50-
def __init__(self, interchange_version: str, manager_version: str):
51-
self.interchange_version = interchange_version
52-
self.manager_version = manager_version
53-
54-
def __str__(self) -> str:
55-
return "Manager version info {} does not match interchange version info {}, causing a critical failure".format(
56-
self.manager_version,
57-
self.interchange_version)
58-
59-
6035
class Interchange:
6136
""" Interchange is a task orchestrator for distributed systems.
6237
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import os
2+
import signal
3+
4+
import pytest
5+
6+
import parsl
7+
from parsl import Config, HighThroughputExecutor
8+
9+
10+
@parsl.python_app
11+
def get_manager_pgid():
12+
import os
13+
return os.getpgid(os.getpid())
14+
15+
16+
@parsl.python_app
17+
def lose_manager():
18+
import os
19+
import signal
20+
21+
manager_pid = os.getppid()
22+
os.kill(manager_pid, signal.SIGSTOP)
23+
24+
25+
@pytest.mark.local
26+
def test_manager_lost_system_failure(tmpd_cwd):
27+
hte = HighThroughputExecutor(
28+
label="htex_local",
29+
address="127.0.0.1",
30+
max_workers_per_node=2,
31+
cores_per_worker=1,
32+
worker_logdir_root=str(tmpd_cwd),
33+
heartbeat_period=1,
34+
heartbeat_threshold=1,
35+
)
36+
c = Config(executors=[hte], strategy='simple', strategy_period=0.1)
37+
38+
with parsl.load(c):
39+
manager_pgid = get_manager_pgid().result()
40+
try:
41+
lose_manager().result()
42+
except Exception as e:
43+
assert "ManagerLost" not in str(e), f"Issue 3495: {e}"
44+
finally:
45+
# Allow process to clean itself up
46+
os.killpg(manager_pgid, signal.SIGCONT)
47+
os.killpg(manager_pgid, signal.SIGTERM)

0 commit comments

Comments
 (0)