@@ -104,7 +104,14 @@ class LinkedInMessaging:
104104 session : aiohttp .ClientSession
105105 two_factor_payload : dict [str , Any ]
106106 event_listeners : defaultdict [
107- str , list [Callable [[RealTimeEventStreamEvent ], Awaitable [None ]]]
107+ str ,
108+ list [
109+ Union [
110+ Callable [[RealTimeEventStreamEvent ], Awaitable [None ]],
111+ Callable [[asyncio .exceptions .TimeoutError ], Awaitable [None ]],
112+ Callable [[Exception ], Awaitable [None ]],
113+ ]
114+ ],
108115 ]
109116
110117 def __init__ (self ):
@@ -483,8 +490,20 @@ async def download_profile_picture(self, picture: Picture) -> bytes:
483490 def add_event_listener (
484491 self ,
485492 payload_key : str ,
486- fn : Callable [[RealTimeEventStreamEvent ], Awaitable [None ]],
493+ fn : Union [
494+ Callable [[RealTimeEventStreamEvent ], Awaitable [None ]],
495+ Callable [[asyncio .exceptions .TimeoutError ], Awaitable [None ]],
496+ Callable [[Exception ], Awaitable [None ]],
497+ ],
487498 ):
499+ """
500+ There are three special event types:
501+
502+ * ALL_EVENTS - an event fired on every event, and which contains the entirety of
503+ the raw event payload
504+ * TIMEOUT - an event fired if the event listener connection times out
505+ * STREAM_ERROR - an event fired if the event stream errors for any other reason
506+ """
488507 self .event_listeners [payload_key ].append (fn )
489508
490509 async def _fire (self , payload_key : str , event : Any ):
@@ -510,6 +529,11 @@ async def _listen_to_event_stream(self):
510529 continue
511530 data = json .loads (line .decode ("utf-8" )[6 :])
512531
532+ # Special handling for ALL_EVENTS handler.
533+ if all_events_handlers := self .event_listeners .get ("ALL_EVENTS" ):
534+ for handler in all_events_handlers :
535+ await handler (data )
536+
513537 event_payload = data .get (
514538 "com.linkedin.realtimefrontend.DecoratedEvent" , {}
515539 ).get ("payload" , {})
@@ -526,10 +550,20 @@ async def start_listener(self):
526550 while True :
527551 try :
528552 await self ._listen_to_event_stream ()
529- except asyncio .exceptions .TimeoutError :
553+ except asyncio .exceptions .TimeoutError as te :
554+ # Special handling for TIMEOUT handler.
555+ if all_events_handlers := self .event_listeners .get ("TIMEOUT" ):
556+ for handler in all_events_handlers :
557+ await handler (te )
558+ await asyncio .sleep (1 )
530559 continue
531560 except Exception as e :
532561 logging .exception (f"Error listening to event stream: { e } " )
562+ # Special handling for STREAM_ERROR handler.
563+ if all_events_handlers := self .event_listeners .get ("STREAM_ERROR" ):
564+ for handler in all_events_handlers :
565+ await handler (e )
566+ await asyncio .sleep (1 )
533567 continue
534568
535569 # endregion
0 commit comments