Skip to content

Commit f245191

Browse files
sam-moslehmiguelgrinberg
authored andcommitted
Migrate async Redis client manager to aioredis 2.x (Fixes #771)
1 parent 4f5bf1e commit f245191

File tree

6 files changed

+76
-157
lines changed

6 files changed

+76
-157
lines changed

src/socketio/asyncio_aiopika_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async def _listen(self):
9494
async with self.listener_queue.iterator() as queue_iter:
9595
async for message in queue_iter:
9696
with message.process():
97-
return pickle.loads(message.body)
97+
yield pickle.loads(message.body)
9898
except Exception:
9999
self._get_logger().error('Cannot receive from rabbitmq... '
100100
'retrying in '

src/socketio/asyncio_pubsub_manager.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -148,35 +148,34 @@ async def _handle_close_room(self, message):
148148
async def _thread(self):
149149
while True:
150150
try:
151-
message = await self._listen()
151+
async for message in self._listen(): # pragma: no branch
152+
data = None
153+
if isinstance(message, dict):
154+
data = message
155+
else:
156+
if isinstance(message, bytes): # pragma: no cover
157+
try:
158+
data = pickle.loads(message)
159+
except:
160+
pass
161+
if data is None:
162+
try:
163+
data = json.loads(message)
164+
except:
165+
pass
166+
if data and 'method' in data:
167+
self._get_logger().info('pubsub message: {}'.format(
168+
data['method']))
169+
if data['method'] == 'emit':
170+
await self._handle_emit(data)
171+
elif data['method'] == 'callback':
172+
await self._handle_callback(data)
173+
elif data['method'] == 'disconnect':
174+
await self._handle_disconnect(data)
175+
elif data['method'] == 'close_room':
176+
await self._handle_close_room(data)
152177
except asyncio.CancelledError: # pragma: no cover
153178
break
154-
except:
179+
except: # pragma: no cover
155180
import traceback
156181
traceback.print_exc()
157-
break
158-
data = None
159-
if isinstance(message, dict):
160-
data = message
161-
else:
162-
if isinstance(message, bytes): # pragma: no cover
163-
try:
164-
data = pickle.loads(message)
165-
except:
166-
pass
167-
if data is None:
168-
try:
169-
data = json.loads(message)
170-
except:
171-
pass
172-
if data and 'method' in data:
173-
self._get_logger().info('pubsub message: {}'.format(
174-
data['method']))
175-
if data['method'] == 'emit':
176-
await self._handle_emit(data)
177-
elif data['method'] == 'callback':
178-
await self._handle_callback(data)
179-
elif data['method'] == 'disconnect':
180-
await self._handle_disconnect(data)
181-
elif data['method'] == 'close_room':
182-
await self._handle_close_room(data)
Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import pickle
3-
from urllib.parse import urlparse
43

54
try:
65
import aioredis
@@ -10,34 +9,18 @@
109
from .asyncio_pubsub_manager import AsyncPubSubManager
1110

1211

13-
def _parse_redis_url(url):
14-
p = urlparse(url)
15-
if p.scheme not in {'redis', 'rediss'}:
16-
raise ValueError('Invalid redis url')
17-
ssl = p.scheme == 'rediss'
18-
host = p.hostname or 'localhost'
19-
port = p.port or 6379
20-
password = p.password
21-
if p.path:
22-
db = int(p.path[1:])
23-
else:
24-
db = 0
25-
return host, port, password, db, ssl
26-
27-
2812
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
2913
"""Redis based client manager for asyncio servers.
3014
3115
This class implements a Redis backend for event sharing across multiple
32-
processes. Only kept here as one more example of how to build a custom
33-
backend, since the kombu backend is perfectly adequate to support a Redis
34-
message queue.
16+
processes.
3517
36-
To use a Redis backend, initialize the :class:`Server` instance as
18+
To use a Redis backend, initialize the :class:`AsyncServer` instance as
3719
follows::
3820
39-
server = socketio.Server(client_manager=socketio.AsyncRedisManager(
40-
'redis://hostname:port/0'))
21+
url = 'redis://hostname:port/0'
22+
server = socketio.AsyncServer(
23+
client_manager=socketio.AsyncRedisManager(url))
4124
4225
:param url: The connection URL for the Redis server. For a default Redis
4326
store running on the same host, use ``redis://``. To use an
@@ -47,62 +30,73 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
4730
:param write_only: If set to ``True``, only initialize to emit events. The
4831
default of ``False`` initializes the class for emitting
4932
and receiving.
33+
:param redis_options: additional keyword arguments to be passed to
34+
``aioredis.from_url()``.
5035
"""
5136
name = 'aioredis'
5237

5338
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
54-
write_only=False, logger=None):
39+
write_only=False, logger=None, redis_options=None):
5540
if aioredis is None:
5641
raise RuntimeError('Redis package is not installed '
5742
'(Run "pip install aioredis" in your '
5843
'virtualenv).')
59-
(
60-
self.host, self.port, self.password, self.db, self.ssl
61-
) = _parse_redis_url(url)
62-
self.pub = None
63-
self.sub = None
44+
if not hasattr(aioredis.Redis, 'from_url'):
45+
raise RuntimeError('Version 2 of aioredis package is required.')
46+
self.redis_url = url
47+
self.redis_options = redis_options or {}
48+
self._redis_connect()
6449
super().__init__(channel=channel, write_only=write_only, logger=logger)
6550

51+
def _redis_connect(self):
52+
self.redis = aioredis.Redis.from_url(self.redis_url,
53+
**self.redis_options)
54+
self.pubsub = self.redis.pubsub()
55+
6656
async def _publish(self, data):
6757
retry = True
6858
while True:
6959
try:
70-
if self.pub is None:
71-
self.pub = await aioredis.create_redis(
72-
(self.host, self.port), db=self.db,
73-
password=self.password, ssl=self.ssl
74-
)
75-
return await self.pub.publish(self.channel,
76-
pickle.dumps(data))
77-
except (aioredis.RedisError, OSError):
60+
if not retry:
61+
self._redis_connect()
62+
return await self.redis.publish(
63+
self.channel, pickle.dumps(data))
64+
except aioredis.exceptions.RedisError:
7865
if retry:
7966
self._get_logger().error('Cannot publish to redis... '
8067
'retrying')
81-
self.pub = None
8268
retry = False
8369
else:
8470
self._get_logger().error('Cannot publish to redis... '
8571
'giving up')
8672
break
8773

88-
async def _listen(self):
74+
async def _redis_listen_with_retries(self):
8975
retry_sleep = 1
76+
connect = False
9077
while True:
9178
try:
92-
if self.sub is None:
93-
self.sub = await aioredis.create_redis(
94-
(self.host, self.port), db=self.db,
95-
password=self.password, ssl=self.ssl
96-
)
97-
self.ch = (await self.sub.subscribe(self.channel))[0]
98-
retry_sleep = 1
99-
return await self.ch.get()
100-
except (aioredis.RedisError, OSError):
79+
if connect:
80+
self._redis_connect()
81+
await self.pubsub.subscribe(self.channel)
82+
retry_sleep = 1
83+
async for message in self.pubsub.listen():
84+
yield message
85+
except aioredis.exceptions.RedisError:
10186
self._get_logger().error('Cannot receive from redis... '
10287
'retrying in '
10388
'{} secs'.format(retry_sleep))
104-
self.sub = None
89+
connect = True
10590
await asyncio.sleep(retry_sleep)
10691
retry_sleep *= 2
10792
if retry_sleep > 60:
10893
retry_sleep = 60
94+
95+
async def _listen(self):
96+
channel = self.channel.encode('utf-8')
97+
await self.pubsub.subscribe(self.channel)
98+
async for message in self._redis_listen_with_retries():
99+
if message['channel'] == channel and \
100+
message['type'] == 'message' and 'data' in message:
101+
yield message['data']
102+
await self.pubsub.unsubscribe(self.channel)

src/socketio/redis_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class RedisManager(PubSubManager): # pragma: no cover
2727
server = socketio.Server(client_manager=socketio.RedisManager(url))
2828
2929
:param url: The connection URL for the Redis server. For a default Redis
30-
store running on the same host, use ``redis://``.
30+
store running on the same host, use ``redis://``. To use an
31+
SSL connection, use ``rediss://``.
3132
:param channel: The channel name on which the server sends and receives
3233
notifications. Must be the same in all the servers.
3334
:param write_only: If set to ``True``, only initialize to emit events. The

tests/asyncio/test_asyncio_pubsub_manager.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def test_background_thread(self):
417417
self.pm._handle_disconnect = AsyncMock()
418418
self.pm._handle_close_room = AsyncMock()
419419

420-
def messages():
420+
async def messages():
421421
import pickle
422422

423423
yield {'method': 'emit', 'value': 'foo'}
@@ -428,12 +428,10 @@ def messages():
428428
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
429429
yield 'bad json'
430430
yield b'bad pickled'
431+
raise asyncio.CancelledError() # force the thread to exit
431432

432-
self.pm._listen = AsyncMock(side_effect=list(messages()))
433-
try:
434-
_run(self.pm._thread())
435-
except StopIteration:
436-
pass
433+
self.pm._listen = messages
434+
_run(self.pm._thread())
437435

438436
self.pm._handle_emit.mock.assert_called_once_with(
439437
{'method': 'emit', 'value': 'foo'}

tests/asyncio/test_asyncio_redis_manager.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

0 commit comments

Comments
 (0)