@@ -466,6 +466,8 @@ async def do_invite_join(
466466            )
467467
468468            if  ret .partial_state :
469+                 # TODO(faster_joins): roll this back if we don't manage to start the 
470+                 #   background resync (eg process_remote_join fails) 
469471                await  self .store .store_partial_state_room (room_id , ret .servers_in_room )
470472
471473            max_stream_id  =  await  self ._federation_event_handler .process_remote_join (
@@ -478,6 +480,18 @@ async def do_invite_join(
478480                partial_state = ret .partial_state ,
479481            )
480482
483+             if  ret .partial_state :
484+                 # Kick off the process of asynchronously fetching the state for this 
485+                 # room. 
486+                 # 
487+                 # TODO(faster_joins): pick this up again on restart 
488+                 run_as_background_process (
489+                     desc = "sync_partial_state_room" ,
490+                     func = self ._sync_partial_state_room ,
491+                     destination = origin ,
492+                     room_id = room_id ,
493+                 )
494+ 
481495            # We wait here until this instance has seen the events come down 
482496            # replication (if we're using replication) as the below uses caches. 
483497            await  self ._replication .wait_for_stream_position (
@@ -1370,3 +1384,64 @@ async def get_room_complexity(
13701384        # We fell off the bottom, couldn't get the complexity from anyone. Oh 
13711385        # well. 
13721386        return  None 
1387+ 
1388+     async  def  _sync_partial_state_room (
1389+         self ,
1390+         destination : str ,
1391+         room_id : str ,
1392+     ) ->  None :
1393+         """Background process to resync the state of a partial-state room 
1394+ 
1395+         Args: 
1396+             destination: homeserver to pull the state from 
1397+             room_id: room to be resynced 
1398+         """ 
1399+ 
1400+         # TODO(faster_joins): do we need to lock to avoid races? What happens if other 
1401+         #   worker processes kick off a resync in parallel? Perhaps we should just elect 
1402+         #   a single worker to do the resync. 
1403+         # 
1404+         # TODO(faster_joins): what happens if we leave the room during a resync? if we 
1405+         #   really leave, that might mean we have difficulty getting the room state over 
1406+         #   federation. 
1407+         # 
1408+         # TODO(faster_joins): try other destinations if the one we have fails 
1409+ 
1410+         logger .info ("Syncing state for room %s via %s" , room_id , destination )
1411+ 
1412+         # we work through the queue in order of increasing stream ordering. 
1413+         while  True :
1414+             batch  =  await  self .store .get_partial_state_events_batch (room_id )
1415+             if  not  batch :
1416+                 # all the events are updated, so we can update current state and 
1417+                 # clear the lazy-loading flag. 
1418+                 logger .info ("Updating current state for %s" , room_id )
1419+                 assert  (
1420+                     self .storage .persistence  is  not None 
1421+                 ), "TODO(faster_joins): support for workers" 
1422+                 await  self .storage .persistence .update_current_state (room_id )
1423+ 
1424+                 logger .info ("Clearing partial-state flag for %s" , room_id )
1425+                 success  =  await  self .store .clear_partial_state_room (room_id )
1426+                 if  success :
1427+                     logger .info ("State resync complete for %s" , room_id )
1428+ 
1429+                     # TODO(faster_joins) update room stats and user directory? 
1430+                     return 
1431+ 
1432+                 # we raced against more events arriving with partial state. Go round 
1433+                 # the loop again. We've already logged a warning, so no need for more. 
1434+                 # TODO(faster_joins): there is still a race here, whereby incoming events which raced 
1435+                 #   with us will fail to be persisted after the call to `clear_partial_state_room` due to 
1436+                 #   having partial state. 
1437+                 continue 
1438+ 
1439+             events  =  await  self .store .get_events_as_list (
1440+                 batch ,
1441+                 redact_behaviour = EventRedactBehaviour .AS_IS ,
1442+                 allow_rejected = True ,
1443+             )
1444+             for  event  in  events :
1445+                 await  self ._federation_event_handler .update_state_for_partial_state_event (
1446+                     destination , event 
1447+                 )
0 commit comments