Skip to content

Commit 308b0c8

Browse files
v5 protocol: handle per-namespace sids in base manager
1 parent 49822e6 commit 308b0c8

File tree

8 files changed

+229
-230
lines changed

8 files changed

+229
-230
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
platforms='any',
3131
install_requires=[
3232
'six>=1.9.0',
33+
'bidict>=0.21.0',
3334
'python-engineio>=3.13.0,<4'
3435
],
3536
extras_require={

socketio/asyncio_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ async def _handle_error(self, namespace, data):
353353
if namespace in self.namespaces:
354354
self.namespaces.remove(namespace)
355355
if namespace == '/':
356-
self.namespaces = []
356+
self.namespaces = {}
357357
self.connected = False
358358

359359
async def _trigger_event(self, event, namespace, *args):
@@ -456,7 +456,7 @@ async def _handle_eio_disconnect(self):
456456
if self.connected:
457457
for n in self.namespaces:
458458
await self._trigger_event('disconnect', namespace=n)
459-
self.namespaces = []
459+
self.namespaces = {}
460460
self.connected = False
461461
self.callbacks = {}
462462
self._binary_packet = None

socketio/asyncio_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ async def emit(self, event, data, namespace, room=None, skip_sid=None,
2020
tasks = []
2121
if not isinstance(skip_sid, list):
2222
skip_sid = [skip_sid]
23-
for sid in self.get_participants(namespace, room):
23+
for sid, eio_sid in self.get_participants(namespace, room):
2424
if sid not in skip_sid:
2525
if callback is not None:
2626
id = self._generate_ack_id(sid, namespace, callback)
2727
else:
2828
id = None
29-
tasks.append(self.server._emit_internal(sid, event, data,
29+
tasks.append(self.server._emit_internal(eio_sid, event, data,
3030
namespace, id))
3131
if tasks == []: # pragma: no cover
3232
return

socketio/asyncio_server.py

Lines changed: 55 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,11 @@ async def disconnect(self, sid, namespace=None, ignore_queue=False):
336336
delete_it = await self.manager.can_disconnect(sid, namespace)
337337
if delete_it:
338338
self.logger.info('Disconnecting %s [%s]', sid, namespace)
339-
self.manager.pre_disconnect(sid, namespace=namespace)
340-
await self._send_packet(sid, packet.Packet(packet.DISCONNECT,
341-
namespace=namespace))
339+
eio_sid = self.manager.pre_disconnect(sid, namespace=namespace)
340+
await self._send_packet(eio_sid, packet.Packet(
341+
packet.DISCONNECT, namespace=namespace))
342342
await self._trigger_event('disconnect', namespace, sid)
343343
self.manager.disconnect(sid, namespace=namespace)
344-
if namespace == '/':
345-
await self.eio.disconnect(sid)
346344

347345
async def handle_request(self, *args, **kwargs):
348346
"""Handle an HTTP request from the client.
@@ -396,67 +394,61 @@ async def _emit_internal(self, sid, event, data, namespace=None, id=None):
396394
await self._send_packet(sid, packet.Packet(
397395
packet.EVENT, namespace=namespace, data=[event] + data, id=id))
398396

399-
async def _send_packet(self, sid, pkt):
397+
async def _send_packet(self, eio_sid, pkt):
400398
"""Send a Socket.IO packet to a client."""
401399
encoded_packet = pkt.encode()
402400
if isinstance(encoded_packet, list):
403401
for ep in encoded_packet:
404-
await self.eio.send(sid, ep)
402+
await self.eio.send(eio_sid, ep)
405403
else:
406-
await self.eio.send(sid, encoded_packet)
404+
await self.eio.send(eio_sid, encoded_packet)
407405

408-
async def _handle_connect(self, sid, namespace):
406+
async def _handle_connect(self, eio_sid, namespace):
409407
"""Handle a client connection request."""
410408
namespace = namespace or '/'
411-
self.manager.connect(sid, namespace)
409+
sid = self.manager.connect(eio_sid, namespace)
412410
if self.always_connect:
413-
await self._send_packet(sid, packet.Packet(packet.CONNECT,
414-
namespace=namespace))
411+
await self._send_packet(eio_sid, packet.Packet(
412+
packet.CONNECT, {'sid': sid}, namespace=namespace))
415413
fail_reason = None
416414
try:
417415
success = await self._trigger_event('connect', namespace, sid,
418-
self.environ[sid])
416+
self.environ[eio_sid])
419417
except exceptions.ConnectionRefusedError as exc:
420418
fail_reason = exc.error_args
421419
success = False
422420

423421
if success is False:
424422
if self.always_connect:
425423
self.manager.pre_disconnect(sid, namespace)
426-
await self._send_packet(sid, packet.Packet(
424+
await self._send_packet(eio_sid, packet.Packet(
427425
packet.DISCONNECT, data=fail_reason, namespace=namespace))
428426
elif namespace != '/':
429-
await self._send_packet(sid, packet.Packet(
427+
await self._send_packet(eio_sid, packet.Packet(
430428
packet.CONNECT_ERROR, data=fail_reason,
431429
namespace=namespace))
432430
self.manager.disconnect(sid, namespace)
433-
if namespace == '/' and sid in self.environ: # pragma: no cover
434-
del self.environ[sid]
431+
if namespace == '/' and \
432+
eio_sid in self.environ: # pragma: no cover
433+
del self.environ[eio_sid]
435434
return fail_reason or False
436435
elif not self.always_connect:
437-
await self._send_packet(sid, packet.Packet(packet.CONNECT,
438-
namespace=namespace))
436+
await self._send_packet(eio_sid, packet.Packet(
437+
packet.CONNECT, {'sid': sid}, namespace=namespace))
439438

440-
async def _handle_disconnect(self, sid, namespace):
439+
async def _handle_disconnect(self, eio_sid, namespace):
441440
"""Handle a client disconnect."""
442441
namespace = namespace or '/'
443-
if namespace == '/':
444-
namespace_list = list(self.manager.get_namespaces())
445-
else:
446-
namespace_list = [namespace]
447-
for n in namespace_list:
448-
if n != '/' and self.manager.is_connected(sid, n):
449-
self.manager.pre_disconnect(sid, namespace=n)
450-
await self._trigger_event('disconnect', n, sid)
451-
self.manager.disconnect(sid, n)
452-
if namespace == '/' and self.manager.is_connected(sid, namespace):
453-
self.manager.pre_disconnect(sid, namespace='/')
454-
await self._trigger_event('disconnect', '/', sid)
455-
self.manager.disconnect(sid, '/')
456-
457-
async def _handle_event(self, sid, namespace, id, data):
442+
sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
443+
if self.manager.is_connected(sid, namespace):
444+
self.manager.pre_disconnect(sid, namespace=namespace)
445+
await self._trigger_event('disconnect', namespace, sid)
446+
self.manager.disconnect(sid, namespace)
447+
448+
async def _handle_event(self, eio_sid, namespace, id, data):
458449
"""Handle an incoming client event."""
459450
namespace = namespace or '/'
451+
sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
460452
self.logger.info('received event "%s" from %s [%s]', data[0], sid,
461453
namespace)
462454
if not self.manager.is_connected(sid, namespace):
@@ -465,11 +457,13 @@ async def _handle_event(self, sid, namespace, id, data):
465457
return
466458
if self.async_handlers:
467459
self.start_background_task(self._handle_event_internal, self, sid,
468-
data, namespace, id)
460+
eio_sid, data, namespace, id)
469461
else:
470-
await self._handle_event_internal(self, sid, data, namespace, id)
462+
await self._handle_event_internal(self, sid, eio_sid, data,
463+
namespace, id)
471464

472-
async def _handle_event_internal(self, server, sid, data, namespace, id):
465+
async def _handle_event_internal(self, server, sid, eio_sid, data,
466+
namespace, id):
473467
r = await server._trigger_event(data[0], namespace, sid, *data[1:])
474468
if id is not None:
475469
# send ACK packet with the response returned by the handler
@@ -480,13 +474,13 @@ async def _handle_event_internal(self, server, sid, data, namespace, id):
480474
data = list(r)
481475
else:
482476
data = [r]
483-
await server._send_packet(sid, packet.Packet(packet.ACK,
484-
namespace=namespace,
485-
id=id, data=data))
477+
await server._send_packet(eio_sid, packet.Packet(
478+
packet.ACK, namespace=namespace, id=id, data=data))
486479

487-
async def _handle_ack(self, sid, namespace, id, data):
480+
async def _handle_ack(self, eio_sid, namespace, id, data):
488481
"""Handle ACK packets from the client."""
489482
namespace = namespace or '/'
483+
sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
490484
self.logger.info('received ack from %s [%s]', sid, namespace)
491485
await self.manager.trigger_callback(sid, namespace, id, data)
492486

@@ -509,48 +503,50 @@ async def _trigger_event(self, event, namespace, *args):
509503
return await self.namespace_handlers[namespace].trigger_event(
510504
event, *args)
511505

512-
async def _handle_eio_connect(self, sid, environ):
506+
async def _handle_eio_connect(self, eio_sid, environ):
513507
"""Handle the Engine.IO connection event."""
514508
if not self.manager_initialized:
515509
self.manager_initialized = True
516510
self.manager.initialize()
517-
self.environ[sid] = environ
511+
self.environ[eio_sid] = environ
518512

519-
async def _handle_eio_message(self, sid, data):
513+
async def _handle_eio_message(self, eio_sid, data):
520514
"""Dispatch Engine.IO messages."""
521-
if sid in self._binary_packet:
522-
pkt = self._binary_packet[sid]
515+
if eio_sid in self._binary_packet:
516+
pkt = self._binary_packet[eio_sid]
523517
if pkt.add_attachment(data):
524-
del self._binary_packet[sid]
518+
del self._binary_packet[eio_sid]
525519
if pkt.packet_type == packet.BINARY_EVENT:
526-
await self._handle_event(sid, pkt.namespace, pkt.id,
520+
await self._handle_event(eio_sid, pkt.namespace, pkt.id,
527521
pkt.data)
528522
else:
529-
await self._handle_ack(sid, pkt.namespace, pkt.id,
523+
await self._handle_ack(eio_sid, pkt.namespace, pkt.id,
530524
pkt.data)
531525
else:
532526
pkt = packet.Packet(encoded_packet=data)
533527
if pkt.packet_type == packet.CONNECT:
534-
await self._handle_connect(sid, pkt.namespace)
528+
await self._handle_connect(eio_sid, pkt.namespace)
535529
elif pkt.packet_type == packet.DISCONNECT:
536-
await self._handle_disconnect(sid, pkt.namespace)
530+
await self._handle_disconnect(eio_sid, pkt.namespace)
537531
elif pkt.packet_type == packet.EVENT:
538-
await self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
532+
await self._handle_event(eio_sid, pkt.namespace, pkt.id,
533+
pkt.data)
539534
elif pkt.packet_type == packet.ACK:
540-
await self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
535+
await self._handle_ack(eio_sid, pkt.namespace, pkt.id,
536+
pkt.data)
541537
elif pkt.packet_type == packet.BINARY_EVENT or \
542538
pkt.packet_type == packet.BINARY_ACK:
543-
self._binary_packet[sid] = pkt
539+
self._binary_packet[eio_sid] = pkt
544540
elif pkt.packet_type == packet.CONNECT_ERROR:
545541
raise ValueError('Unexpected CONNECT_ERROR packet.')
546542
else:
547543
raise ValueError('Unknown packet type.')
548544

549-
async def _handle_eio_disconnect(self, sid):
545+
async def _handle_eio_disconnect(self, eio_sid):
550546
"""Handle Engine.IO disconnect event."""
551-
await self._handle_disconnect(sid, '/')
552-
if sid in self.environ:
553-
del self.environ[sid]
547+
await self._handle_disconnect(eio_sid, '/')
548+
if eio_sid in self.environ:
549+
del self.environ[eio_sid]
554550

555551
def _engineio_server_class(self):
556552
return engineio.AsyncServer

socketio/base_manager.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import itertools
22
import logging
33

4+
from bidict import bidict
45
import six
56

67
default_logger = logging.getLogger('socketio')
@@ -18,7 +19,8 @@ class BaseManager(object):
1819
def __init__(self):
1920
self.logger = None
2021
self.server = None
21-
self.rooms = {}
22+
self.rooms = {} # self.rooms[namespace][room][sio_sid] = eio_sid
23+
self.eio_to_sid = {}
2224
self.callbacks = {}
2325
self.pending_disconnect = {}
2426

@@ -37,13 +39,15 @@ def get_namespaces(self):
3739

3840
def get_participants(self, namespace, room):
3941
"""Return an iterable with the active participants in a room."""
40-
for sid, active in six.iteritems(self.rooms[namespace][room].copy()):
41-
yield sid
42+
for sid, eio_sid in self.rooms[namespace][room].copy().items():
43+
yield sid, eio_sid
4244

43-
def connect(self, sid, namespace):
45+
def connect(self, eio_sid, namespace):
4446
"""Register a client connection to a namespace."""
45-
self.enter_room(sid, namespace, None)
46-
self.enter_room(sid, namespace, sid)
47+
sid = self.server.eio.generate_id()
48+
self.enter_room(sid, namespace, None, eio_sid=eio_sid)
49+
self.enter_room(sid, namespace, sid, eio_sid=eio_sid)
50+
return sid
4751

4852
def is_connected(self, sid, namespace):
4953
if namespace in self.pending_disconnect and \
@@ -55,6 +59,9 @@ def is_connected(self, sid, namespace):
5559
except KeyError:
5660
pass
5761

62+
def sid_from_eio_sid(self, eio_sid, namespace):
63+
return self.rooms[namespace][None].inverse.get(eio_sid)
64+
5865
def can_disconnect(self, sid, namespace):
5966
return self.is_connected(sid, namespace)
6067

@@ -68,6 +75,7 @@ def pre_disconnect(self, sid, namespace):
6875
if namespace not in self.pending_disconnect:
6976
self.pending_disconnect[namespace] = []
7077
self.pending_disconnect[namespace].append(sid)
78+
return self.rooms[namespace][None].get(sid)
7179

7280
def disconnect(self, sid, namespace):
7381
"""Register a client disconnect from a namespace."""
@@ -89,13 +97,15 @@ def disconnect(self, sid, namespace):
8997
if len(self.pending_disconnect[namespace]) == 0:
9098
del self.pending_disconnect[namespace]
9199

92-
def enter_room(self, sid, namespace, room):
100+
def enter_room(self, sid, namespace, room, eio_sid=None):
93101
"""Add a client to a room."""
94102
if namespace not in self.rooms:
95103
self.rooms[namespace] = {}
96104
if room not in self.rooms[namespace]:
97-
self.rooms[namespace][room] = {}
98-
self.rooms[namespace][room][sid] = True
105+
self.rooms[namespace][room] = bidict()
106+
if eio_sid is None:
107+
eio_sid = self.rooms[namespace][None][sid]
108+
self.rooms[namespace][room][sid] = eio_sid
99109

100110
def leave_room(self, sid, namespace, room):
101111
"""Remove a client from a room."""
@@ -111,7 +121,7 @@ def leave_room(self, sid, namespace, room):
111121
def close_room(self, room, namespace):
112122
"""Remove all participants from a room."""
113123
try:
114-
for sid in self.get_participants(namespace, room):
124+
for sid, _ in self.get_participants(namespace, room):
115125
self.leave_room(sid, namespace, room)
116126
except KeyError:
117127
pass
@@ -121,7 +131,7 @@ def get_rooms(self, sid, namespace):
121131
r = []
122132
try:
123133
for room_name, room in six.iteritems(self.rooms[namespace]):
124-
if room_name is not None and sid in room and room[sid]:
134+
if room_name is not None and sid in room:
125135
r.append(room_name)
126136
except KeyError:
127137
pass
@@ -135,13 +145,13 @@ def emit(self, event, data, namespace, room=None, skip_sid=None,
135145
return
136146
if not isinstance(skip_sid, list):
137147
skip_sid = [skip_sid]
138-
for sid in self.get_participants(namespace, room):
148+
for sid, eio_sid in self.get_participants(namespace, room):
139149
if sid not in skip_sid:
140150
if callback is not None:
141151
id = self._generate_ack_id(sid, namespace, callback)
142152
else:
143153
id = None
144-
self.server._emit_internal(sid, event, data, namespace, id)
154+
self.server._emit_internal(eio_sid, event, data, namespace, id)
145155

146156
def trigger_callback(self, sid, namespace, id, data):
147157
"""Invoke an application callback."""

socketio/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ def _handle_error(self, namespace, data):
533533
if namespace in self.namespaces:
534534
del self.namespaces[namespace]
535535
if namespace == '/':
536-
self.namespaces = []
536+
self.namespaces = {}
537537
self.connected = False
538538

539539
def _trigger_event(self, event, namespace, *args):
@@ -625,7 +625,7 @@ def _handle_eio_disconnect(self):
625625
if self.connected:
626626
for n in self.namespaces:
627627
self._trigger_event('disconnect', namespace=n)
628-
self.namespaces = []
628+
self.namespaces = {}
629629
self.connected = False
630630
self.callbacks = {}
631631
self._binary_packet = None

0 commit comments

Comments
 (0)