@@ -116,7 +116,6 @@ class CustomPuppetMixin(ABC):
116116 intent: The primary IntentAPI.
117117 """
118118
119- sync_with_custom_puppets : bool = True
120119 allow_discover_url : bool = False
121120 homeserver_url_map : dict [str , URL ] = {}
122121 only_handle_own_synced_events : bool = True
@@ -135,12 +134,9 @@ class CustomPuppetMixin(ABC):
135134 custom_mxid : UserID | None
136135 access_token : str | None
137136 base_url : URL | None
138- next_batch : SyncToken | None
139137
140138 intent : IntentAPI
141139
142- _sync_task : asyncio .Task | None = None
143-
144140 @abstractmethod
145141 async def save (self ) -> None :
146142 """Save the information of this puppet. Called from :meth:`switch_mxid`"""
@@ -221,7 +217,6 @@ async def switch_mxid(
221217 the appservice-owned ID.
222218 mxid: The expected Matrix user ID of the custom account, or ``None`` when
223219 ``access_token`` is None.
224- start_sync_task: Whether or not syncing should be started after logging in.
225220 """
226221 if access_token == "auto" :
227222 access_token = await self ._login_with_shared_secret (mxid )
@@ -251,7 +246,7 @@ async def switch_mxid(
251246 self .base_url = base_url
252247 self .intent = self ._fresh_intent ()
253248
254- await self .start (start_sync_task = start_sync_task , check_e2ee_keys = True )
249+ await self .start (check_e2ee_keys = True )
255250
256251 try :
257252 del self .by_custom_mxid [prev_mxid ]
@@ -276,7 +271,6 @@ async def _invalidate_double_puppet(self) -> None:
276271 del self .by_custom_mxid [self .custom_mxid ]
277272 self .custom_mxid = None
278273 self .access_token = None
279- self .next_batch = None
280274 await self .save ()
281275 self .intent = self ._fresh_intent ()
282276
@@ -296,7 +290,7 @@ async def start(
296290 except MatrixInvalidToken as e :
297291 if retry_auto_login and self .custom_mxid and self .can_auto_login (self .custom_mxid ):
298292 self .log .debug (f"Got { e .errcode } while trying to initialize custom mxid" )
299- await self .switch_mxid ("auto" , self .custom_mxid , start_sync_task = start_sync_task )
293+ await self .switch_mxid ("auto" , self .custom_mxid )
300294 return
301295 self .log .warning (f"Got { e .errcode } while trying to initialize custom mxid" )
302296 whoami = None
@@ -319,19 +313,14 @@ async def start(
319313 if device_keys and len (device_keys .keys ) > 0 :
320314 await self ._invalidate_double_puppet ()
321315 raise EncryptionKeysFound ()
322- if self .sync_with_custom_puppets and start_sync_task :
323- if self ._sync_task :
324- self ._sync_task .cancel ()
325- self .log .info (f"Initialized custom mxid: { whoami .user_id } . Starting sync task" )
326- self ._sync_task = asyncio .create_task (self ._try_sync ())
327- else :
328- self .log .info (f"Initialized custom mxid: { whoami .user_id } . Not starting sync task" )
316+ self .log .info (f"Initialized custom mxid: { whoami .user_id } " )
329317
330318 def stop (self ) -> None :
331- """Cancel the sync task."""
332- if self ._sync_task :
333- self ._sync_task .cancel ()
334- self ._sync_task = None
319+ """
320+ No-op
321+
322+ .. deprecated:: 0.20.1
323+ """
335324
336325 async def default_puppet_should_leave_room (self , room_id : RoomID ) -> bool :
337326 """
@@ -354,112 +343,3 @@ async def _leave_rooms_with_default_user(self) -> None:
354343 await self .intent .ensure_joined (room_id )
355344 except (IntentError , MatrixRequestError ):
356345 pass
357-
358- def _create_sync_filter (self ) -> Awaitable [FilterID ]:
359- all_events = EventType .find ("*" )
360- return self .intent .create_filter (
361- Filter (
362- account_data = EventFilter (types = [all_events ]),
363- presence = EventFilter (
364- types = [EventType .PRESENCE ],
365- senders = [self .custom_mxid ] if self .only_handle_own_synced_events else None ,
366- ),
367- room = RoomFilter (
368- include_leave = False ,
369- state = StateFilter (not_types = [all_events ]),
370- timeline = RoomEventFilter (not_types = [all_events ]),
371- account_data = RoomEventFilter (not_types = [all_events ]),
372- ephemeral = RoomEventFilter (
373- types = [
374- EventType .TYPING ,
375- EventType .RECEIPT ,
376- ]
377- ),
378- ),
379- )
380- )
381-
382- def _filter_events (self , room_id : RoomID , events : list [dict ]) -> Iterator [Event ]:
383- for event in events :
384- event ["room_id" ] = room_id
385- if self .only_handle_own_synced_events :
386- # We only want events about the custom puppet user, but we can't use
387- # filters for typing and read receipt events.
388- evt_type = EventType .find (event .get ("type" , None ))
389- event .setdefault ("content" , {})
390- if evt_type == EventType .TYPING :
391- is_typing = self .custom_mxid in event ["content" ].get ("user_ids" , [])
392- event ["content" ]["user_ids" ] = [self .custom_mxid ] if is_typing else []
393- elif evt_type == EventType .RECEIPT :
394- try :
395- event_id , receipt = event ["content" ].popitem ()
396- data = receipt ["m.read" ][self .custom_mxid ]
397- event ["content" ] = {event_id : {"m.read" : {self .custom_mxid : data }}}
398- except KeyError :
399- continue
400- yield event
401-
402- def _handle_sync (self , sync_resp : dict ) -> None :
403- # Get events from rooms -> join -> [room_id] -> ephemeral -> events (array)
404- ephemeral_events = (
405- event
406- for room_id , data in sync_resp .get ("rooms" , {}).get ("join" , {}).items ()
407- for event in self ._filter_events (room_id , data .get ("ephemeral" , {}).get ("events" , []))
408- )
409-
410- # Get events from presence -> events (array)
411- presence_events = sync_resp .get ("presence" , {}).get ("events" , [])
412-
413- # Deserialize and handle all events
414- for event in chain (ephemeral_events , presence_events ):
415- background_task .create (self .mx .try_handle_sync_event (Event .deserialize (event )))
416-
417- async def _try_sync (self ) -> None :
418- try :
419- await self ._sync ()
420- except asyncio .CancelledError :
421- self .log .info (f"Syncing for { self .custom_mxid } cancelled" )
422- except Exception :
423- self .log .critical (f"Fatal error syncing { self .custom_mxid } " , exc_info = True )
424-
425- async def _sync (self ) -> None :
426- if not self .is_real_user :
427- self .log .warning ("Called sync() for non-custom puppet." )
428- return
429- custom_mxid : UserID = self .custom_mxid
430- access_token_at_start : str = self .access_token
431- errors : int = 0
432- filter_id : FilterID = await self ._create_sync_filter ()
433- self .log .debug (f"Starting syncer for { custom_mxid } with sync filter { filter_id } ." )
434- while access_token_at_start == self .access_token :
435- try :
436- cur_batch = self .next_batch
437- sync_resp = await self .intent .sync (
438- filter_id = filter_id , since = cur_batch , set_presence = PresenceState .OFFLINE
439- )
440- try :
441- self .next_batch = sync_resp .get ("next_batch" , None )
442- except Exception :
443- self .log .warning ("Failed to store next batch" , exc_info = True )
444- errors = 0
445- if cur_batch is not None :
446- self ._handle_sync (sync_resp )
447- except MatrixInvalidToken :
448- # TODO when not using syncing, we should still check this occasionally and relogin
449- self .log .warning (f"Access token for { custom_mxid } got invalidated, restarting..." )
450- await self .start (retry_auto_login = True , start_sync_task = False )
451- if self .is_real_user :
452- self .log .info ("Successfully relogined custom puppet, continuing sync" )
453- filter_id = await self ._create_sync_filter ()
454- access_token_at_start = self .access_token
455- else :
456- self .log .warning ("Something went wrong during relogin" )
457- raise
458- except (MatrixError , ClientConnectionError , asyncio .TimeoutError ) as e :
459- errors += 1
460- wait = min (errors , 11 ) ** 2
461- self .log .warning (
462- f"Syncer for { custom_mxid } errored: { e } . Waiting for { wait } seconds..."
463- )
464- await asyncio .sleep (wait )
465- self .log .debug (f"Syncer for custom puppet { custom_mxid } stopped." )
0 commit comments