Skip to content

Commit 55786c2

Browse files
authored
Improve the stability of SocketModeClient implementations (#1114)
* Improve the stability of SocketModeClient implementations * Fix typo and apply test improvements
1 parent 12188d3 commit 55786c2

File tree

10 files changed

+290
-16
lines changed

10 files changed

+290
-16
lines changed

slack_sdk/socket_mode/aiohttp/__init__.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ async def monitor_current_session(self) -> None:
144144
t = time.time()
145145
if self.last_ping_pong_time is None:
146146
self.last_ping_pong_time = float(t)
147-
await self.current_session.ping(f"ping-pong:{t}")
147+
await self.current_session.ping(f"sdk-ping-pong:{t}")
148148

149149
if self.auto_reconnect_enabled:
150150
should_reconnect = False
@@ -226,7 +226,10 @@ async def receive_messages(self) -> None:
226226
if message.data is not None:
227227
str_message_data = message.data.decode("utf-8")
228228
elements = str_message_data.split(":")
229-
if len(elements) == 2:
229+
if (
230+
len(elements) == 2
231+
and elements[0] == "sdk-ping-pong"
232+
):
230233
try:
231234
self.last_ping_pong_time = float(elements[1])
232235
except Exception as e:
@@ -296,7 +299,30 @@ async def disconnect(self):
296299
async def send_message(self, message: str):
297300
if self.logger.level <= logging.DEBUG:
298301
self.logger.debug(f"Sending a message: {message}")
299-
await self.current_session.send_str(message)
302+
try:
303+
await self.current_session.send_str(message)
304+
except ConnectionError as e:
305+
# We rarely get this exception while replacing the underlying WebSocket connections.
306+
# We can do one more try here as the self.current_session should be ready now.
307+
if self.logger.level <= logging.DEBUG:
308+
self.logger.debug(
309+
f"Failed to send a message (error: {e}, message: {message})"
310+
" as the underlying connection was replaced. Retrying the same request only one time..."
311+
)
312+
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
313+
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
314+
try:
315+
await self.connect_operation_lock.acquire()
316+
if await self.is_connected():
317+
await self.current_session.send_str(message)
318+
else:
319+
self.logger.warning(
320+
"The current session is no longer active. Failed to send a message"
321+
)
322+
raise e
323+
finally:
324+
if self.connect_operation_lock.locked() is True:
325+
self.connect_operation_lock.release()
300326

301327
async def close(self):
302328
self.closed = True

slack_sdk/socket_mode/builtin/client.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from slack_sdk.web import WebClient
2121
from .connection import Connection, ConnectionState
2222
from ..interval_runner import IntervalRunner
23-
from ...errors import SlackClientConfigurationError
23+
from ...errors import SlackClientConfigurationError, SlackClientNotConnectedError
2424
from ...proxy_env_variable_loader import load_http_proxy_from_env
2525

2626

@@ -206,7 +206,27 @@ def send_message(self, message: str) -> None:
206206
self.logger.debug(
207207
f"Sending a message (session id: {self.session_id()}, message: {message})"
208208
)
209-
self.current_session.send(message)
209+
try:
210+
self.current_session.send(message)
211+
except SlackClientNotConnectedError as e:
212+
# We rarely get this exception while replacing the underlying WebSocket connections.
213+
# We can do one more try here as the self.current_session should be ready now.
214+
if self.logger.level <= logging.DEBUG:
215+
self.logger.debug(
216+
f"Failed to send a message (session id: {self.session_id()}, error: {e}, message: {message})"
217+
" as the underlying connection was replaced. Retrying the same request only one time..."
218+
)
219+
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
220+
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
221+
with self.connect_operation_lock:
222+
if self.is_connected():
223+
self.current_session.send(message)
224+
else:
225+
self.logger.warning(
226+
f"The current session (session id: {self.session_id()}) is no longer active. "
227+
"Failed to send a message"
228+
)
229+
raise e
210230

211231
def close(self):
212232
self.closed = True

slack_sdk/socket_mode/builtin/connection.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,13 @@ def connect(self) -> None:
176176

177177
def disconnect(self) -> None:
178178
if self.sock is not None:
179-
self.sock.close()
180-
self.sock = None
179+
with self.sock_send_lock:
180+
with self.sock_receive_lock:
181+
# Synchronize before closing this instance's socket
182+
self.sock.close()
183+
self.sock = None
184+
# After this, all operations using self.sock will be skipped
185+
181186
self.logger.info(
182187
f"The connection has been closed (session id: {self.session_id})"
183188
)
@@ -198,7 +203,13 @@ def ping(self, payload: Union[str, bytes] = "") -> None:
198203
)
199204
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING)
200205
with self.sock_send_lock:
201-
self.sock.send(data)
206+
if self.sock is not None:
207+
self.sock.send(data)
208+
else:
209+
if self.ping_pong_trace_enabled:
210+
self.logger.debug(
211+
"Skipped sending a ping message as the underlying socket is no longer available."
212+
)
202213

203214
def pong(self, payload: Union[str, bytes] = "") -> None:
204215
if self.trace_enabled and self.ping_pong_trace_enabled:
@@ -210,7 +221,13 @@ def pong(self, payload: Union[str, bytes] = "") -> None:
210221
)
211222
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG)
212223
with self.sock_send_lock:
213-
self.sock.send(data)
224+
if self.sock is not None:
225+
self.sock.send(data)
226+
else:
227+
if self.ping_pong_trace_enabled:
228+
self.logger.debug(
229+
"Skipped sending a pong message as the underlying socket is no longer available."
230+
)
214231

215232
def send(self, payload: str) -> None:
216233
if self.trace_enabled:
@@ -222,7 +239,17 @@ def send(self, payload: str) -> None:
222239
)
223240
data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT)
224241
with self.sock_send_lock:
225-
self.sock.send(data)
242+
try:
243+
self.sock.send(data)
244+
except Exception as e:
245+
# In most cases, we want to retry this operation with a newly established connection.
246+
# Getting this exception means that this connection has been replaced with a new one
247+
# and it's no longer usable.
248+
# The SocketModeClient implementation can do one retry when it gets this exception.
249+
raise SlackClientNotConnectedError(
250+
f"Failed to send a message as the connection is no longer active "
251+
f"(session_id: {self.session_id}, error: {e})"
252+
)
226253

227254
def check_state(self) -> None:
228255
try:

slack_sdk/socket_mode/websocket_client/__init__.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from typing import Union, Optional, List, Callable, Tuple
1414

1515
import websocket
16-
from websocket import WebSocketApp
16+
from websocket import WebSocketApp, WebSocketException
1717

1818
from slack_sdk.socket_mode.client import BaseSocketModeClient
1919
from slack_sdk.socket_mode.interval_runner import IntervalRunner
@@ -212,7 +212,27 @@ def disconnect(self) -> None:
212212
def send_message(self, message: str) -> None:
213213
if self.logger.level <= logging.DEBUG:
214214
self.logger.debug(f"Sending a message: {message}")
215-
self.current_session.send(message)
215+
try:
216+
self.current_session.send(message)
217+
except WebSocketException as e:
218+
# We rarely get this exception while replacing the underlying WebSocket connections.
219+
# We can do one more try here as the self.current_session should be ready now.
220+
if self.logger.level <= logging.DEBUG:
221+
self.logger.debug(
222+
f"Failed to send a message (error: {e}, message: {message})"
223+
" as the underlying connection was replaced. Retrying the same request only one time..."
224+
)
225+
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
226+
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
227+
with self.connect_operation_lock:
228+
if self.is_connected():
229+
self.current_session.send(message)
230+
else:
231+
self.logger.warning(
232+
f"The current session (session id: {self.session_id()}) is no longer active. "
233+
"Failed to send a message"
234+
)
235+
raise e
216236

217237
def close(self):
218238
self.closed = True

slack_sdk/socket_mode/websockets/__init__.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import websockets
1616
from websockets.client import WebSocketClientProtocol
17+
from websockets.exceptions import WebSocketException
1718

1819
from slack_sdk.socket_mode.async_client import AsyncBaseSocketModeClient
1920
from slack_sdk.socket_mode.async_listeners import (
@@ -177,7 +178,29 @@ async def disconnect(self):
177178
async def send_message(self, message: str):
178179
if self.logger.level <= logging.DEBUG:
179180
self.logger.debug(f"Sending a message: {message}")
180-
await self.current_session.send(message)
181+
try:
182+
await self.current_session.send(message)
183+
except WebSocketException as e:
184+
# We rarely get this exception while replacing the underlying WebSocket connections.
185+
# We can do one more try here as the self.current_session should be ready now.
186+
if self.logger.level <= logging.DEBUG:
187+
self.logger.debug(
188+
f"Failed to send a message (error: {e}, message: {message})"
189+
" as the underlying connection was replaced. Retrying the same request only one time..."
190+
)
191+
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
192+
# we avoid synchronizing a lot for better performance. That's why we are doing a retry here.
193+
try:
194+
if await self.is_connected():
195+
await self.current_session.send(message)
196+
else:
197+
self.logger.warning(
198+
"The current session is no longer active. Failed to send a message"
199+
)
200+
raise e
201+
finally:
202+
if self.connect_operation_lock.locked() is True:
203+
self.connect_operation_lock.release()
181204

182205
async def close(self):
183206
self.closed = True

tests/slack_sdk/socket_mode/test_interactions_builtin.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from random import randint
55
from threading import Thread
66

7-
from slack_sdk.errors import SlackClientConfigurationError
7+
from slack_sdk.errors import SlackClientConfigurationError, SlackClientNotConnectedError
88
from slack_sdk.socket_mode.request import SocketModeRequest
99

1010
from slack_sdk.socket_mode.client import BaseSocketModeClient
@@ -126,7 +126,45 @@ def socket_mode_request_handler(
126126
self.logger.info(f"Passed with buffer size: {buffer_size}")
127127

128128
finally:
129+
client.close()
129130
self.server.stop()
130131
self.server.close()
131132

132133
self.logger.info(f"Passed with buffer size: {buffer_size_list}")
134+
135+
def test_send_message_while_disconnection(self):
136+
if is_ci_unstable_test_skip_enabled():
137+
return
138+
t = Thread(target=start_socket_mode_server(self, 3011))
139+
t.daemon = True
140+
t.start()
141+
time.sleep(2) # wait for the server
142+
143+
try:
144+
self.reset_sever_state()
145+
client = SocketModeClient(
146+
app_token="xapp-A111-222-xyz",
147+
web_client=self.web_client,
148+
auto_reconnect_enabled=False,
149+
trace_enabled=True,
150+
)
151+
client.wss_uri = "ws://0.0.0.0:3011/link"
152+
client.connect()
153+
time.sleep(1) # wait for the connection
154+
client.send_message("foo")
155+
156+
client.disconnect()
157+
time.sleep(1) # wait for the connection
158+
try:
159+
client.send_message("foo")
160+
self.fail("SlackClientNotConnectedError is expected here")
161+
except SlackClientNotConnectedError as _:
162+
pass
163+
164+
client.connect()
165+
time.sleep(1) # wait for the connection
166+
client.send_message("foo")
167+
finally:
168+
client.close()
169+
self.server.stop()
170+
self.server.close()

tests/slack_sdk/socket_mode/test_interactions_websocket_client.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from random import randint
55
from threading import Thread
66

7+
from websocket import WebSocketException
8+
79
from slack_sdk.socket_mode.client import BaseSocketModeClient
810

911
from slack_sdk.socket_mode.request import SocketModeRequest
@@ -101,3 +103,40 @@ def socket_mode_request_handler(
101103
client.close()
102104
self.server.stop()
103105
self.server.close()
106+
107+
def test_send_message_while_disconnection(self):
108+
if is_ci_unstable_test_skip_enabled():
109+
return
110+
t = Thread(target=start_socket_mode_server(self, 3012))
111+
t.daemon = True
112+
t.start()
113+
time.sleep(2) # wait for the server
114+
115+
try:
116+
self.reset_sever_state()
117+
client = SocketModeClient(
118+
app_token="xapp-A111-222-xyz",
119+
web_client=self.web_client,
120+
auto_reconnect_enabled=False,
121+
trace_enabled=True,
122+
)
123+
client.wss_uri = "ws://0.0.0.0:3012/link"
124+
client.connect()
125+
time.sleep(1) # wait for the connection
126+
client.send_message("foo")
127+
128+
client.disconnect()
129+
time.sleep(1) # wait for the connection
130+
try:
131+
client.send_message("foo")
132+
self.fail("WebSocketException is expected here")
133+
except WebSocketException as _:
134+
pass
135+
136+
client.connect()
137+
time.sleep(1) # wait for the connection
138+
client.send_message("foo")
139+
finally:
140+
client.close()
141+
self.server.stop()
142+
self.server.close()

tests/slack_sdk_async/socket_mode/test_interactions_aiohttp.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,41 @@ async def socket_mode_listener(
105105
await client.close()
106106
self.server.stop()
107107
self.server.close()
108+
109+
@async_test
110+
async def test_send_message_while_disconnection(self):
111+
if is_ci_unstable_test_skip_enabled():
112+
return
113+
t = Thread(target=start_socket_mode_server(self, 3001))
114+
t.daemon = True
115+
t.start()
116+
117+
client = SocketModeClient(
118+
app_token="xapp-A111-222-xyz",
119+
web_client=self.web_client,
120+
auto_reconnect_enabled=False,
121+
trace_enabled=True,
122+
)
123+
124+
try:
125+
time.sleep(1) # wait for the server
126+
client.wss_uri = "ws://0.0.0.0:3001/link"
127+
await client.connect()
128+
await asyncio.sleep(1) # wait for the message receiver
129+
await client.send_message("foo")
130+
131+
await client.disconnect()
132+
await asyncio.sleep(1) # wait for the message receiver
133+
try:
134+
await client.send_message("foo")
135+
self.fail("ConnectionError is expected here")
136+
except ConnectionError as _:
137+
pass
138+
139+
await client.connect()
140+
await asyncio.sleep(1) # wait for the message receiver
141+
await client.send_message("foo")
142+
finally:
143+
await client.close()
144+
self.server.stop()
145+
self.server.close()

0 commit comments

Comments
 (0)