Skip to content

Commit 01378ef

Browse files
Support client disconnects with multiple servers (Fixes miguelgrinberg/Flask-SocketIO#1174)
1 parent a0abd7a commit 01378ef

File tree

9 files changed

+101
-6
lines changed

9 files changed

+101
-6
lines changed

socketio/asyncio_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
class AsyncManager(BaseManager):
77
"""Manage a client list for an asyncio server."""
8+
async def can_disconnect(self, sid, namespace):
9+
return self.is_connected(sid, namespace)
10+
811
async def emit(self, event, data, namespace, room=None, skip_sid=None,
912
callback=None, **kwargs):
1013
"""Emit a message to a single client, a room, or all the clients

socketio/asyncio_pubsub_manager.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6969
'skip_sid': skip_sid, 'callback': callback,
7070
'host_id': self.host_id})
7171

72+
async def can_disconnect(self, sid, namespace):
73+
await self._publish({'method': 'disconnect', 'sid': sid,
74+
'namespace': namespace or '/'})
75+
76+
async def disconnect(self, sid, namespace=None):
77+
"""Disconnect a client."""
78+
# this is a bit weird, the can_disconnect call on pubsub managers just
79+
# issues a disconnect request to the message queue and returns None,
80+
# indicating that the client cannot disconnect immediately. The
81+
# server(s) listening on the queue will get this request and carry out
82+
# the disconnect appropriately.
83+
await self.can_disconnect(sid, namespace)
84+
7285
async def close_room(self, room, namespace=None):
7386
await self._publish({'method': 'close_room', 'room': room,
7487
'namespace': namespace or '/'})
@@ -128,6 +141,11 @@ async def _return_callback(self, host_id, sid, namespace, callback_id,
128141
'sid': sid, 'namespace': namespace,
129142
'id': callback_id, 'args': args})
130143

144+
async def _handle_disconnect(self, message):
145+
await self.server.disconnect(sid=message.get('sid'),
146+
namespace=message.get('namespace'),
147+
ignore_queue=True)
148+
131149
async def _handle_close_room(self, message):
132150
await super().close_room(
133151
room=message.get('room'), namespace=message.get('namespace'))
@@ -155,9 +173,13 @@ async def _thread(self):
155173
except:
156174
pass
157175
if data and 'method' in data:
176+
self._get_logger().info('pubsub message: {}'.format(
177+
data['method']))
158178
if data['method'] == 'emit':
159179
await self._handle_emit(data)
160180
elif data['method'] == 'callback':
161181
await self._handle_callback(data)
182+
elif data['method'] == 'disconnect':
183+
await self._handle_disconnect(data)
162184
elif data['method'] == 'close_room':
163185
await self._handle_close_room(data)

socketio/asyncio_server.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,17 +297,23 @@ async def __aexit__(self, *args):
297297

298298
return _session_context_manager(self, sid, namespace)
299299

300-
async def disconnect(self, sid, namespace=None):
300+
async def disconnect(self, sid, namespace=None, ignore_queue=False):
301301
"""Disconnect a client.
302302
303303
:param sid: Session ID of the client.
304304
:param namespace: The Socket.IO namespace to disconnect. If this
305305
argument is omitted the default namespace is used.
306+
:param ignore_queue: Only used when a message queue is configured. If
307+
set to ``True``, the disconnect is processed
308+
locally, without broadcasting on the queue. It is
309+
recommended to always leave this parameter with
310+
its default value of ``False``.
306311
307312
Note: this method is a coroutine.
308313
"""
309314
namespace = namespace or '/'
310-
if self.manager.is_connected(sid, namespace=namespace):
315+
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
316+
await self.manager.can_disconnect(sid, namespace):
311317
self.logger.info('Disconnecting %s [%s]', sid, namespace)
312318
self.manager.pre_disconnect(sid, namespace=namespace)
313319
await self._send_packet(sid, packet.Packet(packet.DISCONNECT,

socketio/base_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ def is_connected(self, sid, namespace):
5555
except KeyError:
5656
pass
5757

58+
def can_disconnect(self, sid, namespace):
59+
return self.is_connected(sid, namespace)
60+
5861
def pre_disconnect(self, sid, namespace):
5962
"""Put the client in the to-be-disconnected list.
6063

socketio/pubsub_manager.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
6767
'skip_sid': skip_sid, 'callback': callback,
6868
'host_id': self.host_id})
6969

70+
def can_disconnect(self, sid, namespace):
71+
self._publish({'method': 'disconnect', 'sid': sid,
72+
'namespace': namespace or '/'})
73+
74+
def disconnect(self, sid, namespace=None):
75+
"""Disconnect a client."""
76+
# this is a bit weird, the can_disconnect call on pubsub managers just
77+
# issues a disconnect request to the message queue and returns None,
78+
# indicating that the client cannot disconnect immediately. The
79+
# server(s) listening on the queue will get this request and carry out
80+
# the disconnect appropriately.
81+
self.can_disconnect(sid, namespace)
82+
7083
def close_room(self, room, namespace=None):
7184
self._publish({'method': 'close_room', 'room': room,
7285
'namespace': namespace or '/'})
@@ -125,6 +138,11 @@ def _return_callback(self, host_id, sid, namespace, callback_id, *args):
125138
'sid': sid, 'namespace': namespace, 'id': callback_id,
126139
'args': args})
127140

141+
def _handle_disconnect(self, message):
142+
self.server.disconnect(sid=message.get('sid'),
143+
namespace=message.get('namespace'),
144+
ignore_queue=True)
145+
128146
def _handle_close_room(self, message):
129147
super(PubSubManager, self).close_room(
130148
room=message.get('room'), namespace=message.get('namespace'))
@@ -146,9 +164,13 @@ def _thread(self):
146164
except:
147165
pass
148166
if data and 'method' in data:
167+
self._get_logger().info('pubsub message: {}'.format(
168+
data['method']))
149169
if data['method'] == 'emit':
150170
self._handle_emit(data)
151171
elif data['method'] == 'callback':
152172
self._handle_callback(data)
173+
elif data['method'] == 'disconnect':
174+
self._handle_disconnect(data)
153175
elif data['method'] == 'close_room':
154176
self._handle_close_room(data)

socketio/server.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,15 +492,21 @@ def __exit__(self, *args):
492492

493493
return _session_context_manager(self, sid, namespace)
494494

495-
def disconnect(self, sid, namespace=None):
495+
def disconnect(self, sid, namespace=None, ignore_queue=False):
496496
"""Disconnect a client.
497497
498498
:param sid: Session ID of the client.
499499
:param namespace: The Socket.IO namespace to disconnect. If this
500500
argument is omitted the default namespace is used.
501+
:param ignore_queue: Only used when a message queue is configured. If
502+
set to ``True``, the disconnect is processed
503+
locally, without broadcasting on the queue. It is
504+
recommended to always leave this parameter with
505+
its default value of ``False``.
501506
"""
502507
namespace = namespace or '/'
503-
if self.manager.is_connected(sid, namespace=namespace):
508+
if (ignore_queue and self.manager.is_connected(sid, namespace)) or \
509+
self.manager.can_disconnect(sid, namespace):
504510
self.logger.info('Disconnecting %s [%s]', sid, namespace)
505511
self.manager.pre_disconnect(sid, namespace=namespace)
506512
self._send_packet(sid, packet.Packet(packet.DISCONNECT,

tests/asyncio/test_asyncio_pubsub_manager.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class TestAsyncPubSubManager(unittest.TestCase):
3434
def setUp(self):
3535
mock_server = mock.MagicMock()
3636
mock_server._emit_internal = AsyncMock()
37+
mock_server.disconnect = AsyncMock()
3738
self.pm = asyncio_pubsub_manager.AsyncPubSubManager()
3839
self.pm._publish = AsyncMock()
3940
self.pm.set_server(mock_server)
@@ -115,6 +116,11 @@ def test_emit_with_ignore_queue(self):
115116
self.pm.server._emit_internal.mock.assert_called_once_with(
116117
'123', 'foo', 'bar', '/', None)
117118

119+
def test_disconnect(self):
120+
_run(self.pm.disconnect('123', '/foo'))
121+
self.pm._publish.mock.assert_called_once_with(
122+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
123+
118124
def test_close_room(self):
119125
_run(self.pm.close_room('foo'))
120126
self.pm._publish.mock.assert_called_once_with(
@@ -142,7 +148,7 @@ def test_handle_emit_with_namespace(self):
142148
self.pm, 'foo', 'bar', namespace='/baz', room=None,
143149
skip_sid=None, callback=None)
144150

145-
def test_handle_emiti_with_room(self):
151+
def test_handle_emit_with_room(self):
146152
with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
147153
new=AsyncMock()) as super_emit:
148154
_run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
@@ -216,6 +222,12 @@ def test_handle_callback_missing_args(self):
216222
'host_id': host_id}))
217223
self.assertEqual(trigger.mock.call_count, 0)
218224

225+
def test_handle_disconnect(self):
226+
_run(self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
227+
'namespace': '/foo'}))
228+
self.pm.server.disconnect.mock.assert_called_once_with(
229+
sid='123', namespace='/foo', ignore_queue=True)
230+
219231
def test_handle_close_room(self):
220232
with mock.patch.object(asyncio_manager.AsyncManager, 'close_room',
221233
new=AsyncMock()) as super_close_room:
@@ -236,13 +248,15 @@ def test_handle_close_room_with_namespace(self):
236248
def test_background_thread(self):
237249
self.pm._handle_emit = AsyncMock()
238250
self.pm._handle_callback = AsyncMock()
251+
self.pm._handle_disconnect = AsyncMock()
239252
self.pm._handle_close_room = AsyncMock()
240253

241254
def messages():
242255
import pickle
243256
yield {'method': 'emit', 'value': 'foo'}
244257
yield {'missing': 'method'}
245258
yield '{"method": "callback", "value": "bar"}'
259+
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
246260
yield {'method': 'bogus'}
247261
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
248262
yield 'bad json'
@@ -258,5 +272,7 @@ def messages():
258272
{'method': 'emit', 'value': 'foo'})
259273
self.pm._handle_callback.mock.assert_called_once_with(
260274
{'method': 'callback', 'value': 'bar'})
275+
self.pm._handle_disconnect.mock.assert_called_once_with(
276+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
261277
self.pm._handle_close_room.mock.assert_called_once_with(
262278
{'method': 'close_room', 'value': 'baz'})

tests/asyncio/test_asyncio_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def tearDown(self):
4242

4343
def _get_mock_manager(self):
4444
mgr = mock.MagicMock()
45+
mgr.can_disconnect = AsyncMock()
4546
mgr.emit = AsyncMock()
4647
mgr.close_room = AsyncMock()
4748
mgr.trigger_callback = AsyncMock()

tests/common/test_pubsub_manager.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ def test_emit_with_ignore_queue(self):
112112
self.pm.server._emit_internal.assert_called_once_with('123', 'foo',
113113
'bar', '/', None)
114114

115+
def test_disconnect(self):
116+
self.pm.disconnect('123', '/foo')
117+
self.pm._publish.assert_called_once_with(
118+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
119+
115120
def test_close_room(self):
116121
self.pm.close_room('foo')
117122
self.pm._publish.assert_called_once_with(
@@ -137,7 +142,7 @@ def test_handle_emit_with_namespace(self):
137142
room=None, skip_sid=None,
138143
callback=None)
139144

140-
def test_handle_emiti_with_room(self):
145+
def test_handle_emit_with_room(self):
141146
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
142147
self.pm._handle_emit({'event': 'foo', 'data': 'bar',
143148
'room': 'baz'})
@@ -204,6 +209,13 @@ def test_handle_callback_missing_args(self):
204209
'host_id': host_id})
205210
self.assertEqual(trigger.call_count, 0)
206211

212+
def test_handle_disconnect(self):
213+
self.pm._handle_disconnect({'method': 'disconnect', 'sid': '123',
214+
'namespace': '/foo'})
215+
self.pm.server.disconnect.assert_called_once_with(sid='123',
216+
namespace='/foo',
217+
ignore_queue=True)
218+
207219
def test_handle_close_room(self):
208220
with mock.patch.object(base_manager.BaseManager, 'close_room') \
209221
as super_close_room:
@@ -223,13 +235,15 @@ def test_handle_close_room_with_namespace(self):
223235
def test_background_thread(self):
224236
self.pm._handle_emit = mock.MagicMock()
225237
self.pm._handle_callback = mock.MagicMock()
238+
self.pm._handle_disconnect = mock.MagicMock()
226239
self.pm._handle_close_room = mock.MagicMock()
227240

228241
def messages():
229242
import pickle
230243
yield {'method': 'emit', 'value': 'foo'}
231244
yield {'missing': 'method'}
232245
yield '{"method": "callback", "value": "bar"}'
246+
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo'}
233247
yield {'method': 'bogus'}
234248
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
235249
yield 'bad json'
@@ -245,5 +259,7 @@ def messages():
245259
{'method': 'emit', 'value': 'foo'})
246260
self.pm._handle_callback.assert_called_once_with(
247261
{'method': 'callback', 'value': 'bar'})
262+
self.pm._handle_disconnect.assert_called_once_with(
263+
{'method': 'disconnect', 'sid': '123', 'namespace': '/foo'})
248264
self.pm._handle_close_room.assert_called_once_with(
249265
{'method': 'close_room', 'value': 'baz'})

0 commit comments

Comments
 (0)