Skip to content

Commit dafdef0

Browse files
authored
Fix #1110 Socket Mode disconnection issue with the aiohttp-based client (#1112)
* Fix #1110 Socket Mode disconnection issue with the aiohttp-based client * Fix lint errors * Make info logs more consistent * Revert log change and improve details
1 parent 9f3240d commit dafdef0

File tree

8 files changed

+172
-74
lines changed

8 files changed

+172
-74
lines changed

integration_tests/samples/socket_mode/aiohttp_example.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ async def main():
1717
web_client=AsyncWebClient(
1818
token=os.environ.get("SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN")
1919
),
20+
trace_enabled=True,
2021
)
2122

2223
async def process(client: SocketModeClient, req: SocketModeRequest):

slack_sdk/socket_mode/aiohttp/__init__.py

Lines changed: 133 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""aiohttp bassd Socket Mode client
1+
"""aiohttp based Socket Mode client
22
33
* https://api.slack.com/apis/connections/socket
44
* https://slack.dev/python-slack-sdk/socket-mode/
@@ -7,6 +7,7 @@
77
"""
88
import asyncio
99
import logging
10+
import time
1011
from asyncio import Future, Lock
1112
from asyncio import Queue
1213
from logging import Logger
@@ -52,12 +53,16 @@ class SocketModeClient(AsyncBaseSocketModeClient):
5253

5354
proxy: Optional[str]
5455
ping_interval: float
56+
trace_enabled: bool
57+
58+
last_ping_pong_time: Optional[float]
5559
current_session: Optional[ClientWebSocketResponse]
5660
current_session_monitor: Optional[Future]
5761

5862
auto_reconnect_enabled: bool
5963
default_auto_reconnect_enabled: bool
6064
closed: bool
65+
stale: bool
6166
connect_operation_lock: Lock
6267

6368
on_message_listeners: List[Callable[[WSMessage], Awaitable[None]]]
@@ -71,7 +76,8 @@ def __init__(
7176
web_client: Optional[AsyncWebClient] = None,
7277
proxy: Optional[str] = None,
7378
auto_reconnect_enabled: bool = True,
74-
ping_interval: float = 10,
79+
ping_interval: float = 5,
80+
trace_enabled: bool = False,
7581
on_message_listeners: Optional[List[Callable[[WSMessage], None]]] = None,
7682
on_error_listeners: Optional[List[Callable[[WSMessage], None]]] = None,
7783
on_close_listeners: Optional[List[Callable[[WSMessage], None]]] = None,
@@ -84,6 +90,7 @@ def __init__(
8490
web_client: Web API client
8591
auto_reconnect_enabled: True if automatic reconnection is enabled (default: True)
8692
ping_interval: interval for ping-pong with Slack servers (seconds)
93+
trace_enabled: True if more verbose logs to see what's happening under the hood
8794
proxy: the HTTP proxy URL
8895
on_message_listeners: listener functions for on_message
8996
on_error_listeners: listener functions for on_error
@@ -93,6 +100,7 @@ def __init__(
93100
self.logger = logger or logging.getLogger(__name__)
94101
self.web_client = web_client or AsyncWebClient()
95102
self.closed = False
103+
self.stale = False
96104
self.connect_operation_lock = Lock()
97105
self.proxy = proxy
98106
if self.proxy is None or len(self.proxy.strip()) == 0:
@@ -103,6 +111,8 @@ def __init__(
103111
self.default_auto_reconnect_enabled = auto_reconnect_enabled
104112
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
105113
self.ping_interval = ping_interval
114+
self.trace_enabled = trace_enabled
115+
self.last_ping_pong_time = None
106116

107117
self.wss_uri = None
108118
self.message_queue = Queue()
@@ -126,70 +136,124 @@ def __init__(
126136
self.message_processor = asyncio.ensure_future(self.process_messages())
127137

128138
async def monitor_current_session(self) -> None:
129-
while not self.closed:
130-
await asyncio.sleep(self.ping_interval)
131-
try:
132-
if self.auto_reconnect_enabled and (
133-
self.current_session is None or self.current_session.closed
134-
):
135-
self.logger.info(
136-
"The session seems to be already closed. Going to reconnect..."
139+
try:
140+
while not self.closed:
141+
try:
142+
await asyncio.sleep(self.ping_interval)
143+
if self.current_session is not None:
144+
t = time.time()
145+
if self.last_ping_pong_time is None:
146+
self.last_ping_pong_time = float(t)
147+
await self.current_session.ping(f"ping-pong:{t}")
148+
149+
if self.auto_reconnect_enabled:
150+
should_reconnect = False
151+
if self.current_session is None or self.current_session.closed:
152+
self.logger.info(
153+
"The session seems to be already closed. Reconnecting..."
154+
)
155+
should_reconnect = True
156+
157+
if self.last_ping_pong_time is not None:
158+
disconnected_seconds = int(
159+
time.time() - self.last_ping_pong_time
160+
)
161+
if disconnected_seconds >= (self.ping_interval * 4):
162+
self.logger.info(
163+
"The connection seems to be stale. Reconnecting..."
164+
f" reason: disconnected for {disconnected_seconds}+ seconds)"
165+
)
166+
self.stale = True
167+
self.last_ping_pong_time = None
168+
should_reconnect = True
169+
170+
if should_reconnect is True or not await self.is_connected():
171+
await self.connect_to_new_endpoint()
172+
173+
except Exception as e:
174+
self.logger.error(
175+
"Failed to check the current session or reconnect to the server "
176+
f"(error: {type(e).__name__}, message: {e})"
137177
)
138-
await self.connect_to_new_endpoint()
139-
except Exception as e:
140-
self.logger.error(
141-
"Failed to check the current session or reconnect to the server "
142-
f"(error: {type(e).__name__}, message: {e})"
178+
except asyncio.CancelledError:
179+
if self.trace_enabled:
180+
self.logger.debug(
181+
"The running monitor_current_session task is now cancelled"
143182
)
183+
raise
144184

145185
async def receive_messages(self) -> None:
146-
consecutive_error_count = 0
147-
while not self.closed:
148-
try:
149-
message: WSMessage = await self.current_session.receive()
150-
if self.logger.level <= logging.DEBUG:
151-
type = WSMsgType(message.type)
152-
message_type = type.name if type is not None else message.type
153-
message_data = message.data
154-
if isinstance(message_data, bytes):
155-
message_data = message_data.decode("utf-8")
156-
self.logger.debug(
157-
f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})"
158-
)
159-
if message is not None:
160-
if message.type == WSMsgType.TEXT:
186+
try:
187+
consecutive_error_count = 0
188+
while not self.closed:
189+
try:
190+
message: WSMessage = await self.current_session.receive()
191+
if self.trace_enabled and self.logger.level <= logging.DEBUG:
192+
type = WSMsgType(message.type)
193+
message_type = type.name if type is not None else message.type
161194
message_data = message.data
162-
await self.enqueue_message(message_data)
163-
for listener in self.on_message_listeners:
164-
await listener(message)
165-
elif message.type == WSMsgType.CLOSE:
166-
if self.auto_reconnect_enabled:
167-
self.logger.info(
168-
"Received CLOSE event. Going to reconnect..."
195+
if isinstance(message_data, bytes):
196+
message_data = message_data.decode("utf-8")
197+
if len(message_data) > 0:
198+
# To skip the empty message that Slack server-side often sends
199+
self.logger.debug(
200+
f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})"
169201
)
170-
await self.connect_to_new_endpoint()
171-
for listener in self.on_close_listeners:
172-
await listener(message)
173-
elif message.type == WSMsgType.ERROR:
174-
for listener in self.on_error_listeners:
175-
await listener(message)
176-
elif message.type == WSMsgType.CLOSED:
202+
if message is not None:
203+
if message.type == WSMsgType.TEXT:
204+
message_data = message.data
205+
await self.enqueue_message(message_data)
206+
for listener in self.on_message_listeners:
207+
await listener(message)
208+
elif message.type == WSMsgType.CLOSE:
209+
if self.auto_reconnect_enabled:
210+
self.logger.info(
211+
"Received CLOSE event. Reconnecting..."
212+
)
213+
await self.connect_to_new_endpoint()
214+
for listener in self.on_close_listeners:
215+
await listener(message)
216+
elif message.type == WSMsgType.ERROR:
217+
for listener in self.on_error_listeners:
218+
await listener(message)
219+
elif message.type == WSMsgType.CLOSED:
220+
await asyncio.sleep(self.ping_interval)
221+
continue
222+
elif message.type == WSMsgType.PING:
223+
await self.current_session.pong(message.data)
224+
continue
225+
elif message.type == WSMsgType.PONG:
226+
if message.data is not None:
227+
str_message_data = message.data.decode("utf-8")
228+
elements = str_message_data.split(":")
229+
if len(elements) == 2:
230+
try:
231+
self.last_ping_pong_time = float(elements[1])
232+
except Exception as e:
233+
self.logger.warning(
234+
f"Failed to parse the last_ping_pong_time value from {str_message_data}"
235+
f" - error : {e}"
236+
)
237+
continue
238+
consecutive_error_count = 0
239+
except Exception as e:
240+
consecutive_error_count += 1
241+
self.logger.error(
242+
f"Failed to receive or enqueue a message: {type(e).__name__}, {e}"
243+
)
244+
if isinstance(e, ClientConnectionError):
177245
await asyncio.sleep(self.ping_interval)
178-
continue
179-
consecutive_error_count = 0
180-
except Exception as e:
181-
consecutive_error_count += 1
182-
self.logger.error(
183-
f"Failed to receive or enqueue a message: {type(e).__name__}, {e}"
184-
)
185-
if isinstance(e, ClientConnectionError):
186-
await asyncio.sleep(self.ping_interval)
187-
else:
188-
await asyncio.sleep(consecutive_error_count)
246+
else:
247+
await asyncio.sleep(consecutive_error_count)
248+
except asyncio.CancelledError:
249+
if self.trace_enabled:
250+
self.logger.debug("The running receive_messages task is now cancelled")
251+
raise
189252

190253
async def is_connected(self) -> bool:
191254
return (
192255
not self.closed
256+
and not self.stale
193257
and self.current_session is not None
194258
and not self.current_session.closed
195259
)
@@ -200,19 +264,25 @@ async def connect(self):
200264
self.wss_uri = await self.issue_new_wss_url()
201265
self.current_session = await self.aiohttp_client_session.ws_connect(
202266
self.wss_uri,
267+
autoping=False,
203268
heartbeat=self.ping_interval,
204269
proxy=self.proxy,
205270
)
206271
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
272+
self.stale = False
207273
self.logger.info("A new session has been established")
208274

209-
if self.current_session_monitor is None:
210-
self.current_session_monitor = asyncio.ensure_future(
211-
self.monitor_current_session()
212-
)
275+
if self.current_session_monitor is not None:
276+
self.current_session_monitor.cancel()
277+
278+
self.current_session_monitor = asyncio.ensure_future(
279+
self.monitor_current_session()
280+
)
281+
282+
if self.message_receiver is not None:
283+
self.message_receiver.cancel()
213284

214-
if self.message_receiver is None:
215-
self.message_receiver = asyncio.ensure_future(self.receive_messages())
285+
self.message_receiver = asyncio.ensure_future(self.receive_messages())
216286

217287
if old_session is not None:
218288
await old_session.close()
@@ -232,7 +302,8 @@ async def close(self):
232302
self.closed = True
233303
self.auto_reconnect_enabled = False
234304
await self.disconnect()
235-
self.message_processor.cancel()
305+
if self.message_processor is not None:
306+
self.message_processor.cancel()
236307
if self.current_session_monitor is not None:
237308
self.current_session_monitor.cancel()
238309
if self.message_receiver is not None:

slack_sdk/socket_mode/async_client.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class AsyncBaseSocketModeClient:
2222
app_token: str
2323
wss_uri: str
2424
auto_reconnect_enabled: bool
25+
trace_enabled: bool
2526
closed: bool
2627
connect_operation_lock: Lock
2728

@@ -72,12 +73,20 @@ async def disconnect(self):
7273
async def connect_to_new_endpoint(self, force: bool = False):
7374
try:
7475
await self.connect_operation_lock.acquire()
76+
if self.trace_enabled:
77+
self.logger.debug(
78+
"For reconnection, the connect_operation_lock was acquired"
79+
)
7580
if force or not await self.is_connected():
7681
self.wss_uri = await self.issue_new_wss_url()
7782
await self.connect()
7883
finally:
7984
if self.connect_operation_lock.locked() is True:
8085
self.connect_operation_lock.release()
86+
if self.trace_enabled:
87+
self.logger.debug(
88+
"The connect_operation_lock for reconnection was released"
89+
)
8190

8291
async def close(self):
8392
self.closed = True
@@ -103,11 +112,16 @@ async def enqueue_message(self, message: str):
103112
)
104113

105114
async def process_messages(self):
106-
while not self.closed:
107-
try:
108-
await self.process_message()
109-
except Exception as e:
110-
self.logger.exception(f"Failed to process a message: {e}")
115+
try:
116+
while not self.closed:
117+
try:
118+
await self.process_message()
119+
except Exception as e:
120+
self.logger.exception(f"Failed to process a message: {e}")
121+
except asyncio.CancelledError:
122+
if self.trace_enabled:
123+
self.logger.debug("The running process_messages task is now cancelled")
124+
raise
111125

112126
async def process_message(self):
113127
raw_message = await self.message_queue.get()

slack_sdk/socket_mode/builtin/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def _on_close(self, code: int, reason: Optional[str] = None):
238238
self.logger.debug(f"on_close invoked (session id: {self.session_id()})")
239239
if self.auto_reconnect_enabled:
240240
self.logger.info(
241-
"Received CLOSE event. Going to reconnect... "
241+
"Received CLOSE event. Reconnecting... "
242242
f"(session id: {self.session_id()})"
243243
)
244244
self.connect_to_new_endpoint()
@@ -274,7 +274,7 @@ def _monitor_current_session(self):
274274
self.current_session is None or not self.current_session.is_active()
275275
):
276276
self.logger.info(
277-
"The session seems to be already closed. Going to reconnect... "
277+
"The session seems to be already closed. Reconnecting... "
278278
f"(session id: {self.session_id()})"
279279
)
280280
self.connect_to_new_endpoint()

slack_sdk/socket_mode/websocket_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def on_close(
177177
f"on_close invoked: (code: {close_status_code}, message: {close_msg})"
178178
)
179179
if self.auto_reconnect_enabled:
180-
self.logger.info("Received CLOSE event. Going to reconnect...")
180+
self.logger.info("Received CLOSE event. Reconnecting...")
181181
self.connect_to_new_endpoint()
182182
for listener in self.on_close_listeners:
183183
listener(ws)
@@ -246,7 +246,7 @@ def _monitor_current_session(self):
246246
self.current_session is None or self.current_session.sock is None
247247
):
248248
self.logger.info(
249-
"The session seems to be already closed. Going to reconnect..."
249+
"The session seems to be already closed. Reconnecting..."
250250
)
251251
self.connect_to_new_endpoint()
252252
except Exception as e:

0 commit comments

Comments
 (0)