|
27 | 27 | Tuple, |
28 | 28 | ) |
29 | 29 |
|
30 | | -import attr |
31 | 30 | from prometheus_client import Counter |
32 | 31 |
|
33 | | -from twisted.internet import defer |
34 | | - |
35 | 32 | from synapse import event_auth |
36 | 33 | from synapse.api.constants import ( |
37 | 34 | EventContentFields, |
|
54 | 51 | from synapse.events import EventBase |
55 | 52 | from synapse.events.snapshot import EventContext |
56 | 53 | from synapse.federation.federation_client import InvalidResponseError |
57 | | -from synapse.logging.context import ( |
58 | | - make_deferred_yieldable, |
59 | | - nested_logging_context, |
60 | | - run_in_background, |
61 | | -) |
| 54 | +from synapse.logging.context import nested_logging_context, run_in_background |
62 | 55 | from synapse.logging.utils import log_function |
63 | 56 | from synapse.metrics.background_process_metrics import run_as_background_process |
64 | 57 | from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet |
|
75 | 68 | UserID, |
76 | 69 | get_domain_from_id, |
77 | 70 | ) |
78 | | -from synapse.util.async_helpers import Linearizer, concurrently_execute |
| 71 | +from synapse.util.async_helpers import ( |
| 72 | + Linearizer, |
| 73 | + concurrently_execute, |
| 74 | + yieldable_gather_results, |
| 75 | +) |
79 | 76 | from synapse.util.iterutils import batch_iter |
80 | 77 | from synapse.util.retryutils import NotRetryingDestination |
81 | 78 | from synapse.util.stringutils import shortstr |
|
92 | 89 | ) |
93 | 90 |
|
94 | 91 |
|
95 | | -@attr.s(slots=True, frozen=True, auto_attribs=True) |
96 | | -class _NewEventInfo: |
97 | | - """Holds information about a received event, ready for passing to _auth_and_persist_events |
98 | | -
|
99 | | - Attributes: |
100 | | - event: the received event |
101 | | -
|
102 | | - claimed_auth_event_map: a map of (type, state_key) => event for the event's |
103 | | - claimed auth_events. |
104 | | -
|
105 | | - This can include events which have not yet been persisted, in the case that |
106 | | - we are backfilling a batch of events. |
107 | | -
|
108 | | - Note: May be incomplete: if we were unable to find all of the claimed auth |
109 | | - events. Also, treat the contents with caution: the events might also have |
110 | | - been rejected, might not yet have been authorized themselves, or they might |
111 | | - be in the wrong room. |
112 | | -
|
113 | | - """ |
114 | | - |
115 | | - event: EventBase |
116 | | - claimed_auth_event_map: StateMap[EventBase] |
117 | | - |
118 | | - |
119 | 92 | class FederationEventHandler: |
120 | 93 | """Handles events that originated from federation. |
121 | 94 |
|
@@ -1203,47 +1176,27 @@ async def _auth_and_persist_fetched_events( |
1203 | 1176 | allow_rejected=True, |
1204 | 1177 | ) |
1205 | 1178 |
|
1206 | | - event_infos = [] |
1207 | | - for event in fetched_events: |
1208 | | - auth = {} |
1209 | | - for auth_event_id in event.auth_event_ids(): |
1210 | | - ae = persisted_events.get(auth_event_id) |
1211 | | - if ae: |
1212 | | - auth[(ae.type, ae.state_key)] = ae |
1213 | | - else: |
1214 | | - logger.info("Missing auth event %s", auth_event_id) |
1215 | | - |
1216 | | - event_infos.append(_NewEventInfo(event, auth)) |
1217 | | - |
1218 | | - if not event_infos: |
1219 | | - return |
1220 | | - |
1221 | | - async def prep(ev_info: _NewEventInfo) -> EventContext: |
1222 | | - event = ev_info.event |
| 1179 | + async def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]: |
1223 | 1180 | with nested_logging_context(suffix=event.event_id): |
1224 | | - res = EventContext.for_outlier() |
1225 | | - res = await self._check_event_auth( |
| 1181 | + auth = {} |
| 1182 | + for auth_event_id in event.auth_event_ids(): |
| 1183 | + ae = persisted_events.get(auth_event_id) |
| 1184 | + if ae: |
| 1185 | + auth[(ae.type, ae.state_key)] = ae |
| 1186 | + else: |
| 1187 | + logger.info("Missing auth event %s", auth_event_id) |
| 1188 | + |
| 1189 | + context = EventContext.for_outlier() |
| 1190 | + context = await self._check_event_auth( |
1226 | 1191 | origin, |
1227 | 1192 | event, |
1228 | | - res, |
1229 | | - claimed_auth_event_map=ev_info.claimed_auth_event_map, |
| 1193 | + context, |
| 1194 | + claimed_auth_event_map=auth, |
1230 | 1195 | ) |
1231 | | - return res |
1232 | | - |
1233 | | - contexts = await make_deferred_yieldable( |
1234 | | - defer.gatherResults( |
1235 | | - [run_in_background(prep, ev_info) for ev_info in event_infos], |
1236 | | - consumeErrors=True, |
1237 | | - ) |
1238 | | - ) |
| 1196 | + return event, context |
1239 | 1197 |
|
1240 | | - await self.persist_events_and_notify( |
1241 | | - room_id, |
1242 | | - [ |
1243 | | - (ev_info.event, context) |
1244 | | - for ev_info, context in zip(event_infos, contexts) |
1245 | | - ], |
1246 | | - ) |
| 1198 | + events_to_persist = await yieldable_gather_results(prep, fetched_events) |
| 1199 | + await self.persist_events_and_notify(room_id, events_to_persist) |
1247 | 1200 |
|
1248 | 1201 | async def _check_event_auth( |
1249 | 1202 | self, |
|
0 commit comments