Skip to content

Commit 2e4a699

Browse files
authored
Merge pull request #2064 from apache/jbilleter/child-watcher
scheduler.py: Replace asyncio child watcher with our own watcher thread
2 parents 65995af + 4c96557 commit 2e4a699

File tree

1 file changed

+17
-21
lines changed

1 file changed

+17
-21
lines changed

src/buildstream/_scheduler/scheduler.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import datetime
2626
import multiprocessing.forkserver
2727
import sys
28+
import threading
2829
from concurrent.futures import ThreadPoolExecutor
2930

3031
# Local imports
@@ -138,12 +139,6 @@ def run(self, queues, casd_process_manager):
138139
# Hold on to the queues to process
139140
self.queues = queues
140141

141-
# NOTE: Enforce use of `SafeChildWatcher` as we generally don't want
142-
# background threads.
143-
# In Python 3.8+, `ThreadedChildWatcher` is the default watcher, and
144-
# not `SafeChildWatcher`.
145-
asyncio.set_child_watcher(asyncio.SafeChildWatcher()) # pylint: disable=deprecated-class
146-
147142
# Ensure that we have a fresh new event loop, in case we want
148143
# to run another test in this thread.
149144
self.loop = asyncio.new_event_loop()
@@ -160,12 +155,7 @@ def run(self, queues, casd_process_manager):
160155

161156
# Watch casd while running to ensure it doesn't die
162157
self._casd_process = casd_process_manager.process
163-
_watcher = asyncio.get_child_watcher()
164-
165-
def abort_casd(pid, returncode):
166-
asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode)
167-
168-
_watcher.add_child_handler(self._casd_process.pid, abort_casd)
158+
threading.Thread(target=self._watch_casd, name="watch-casd", daemon=True).start()
169159

170160
# Start the profiler
171161
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -185,8 +175,6 @@ def abort_casd(pid, returncode):
185175
# Invoke the ticker callback a final time to render pending messages
186176
self._ticker_callback()
187177

188-
# Stop watching casd
189-
_watcher.remove_child_handler(self._casd_process.pid)
190178
self._casd_process = None
191179

192180
# Stop handling unix signals
@@ -309,16 +297,24 @@ def job_completed(self, job, status):
309297
# This will terminate immediately all jobs, since buildbox-casd is dead,
310298
# we can't do anything with them anymore.
311299
#
312-
# Args:
313-
# pid (int): the process id under which buildbox-casd was running
314-
# returncode (int): the return code with which buildbox-casd exited
315-
#
316-
def _abort_on_casd_failure(self, pid, returncode):
300+
def _abort_on_casd_failure(self):
317301
self.context.messenger.bug("buildbox-casd died while the pipeline was active.")
318-
319-
self._casd_process.returncode = returncode
320302
self.terminate()
321303

304+
# _watch_casd()
305+
#
306+
# This runs in a separate thread to detect casd exiting while the loop is
307+
# still running.
308+
#
309+
def _watch_casd(self):
310+
loop = self.loop
311+
proc = self._casd_process
312+
if loop and proc:
313+
# This sets the `returncode` attribute
314+
proc.wait()
315+
if not loop.is_closed():
316+
loop.call_soon_threadsafe(self._abort_on_casd_failure)
317+
322318
# _start_job()
323319
#
324320
# Spanws a job

0 commit comments

Comments
 (0)