-
Notifications
You must be signed in to change notification settings - Fork 404
Fix run_coroutine_in_background(...)
incorrectly handling logcontext
#18964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
00a4076
29b121e
c1f2431
2feae9a
ba22b73
7fcec49
ade9853
de7d49d
b60cd31
c6e8141
2ae72a6
caaa6a5
c9d5cda
8e5935c
ffbf188
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix `run_coroutine_in_background(...)` incorrectly handling logcontext. | ||
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -802,13 +802,15 @@ def run_in_background( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
deferred returned by the function completes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
To explain how the log contexts work here: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- When `run_in_background` is called, the current context is stored ("original"), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
we kick off the background task in the current context, and we restore that | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
original context before returning | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- When the background task finishes, we don't want to leak our context into the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
reactor which would erroneously get attached to the next operation picked up by | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
the event loop. We add a callback to the deferred which will clear the logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
context after it finishes and yields control back to the reactor. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- When `run_in_background` is called, the calling logcontext is stored | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
("original"), we kick off the background task in the current context, and we | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
restore that original context before returning. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- For a completed deferred, that's the end of the story. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- For an incomplete deferred, when the background task finishes, we don't want to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
leak our context into the reactor which would erroneously get attached to the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
next operation picked up by the event loop. We add a callback to the deferred | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
which will clear the logging context after it finishes and yields control back to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
the reactor. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+805
to
+813
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the explanation laid out in #18900 (comment) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Useful for wrapping functions that return a deferred or coroutine, which you don't | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
yield or await on (for instance because you want to pass it to | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -857,8 +859,11 @@ def run_in_background( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# The deferred has already completed | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if d.called and not d.paused: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# The function should have maintained the logcontext, so we can | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# optimise out the messing about | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# The function should have maintained the calling logcontext, so we can avoid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# messing with it further. Additionally, if the deferred has already completed, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# then it would be a mistake to then add a deferred callback (below) to reset | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# the logcontext to the sentinel logcontext as that would run immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# (remember our goal is to maintain the calling logcontext when we return). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return d | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# The function may have reset the context before returning, so we need to restore it | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
**Rules for functions returning awaitables:** | |
> - If the awaitable is already complete, the function returns with the | |
> same logcontext it started with. | |
> - If the awaitable is incomplete, the function clears the logcontext | |
> before returning; when the awaitable completes, it restores the | |
> logcontext before running any callbacks. |
I've updated the comments with some further details:
synapse/synapse/logging/context.py
Lines 860 to 899 in ffbf188
# The deferred has already completed | |
if d.called and not d.paused: | |
# If the function messes with logcontexts, we can assume it follows the Synapse | |
# logcontext rules (Rules for functions returning awaitables: "If the awaitable | |
# is already complete, the function returns with the same logcontext it started | |
# with."). If it function doesn't touch logcontexts at all, we can also assume | |
# the logcontext is unchanged. | |
# | |
# Either way, the function should have maintained the calling logcontext, so we | |
# can avoid messing with it further. Additionally, if the deferred has already | |
# completed, then it would be a mistake to then add a deferred callback (below) | |
# to reset the logcontext to the sentinel logcontext as that would run | |
# immediately (remember our goal is to maintain the calling logcontext when we | |
# return). | |
return d | |
# Since the function we called may follow the Synapse logcontext rules (Rules for | |
# functions returning awaitables: "If the awaitable is incomplete, the function | |
# clears the logcontext before returning"), the function may have reset the | |
# logcontext before returning, so we need to restore the calling logcontext now | |
# before we return ourselves. | |
# | |
# Our goal is to have the caller logcontext unchanged after firing off the | |
# background task and returning. | |
set_current_context(calling_context) | |
# If the function we called is playing nice and following the Synapse logcontext | |
# rules, it will restore original calling logcontext when the deferred completes; | |
# but there is nothing waiting for it, so it will get leaked into the reactor (which | |
# would then get picked up by the next thing the reactor does). We therefore need to | |
# reset the logcontext here (set the `sentinel` logcontext) before yielding control | |
# back to the reactor. | |
# | |
# (If this feels asymmetric, consider it this way: we are | |
# effectively forking a new thread of execution. We are | |
# probably currently within a ``with LoggingContext()`` block, | |
# which is supposed to have a single entry and exit point. But | |
# by spawning off another deferred, we are effectively | |
# adding a new exit point.) | |
d.addBoth(_set_context_cb, SENTINEL_CONTEXT) |
Let me know if that makes more sense otherwise we can continue iterating in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so there's a difference in the potential current logcontext dependent on whether the awaitable is complete or incomplete. That makes sense why we don't need to bother when it has completed.
Thanks for the clarification and the extra comments!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this last sentence because this isn't true on multiple levels:
- Calling an
async
function will run immediately until it yields (hits anawait
for an incomplete awaitable) - Calling an
async
function can change the logcontext (and happens all the time). This is exactly why we set the logcontext back to thecalling_context
before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix the root problem, all we need to do is add this same fix that run_in_background
already has for already completed deferreds:
synapse/synapse/logging/context.py
Lines 858 to 862 in 9680804
# The deferred has already completed | |
if d.called and not d.paused: | |
# The function should have maintained the logcontext, so we can | |
# optimise out the messing about | |
return d |
But instead of duplicating all of this specialty logic and context into run_coroutine_in_background(...)
, we can just simplify to using run_in_background(...)
. Especially when run_coroutine_in_background(...)
is just an ergonomic wrapper around run_in_background(...)
See #18900 (comment) for more information on how this shortcut and the logcontext logic works for run_in_background(...)
Related conversation where I asked why we even have run_coroutine_in_background(...)
-> #18900 (comment)
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
import logging | ||
from typing import Callable, Generator, cast | ||
|
||
import twisted.python.failure | ||
from twisted.internet import defer, reactor as _reactor | ||
|
||
from synapse.logging.context import ( | ||
|
@@ -33,6 +32,7 @@ | |
current_context, | ||
make_deferred_yieldable, | ||
nested_logging_context, | ||
run_coroutine_in_background, | ||
run_in_background, | ||
) | ||
from synapse.types import ISynapseReactor | ||
|
@@ -249,88 +249,192 @@ async def competing_callback() -> None: | |
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: | ||
sentinel_context = current_context() | ||
async def _test_run_in_background(self, function: Callable[[], object]) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
clock = Clock(reactor) | ||
|
||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_completed = False | ||
callback_finished = False | ||
|
||
with LoggingContext("foo"): | ||
# fire off function, but don't wait on it. | ||
d2 = run_in_background(function) | ||
# Fire off the function, but don't wait on it. | ||
deferred = run_in_background(function) | ||
self._check_test_key("foo") | ||
|
||
def cb(res: object) -> object: | ||
nonlocal callback_completed | ||
callback_completed = True | ||
return res | ||
def callback(result: object) -> object: | ||
nonlocal callback_finished | ||
callback_finished = True | ||
# Pass through the result | ||
return result | ||
|
||
d2.addCallback(cb) | ||
# We `addBoth` because when exceptions happen, we still want to mark the | ||
# callback as finished so that the test can complete and we see the | ||
# underlying error. | ||
deferred.addBoth(callback) | ||
|
||
self._check_test_key("foo") | ||
|
||
# now wait for the function under test to have run, and check that | ||
# the logcontext is left in a sane state. | ||
d2 = defer.Deferred() | ||
|
||
def check_logcontext() -> None: | ||
if not callback_completed: | ||
reactor.callLater(0.01, check_logcontext) | ||
return | ||
# Now wait for the function under test to have run, and check that | ||
# the logcontext is left in a sane state. | ||
while not callback_finished: | ||
await clock.sleep(0) | ||
self._check_test_key("foo") | ||
|
||
# make sure that the context was reset before it got thrown back | ||
# into the reactor | ||
try: | ||
self.assertIs(current_context(), sentinel_context) | ||
d2.callback(None) | ||
except BaseException: | ||
d2.errback(twisted.python.failure.Failure()) | ||
|
||
reactor.callLater(0.01, check_logcontext) | ||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# test is done once d2 finishes | ||
return d2 | ||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: | ||
async def test_run_in_background_with_blocking_fn(self) -> None: | ||
async def blocking_function() -> None: | ||
await Clock(reactor).sleep(0) | ||
|
||
return self._test_run_in_background(blocking_function) | ||
await self._test_run_in_background(blocking_function) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: | ||
async def test_run_in_background_with_non_blocking_fn(self) -> None: | ||
@defer.inlineCallbacks | ||
def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: | ||
with PreserveLoggingContext(): | ||
yield defer.succeed(None) | ||
|
||
return self._test_run_in_background(nonblocking_function) | ||
await self._test_run_in_background(nonblocking_function) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: | ||
async def test_run_in_background_with_chained_deferred(self) -> None: | ||
# a function which returns a deferred which looks like it has been | ||
# called, but is actually paused | ||
def testfunc() -> defer.Deferred: | ||
return make_deferred_yieldable(_chained_deferred_function()) | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_coroutine(self) -> defer.Deferred: | ||
async def test_run_in_background_with_coroutine(self) -> None: | ||
""" | ||
Test `run_in_background` with a coroutine that yields control back to the | ||
reactor. | ||
|
||
This will stress the logic around incomplete deferreds in `run_in_background`. | ||
""" | ||
|
||
async def testfunc() -> None: | ||
self._check_test_key("foo") | ||
d = defer.ensureDeferred(Clock(reactor).sleep(0)) | ||
self.assertIs(current_context(), SENTINEL_CONTEXT) | ||
await d | ||
self._check_test_key("foo") | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: | ||
async def test_run_in_background_with_nonblocking_coroutine(self) -> None: | ||
""" | ||
Test `run_in_background` with a "nonblocking" coroutine (never yields control | ||
back to the reactor). | ||
|
||
This will stress the logic around completed deferreds in `run_in_background`. | ||
""" | ||
|
||
async def testfunc() -> None: | ||
self._check_test_key("foo") | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
async def test_run_coroutine_in_background(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New We could use a similar pattern to |
||
""" | ||
Test `run_coroutine_in_background` with a coroutine that yields control back to the | ||
reactor. | ||
|
||
This will stress the logic around incomplete deferreds in `run_coroutine_in_background`. | ||
""" | ||
clock = Clock(reactor) | ||
|
||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_finished = False | ||
|
||
async def competing_callback() -> None: | ||
nonlocal callback_finished | ||
try: | ||
# The callback should have the same logcontext as the caller | ||
self._check_test_key("foo") | ||
|
||
with LoggingContext("competing"): | ||
await clock.sleep(0) | ||
self._check_test_key("competing") | ||
|
||
self._check_test_key("foo") | ||
finally: | ||
# When exceptions happen, we still want to mark the callback as finished | ||
# so that the test can complete and we see the underlying error. | ||
callback_finished = True | ||
|
||
with LoggingContext("foo"): | ||
run_coroutine_in_background(competing_callback()) | ||
self._check_test_key("foo") | ||
await clock.sleep(0) | ||
self._check_test_key("foo") | ||
|
||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
async def test_run_coroutine_in_background_with_nonblocking_coroutine(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not in love with the "nonblocking" terminology used here but I've aligned with the prior art here ( |
||
""" | ||
Test `run_coroutine_in_background` with a "nonblocking" coroutine (never yields control | ||
back to the reactor). | ||
|
||
This will stress the logic around completed deferreds in `run_coroutine_in_background`. | ||
""" | ||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_finished = False | ||
|
||
async def competing_callback() -> None: | ||
nonlocal callback_finished | ||
try: | ||
# The callback should have the same logcontext as the caller | ||
self._check_test_key("foo") | ||
|
||
with LoggingContext("competing"): | ||
# We `await` here but there is nothing to wait for here since the | ||
# deferred is already complete so we should immediately continue | ||
# executing in the same context. | ||
await defer.succeed(None) | ||
|
||
self._check_test_key("competing") | ||
|
||
self._check_test_key("foo") | ||
finally: | ||
# When exceptions happen, we still want to mark the callback as finished | ||
# so that the test can complete and we see the underlying error. | ||
callback_finished = True | ||
|
||
with LoggingContext("foo"): | ||
run_coroutine_in_background(competing_callback()) | ||
self._check_test_key("foo") | ||
|
||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
@defer.inlineCallbacks | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this technically regressed in #18900 which is part of
1.139.0rc1
, we should land another RC with this PR(The PR description here at the top explains the regression)
cc @anoadragon453