-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Currently, if you cancel the deferred returned by callRemote, the child process gets re-added to the ready set.
This makes it possible for ampoule to unexpectedly execute two operations in a single child process.
There are a few possible cancellation behaviors which may be correct/desired in different situations.
-
Keep going, don't return process to the pool until the underlying task has completed.
-
Hard shutdown, if the deferred is cancelled just immediately kill and replace the worker process.
-
Soft shutdown, send the worker process the shutdown message, if it doesn't respond after some interval do 2.
-
Cancellation aware workers, send the worker process a new Cancel message, which can be used to propagate cancellation to the pending work within the process, allowing work to stop and the worker be returned to the pool
Of these 1. is the only one that can be implemented outside ampoule, we have started using the following helper for that.
def block_cancellation(blocked_d: Deferred[_T]) -> Deferred[_T]:
"""
Return a new deferred that will fire when the given deferred fires but
will not propagate cancellation to the given deferred.
"""
cancelled = False
def _cancelled(d: Deferred[_T]) -> None:
nonlocal cancelled
cancelled = True
return
# Create a new deferred that has a canceller.
# Since all our canceller does is write down that it was cancelled
# this Deferred will errback with twisted.internet.defer.CancelledError.
# That will allow upstream callers to behave exactly as they do today.
blocker_d: Deferred[_T] = Deferred(_cancelled)
def _fireIfNotCancelled(result: _T | Failure) -> _T | Failure:
# If the canceller has cancelled us then we don't fire the blocker deferred.
# This prevents us from ending up with an unhandled error in deferred for
# AlreadyCalled. Since blocker_d will fire when it is cancelled.
if cancelled:
return result
# If the blocked deferred has a Failure we want errback, so that any errors raised
# propagate as normal.
if isinstance(result, Failure):
blocker_d.errback(result)
else:
# If the blocked deferred succeeds then the blocker succeeds.
blocker_d.callback(result)
# Return the result in case someone else has a handle to this deferred and adds
# callbacks after.
return result
blocked_d.addBoth(_fireIfNotCancelled)
return blocker_d