11"""This module contains the local :class:`~psij.JobExecutor`."""
22import logging
33import os
4+ import signal
45import subprocess
56import threading
67import time
78from abc import ABC , abstractmethod
9+ from types import FrameType
810from typing import Optional , Dict , List , Type , Tuple
911
1012import psutil
1618logger = logging .getLogger (__name__ )
1719
1820
21+ def _handle_sigchld (signum : int , frame : Optional [FrameType ]) -> None :
22+ _ProcessReaper .get_instance ()._handle_sigchld ()
23+
24+
25+ signal .signal (signal .SIGCHLD , _handle_sigchld )
1926_REAPER_SLEEP_TIME = 0.2
2027
2128
@@ -120,6 +127,7 @@ def __init__(self) -> None:
120127 super ().__init__ (name = 'Local Executor Process Reaper' , daemon = True )
121128 self ._jobs : Dict [Job , _ProcessEntry ] = {}
122129 self ._lock = threading .RLock ()
130+ self ._cvar = threading .Condition ()
123131
124132 def register (self , entry : _ProcessEntry ) -> None :
125133 logger .debug ('Registering process %s' , entry )
@@ -134,7 +142,25 @@ def run(self) -> None:
134142 self ._check_processes ()
135143 except Exception as ex :
136144 logger .error ('Error polling for process status' , ex )
137- time .sleep (_REAPER_SLEEP_TIME )
145+ with self ._cvar :
146+ self ._cvar .wait (_REAPER_SLEEP_TIME )
147+
148+ def _handle_sigchld (self ) -> None :
149+ with self ._cvar :
150+ try :
151+ self ._cvar .notify_all ()
152+ except RuntimeError as ex :
153+ # In what looks like rare cases, notify_all(), seemingly when combined with
154+ # signal handling, raises `RuntimeError: release unlocked lock`.
155+ # There appears to be an unresolved Python bug about this:
156+ # https://bugs.python.org/issue34486
157+ # We catch the exception here and log it. It is hard to tell if that will not lead
158+ # to further issues. It would seem like it shouldn't: after all, all we're doing is
159+ # making sure we don't sleep too much, but, even if we do, the consequence is a
160+ # small delay in processing a completed job. However, since this exception seems
161+ # to be a logical impossibility when looking at the code in threading.Condition,
162+ # there is really no telling what else could go wrong.
163+ logger .warning ('Exception in Condition.notify_all()' , ex )
138164
139165 def _check_processes (self ) -> None :
140166 done : List [_ProcessEntry ] = []
0 commit comments