Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/socketio/async_redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
RedisError = None

from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url


class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
Expand All @@ -29,15 +30,18 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
client_manager=socketio.AsyncRedisManager(url))

:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use an
SSL connection, use ``rediss://``.
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``.
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'

Expand All @@ -54,8 +58,16 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
super().__init__(channel=channel, write_only=write_only, logger=logger)

def _redis_connect(self):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)

async def _publish(self, data):
Expand Down
48 changes: 43 additions & 5 deletions src/socketio/redis_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import pickle
import time
from urllib.parse import urlparse

try:
import redis
Expand All @@ -12,6 +13,32 @@
logger = logging.getLogger('socketio')


def parse_redis_sentinel_url(url):
"""Parse a Redis Sentinel URL with the format:
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'redis+sentinel':
raise ValueError('Invalid Redis Sentinel URL')
sentinels = []
for host_port in parsed_url.netloc.split('@')[-1].split(','):
host, port = host_port.rsplit(':', 1)
sentinels.append((host, int(port)))
kwargs = {}
if parsed_url.username:
kwargs['username'] = parsed_url.username
if parsed_url.password:
kwargs['password'] = parsed_url.password
service_name = None
if parsed_url.path:
parts = parsed_url.path.split('/')
if len(parts) >= 2 and parts[1] != '':
kwargs['db'] = int(parts[1])
if len(parts) >= 3 and parts[2] != '':
service_name = parts[2]
return sentinels, service_name, kwargs


class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager.

Expand All @@ -27,15 +54,18 @@ class RedisManager(PubSubManager): # pragma: no cover
server = socketio.Server(client_manager=socketio.RedisManager(url))

:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use an
SSL connection, use ``rediss://``.
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()``.
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'redis'

Expand Down Expand Up @@ -66,8 +96,16 @@ def initialize(self):
'with ' + self.server.async_mode)

def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)

def _publish(self, data):
Expand Down
38 changes: 38 additions & 0 deletions tests/common/test_redis_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

from socketio.redis_manager import parse_redis_sentinel_url


class TestPubSubManager:
def test_sentinel_url_parser(self):
with pytest.raises(ValueError):
parse_redis_sentinel_url('redis://localhost:6379/0')

assert parse_redis_sentinel_url(
'redis+sentinel://localhost:6379'
) == (
[('localhost', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
) == (
[('192.168.0.1', 6379), ('192.168.0.2', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://h1:6379,h2:6379/0'
) == (
[('h1', 6379), ('h2', 6379)],
None,
{'db': 0}
)
assert parse_redis_sentinel_url(
'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
) == (
[('h1', 6379), ('h2', 6379), ('h1', 6380)],
'myredis',
{'username': 'user', 'password': 'password', 'db': 0}
)
Loading