3535from synapse .logging .opentracing import SynapseTags , set_tag
3636from synapse .metrics import sent_transactions_counter
3737from synapse .metrics .background_process_metrics import run_as_background_process
38- from synapse .types import ReadReceipt
38+ from synapse .types import JsonDict , ReadReceipt
3939from synapse .util .retryutils import NotRetryingDestination , get_retry_limiter
4040from synapse .visibility import filter_events_for_server
4141
@@ -136,8 +136,11 @@ def __init__(
136136 # destination
137137 self ._pending_presence : Dict [str , UserPresenceState ] = {}
138138
139- # room_id -> receipt_type -> user_id -> receipt_dict
140- self ._pending_rrs : Dict [str , Dict [str , Dict [str , dict ]]] = {}
139+ # List of room_id -> receipt_type -> user_id -> receipt_dict,
140+ #
141+ # Each receipt can only have a single receipt per
142+ # (room ID, receipt type, user ID, thread ID) tuple.
143+ self ._pending_receipt_edus : List [Dict [str , Dict [str , Dict [str , dict ]]]] = []
141144 self ._rrs_pending_flush = False
142145
143146 # stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ def queue_read_receipt(self, receipt: ReadReceipt) -> None:
202205 Args:
203206 receipt: receipt to be queued
204207 """
205- self ._pending_rrs .setdefault (receipt .room_id , {}).setdefault (
206- receipt .receipt_type , {}
207- )[receipt .user_id ] = {"event_ids" : receipt .event_ids , "data" : receipt .data }
208+ serialized_receipt : JsonDict = {
209+ "event_ids" : receipt .event_ids ,
210+ "data" : receipt .data ,
211+ }
212+ if receipt .thread_id is not None :
213+ serialized_receipt ["data" ]["thread_id" ] = receipt .thread_id
214+
215+ # Find which EDU to add this receipt to. There's three situations depending
216+ # on the (room ID, receipt type, user, thread ID) tuple:
217+ #
218+ # 1. If it fully matches, clobber the information.
219+ # 2. If it is missing, add the information.
220+ # 3. If the subset tuple of (room ID, receipt type, user) matches, check
221+ # the next EDU (or add a new EDU).
222+ for edu in self ._pending_receipt_edus :
223+ receipt_content = edu .setdefault (receipt .room_id , {}).setdefault (
224+ receipt .receipt_type , {}
225+ )
226+ # If this room ID, receipt type, user ID is not in this EDU, OR if
227+ # the full tuple matches, use the current EDU.
228+ if (
229+ receipt .user_id not in receipt_content
230+ or receipt_content [receipt .user_id ].get ("thread_id" )
231+ == receipt .thread_id
232+ ):
233+ receipt_content [receipt .user_id ] = serialized_receipt
234+ break
235+
236+ # If no matching EDU was found, create a new one.
237+ else :
238+ self ._pending_receipt_edus .append (
239+ {
240+ receipt .room_id : {
241+ receipt .receipt_type : {receipt .user_id : serialized_receipt }
242+ }
243+ }
244+ )
208245
209246 def flush_read_receipts_for_room (self , room_id : str ) -> None :
210- # if we don't have any read-receipts for this room, it may be that we've already
211- # sent them out, so we don't need to flush.
212- if room_id not in self ._pending_rrs :
213- return
214- self ._rrs_pending_flush = True
215- self .attempt_new_transaction ()
247+ # If there are any pending receipts for this room then force-flush them
248+ # in a new transaction.
249+ for edu in self ._pending_receipt_edus :
250+ if room_id in edu :
251+ self ._rrs_pending_flush = True
252+ self .attempt_new_transaction ()
253+ # No use in checking remaining EDUs if the room was found.
254+ break
216255
217256 def send_keyed_edu (self , edu : Edu , key : Hashable ) -> None :
218257 self ._pending_edus_keyed [(edu .edu_type , key )] = edu
@@ -351,7 +390,7 @@ async def _transaction_transmission_loop(self) -> None:
351390 self ._pending_edus = []
352391 self ._pending_edus_keyed = {}
353392 self ._pending_presence = {}
354- self ._pending_rrs = {}
393+ self ._pending_receipt_edus = []
355394
356395 self ._start_catching_up ()
357396 except FederationDeniedError as e :
@@ -543,22 +582,27 @@ async def _catch_up_transmission_loop(self) -> None:
543582 self ._destination , last_successful_stream_ordering
544583 )
545584
546- def _get_rr_edus (self , force_flush : bool ) -> Iterable [Edu ]:
547- if not self ._pending_rrs :
585+ def _get_receipt_edus (self , force_flush : bool , limit : int ) -> Iterable [Edu ]:
586+ if not self ._pending_receipt_edus :
548587 return
549588 if not force_flush and not self ._rrs_pending_flush :
550589 # not yet time for this lot
551590 return
552591
553- edu = Edu (
554- origin = self ._server_name ,
555- destination = self ._destination ,
556- edu_type = EduTypes .RECEIPT ,
557- content = self ._pending_rrs ,
558- )
559- self ._pending_rrs = {}
560- self ._rrs_pending_flush = False
561- yield edu
592+ # Send at most limit EDUs for receipts.
593+ for content in self ._pending_receipt_edus [:limit ]:
594+ yield Edu (
595+ origin = self ._server_name ,
596+ destination = self ._destination ,
597+ edu_type = EduTypes .RECEIPT ,
598+ content = content ,
599+ )
600+ self ._pending_receipt_edus = self ._pending_receipt_edus [limit :]
601+
602+ # If there are still pending read-receipts, don't reset the pending flush
603+ # flag.
604+ if not self ._pending_receipt_edus :
605+ self ._rrs_pending_flush = False
562606
563607 def _pop_pending_edus (self , limit : int ) -> List [Edu ]:
564608 pending_edus = self ._pending_edus
@@ -645,68 +689,79 @@ class _TransactionQueueManager:
645689 async def __aenter__ (self ) -> Tuple [List [EventBase ], List [Edu ]]:
646690 # First we calculate the EDUs we want to send, if any.
647691
648- # We start by fetching device related EDUs, i.e device updates and to
649- # device messages. We have to keep 2 free slots for presence and rr_edus.
650- device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
692+ # There's a maximum number of EDUs that can be sent with a transaction,
693+ # generally device updates and to-device messages get priority, but we
694+ # want to ensure that there's room for some other EDUs as well.
695+ #
696+ # This is done by:
697+ #
698+ # * Add a presence EDU, if one exists.
699+ # * Add up-to a small limit of read receipt EDUs.
700+ # * Add to-device EDUs, but leave some space for device list updates.
701+ # * Add device list updates EDUs.
702+ # * If there's any remaining room, add other EDUs.
703+ pending_edus = []
704+
705+ # Add presence EDU.
706+ if self .queue ._pending_presence :
707+ pending_edus .append (
708+ Edu (
709+ origin = self .queue ._server_name ,
710+ destination = self .queue ._destination ,
711+ edu_type = EduTypes .PRESENCE ,
712+ content = {
713+ "push" : [
714+ format_user_presence_state (
715+ presence , self .queue ._clock .time_msec ()
716+ )
717+ for presence in self .queue ._pending_presence .values ()
718+ ]
719+ },
720+ )
721+ )
722+ self .queue ._pending_presence = {}
651723
652- # We prioritize to-device messages so that existing encryption channels
724+ # Add read receipt EDUs.
725+ pending_edus .extend (self .queue ._get_receipt_edus (force_flush = False , limit = 5 ))
726+ edu_limit = MAX_EDUS_PER_TRANSACTION - len (pending_edus )
727+
728+ # Next, prioritize to-device messages so that existing encryption channels
653729 # work. We also keep a few slots spare (by reducing the limit) so that
654730 # we can still trickle out some device list updates.
655731 (
656732 to_device_edus ,
657733 device_stream_id ,
658- ) = await self .queue ._get_to_device_message_edus (device_edu_limit - 10 )
734+ ) = await self .queue ._get_to_device_message_edus (edu_limit - 10 )
659735
660736 if to_device_edus :
661737 self ._device_stream_id = device_stream_id
662738 else :
663739 self .queue ._last_device_stream_id = device_stream_id
664740
665- device_edu_limit -= len (to_device_edus )
741+ pending_edus .extend (to_device_edus )
742+ edu_limit -= len (to_device_edus )
666743
744+ # Add device list update EDUs.
667745 device_update_edus , dev_list_id = await self .queue ._get_device_update_edus (
668- device_edu_limit
746+ edu_limit
669747 )
670748
671749 if device_update_edus :
672750 self ._device_list_id = dev_list_id
673751 else :
674752 self .queue ._last_device_list_stream_id = dev_list_id
675753
676- pending_edus = device_update_edus + to_device_edus
677-
678- # Now add the read receipt EDU.
679- pending_edus .extend (self .queue ._get_rr_edus (force_flush = False ))
680-
681- # And presence EDU.
682- if self .queue ._pending_presence :
683- pending_edus .append (
684- Edu (
685- origin = self .queue ._server_name ,
686- destination = self .queue ._destination ,
687- edu_type = EduTypes .PRESENCE ,
688- content = {
689- "push" : [
690- format_user_presence_state (
691- presence , self .queue ._clock .time_msec ()
692- )
693- for presence in self .queue ._pending_presence .values ()
694- ]
695- },
696- )
697- )
698- self .queue ._pending_presence = {}
754+ pending_edus .extend (device_update_edus )
755+ edu_limit -= len (device_update_edus )
699756
700757 # Finally add any other types of EDUs if there is room.
701- pending_edus .extend (
702- self .queue ._pop_pending_edus (MAX_EDUS_PER_TRANSACTION - len (pending_edus ))
703- )
704- while (
705- len (pending_edus ) < MAX_EDUS_PER_TRANSACTION
706- and self .queue ._pending_edus_keyed
707- ):
758+ other_edus = self .queue ._pop_pending_edus (edu_limit )
759+ pending_edus .extend (other_edus )
760+ edu_limit -= len (other_edus )
761+ while edu_limit > 0 and self .queue ._pending_edus_keyed :
708762 _ , val = self .queue ._pending_edus_keyed .popitem ()
709763 pending_edus .append (val )
764+ edu_limit -= 1
710765
711766 # Now we look for any PDUs to send, by getting up to 50 PDUs from the
712767 # queue
@@ -717,8 +772,10 @@ async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
717772
718773 # if we've decided to send a transaction anyway, and we have room, we
719774 # may as well send any pending RRs
720- if len (pending_edus ) < MAX_EDUS_PER_TRANSACTION :
721- pending_edus .extend (self .queue ._get_rr_edus (force_flush = True ))
775+ if edu_limit :
776+ pending_edus .extend (
777+ self .queue ._get_receipt_edus (force_flush = True , limit = edu_limit )
778+ )
722779
723780 if self ._pdus :
724781 self ._last_stream_ordering = self ._pdus [
0 commit comments