1818
1919from prometheus_client import Counter
2020
21- from twisted .internet import defer
22-
2321import synapse .metrics
2422from synapse .api .presence import UserPresenceState
2523from synapse .events import EventBase
2624from synapse .federation .sender .per_destination_queue import PerDestinationQueue
2725from synapse .federation .sender .transaction_manager import TransactionManager
2826from synapse .federation .units import Edu
2927from synapse .handlers .presence import get_interested_remotes
30- from synapse .logging .context import (
31- make_deferred_yieldable ,
32- preserve_fn ,
33- run_in_background ,
34- )
28+ from synapse .logging .context import preserve_fn
3529from synapse .metrics import (
3630 LaterGauge ,
3731 event_processing_loop_counter ,
3832 event_processing_loop_room_count ,
3933 events_processed_counter ,
4034)
4135from synapse .metrics .background_process_metrics import run_as_background_process
42- from synapse .types import JsonDict , ReadReceipt , RoomStreamToken
36+ from synapse .types import Collection , JsonDict , ReadReceipt , RoomStreamToken
4337from synapse .util .metrics import Measure , measure_func
4438
4539if TYPE_CHECKING :
@@ -276,15 +270,27 @@ async def _process_event_queue_loop(self) -> None:
276270 if not events and next_token >= self ._last_poked_id :
277271 break
278272
279- async def handle_event (event : EventBase ) -> None :
273+ async def get_destinations_for_event (
274+ event : EventBase ,
275+ ) -> Collection [str ]:
276+ """Computes the destinations to which this event must be sent.
277+
278+ This returns an empty tuple when there are no destinations to send to,
279+ or if this event is not from this homeserver and it is not sending
280+ it on behalf of another server.
281+
282+ Will also filter out destinations which this sender is not responsible for,
283+ if multiple federation senders exist.
284+ """
285+
280286 # Only send events for this server.
281287 send_on_behalf_of = event .internal_metadata .get_send_on_behalf_of ()
282288 is_mine = self .is_mine_id (event .sender )
283289 if not is_mine and send_on_behalf_of is None :
284- return
290+ return ()
285291
286292 if not event .internal_metadata .should_proactively_send ():
287- return
293+ return ()
288294
289295 destinations = None # type: Optional[Set[str]]
290296 if not event .prev_event_ids ():
@@ -319,7 +325,7 @@ async def handle_event(event: EventBase) -> None:
319325 "Failed to calculate hosts in room for event: %s" ,
320326 event .event_id ,
321327 )
322- return
328+ return ()
323329
324330 destinations = {
325331 d
@@ -329,42 +335,45 @@ async def handle_event(event: EventBase) -> None:
329335 )
330336 }
331337
338+ destinations .discard (self .server_name )
339+
332340 if send_on_behalf_of is not None :
333341 # If we are sending the event on behalf of another server
334342 # then it already has the event and there is no reason to
335343 # send the event to it.
336344 destinations .discard (send_on_behalf_of )
337345
338- logger .debug ("Sending %s to %r" , event , destinations )
339-
340346 if destinations :
341- await self ._send_pdu (event , destinations )
342-
343347 now = self .clock .time_msec ()
344348 ts = await self .store .get_received_ts (event .event_id )
345349
346350 synapse .metrics .event_processing_lag_by_event .labels (
347351 "federation_sender"
348352 ).observe ((now - ts ) / 1000 )
349353
350- async def handle_room_events (events : Iterable [EventBase ]) -> None :
351- with Measure (self .clock , "handle_room_events" ):
352- for event in events :
353- await handle_event (event )
354-
355- events_by_room = {} # type: Dict[str, List[EventBase]]
356- for event in events :
357- events_by_room .setdefault (event .room_id , []).append (event )
358-
359- await make_deferred_yieldable (
360- defer .gatherResults (
361- [
362- run_in_background (handle_room_events , evs )
363- for evs in events_by_room .values ()
364- ],
365- consumeErrors = True ,
366- )
367- )
354+ return destinations
355+ return ()
356+
357+ async def get_federatable_events_and_destinations (
358+ events : Iterable [EventBase ],
359+ ) -> List [Tuple [EventBase , Collection [str ]]]:
360+ with Measure (self .clock , "get_destinations_for_events" ):
361+ # Fetch federation destinations per event,
362+ # skip if get_destinations_for_event returns an empty collection,
363+ # return list of event->destinations pairs.
364+ return [
365+ (event , dests )
366+ for (event , dests ) in [
367+ (event , await get_destinations_for_event (event ))
368+ for event in events
369+ ]
370+ if dests
371+ ]
372+
373+ events_and_dests = await get_federatable_events_and_destinations (events )
374+
375+ # Send corresponding events to each destination queue
376+ await self ._distribute_events (events_and_dests )
368377
369378 await self .store .update_federation_out_pos ("events" , next_token )
370379
@@ -382,7 +391,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
382391 events_processed_counter .inc (len (events ))
383392
384393 event_processing_loop_room_count .labels ("federation_sender" ).inc (
385- len (events_by_room )
394+ len ({ event . room_id for event in events } )
386395 )
387396
388397 event_processing_loop_counter .labels ("federation_sender" ).inc ()
@@ -394,34 +403,53 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
394403 finally :
395404 self ._is_processing = False
396405
397- async def _send_pdu (self , pdu : EventBase , destinations : Iterable [str ]) -> None :
398- # We loop through all destinations to see whether we already have
399- # a transaction in progress. If we do, stick it in the pending_pdus
400- # table and we'll get back to it later.
406+ async def _distribute_events (
407+ self ,
408+ events_and_dests : Iterable [Tuple [EventBase , Collection [str ]]],
409+ ) -> None :
410+ """Distribute events to the respective per_destination queues.
401411
402- destinations = set (destinations )
403- destinations .discard (self .server_name )
404- logger .debug ("Sending to: %s" , str (destinations ))
412+ Also persists last-seen per-room stream_ordering to 'destination_rooms'.
405413
406- if not destinations :
407- return
414+ Args:
415+ events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
416+ Every event is paired with its intended destinations (in federation).
417+ """
418+ # Tuples of room_id + destination to their max-seen stream_ordering
419+ room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
408420
409- sent_pdus_destination_dist_total . inc ( len ( destinations ))
410- sent_pdus_destination_dist_count . inc ()
421+ # List of events to send to each destination
422+ events_by_dest = {} # type: Dict[str, List[EventBase]]
411423
412- assert pdu .internal_metadata .stream_ordering
424+ # For each event-destinations pair...
425+ for event , destinations in events_and_dests :
413426
414- # track the fact that we have a PDU for these destinations,
415- # to allow us to perform catch-up later on if the remote is unreachable
416- # for a while.
417- await self .store .store_destination_rooms_entries (
418- destinations ,
419- pdu .room_id ,
420- pdu .internal_metadata .stream_ordering ,
427+ # (we got this from the database, it's filled)
428+ assert event .internal_metadata .stream_ordering
429+
430+ sent_pdus_destination_dist_total .inc (len (destinations ))
431+ sent_pdus_destination_dist_count .inc ()
432+
433+ # ...iterate over those destinations..
434+ for destination in destinations :
435+ # ...update their stream-ordering...
436+ room_with_dest_stream_ordering [(event .room_id , destination )] = max (
437+ event .internal_metadata .stream_ordering ,
438+ room_with_dest_stream_ordering .get ((event .room_id , destination ), 0 ),
439+ )
440+
441+ # ...and add the event to each destination queue.
442+ events_by_dest .setdefault (destination , []).append (event )
443+
444+ # Bulk-store destination_rooms stream_ids
445+ await self .store .bulk_store_destination_rooms_entries (
446+ room_with_dest_stream_ordering
421447 )
422448
423- for destination in destinations :
424- self ._get_per_destination_queue (destination ).send_pdu (pdu )
449+ for destination , pdus in events_by_dest .items ():
450+ logger .debug ("Sending %d pdus to %s" , len (pdus ), destination )
451+
452+ self ._get_per_destination_queue (destination ).send_pdus (pdus )
425453
426454 async def send_read_receipt (self , receipt : ReadReceipt ) -> None :
427455 """Send a RR to any other servers in the room
0 commit comments