Skip to content

Commit 83363fb

Browse files
committed
Use per-process singleton threads for the local executor process reaper.
The issue is that singletons based on flags do not work properly with fork() since memory is copied to the child process. The only reasonable way to distinguish between the parent and child is to look at the PID. That said, using `fork()` with multi-threading is a bad idea. This commit is not an endorsement but an attempt to minimize damage when possible.
1 parent 3bf1f7c commit 83363fb

File tree

2 files changed

+44
-12
lines changed

2 files changed

+44
-12
lines changed

src/psij/executors/local.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
import time
88
from abc import ABC, abstractmethod
99
from types import FrameType
10-
from typing import Optional, Dict, List, Type, Tuple
10+
from typing import Optional, Dict, List, Tuple, Type, cast
1111

1212
import psutil
1313

1414
from psij import InvalidJobException, SubmitException, Launcher
1515
from psij import Job, JobSpec, JobExecutorConfig, JobState, JobStatus
1616
from psij import JobExecutor
17+
from psij.utils import SingletonThread
1718

1819
logger = logging.getLogger(__name__)
1920

@@ -117,18 +118,11 @@ def _get_env(spec: JobSpec) -> Optional[Dict[str, str]]:
117118
return spec.environment
118119

119120

120-
class _ProcessReaper(threading.Thread):
121-
_instance: Optional['_ProcessReaper'] = None
122-
_lock = threading.RLock()
121+
class _ProcessReaper(SingletonThread):
123122

124123
@classmethod
125124
def get_instance(cls: Type['_ProcessReaper']) -> '_ProcessReaper':
126-
with cls._lock:
127-
if cls._instance is None:
128-
cls._instance = _ProcessReaper()
129-
cls._instance.start()
130-
signal.signal(signal.SIGCHLD, _handle_sigchld)
131-
return cls._instance
125+
return cast('_ProcessReaper', super().get_instance())
132126

133127
def __init__(self) -> None:
134128
super().__init__(name='Local Executor Process Reaper', daemon=True)
@@ -205,8 +199,15 @@ class LocalJobExecutor(JobExecutor):
205199
This job executor is intended to be used when there is no resource manager, only
206200
the operating system. Or when there is a resource manager, but it should be ignored.
207201
208-
Limitations: in Linux, attached jobs always appear to complete with a zero exit code regardless
202+
Limitations:
203+
- In Linux, attached jobs always appear to complete with a zero exit code regardless
209204
of the actual exit code.
205+
- Instantiation of a local executor from both parent process and a `fork()`-ed process
206+
is not guaranteed to work. In general, using `fork()` and multi-threading in Linux is unsafe,
207+
as suggested by the `fork()` man page. While PSI/J attempts to minimize problems that can
208+
arise when `fork()` is combined with threads (which are used by PSI/J), no guarantees can be
209+
made and the chances of unexpected behavior are high. Please do not use PSI/J with `fork()`.
210+
If you do, please be mindful that support for using PSI/J with `fork()` will be limited.
210211
"""
211212

212213
def __init__(self, url: Optional[str] = None,

src/psij/utils.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import os
2+
import threading
13
from pathlib import Path
2-
from typing import Optional
4+
from typing import Optional, Type, Dict
35
import sys
46

57

@@ -19,3 +21,32 @@ def path_object_to_full_path(obj: Optional[object]) -> Optional[str]:
1921
sys.exit("This type " + type(obj).__name__
2022
+ " for a path is not supported, use pathlib instead")
2123
return p
24+
25+
26+
class SingletonThread(threading.Thread):
27+
"""
28+
A convenience class to return a thread that is guaranteed to be unique to this process.
29+
30+
This is intended to work with fork() to ensure that each os.getpid() value is associated with
31+
at most one thread. This is not safe. The safe thing, as pointed out by the fork() man page,
32+
is to not use fork() with threads. However, this is here in an attempt to make it slightly
33+
safer for when users really really want to take the risk against all advice.
34+
"""
35+
36+
_instances: Dict[int, 'SingletonThread'] = {}
37+
_lock = threading.RLock()
38+
39+
@classmethod
40+
def get_instance(cls: Type['SingletonThread']) -> 'SingletonThread':
41+
"""Returns a started instance of this thread.
42+
43+
The instance is guaranteed to be unique for this process. This method also guarantees
44+
that a forked process will get a separate instance of this thread from the parent.
45+
"""
46+
with cls._lock:
47+
my_pid = os.getpid()
48+
if my_pid not in cls._instances:
49+
instance = cls()
50+
cls._instances[my_pid] = instance
51+
instance.start()
52+
return cls._instances[my_pid]

0 commit comments

Comments
 (0)