|
19 | 19 |
|
20 | 20 | from canonicaljson import encode_canonical_json |
21 | 21 |
|
| 22 | +from twisted.internet import defer |
22 | 23 | from twisted.internet.interfaces import IDelayedCall |
23 | 24 |
|
24 | 25 | from synapse import event_auth |
|
43 | 44 | from synapse.events.builder import EventBuilder |
44 | 45 | from synapse.events.snapshot import EventContext |
45 | 46 | from synapse.events.validator import EventValidator |
46 | | -from synapse.logging.context import run_in_background |
| 47 | +from synapse.logging.context import make_deferred_yieldable, run_in_background |
47 | 48 | from synapse.metrics.background_process_metrics import run_as_background_process |
48 | 49 | from synapse.replication.http.send_event import ReplicationSendEventRestServlet |
49 | 50 | from synapse.storage.databases.main.events_worker import EventRedactBehaviour |
50 | 51 | from synapse.storage.state import StateFilter |
51 | 52 | from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester |
52 | | -from synapse.util import json_decoder, json_encoder |
53 | | -from synapse.util.async_helpers import Linearizer |
| 53 | +from synapse.util import json_decoder, json_encoder, log_failure |
| 54 | +from synapse.util.async_helpers import Linearizer, unwrapFirstError |
54 | 55 | from synapse.util.caches.expiringcache import ExpiringCache |
55 | 56 | from synapse.util.metrics import measure_func |
56 | 57 | from synapse.visibility import filter_events_for_client |
@@ -979,9 +980,43 @@ async def handle_new_client_event( |
979 | 980 | logger.exception("Failed to encode content: %r", event.content) |
980 | 981 | raise |
981 | 982 |
|
982 | | - await self.action_generator.handle_push_actions_for_event(event, context) |
| 983 | + # We now persist the event (and update the cache in parallel, since we |
| 984 | + # don't want to block on it). |
| 985 | + result = await make_deferred_yieldable( |
| 986 | + defer.gatherResults( |
| 987 | + [ |
| 988 | + run_in_background( |
| 989 | + self._persist_event, |
| 990 | + requester=requester, |
| 991 | + event=event, |
| 992 | + context=context, |
| 993 | + ratelimit=ratelimit, |
| 994 | + extra_users=extra_users, |
| 995 | + ), |
| 996 | + run_in_background( |
| 997 | + self.cache_joined_hosts_for_event, event, context |
| 998 | + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), |
| 999 | + ], |
| 1000 | + consumeErrors=True, |
| 1001 | + ) |
| 1002 | + ).addErrback(unwrapFirstError) |
| 1003 | + |
| 1004 | + return result[0] |
| 1005 | + |
| 1006 | + async def _persist_event( |
| 1007 | + self, |
| 1008 | + requester: Requester, |
| 1009 | + event: EventBase, |
| 1010 | + context: EventContext, |
| 1011 | + ratelimit: bool = True, |
| 1012 | + extra_users: Optional[List[UserID]] = None, |
| 1013 | + ) -> EventBase: |
| 1014 | + """Actually persists the event. Should only be called by |
| 1015 | + `handle_new_client_event`, and see its docstring for documentation of |
| 1016 | + the arguments. |
| 1017 | + """ |
983 | 1018 |
|
984 | | - await self.cache_joined_hosts_for_event(event, context) |
| 1019 | + await self.action_generator.handle_push_actions_for_event(event, context) |
985 | 1020 |
|
986 | 1021 | try: |
987 | 1022 | # If we're a worker we need to hit out to the master. |
|
0 commit comments