diff --git a/docs/server.rst b/docs/server.rst index 508bd46f..9261a425 100644 --- a/docs/server.rst +++ b/docs/server.rst @@ -1096,17 +1096,17 @@ For a production deployment there are a few recommendations to keep your application secure. First of all, the message queue should never be listening on a public network -interface, to ensure that external clients never connect to it. The use of a -private network (VPC), where the communication between servers can happen -privately is highly recommended. - -In addition, all message queues support authentication and encryption. -Authentication ensures that only the Socket.IO servers and related processes -have access, while encryption prevents data to be collected by a third-party -listening on the network. - -Access credentials can be included in the connection URLs that are passed to the -client managers. +interface, to ensure that external clients never connect to it. For a single +node deployment, the queue should only listen on `localhost`. For a multi-node +system the use of a private network (VPC), where the communication between +servers can happen privately is highly recommended. + +In addition, all message queues support authentication and encryption, which +can strenthen the security of the deployment. Authentication ensures that only +the Socket.IO servers and related processes have access, while encryption +prevents data from being collected by a third-party that is listening on the +network. Access credentials can be included in the connection URLs that are +passed to the client managers. Horizontal Scaling ~~~~~~~~~~~~~~~~~~ diff --git a/src/socketio/async_aiopika_manager.py b/src/socketio/async_aiopika_manager.py index 003b67bc..1485d37a 100644 --- a/src/socketio/async_aiopika_manager.py +++ b/src/socketio/async_aiopika_manager.py @@ -1,6 +1,6 @@ import asyncio -import pickle +from engineio import json from .async_pubsub_manager import AsyncPubSubManager try: @@ -82,7 +82,7 @@ async def _publish(self, data): try: await self.publisher_exchange.publish( aio_pika.Message( - body=pickle.dumps(data), + body=json.dumps(data), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ), routing_key='*', ) @@ -113,7 +113,7 @@ async def _listen(self): async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): - yield pickle.loads(message.body) + yield message.body retry_sleep = 1 except aio_pika.AMQPException: self._get_logger().error( diff --git a/src/socketio/async_pubsub_manager.py b/src/socketio/async_pubsub_manager.py index 72946eb2..9ccc3382 100644 --- a/src/socketio/async_pubsub_manager.py +++ b/src/socketio/async_pubsub_manager.py @@ -3,7 +3,6 @@ import uuid from engineio import json -import pickle from .async_manager import AsyncManager @@ -202,16 +201,10 @@ async def _thread(self): if isinstance(message, dict): data = message else: - if isinstance(message, bytes): # pragma: no cover - try: - data = pickle.loads(message) - except: - pass - if data is None: - try: - data = json.loads(message) - except: - pass + try: + data = json.loads(message) + except: + pass if data and 'method' in data: self._get_logger().debug('pubsub message: {}'.format( data['method'])) diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index b37e9059..4f9e3264 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -1,5 +1,4 @@ import asyncio -import pickle from urllib.parse import urlparse try: # pragma: no cover @@ -20,6 +19,7 @@ valkey = None ValkeyError = None +from engineio import json from .async_pubsub_manager import AsyncPubSubManager from .redis_manager import parse_redis_sentinel_url @@ -108,7 +108,7 @@ async def _publish(self, data): if not retry: self._redis_connect() return await self.redis.publish( - self.channel, pickle.dumps(data)) + self.channel, json.dumps(data)) except error as exc: if retry: self._get_logger().error( diff --git a/src/socketio/kafka_manager.py b/src/socketio/kafka_manager.py index 11b87ad8..a9f1a075 100644 --- a/src/socketio/kafka_manager.py +++ b/src/socketio/kafka_manager.py @@ -1,11 +1,11 @@ import logging -import pickle try: import kafka except ImportError: kafka = None +from engineio import json from .pubsub_manager import PubSubManager logger = logging.getLogger('socketio') @@ -53,7 +53,7 @@ def __init__(self, url='kafka://localhost:9092', channel='socketio', bootstrap_servers=self.kafka_urls) def _publish(self, data): - self.producer.send(self.channel, value=pickle.dumps(data)) + self.producer.send(self.channel, value=json.dumps(data)) self.producer.flush() def _kafka_listen(self): @@ -62,4 +62,4 @@ def _kafka_listen(self): def _listen(self): for message in self._kafka_listen(): if message.topic == self.channel: - yield pickle.loads(message.value) + yield message.value diff --git a/src/socketio/kombu_manager.py b/src/socketio/kombu_manager.py index 09e260c9..bc84bbb9 100644 --- a/src/socketio/kombu_manager.py +++ b/src/socketio/kombu_manager.py @@ -1,4 +1,3 @@ -import pickle import time import uuid @@ -7,6 +6,7 @@ except ImportError: kombu = None +from engineio import json from .pubsub_manager import PubSubManager @@ -102,7 +102,7 @@ def _publish(self, data): try: producer_publish = self._producer_publish( self.publisher_connection) - producer_publish(pickle.dumps(data)) + producer_publish(json.dumps(data)) break except (OSError, kombu.exceptions.KombuError): if retry: diff --git a/src/socketio/pubsub_manager.py b/src/socketio/pubsub_manager.py index 3270b4cb..80744f2f 100644 --- a/src/socketio/pubsub_manager.py +++ b/src/socketio/pubsub_manager.py @@ -2,7 +2,6 @@ import uuid from engineio import json -import pickle from .manager import Manager @@ -196,16 +195,10 @@ def _thread(self): if isinstance(message, dict): data = message else: - if isinstance(message, bytes): # pragma: no cover - try: - data = pickle.loads(message) - except: - pass - if data is None: - try: - data = json.loads(message) - except: - pass + try: + data = json.loads(message) + except: + pass if data and 'method' in data: self._get_logger().debug('pubsub message: {}'.format( data['method'])) diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index 2e68c31c..4f701b92 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -1,5 +1,4 @@ import logging -import pickle import time from urllib.parse import urlparse @@ -17,6 +16,7 @@ valkey = None ValkeyError = None +from engineio import json from .pubsub_manager import PubSubManager logger = logging.getLogger('socketio') @@ -145,7 +145,7 @@ def _publish(self, data): try: if not retry: self._redis_connect() - return self.redis.publish(self.channel, pickle.dumps(data)) + return self.redis.publish(self.channel, json.dumps(data)) except error as exc: if retry: logger.error( diff --git a/src/socketio/zmq_manager.py b/src/socketio/zmq_manager.py index aa5a49a2..a71b869c 100644 --- a/src/socketio/zmq_manager.py +++ b/src/socketio/zmq_manager.py @@ -1,6 +1,6 @@ -import pickle import re +from engineio import json from .pubsub_manager import PubSubManager @@ -75,14 +75,14 @@ def __init__(self, url='zmq+tcp://localhost:5555+5556', self.channel = channel def _publish(self, data): - pickled_data = pickle.dumps( + packed_data = json.dumps( { 'type': 'message', 'channel': self.channel, 'data': data } - ) - return self.sink.send(pickled_data) + ).encode() + return self.sink.send(packed_data) def zmq_listen(self): while True: @@ -94,7 +94,7 @@ def _listen(self): for message in self.zmq_listen(): if isinstance(message, bytes): try: - message = pickle.loads(message) + message = json.loads(message) except Exception: pass if isinstance(message, dict) and \ diff --git a/tests/async/test_pubsub_manager.py b/tests/async/test_pubsub_manager.py index 71d948a6..4a7012ee 100644 --- a/tests/async/test_pubsub_manager.py +++ b/tests/async/test_pubsub_manager.py @@ -1,5 +1,6 @@ import asyncio import functools +import json from unittest import mock import pytest @@ -482,22 +483,20 @@ async def test_background_thread(self): host_id = self.pm.host_id async def messages(): - import pickle - yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} yield {'missing': 'method', 'host_id': 'x'} yield '{"method": "callback", "value": "bar", "host_id": "x"}' yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': 'x'} yield {'method': 'bogus', 'host_id': 'x'} - yield pickle.dumps({'method': 'close_room', 'value': 'baz', - 'host_id': 'x'}) + yield json.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': 'x'}) yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', 'room': 'room', 'host_id': 'x'} yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', 'room': 'room', 'host_id': 'x'} yield 'bad json' - yield b'bad pickled' + yield b'bad data' # these should not publish anything on the queue, as they come from # the same host @@ -505,8 +504,8 @@ async def messages(): yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': host_id} - yield pickle.dumps({'method': 'close_room', 'value': 'baz', - 'host_id': host_id}) + yield json.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': host_id}) self.pm._listen = messages await self.pm._thread() diff --git a/tests/common/test_pubsub_manager.py b/tests/common/test_pubsub_manager.py index 6d8eda75..32c7d41d 100644 --- a/tests/common/test_pubsub_manager.py +++ b/tests/common/test_pubsub_manager.py @@ -1,4 +1,5 @@ import functools +import json import logging from unittest import mock @@ -465,22 +466,20 @@ def test_background_thread(self): host_id = self.pm.host_id def messages(): - import pickle - yield {'method': 'emit', 'value': 'foo', 'host_id': 'x'} yield {'missing': 'method', 'host_id': 'x'} yield '{"method": "callback", "value": "bar", "host_id": "x"}' yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': 'x'} yield {'method': 'bogus', 'host_id': 'x'} - yield pickle.dumps({'method': 'close_room', 'value': 'baz', - 'host_id': 'x'}) + yield json.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': 'x'}) yield {'method': 'enter_room', 'sid': '123', 'namespace': '/foo', 'room': 'room', 'host_id': 'x'} yield {'method': 'leave_room', 'sid': '123', 'namespace': '/foo', 'room': 'room', 'host_id': 'x'} yield 'bad json' - yield b'bad pickled' + yield b'bad data' # these should not publish anything on the queue, as they come from # the same host @@ -488,8 +487,8 @@ def messages(): yield {'method': 'callback', 'value': 'bar', 'host_id': host_id} yield {'method': 'disconnect', 'sid': '123', 'namespace': '/foo', 'host_id': host_id} - yield pickle.dumps({'method': 'close_room', 'value': 'baz', - 'host_id': host_id}) + yield json.dumps({'method': 'close_room', 'value': 'baz', + 'host_id': host_id}) self.pm._listen = mock.MagicMock(side_effect=messages) try: