Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/socketio/async_pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import base64
from functools import partial
import uuid

from engineio import json

from .async_manager import AsyncManager
from .packet import Packet


class AsyncPubSubManager(AsyncManager):
Expand Down Expand Up @@ -64,8 +66,12 @@ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback = (room, namespace, id)
else:
callback = None
binary = Packet.data_is_binary(data)
if binary:
data, attachments = Packet.deconstruct_binary(data)
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'binary': binary, 'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id}
await self._handle_emit(message) # handle in this host
Expand Down Expand Up @@ -145,7 +151,11 @@ async def _handle_emit(self, message):
*remote_callback)
else:
callback = None
await super().emit(message['event'], message['data'],
data = message['data']
if message.get('binary'):
attachments = [base64.b64decode(a) for a in data[1:]]
data = Packet.reconstruct_binary(data[0], attachments)
await super().emit(message['event'], data,
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'),
Expand Down
40 changes: 22 additions & 18 deletions src/socketio/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, packet_type=EVENT, data=None, namespace=None, id=None,
self.namespace = namespace
self.id = id
if self.uses_binary_events and \
(binary or (binary is None and self._data_is_binary(
(binary or (binary is None and self.data_is_binary(
self.data))):
if self.packet_type == EVENT:
self.packet_type = BINARY_EVENT
Expand All @@ -51,7 +51,7 @@ def encode(self):
"""
encoded_packet = str(self.packet_type)
if self.packet_type == BINARY_EVENT or self.packet_type == BINARY_ACK:
data, attachments = self._deconstruct_binary(self.data)
data, attachments = self.deconstruct_binary(self.data)
encoded_packet += str(len(attachments)) + '-'
else:
data = self.data
Expand Down Expand Up @@ -119,61 +119,65 @@ def add_attachment(self, attachment):
raise ValueError('Unexpected binary attachment')
self.attachments.append(attachment)
if self.attachment_count == len(self.attachments):
self.reconstruct_binary(self.attachments)
self.data = self.reconstruct_binary(self.data, self.attachments)
return True
return False

def reconstruct_binary(self, attachments):
@classmethod
def reconstruct_binary(cls, data, attachments):
"""Reconstruct a decoded packet using the given list of binary
attachments.
"""
self.data = self._reconstruct_binary_internal(self.data,
self.attachments)
return cls._reconstruct_binary_internal(data, attachments)

def _reconstruct_binary_internal(self, data, attachments):
@classmethod
def _reconstruct_binary_internal(cls, data, attachments):
if isinstance(data, list):
return [self._reconstruct_binary_internal(item, attachments)
return [cls._reconstruct_binary_internal(item, attachments)
for item in data]
elif isinstance(data, dict):
if data.get('_placeholder') and 'num' in data:
return attachments[data['num']]
else:
return {key: self._reconstruct_binary_internal(value,
attachments)
return {key: cls._reconstruct_binary_internal(value,
attachments)
for key, value in data.items()}
else:
return data

def _deconstruct_binary(self, data):
@classmethod
def deconstruct_binary(cls, data):
"""Extract binary components in the packet."""
attachments = []
data = self._deconstruct_binary_internal(data, attachments)
data = cls._deconstruct_binary_internal(data, attachments)
return data, attachments

def _deconstruct_binary_internal(self, data, attachments):
@classmethod
def _deconstruct_binary_internal(cls, data, attachments):
if isinstance(data, bytes):
attachments.append(data)
return {'_placeholder': True, 'num': len(attachments) - 1}
elif isinstance(data, list):
return [self._deconstruct_binary_internal(item, attachments)
return [cls._deconstruct_binary_internal(item, attachments)
for item in data]
elif isinstance(data, dict):
return {key: self._deconstruct_binary_internal(value, attachments)
return {key: cls._deconstruct_binary_internal(value, attachments)
for key, value in data.items()}
else:
return data

def _data_is_binary(self, data):
@classmethod
def data_is_binary(cls, data):
"""Check if the data contains binary components."""
if isinstance(data, bytes):
return True
elif isinstance(data, list):
return functools.reduce(
lambda a, b: a or b, [self._data_is_binary(item)
lambda a, b: a or b, [cls.data_is_binary(item)
for item in data], False)
elif isinstance(data, dict):
return functools.reduce(
lambda a, b: a or b, [self._data_is_binary(item)
lambda a, b: a or b, [cls.data_is_binary(item)
for item in data.values()],
False)
else:
Expand Down
14 changes: 12 additions & 2 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
from functools import partial
import uuid

from engineio import json

from .manager import Manager
from .packet import Packet


class PubSubManager(Manager):
Expand Down Expand Up @@ -61,8 +63,12 @@ def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback = (room, namespace, id)
else:
callback = None
binary = Packet.data_is_binary(data)
if binary:
data, attachments = Packet.deconstruct_binary(data)
data = [data, *[base64.b64encode(a).decode() for a in attachments]]
message = {'method': 'emit', 'event': event, 'data': data,
'namespace': namespace, 'room': room,
'binary': binary, 'namespace': namespace, 'room': room,
'skip_sid': skip_sid, 'callback': callback,
'host_id': self.host_id}
self._handle_emit(message) # handle in this host
Expand Down Expand Up @@ -141,7 +147,11 @@ def _handle_emit(self, message):
*remote_callback)
else:
callback = None
super().emit(message['event'], message['data'],
data = message['data']
if message.get('binary'):
attachments = [base64.b64decode(a) for a in data[1:]]
data = Packet.reconstruct_binary(data[0], attachments)
super().emit(message['event'], data,
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'), callback=callback)
Expand Down
67 changes: 67 additions & 0 deletions tests/async/test_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ async def test_emit(self):
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/',
'room': None,
Expand All @@ -66,13 +67,44 @@ async def test_emit(self):
}
)

async def test_emit_binary(self):
await self.pm.emit('foo', b'bar')
self.pm._publish.assert_awaited_once_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)
await self.pm.emit('foo', {'foo': b'bar'})
self.pm._publish.assert_awaited_with(
{
'method': 'emit',
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
'namespace': '/',
'room': None,
'skip_sid': None,
'callback': None,
'host_id': '123456',
}
)

async def test_emit_with_to(self):
sid = 'room-mate'
await self.pm.emit('foo', 'bar', to=sid)
self.pm._publish.assert_awaited_once_with(
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/',
'room': sid,
Expand All @@ -88,6 +120,7 @@ async def test_emit_with_namespace(self):
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/baz',
'room': None,
Expand All @@ -103,6 +136,7 @@ async def test_emit_with_room(self):
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/',
'room': 'baz',
Expand All @@ -118,6 +152,7 @@ async def test_emit_with_skip_sid(self):
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/',
'room': None,
Expand All @@ -136,6 +171,7 @@ async def test_emit_with_callback(self):
{
'method': 'emit',
'event': 'foo',
'binary': False,
'data': 'bar',
'namespace': '/',
'room': 'baz',
Expand Down Expand Up @@ -241,6 +277,37 @@ async def test_handle_emit(self):
callback=None,
)

async def test_handle_emit_binary(self):
with mock.patch.object(
async_manager.AsyncManager, 'emit'
) as super_emit:
await self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'_placeholder': True, 'num': 0}, 'YmFy'],
})
super_emit.assert_awaited_once_with(
'foo',
b'bar',
namespace=None,
room=None,
skip_sid=None,
callback=None,
)
await self.pm._handle_emit({
'event': 'foo',
'binary': True,
'data': [{'foo': {'_placeholder': True, 'num': 0}}, 'YmFy'],
})
super_emit.assert_awaited_with(
'foo',
{'foo': b'bar'},
namespace=None,
room=None,
skip_sid=None,
callback=None,
)

async def test_handle_emit_with_namespace(self):
with mock.patch.object(
async_manager.AsyncManager, 'emit'
Expand Down
24 changes: 16 additions & 8 deletions tests/common/test_packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,24 @@ def test_decode_dash_in_payload(self):
assert pkt.data["a"] == "0123456789-"
assert pkt.attachment_count == 0

def test_deconstruct_binary(self):
datas = [b'foo', [b'foo', b'bar'], ['foo', b'bar'], {'foo': b'bar'},
{'foo': 'bar', 'baz': b'qux'}, {'foo': [b'bar']}]
for data in datas:
bdata, attachments = packet.Packet.deconstruct_binary(data)
rdata = packet.Packet.reconstruct_binary(bdata, attachments)
assert data == rdata

def test_data_is_binary_list(self):
pkt = packet.Packet()
assert not pkt._data_is_binary(['foo'])
assert not pkt._data_is_binary([])
assert pkt._data_is_binary([b'foo'])
assert pkt._data_is_binary(['foo', b'bar'])
assert not pkt.data_is_binary(['foo'])
assert not pkt.data_is_binary([])
assert pkt.data_is_binary([b'foo'])
assert pkt.data_is_binary(['foo', b'bar'])

def test_data_is_binary_dict(self):
pkt = packet.Packet()
assert not pkt._data_is_binary({'a': 'foo'})
assert not pkt._data_is_binary({})
assert pkt._data_is_binary({'a': b'foo'})
assert pkt._data_is_binary({'a': 'foo', 'b': b'bar'})
assert not pkt.data_is_binary({'a': 'foo'})
assert not pkt.data_is_binary({})
assert pkt.data_is_binary({'a': b'foo'})
assert pkt.data_is_binary({'a': 'foo', 'b': b'bar'})
Loading
Loading