@@ -376,48 +376,68 @@ <h1 class="title">Module <code>slack_sdk.socket_mode.aiohttp</code></h1>
376376 return self.build_session_id(self.current_session)
377377
378378 async def connect(self):
379- old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session
380- if self.wss_uri is None:
381- # If the underlying WSS URL does not exist,
382- # acquiring a new active WSS URL from the server-side first
383- self.wss_uri = await self.issue_new_wss_url()
384-
385- self.current_session = await self.aiohttp_client_session.ws_connect(
386- self.wss_uri,
387- autoping=False,
388- heartbeat=self.ping_interval,
389- proxy=self.proxy,
390- ssl=self.web_client.ssl,
391- )
392- session_id: str = await self.session_id()
393- self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
394- self.stale = False
395- self.logger.info(f"A new session ({session_id}) has been established")
396-
397- # The first ping from the new connection
398- if self.logger.level <= logging.DEBUG:
399- self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
400- t = time.time()
401- await self.current_session.ping(f"sdk-ping-pong:{t}")
402-
403- if self.current_session_monitor is not None:
404- self.current_session_monitor.cancel()
379+ # This loop is used to ensure when a new session is created,
380+ # a new monitor and a new message receiver are also created.
381+ # If a new session is created but we failed to create the new
382+ # monitor or the new message, we should try it.
383+ while True:
384+ try:
385+ old_session: Optional[ClientWebSocketResponse] = (
386+ None if self.current_session is None else self.current_session
387+ )
405388
406- self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
407- if self.logger.level <= logging.DEBUG:
408- self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
389+ # If the old session is broken (e.g. reset by peer), it might fail to close it.
390+ # We don't want to retry when this kind of cases happen.
391+ try:
392+ # We should close old session before create a new one. Because when disconnect
393+ # reason is `too_many_websockets`, we need to close the old one first to
394+ # to decrease the number of connections.
395+ self.auto_reconnect_enabled = False
396+ if old_session is not None:
397+ await old_session.close()
398+ old_session_id = self.build_session_id(old_session)
399+ self.logger.info(f"The old session ({old_session_id}) has been abandoned")
400+ except Exception as e:
401+ self.logger.exception(f"Failed to close the old session : {e}")
402+
403+ if self.wss_uri is None:
404+ # If the underlying WSS URL does not exist,
405+ # acquiring a new active WSS URL from the server-side first
406+ self.wss_uri = await self.issue_new_wss_url()
407+
408+ self.current_session = await self.aiohttp_client_session.ws_connect(
409+ self.wss_uri,
410+ autoping=False,
411+ heartbeat=self.ping_interval,
412+ proxy=self.proxy,
413+ ssl=self.web_client.ssl,
414+ )
415+ session_id: str = await self.session_id()
416+ self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
417+ self.stale = False
418+ self.logger.info(f"A new session ({session_id}) has been established")
409419
410- if self.message_receiver is not None:
411- self.message_receiver.cancel()
420+ # The first ping from the new connection
421+ if self.logger.level <= logging.DEBUG:
422+ self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
423+ t = time.time()
424+ await self.current_session.ping(f"sdk-ping-pong:{t}")
412425
413- self.message_receiver = asyncio.ensure_future(self.receive_messages())
414- if self.logger.level <= logging.DEBUG:
415- self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
426+ if self.current_session_monitor is not None:
427+ self.current_session_monitor.cancel()
428+ self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
429+ if self.logger.level <= logging.DEBUG:
430+ self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
416431
417- if old_session is not None:
418- await old_session.close()
419- old_session_id = self.build_session_id(old_session)
420- self.logger.info(f"The old session ({old_session_id}) has been abandoned")
432+ if self.message_receiver is not None:
433+ self.message_receiver.cancel()
434+ self.message_receiver = asyncio.ensure_future(self.receive_messages())
435+ if self.logger.level <= logging.DEBUG:
436+ self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
437+ break
438+ except Exception as e:
439+ self.logger.exception(f"Failed to connect (error: {e}); Retrying...")
440+ await asyncio.sleep(self.ping_interval)
421441
422442 async def disconnect(self):
423443 if self.current_session is not None:
@@ -832,48 +852,68 @@ <h2 id="args">Args</h2>
832852 return self.build_session_id(self.current_session)
833853
834854 async def connect(self):
835- old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session
836- if self.wss_uri is None:
837- # If the underlying WSS URL does not exist,
838- # acquiring a new active WSS URL from the server-side first
839- self.wss_uri = await self.issue_new_wss_url()
840-
841- self.current_session = await self.aiohttp_client_session.ws_connect(
842- self.wss_uri,
843- autoping=False,
844- heartbeat=self.ping_interval,
845- proxy=self.proxy,
846- ssl=self.web_client.ssl,
847- )
848- session_id: str = await self.session_id()
849- self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
850- self.stale = False
851- self.logger.info(f"A new session ({session_id}) has been established")
852-
853- # The first ping from the new connection
854- if self.logger.level <= logging.DEBUG:
855- self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
856- t = time.time()
857- await self.current_session.ping(f"sdk-ping-pong:{t}")
858-
859- if self.current_session_monitor is not None:
860- self.current_session_monitor.cancel()
855+ # This loop is used to ensure when a new session is created,
856+ # a new monitor and a new message receiver are also created.
857+ # If a new session is created but we failed to create the new
858+ # monitor or the new message, we should try it.
859+ while True:
860+ try:
861+ old_session: Optional[ClientWebSocketResponse] = (
862+ None if self.current_session is None else self.current_session
863+ )
861864
862- self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
863- if self.logger.level <= logging.DEBUG:
864- self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
865+ # If the old session is broken (e.g. reset by peer), it might fail to close it.
866+ # We don't want to retry when this kind of cases happen.
867+ try:
868+ # We should close old session before create a new one. Because when disconnect
869+ # reason is `too_many_websockets`, we need to close the old one first to
870+ # to decrease the number of connections.
871+ self.auto_reconnect_enabled = False
872+ if old_session is not None:
873+ await old_session.close()
874+ old_session_id = self.build_session_id(old_session)
875+ self.logger.info(f"The old session ({old_session_id}) has been abandoned")
876+ except Exception as e:
877+ self.logger.exception(f"Failed to close the old session : {e}")
878+
879+ if self.wss_uri is None:
880+ # If the underlying WSS URL does not exist,
881+ # acquiring a new active WSS URL from the server-side first
882+ self.wss_uri = await self.issue_new_wss_url()
883+
884+ self.current_session = await self.aiohttp_client_session.ws_connect(
885+ self.wss_uri,
886+ autoping=False,
887+ heartbeat=self.ping_interval,
888+ proxy=self.proxy,
889+ ssl=self.web_client.ssl,
890+ )
891+ session_id: str = await self.session_id()
892+ self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
893+ self.stale = False
894+ self.logger.info(f"A new session ({session_id}) has been established")
865895
866- if self.message_receiver is not None:
867- self.message_receiver.cancel()
896+ # The first ping from the new connection
897+ if self.logger.level <= logging.DEBUG:
898+ self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
899+ t = time.time()
900+ await self.current_session.ping(f"sdk-ping-pong:{t}")
868901
869- self.message_receiver = asyncio.ensure_future(self.receive_messages())
870- if self.logger.level <= logging.DEBUG:
871- self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
902+ if self.current_session_monitor is not None:
903+ self.current_session_monitor.cancel()
904+ self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
905+ if self.logger.level <= logging.DEBUG:
906+ self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
872907
873- if old_session is not None:
874- await old_session.close()
875- old_session_id = self.build_session_id(old_session)
876- self.logger.info(f"The old session ({old_session_id}) has been abandoned")
908+ if self.message_receiver is not None:
909+ self.message_receiver.cancel()
910+ self.message_receiver = asyncio.ensure_future(self.receive_messages())
911+ if self.logger.level <= logging.DEBUG:
912+ self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
913+ break
914+ except Exception as e:
915+ self.logger.exception(f"Failed to connect (error: {e}); Retrying...")
916+ await asyncio.sleep(self.ping_interval)
877917
878918 async def disconnect(self):
879919 if self.current_session is not None:
@@ -1082,48 +1122,68 @@ <h3>Methods</h3>
10821122< span > Expand source code</ span >
10831123</ summary >
10841124< pre > < code class ="python "> async def connect(self):
1085- old_session: Optional[ClientWebSocketResponse] = None if self.current_session is None else self.current_session
1086- if self.wss_uri is None:
1087- # If the underlying WSS URL does not exist,
1088- # acquiring a new active WSS URL from the server-side first
1089- self.wss_uri = await self.issue_new_wss_url()
1090-
1091- self.current_session = await self.aiohttp_client_session.ws_connect(
1092- self.wss_uri,
1093- autoping=False,
1094- heartbeat=self.ping_interval,
1095- proxy=self.proxy,
1096- ssl=self.web_client.ssl,
1097- )
1098- session_id: str = await self.session_id()
1099- self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
1100- self.stale = False
1101- self.logger.info(f"A new session ({session_id}) has been established")
1102-
1103- # The first ping from the new connection
1104- if self.logger.level <= logging.DEBUG:
1105- self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
1106- t = time.time()
1107- await self.current_session.ping(f"sdk-ping-pong:{t}")
1108-
1109- if self.current_session_monitor is not None:
1110- self.current_session_monitor.cancel()
1125+ # This loop is used to ensure when a new session is created,
1126+ # a new monitor and a new message receiver are also created.
1127+ # If a new session is created but we failed to create the new
1128+ # monitor or the new message, we should try it.
1129+ while True:
1130+ try:
1131+ old_session: Optional[ClientWebSocketResponse] = (
1132+ None if self.current_session is None else self.current_session
1133+ )
11111134
1112- self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
1113- if self.logger.level <= logging.DEBUG:
1114- self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
1135+ # If the old session is broken (e.g. reset by peer), it might fail to close it.
1136+ # We don't want to retry when this kind of cases happen.
1137+ try:
1138+ # We should close old session before create a new one. Because when disconnect
1139+ # reason is `too_many_websockets`, we need to close the old one first to
1140+ # to decrease the number of connections.
1141+ self.auto_reconnect_enabled = False
1142+ if old_session is not None:
1143+ await old_session.close()
1144+ old_session_id = self.build_session_id(old_session)
1145+ self.logger.info(f"The old session ({old_session_id}) has been abandoned")
1146+ except Exception as e:
1147+ self.logger.exception(f"Failed to close the old session : {e}")
1148+
1149+ if self.wss_uri is None:
1150+ # If the underlying WSS URL does not exist,
1151+ # acquiring a new active WSS URL from the server-side first
1152+ self.wss_uri = await self.issue_new_wss_url()
1153+
1154+ self.current_session = await self.aiohttp_client_session.ws_connect(
1155+ self.wss_uri,
1156+ autoping=False,
1157+ heartbeat=self.ping_interval,
1158+ proxy=self.proxy,
1159+ ssl=self.web_client.ssl,
1160+ )
1161+ session_id: str = await self.session_id()
1162+ self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
1163+ self.stale = False
1164+ self.logger.info(f"A new session ({session_id}) has been established")
11151165
1116- if self.message_receiver is not None:
1117- self.message_receiver.cancel()
1166+ # The first ping from the new connection
1167+ if self.logger.level <= logging.DEBUG:
1168+ self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
1169+ t = time.time()
1170+ await self.current_session.ping(f"sdk-ping-pong:{t}")
11181171
1119- self.message_receiver = asyncio.ensure_future(self.receive_messages())
1120- if self.logger.level <= logging.DEBUG:
1121- self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
1172+ if self.current_session_monitor is not None:
1173+ self.current_session_monitor.cancel()
1174+ self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
1175+ if self.logger.level <= logging.DEBUG:
1176+ self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
11221177
1123- if old_session is not None:
1124- await old_session.close()
1125- old_session_id = self.build_session_id(old_session)
1126- self.logger.info(f"The old session ({old_session_id}) has been abandoned")</ code > </ pre >
1178+ if self.message_receiver is not None:
1179+ self.message_receiver.cancel()
1180+ self.message_receiver = asyncio.ensure_future(self.receive_messages())
1181+ if self.logger.level <= logging.DEBUG:
1182+ self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
1183+ break
1184+ except Exception as e:
1185+ self.logger.exception(f"Failed to connect (error: {e}); Retrying...")
1186+ await asyncio.sleep(self.ping_interval)</ code > </ pre >
11271187</ details >
11281188</ dd >
11291189< dt id ="slack_sdk.socket_mode.aiohttp.SocketModeClient.disconnect "> < code class ="name flex ">
0 commit comments