@@ -148,6 +148,41 @@ def __init__(self, hs: "HomeServer"):
148148
149149 self ._room_prejoin_state_types = hs .config .api .room_prejoin_state
150150
151+ # Whether we have started handling old events in the staging area.
152+ self ._started_handling_of_staged_events = False
153+
154+ @wrap_as_background_process ("_handle_old_staged_events" )
155+ async def _handle_old_staged_events (self ) -> None :
156+ """Handle old staged events by fetching all rooms that have staged
157+ events and start the processing of each of those rooms.
158+ """
159+
160+ # Get all the rooms IDs with staged events.
161+ room_ids = await self .store .get_all_rooms_with_staged_incoming_events ()
162+
163+ # We then shuffle them so that if there are multiple instances doing
164+ # this work they're less likely to collide.
165+ random .shuffle (room_ids )
166+
167+ for room_id in room_ids :
168+ room_version = await self .store .get_room_version (room_id )
169+
170+ # Try and acquire the processing lock for the room, if we get it start a
171+ # background process for handling the events in the room.
172+ lock = await self .store .try_acquire_lock (
173+ _INBOUND_EVENT_HANDLING_LOCK_NAME , room_id
174+ )
175+ if lock :
176+ logger .info ("Handling old staged inbound events in %s" , room_id )
177+ self ._process_incoming_pdus_in_room_inner (
178+ room_id ,
179+ room_version ,
180+ lock ,
181+ )
182+
183+ # We pause a bit so that we don't start handling all rooms at once.
184+ await self ._clock .sleep (random .uniform (0 , 0.1 ))
185+
151186 async def on_backfill_request (
152187 self , origin : str , room_id : str , versions : List [str ], limit : int
153188 ) -> Tuple [int , Dict [str , Any ]]:
@@ -166,6 +201,12 @@ async def on_backfill_request(
166201 async def on_incoming_transaction (
167202 self , origin : str , transaction_data : JsonDict
168203 ) -> Tuple [int , Dict [str , Any ]]:
204+ # If we receive a transaction we should make sure that kick off handling
205+ # any old events in the staging area.
206+ if not self ._started_handling_of_staged_events :
207+ self ._started_handling_of_staged_events = True
208+ self ._handle_old_staged_events ()
209+
169210 # keep this as early as possible to make the calculated origin ts as
170211 # accurate as possible.
171212 request_time = self ._clock .time_msec ()
@@ -882,32 +923,38 @@ async def _process_incoming_pdus_in_room_inner(
882923 room_id : str ,
883924 room_version : RoomVersion ,
884925 lock : Lock ,
885- latest_origin : str ,
886- latest_event : EventBase ,
926+ latest_origin : Optional [ str ] = None ,
927+ latest_event : Optional [ EventBase ] = None ,
887928 ) -> None :
888929 """Process events in the staging area for the given room.
889930
890931 The latest_origin and latest_event args are the latest origin and event
891- received.
932+ received (or None to simply pull the next event from the database) .
892933 """
893934
894935 # The common path is for the event we just received be the only event in
895936 # the room, so instead of pulling the event out of the DB and parsing
896937 # the event we just pull out the next event ID and check if that matches.
897- next_origin , next_event_id = await self .store .get_next_staged_event_id_for_room (
898- room_id
899- )
900- if next_origin == latest_origin and next_event_id == latest_event .event_id :
901- origin = latest_origin
902- event = latest_event
903- else :
938+ if latest_event is not None and latest_origin is not None :
939+ (
940+ next_origin ,
941+ next_event_id ,
942+ ) = await self .store .get_next_staged_event_id_for_room (room_id )
943+ if next_origin != latest_origin or next_event_id != latest_event .event_id :
944+ latest_origin = None
945+ latest_event = None
946+
947+ if latest_origin is None or latest_event is None :
904948 next = await self .store .get_next_staged_event_for_room (
905949 room_id , room_version
906950 )
907951 if not next :
908952 return
909953
910954 origin , event = next
955+ else :
956+ origin = latest_origin
957+ event = latest_event
911958
912959 # We loop round until there are no more events in the room in the
913960 # staging area, or we fail to get the lock (which means another process
0 commit comments