Skip to content

Commit dcd3c77

Browse files
committed
Solve another OoB cancellation case, the bg task one
Such that we are able to (finally) detect when we should `Context._scope.cancel()` specifically when the `.parent_task` is **not** blocking on receiving from the underlying `._rx_chan`, since if the task is blocking on `.receive()` it will call `.cancel()` implicitly. This is a lot to explain with very little code actually needed for the implementation (are we like `trio` yet anyone?? XD) but the main jist is that `Context._maybe_cancel_and_set_remote_error()` needed the additional case of calling `._scope.cancel()` whenever we know that a remote-error/ctxc won't be immediately handled, bc user code is doing non `Context`-API things, and result in a similar outcome as if that task was waiting on `Context.wait_for_result()` or `.__aexite__()`. Impl details, - add a new `._is_blocked_on_rx_chan()` method which predicates whether the (new) `.parent_task` is blocking on `._rx_chan.receive()`. * see various stipulations about the current impl and how we might need to adjust for the future given `trio`'s commitment to the `Task.custom_sleep_data` attr.. - add `.parent_task`, a pub wrapper for `._task`. - check for `not ._is_blocked_on_rx_chan()` before manually cancelling the local `.parent_task` - minimize the surrounding branch case expressions. Other, - tweak a couple logs. - add a new `.cancel()` pre-started msg. - mask the `.cancel_called` setter, it's only (been) used for tracing. - todos around maybe moving the `._nursery` allocation "around" the `.start_remote_task()` call and various subsequent tweaks therein.
1 parent 36c54d1 commit dcd3c77

File tree

1 file changed

+158
-34
lines changed

1 file changed

+158
-34
lines changed

tractor/_context.py

Lines changed: 158 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -442,25 +442,25 @@ def cancel_called(self) -> bool:
442442
'''
443443
Records whether cancellation has been requested for this context
444444
by a call to `.cancel()` either due to,
445-
- either an explicit call by some local task,
445+
- an explicit call by some local task,
446446
- or an implicit call due to an error caught inside
447-
the ``Portal.open_context()`` block.
447+
the `Portal.open_context()` block.
448448
449449
'''
450450
return self._cancel_called
451451

452-
@cancel_called.setter
453-
def cancel_called(self, val: bool) -> None:
454-
'''
455-
Set the self-cancelled request `bool` value.
452+
# XXX, to debug who frickin sets it..
453+
# @cancel_called.setter
454+
# def cancel_called(self, val: bool) -> None:
455+
# '''
456+
# Set the self-cancelled request `bool` value.
456457

457-
'''
458-
# to debug who frickin sets it..
459-
# if val:
460-
# from .devx import pause_from_sync
461-
# pause_from_sync()
458+
# '''
459+
# if val:
460+
# from .devx import pause_from_sync
461+
# pause_from_sync()
462462

463-
self._cancel_called = val
463+
# self._cancel_called = val
464464

465465
@property
466466
def canceller(self) -> tuple[str, str]|None:
@@ -635,6 +635,71 @@ async def send_stop(self) -> None:
635635
'''
636636
await self.chan.send(Stop(cid=self.cid))
637637

638+
@property
639+
def parent_task(self) -> trio.Task:
640+
'''
641+
This IPC context's "owning task" which is a `trio.Task`
642+
on one of the "sides" of the IPC.
643+
644+
Note that the "parent_" prefix here refers to the local
645+
`trio` task tree using the same interface as
646+
`trio.Nursery.parent_task` whereas for IPC contexts,
647+
a different cross-actor task hierarchy exists:
648+
649+
- a "parent"-side which originally entered
650+
`Portal.open_context()`,
651+
652+
- the "child"-side which was spawned and scheduled to invoke
653+
a function decorated with `@tractor.context`.
654+
655+
This task is thus a handle to mem-domain-distinct/per-process
656+
`Nursery.parent_task` depending on in which of the above
657+
"sides" this context exists.
658+
659+
'''
660+
return self._task
661+
662+
def _is_blocked_on_rx_chan(self) -> bool:
663+
'''
664+
Predicate to indcate whether the owner `._task: trio.Task` is
665+
currently blocked (by `.receive()`-ing) on its underlying RPC
666+
feeder `._rx_chan`.
667+
668+
This knowledge is highly useful when handling so called
669+
"out-of-band" (OoB) cancellation conditions where a peer
670+
actor's task transmitted some remote error/cancel-msg and we
671+
must know whether to signal-via-cancel currently executing
672+
"user-code" (user defined code embedded in `ctx._scope`) or
673+
simply to forward the IPC-msg-as-error **without calling**
674+
`._scope.cancel()`.
675+
676+
In the latter case it is presumed that if the owner task is
677+
blocking for the next IPC msg, it will eventually receive,
678+
process and raise the equivalent local error **without**
679+
requiring `._scope.cancel()` to be explicitly called by the
680+
*delivering OoB RPC-task* (via `_deliver_msg()`).
681+
682+
'''
683+
# NOTE, see the mem-chan meth-impls for *why* this
684+
# logic works,
685+
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
686+
#
687+
# XXX realize that this is NOT an
688+
# official/will-be-loudly-deprecated API:
689+
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
690+
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
691+
#
692+
# orig repo intro in the mem-chan change over patch:
693+
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
694+
# |_https://github.com/python-trio/trio/pull/616
695+
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
696+
#
697+
return (
698+
self._task.custom_sleep_data
699+
is
700+
self._rx_chan
701+
)
702+
638703
def _maybe_cancel_and_set_remote_error(
639704
self,
640705
error: BaseException,
@@ -787,13 +852,27 @@ def _maybe_cancel_and_set_remote_error(
787852
if self._canceller is None:
788853
log.error('Ctx has no canceller set!?')
789854

855+
cs: trio.CancelScope = self._scope
856+
857+
# ?TODO? see comment @ .start_remote_task()`
858+
#
859+
# if not cs:
860+
# from .devx import mk_pdb
861+
# mk_pdb().set_trace()
862+
# raise RuntimeError(
863+
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
864+
# f'{self}\n'
865+
# f'\n'
866+
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
867+
# )
868+
790869
# Cancel the local `._scope`, catch that
791870
# `._scope.cancelled_caught` and re-raise any remote error
792871
# once exiting (or manually calling `.wait_for_result()`) the
793872
# `.open_context()` block.
794-
cs: trio.CancelScope = self._scope
795873
if (
796874
cs
875+
and not cs.cancel_called
797876

798877
# XXX this is an expected cancel request response
799878
# message and we **don't need to raise it** in the
@@ -802,8 +881,7 @@ def _maybe_cancel_and_set_remote_error(
802881
# if `._cancel_called` then `.cancel_acked and .cancel_called`
803882
# always should be set.
804883
and not self._is_self_cancelled()
805-
and not cs.cancel_called
806-
and not cs.cancelled_caught
884+
# and not cs.cancelled_caught
807885
):
808886
if (
809887
msgerr
@@ -814,7 +892,7 @@ def _maybe_cancel_and_set_remote_error(
814892
not self._cancel_on_msgerr
815893
):
816894
message: str = (
817-
'NOT Cancelling `Context._scope` since,\n'
895+
f'NOT Cancelling `Context._scope` since,\n'
818896
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
819897
f'AND we got a msg-type-error!\n'
820898
f'{error}\n'
@@ -824,13 +902,43 @@ def _maybe_cancel_and_set_remote_error(
824902
# `trio.Cancelled` subtype here ;)
825903
# https://github.com/goodboy/tractor/issues/368
826904
message: str = 'Cancelling `Context._scope` !\n\n'
827-
# from .devx import pause_from_sync
828-
# pause_from_sync()
829-
self._scope.cancel()
830-
else:
831-
message: str = 'NOT cancelling `Context._scope` !\n\n'
905+
cs.cancel()
906+
907+
# TODO, explicit condition for OoB (self-)cancellation?
908+
# - we called `Portal.cancel_actor()` from this actor
909+
# and the peer ctx task delivered ctxc due to it.
910+
# - currently `self._is_self_cancelled()` will be true
911+
# since the ctxc.canceller check will match us even though it
912+
# wasn't from this ctx specifically!
913+
elif (
914+
cs
915+
and self._is_self_cancelled()
916+
and not cs.cancel_called
917+
):
918+
message: str = (
919+
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
920+
'\n'
921+
)
832922
# from .devx import mk_pdb
833923
# mk_pdb().set_trace()
924+
# TODO XXX, required to fix timeout failure in
925+
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
926+
#
927+
928+
# XXX NOTE XXX, this is SUPER SUBTLE!
929+
# we only want to cancel our embedded `._scope`
930+
# if the ctx's current/using task is NOT blocked
931+
# on `._rx_chan.receive()` and on some other
932+
# `trio`-checkpoint since in the former case
933+
# any `._remote_error` will be relayed through
934+
# the rx-chan and appropriately raised by the owning
935+
# `._task` directly. IF the owner task is however
936+
# blocking elsewhere we need to interrupt it **now**.
937+
if not self._is_blocked_on_rx_chan():
938+
cs.cancel()
939+
else:
940+
# rx_stats = self._rx_chan.statistics()
941+
message: str = 'NOT cancelling `Context._scope` !\n\n'
834942

835943
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
836944
if (
@@ -854,6 +962,7 @@ def _maybe_cancel_and_set_remote_error(
854962
+
855963
cs_fmt
856964
)
965+
857966
log.cancel(
858967
message
859968
+
@@ -946,8 +1055,9 @@ async def cancel(
9461055
9471056
'''
9481057
side: str = self.side
949-
# XXX for debug via the `@.setter`
950-
self.cancel_called = True
1058+
self._cancel_called = True
1059+
# ^ XXX for debug via the `@.setter`
1060+
# self.cancel_called = True
9511061

9521062
header: str = (
9531063
f'Cancelling ctx from {side!r}-side\n'
@@ -2011,6 +2121,9 @@ async def open_context_from_portal(
20112121
f'|_{portal.actor}\n'
20122122
)
20132123

2124+
# ?TODO? could we move this to inside the `tn` block?
2125+
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
2126+
# -> would allow a `if not ._scope: => raise RTE` ?
20142127
ctx: Context = await portal.actor.start_remote_task(
20152128
portal.channel,
20162129
nsf=nsf,
@@ -2037,6 +2150,7 @@ async def open_context_from_portal(
20372150
scope_err: BaseException|None = None
20382151
ctxc_from_child: ContextCancelled|None = None
20392152
try:
2153+
# from .devx import pause
20402154
async with (
20412155
collapse_eg(),
20422156
trio.open_nursery() as tn,
@@ -2059,6 +2173,10 @@ async def open_context_from_portal(
20592173
# the dialog, the `Error` msg should be raised from the `msg`
20602174
# handling block below.
20612175
try:
2176+
log.runtime(
2177+
f'IPC ctx parent waiting on Started msg..\n'
2178+
f'ctx.cid: {ctx.cid!r}\n'
2179+
)
20622180
started_msg, first = await ctx._pld_rx.recv_msg(
20632181
ipc=ctx,
20642182
expect_msg=Started,
@@ -2067,16 +2185,16 @@ async def open_context_from_portal(
20672185
)
20682186
except trio.Cancelled as taskc:
20692187
ctx_cs: trio.CancelScope = ctx._scope
2188+
log.cancel(
2189+
f'IPC ctx was cancelled during "child" task sync due to\n\n'
2190+
f'.cid: {ctx.cid!r}\n'
2191+
f'.maybe_error: {ctx.maybe_error!r}\n'
2192+
)
2193+
# await pause(shield=True)
2194+
20702195
if not ctx_cs.cancel_called:
20712196
raise
20722197

2073-
# from .devx import pause
2074-
# await pause(shield=True)
2075-
2076-
log.cancel(
2077-
'IPC ctx was cancelled during "child" task sync due to\n\n'
2078-
f'{ctx.maybe_error}\n'
2079-
)
20802198
# OW if the ctx's scope was cancelled manually,
20812199
# likely the `Context` was cancelled via a call to
20822200
# `._maybe_cancel_and_set_remote_error()` so ensure
@@ -2199,7 +2317,7 @@ async def open_context_from_portal(
21992317
# documenting it as a definittive example of
22002318
# debugging the tractor-runtime itself using it's
22012319
# own `.devx.` tooling!
2202-
#
2320+
#
22032321
# await debug.pause()
22042322

22052323
# CASE 2: context was cancelled by local task calling
@@ -2272,13 +2390,16 @@ async def open_context_from_portal(
22722390
match scope_err:
22732391
case trio.Cancelled():
22742392
logmeth = log.cancel
2393+
cause: str = 'cancelled'
22752394

22762395
# XXX explicitly report on any non-graceful-taskc cases
22772396
case _:
2397+
cause: str = 'errored'
22782398
logmeth = log.exception
22792399

22802400
logmeth(
2281-
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
2401+
f'ctx {ctx.side!r}-side {cause!r} with,\n'
2402+
f'{ctx.repr_outcome()!r}\n'
22822403
)
22832404

22842405
if debug_mode():
@@ -2303,6 +2424,7 @@ async def open_context_from_portal(
23032424
# told us it's cancelled ;p
23042425
if ctxc_from_child is None:
23052426
try:
2427+
# await pause(shield=True)
23062428
await ctx.cancel()
23072429
except (
23082430
trio.BrokenResourceError,
@@ -2459,8 +2581,10 @@ async def open_context_from_portal(
24592581
log.cancel(
24602582
f'Context cancelled by local {ctx.side!r}-side task\n'
24612583
f'c)>\n'
2462-
f' |_{ctx._task}\n\n'
2463-
f'{repr(scope_err)}\n'
2584+
f' |_{ctx.parent_task}\n'
2585+
f' .cid={ctx.cid!r}\n'
2586+
f'\n'
2587+
f'{scope_err!r}\n'
24642588
)
24652589

24662590
# TODO: should we add a `._cancel_req_received`

0 commit comments

Comments
 (0)