Skip to content

Commit 15e6d19

Browse files
committed
Catch SIGCHLD to figure out when a child process completes instead of
polling at fixed intervals of 0.2s. Without it, `/bin/sleep 0` would take a minimum of 0.2s to complete, which is unnecessary.
1 parent c143685 commit 15e6d19

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

src/psij/executors/local.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""This module contains the local :class:`~psij.JobExecutor`."""
22
import logging
33
import os
4+
import signal
45
import subprocess
56
import threading
67
import time
78
from abc import ABC, abstractmethod
9+
from types import FrameType
810
from typing import Optional, Dict, List, Type, Tuple
911

1012
import psutil
@@ -16,6 +18,11 @@
1618
logger = logging.getLogger(__name__)
1719

1820

21+
def _handle_sigchld(signum: int, frame: Optional[FrameType]) -> None:
22+
_ProcessReaper.get_instance()._handle_sigchld()
23+
24+
25+
signal.signal(signal.SIGCHLD, _handle_sigchld)
1926
_REAPER_SLEEP_TIME = 0.2
2027

2128

@@ -120,6 +127,7 @@ def __init__(self) -> None:
120127
super().__init__(name='Local Executor Process Reaper', daemon=True)
121128
self._jobs: Dict[Job, _ProcessEntry] = {}
122129
self._lock = threading.RLock()
130+
self._cvar = threading.Condition()
123131

124132
def register(self, entry: _ProcessEntry) -> None:
125133
logger.debug('Registering process %s', entry)
@@ -134,7 +142,12 @@ def run(self) -> None:
134142
self._check_processes()
135143
except Exception as ex:
136144
logger.error('Error polling for process status', ex)
137-
time.sleep(_REAPER_SLEEP_TIME)
145+
with self._cvar:
146+
self._cvar.wait(_REAPER_SLEEP_TIME)
147+
148+
def _handle_sigchld(self) -> None:
149+
with self._cvar:
150+
self._cvar.notify_all()
138151

139152
def _check_processes(self) -> None:
140153
done: List[_ProcessEntry] = []

0 commit comments

Comments
 (0)