Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions docs/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1096,17 +1096,17 @@ For a production deployment there are a few recommendations to keep your
application secure.

First of all, the message queue should never be listening on a public network
interface, to ensure that external clients never connect to it. The use of a
private network (VPC), where the communication between servers can happen
privately is highly recommended.

In addition, all message queues support authentication and encryption.
Authentication ensures that only the Socket.IO servers and related processes
have access, while encryption prevents data to be collected by a third-party
listening on the network.

Access credentials can be included in the connection URLs that are passed to the
client managers.
interface, to ensure that external clients never connect to it. For a single
node deployment, the queue should only listen on `localhost`. For a multi-node
system the use of a private network (VPC), where the communication between
servers can happen privately is highly recommended.

In addition, all message queues support authentication and encryption, which
can strenthen the security of the deployment. Authentication ensures that only
the Socket.IO servers and related processes have access, while encryption
prevents data from being collected by a third-party that is listening on the
network. Access credentials can be included in the connection URLs that are
passed to the client managers.

Horizontal Scaling
~~~~~~~~~~~~~~~~~~
Expand Down
6 changes: 3 additions & 3 deletions src/socketio/async_aiopika_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import pickle

from engineio import json
from .async_pubsub_manager import AsyncPubSubManager

try:
Expand Down Expand Up @@ -82,7 +82,7 @@ async def _publish(self, data):
try:
await self.publisher_exchange.publish(
aio_pika.Message(
body=pickle.dumps(data),
body=json.dumps(data),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
), routing_key='*',
)
Expand Down Expand Up @@ -113,7 +113,7 @@ async def _listen(self):
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
yield pickle.loads(message.body)
yield message.body
retry_sleep = 1
except aio_pika.AMQPException:
self._get_logger().error(
Expand Down
15 changes: 4 additions & 11 deletions src/socketio/async_pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import uuid

from engineio import json
import pickle

from .async_manager import AsyncManager

Expand Down Expand Up @@ -202,16 +201,10 @@ async def _thread(self):
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format(
data['method']))
Expand Down
4 changes: 2 additions & 2 deletions src/socketio/async_redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import pickle
from urllib.parse import urlparse

try: # pragma: no cover
Expand All @@ -20,6 +19,7 @@
valkey = None
ValkeyError = None

from engineio import json
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url

Expand Down Expand Up @@ -108,7 +108,7 @@ async def _publish(self, data):
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
self.channel, json.dumps(data))
except error as exc:
if retry:
self._get_logger().error(
Expand Down
6 changes: 3 additions & 3 deletions src/socketio/kafka_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import pickle

try:
import kafka
except ImportError:
kafka = None

from engineio import json
from .pubsub_manager import PubSubManager

logger = logging.getLogger('socketio')
Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, url='kafka://localhost:9092', channel='socketio',
bootstrap_servers=self.kafka_urls)

def _publish(self, data):
self.producer.send(self.channel, value=pickle.dumps(data))
self.producer.send(self.channel, value=json.dumps(data))
self.producer.flush()

def _kafka_listen(self):
Expand All @@ -62,4 +62,4 @@ def _kafka_listen(self):
def _listen(self):
for message in self._kafka_listen():
if message.topic == self.channel:
yield pickle.loads(message.value)
yield message.value
4 changes: 2 additions & 2 deletions src/socketio/kombu_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pickle
import time
import uuid

Expand All @@ -7,6 +6,7 @@
except ImportError:
kombu = None

from engineio import json
from .pubsub_manager import PubSubManager


Expand Down Expand Up @@ -102,7 +102,7 @@ def _publish(self, data):
try:
producer_publish = self._producer_publish(
self.publisher_connection)
producer_publish(pickle.dumps(data))
producer_publish(json.dumps(data))
break
except (OSError, kombu.exceptions.KombuError):
if retry:
Expand Down
15 changes: 4 additions & 11 deletions src/socketio/pubsub_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import uuid

from engineio import json
import pickle

from .manager import Manager

Expand Down Expand Up @@ -196,16 +195,10 @@ def _thread(self):
if isinstance(message, dict):
data = message
else:
if isinstance(message, bytes): # pragma: no cover
try:
data = pickle.loads(message)
except:
pass
if data is None:
try:
data = json.loads(message)
except:
pass
try:
data = json.loads(message)
except:
pass
if data and 'method' in data:
self._get_logger().debug('pubsub message: {}'.format(
data['method']))
Expand Down
4 changes: 2 additions & 2 deletions src/socketio/redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import pickle
import time
from urllib.parse import urlparse

Expand All @@ -17,6 +16,7 @@
valkey = None
ValkeyError = None

from engineio import json
from .pubsub_manager import PubSubManager

logger = logging.getLogger('socketio')
Expand Down Expand Up @@ -145,7 +145,7 @@ def _publish(self, data):
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
return self.redis.publish(self.channel, json.dumps(data))
except error as exc:
if retry:
logger.error(
Expand Down
10 changes: 5 additions & 5 deletions src/socketio/zmq_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
import re

from engineio import json
from .pubsub_manager import PubSubManager


Expand Down Expand Up @@ -75,14 +75,14 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556',
self.channel = channel

def _publish(self, data):
pickled_data = pickle.dumps(
packed_data = json.dumps(
{
'type': 'message',
'channel': self.channel,
'data': data
}
)
return self.sink.send(pickled_data)
).encode()
return self.sink.send(packed_data)

def zmq_listen(self):
while True:
Expand All @@ -94,7 +94,7 @@ def _listen(self):
for message in self.zmq_listen():
if isinstance(message, bytes):
try:
message = pickle.loads(message)
message = json.loads(message)
except Exception:
pass
if isinstance(message, dict) and \
Expand Down
13 changes: 6 additions & 7 deletions tests/async/test_pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import functools
import json
from unittest import mock

import pytest
Expand Down Expand Up @@ -482,31 +483,29 @@ async def test_background_thread(self):
host_id = self.pm.host_id

async def messages():
import pickle

yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'
yield b'bad data'

# these should not publish anything on the queue, as they come from
# the same host
yield {'method': 'emit', 'value': 'foo', 'host_id': host_id}
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})

self.pm._listen = messages
await self.pm._thread()
Expand Down
13 changes: 6 additions & 7 deletions tests/common/test_pubsub_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import json
import logging
from unittest import mock

Expand Down Expand Up @@ -465,31 +466,29 @@ def test_background_thread(self):
host_id = self.pm.host_id

def messages():
import pickle

yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'}
yield {'missing': 'method', 'host_id': 'x'}
yield '{"method": "callback", "value": "bar", "host_id": "x"}'
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': 'x'}
yield {'method': 'bogus', 'host_id': 'x'}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': 'x'})
yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo',
'room': 'room', 'host_id': 'x'}
yield 'bad json'
yield b'bad pickled'
yield b'bad data'

# these should not publish anything on the queue, as they come from
# the same host
yield {'method': 'emit', 'value': 'foo', 'host_id': host_id}
yield {'method': 'callback', 'value': 'bar', 'host_id': host_id}
yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo',
'host_id': host_id}
yield pickle.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})
yield json.dumps({'method': 'close_room', 'value': 'baz',
'host_id': host_id})

self.pm._listen = mock.MagicMock(side_effect=messages)
try:
Expand Down
Loading