- 
                Notifications
    You must be signed in to change notification settings 
- Fork 404
          Remove sentinel logcontext in Clock utilities (looping_call, looping_call_now, call_later)
          #18907
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
6170762
              5d2e4c4
              414b178
              788cd19
              77b3228
              91f7bb3
              2e88eb3
              fbf5946
              3809f3f
              442dbc0
              13b938f
              04825eb
              f4ad07d
              a8e66e7
              0780183
              c535d8a
              aec7065
              2aa15b0
              1c001c9
              13a5a36
              eae83e8
              3797515
              9116e74
              57cc675
              45e6b78
              4145160
              f2ae33a
              160eb63
              c8d0f97
              774d25f
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Remove `sentinel` logcontext usage in `Clock` utilities like `looping_call` and `call_later`. | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -41,6 +41,7 @@ | |
| from typing_extensions import ParamSpec | ||
|  | ||
| from twisted.internet import defer, task | ||
| from twisted.internet.defer import Deferred | ||
| from twisted.internet.interfaces import IDelayedCall, IReactorTime | ||
| from twisted.internet.task import LoopingCall | ||
| from twisted.python.failure import Failure | ||
|  | @@ -121,6 +122,8 @@ class Clock: | |
|  | ||
| async def sleep(self, seconds: float) -> None: | ||
| d: defer.Deferred[float] = defer.Deferred() | ||
| # Start task in the `sentinel` logcontext, to avoid leaking the current context | ||
| # into the reactor once it finishes. | ||
| with context.PreserveLoggingContext(): | ||
| self._reactor.callLater(seconds, d.callback, seconds) | ||
| await d | ||
|  | @@ -149,8 +152,9 @@ def looping_call( | |
| this functionality thanks to this function being a thin wrapper around | ||
| `twisted.internet.task.LoopingCall`. | ||
|  | ||
| Note that the function will be called with no logcontext, so if it is anything | ||
| other than trivial, you probably want to wrap it in run_as_background_process. | ||
| Note that the function will be called with generic `looping_call` logcontext, so | ||
| if it is anything other than a trivial task, you probably want to wrap it in | ||
| `run_as_background_process` to give it more specific label and track metrics. | ||
|  | ||
| Args: | ||
| f: The function to call repeatedly. | ||
|  | @@ -172,8 +176,9 @@ def looping_call_now( | |
| As with `looping_call`: subsequent calls are not scheduled until after the | ||
| the Awaitable returned by a previous call has finished. | ||
|  | ||
| Also as with `looping_call`: the function is called with no logcontext and | ||
| you probably want to wrap it in `run_as_background_process`. | ||
| Note that the function will be called with generic `looping_call` logcontext, so | ||
| if it is anything other than a trivial task, you probably want to wrap it in | ||
| `run_as_background_process` to give it more specific label and track metrics. | ||
|  | ||
| Args: | ||
| f: The function to call repeatedly. | ||
|  | @@ -192,9 +197,44 @@ def _looping_call_common( | |
| **kwargs: P.kwargs, | ||
| ) -> LoopingCall: | ||
| """Common functionality for `looping_call` and `looping_call_now`""" | ||
| call = task.LoopingCall(f, *args, **kwargs) | ||
|  | ||
| def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: | ||
| assert context.current_context() is context.SENTINEL_CONTEXT, ( | ||
| "Expected `looping_call` callback from the reactor to start with the sentinel logcontext " | ||
| f"but saw {context.current_context()}. In other words, another task shouldn't have " | ||
| "leaked their logcontext to us." | ||
| ) | ||
|  | ||
| # Because this is a callback from the reactor, we will be using the | ||
| # `sentinel` log context at this point. We want the function to log with | ||
| # some logcontext as we want to know which server the logs came from. | ||
| # | ||
| # We use `PreserveLoggingContext` to prevent our new `looping_call` | ||
| # logcontext from finishing as soon as we exit this function, in case `f` | ||
| # returns an awaitable/deferred which would continue running and may try to | ||
| # restore the `loop_call` context when it's done (because it's trying to | ||
| # adhere to the Synapse logcontext rules.) | ||
| # | ||
| # This also ensures that we return to the `sentinel` context when we exit | ||
| # this function and yield control back to the reactor to avoid leaking the | ||
| # current logcontext to the reactor (which would then get picked up and | ||
| # associated with the next thing the reactor does) | ||
| with context.PreserveLoggingContext(context.LoggingContext("looping_call")): | ||
| # We use `run_in_background` to reset the logcontext after `f` (or the | ||
| # awaitable returned by `f`) completes to avoid leaking the current | ||
| # logcontext to the reactor | ||
| return context.run_in_background(f, *args, **kwargs) | ||
|  | ||
| call = task.LoopingCall(wrapped_f, *args, **kwargs) | ||
| call.clock = self._reactor | ||
| d = call.start(msec / 1000.0, now=now) | ||
| # If `now=true`, the function will be called here immediately so we need to be | ||
| # in the sentinel context now. | ||
| # | ||
| # We want to start the task in the `sentinel` logcontext, to avoid leaking the | ||
| # current context into the reactor after the function finishes. TODO: Or perhaps | ||
| # someone cancels the looping call (does this matter?). | ||
| with context.PreserveLoggingContext(): | ||
| d = call.start(msec / 1000.0, now=now) | ||
| d.addErrback(log_failure, "Looping call died", consumeErrors=False) | ||
| return call | ||
|  | ||
|  | @@ -203,8 +243,9 @@ def call_later( | |
| ) -> IDelayedCall: | ||
| """Call something later | ||
|  | ||
| Note that the function will be called with no logcontext, so if it is anything | ||
| other than trivial, you probably want to wrap it in run_as_background_process. | ||
| Note that the function will be called with generic `call_later` logcontext, so | ||
| if it is anything other than a trivial task, you probably want to wrap it in | ||
| `run_as_background_process` to give it more specific label and track metrics. | ||
|  | ||
| Args: | ||
| delay: How long to wait in seconds. | ||
|  | @@ -214,11 +255,33 @@ def call_later( | |
| """ | ||
|  | ||
| def wrapped_callback(*args: Any, **kwargs: Any) -> None: | ||
| with context.PreserveLoggingContext(): | ||
| callback(*args, **kwargs) | ||
|  | ||
| with context.PreserveLoggingContext(): | ||
|          | ||
| return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) | ||
| assert context.current_context() is context.SENTINEL_CONTEXT, ( | ||
| "Expected `call_later` callback from the reactor to start with the sentinel logcontext " | ||
| f"but saw {context.current_context()}. In other words, another task shouldn't have " | ||
| "leaked their logcontext to us." | ||
| ) | ||
|  | ||
| # Because this is a callback from the reactor, we will be using the | ||
| # `sentinel` log context at this point. We want the function to log with | ||
| # some logcontext as we want to know which server the logs came from. | ||
| # | ||
| # We use `PreserveLoggingContext` to prevent our new `call_later` | ||
| # logcontext from finishing as soon as we exit this function, in case `f` | ||
| # returns an awaitable/deferred which would continue running and may try to | ||
| # restore the `loop_call` context when it's done (because it's trying to | ||
| # adhere to the Synapse logcontext rules.) | ||
| # | ||
| # This also ensures that we return to the `sentinel` context when we exit | ||
| # this function and yield control back to the reactor to avoid leaking the | ||
| # current logcontext to the reactor (which would then get picked up and | ||
| # associated with the next thing the reactor does) | ||
| with context.PreserveLoggingContext(context.LoggingContext("call_later")): | ||
| # We use `run_in_background` to reset the logcontext after `f` (or the | ||
| # awaitable returned by `f`) completes to avoid leaking the current | ||
| # logcontext to the reactor | ||
| context.run_in_background(callback, *args, **kwargs) | ||
|  | ||
| return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) | ||
|  | ||
| def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: | ||
| try: | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -31,6 +31,7 @@ | |
|  | ||
| import synapse.rest.admin | ||
| from synapse.api.errors import Codes, SynapseError | ||
| from synapse.logging.context import make_deferred_yieldable | ||
| from synapse.push.emailpusher import EmailPusher | ||
| from synapse.rest.client import login, room | ||
| from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource | ||
|  | @@ -89,7 +90,7 @@ def sendmail(*args: Any, **kwargs: Any) -> Deferred: | |
| # This mocks out synapse.reactor.send_email._sendmail. | ||
| d: Deferred = Deferred() | ||
| self.email_attempts.append((d, args, kwargs)) | ||
| return d | ||
| return make_deferred_yieldable(d) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were leaking the logcontext into the reactor This took some debug diving to find the culprit but turns out we already had the same fix in the HTTP pusher tests since a267c2e (matrix-org/synapse#4204) Previously, this was causing some tests to fail: 
 
 We caught this because we added that new  | ||
|  | ||
| hs.get_send_email_handler()._sendmail = sendmail # type: ignore[assignment] | ||
|  | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not your problem to deal with, but I've often wondered why we don't just make
looping_calltake a name and do this wrapping itself. I can't think of a genericlooping_callctx ever being the desirable outcome.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible future 👍