Skip to content

Commit 6e2d0de

Browse files
Restore support for rediss:// URLs, and add valkeys://
1 parent a8deb3a commit 6e2d0de

File tree

5 files changed

+263
-39
lines changed

5 files changed

+263
-39
lines changed

src/socketio/async_redis_manager.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from urllib.parse import urlparse
33

4-
try: # pragma: no cover
4+
try:
55
from redis import asyncio as aioredis
66
from redis.exceptions import RedisError
77
except ImportError: # pragma: no cover
@@ -12,19 +12,19 @@
1212
aioredis = None
1313
RedisError = None
1414

15-
try: # pragma: no cover
16-
from valkey import asyncio as valkey
15+
try:
16+
from valkey import asyncio as aiovalkey
1717
from valkey.exceptions import ValkeyError
1818
except ImportError: # pragma: no cover
19-
valkey = None
19+
aiovalkey = None
2020
ValkeyError = None
2121

2222
from engineio import json
2323
from .async_pubsub_manager import AsyncPubSubManager
2424
from .redis_manager import parse_redis_sentinel_url
2525

2626

27-
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
27+
class AsyncRedisManager(AsyncPubSubManager):
2828
"""Redis based client manager for asyncio servers.
2929
3030
This class implements a Redis backend for event sharing across multiple
@@ -55,12 +55,8 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
5555

5656
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
5757
write_only=False, logger=None, redis_options=None):
58-
if aioredis is None and valkey is None:
59-
raise RuntimeError('Redis package is not installed '
60-
'(Run "pip install redis" or '
61-
'"pip install valkey" '
62-
'in your virtualenv).')
63-
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
58+
if aioredis and \
59+
not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover
6460
raise RuntimeError('Version 2 of aioredis package is required.')
6561
super().__init__(channel=channel, write_only=write_only, logger=logger)
6662
self.redis_url = url
@@ -69,20 +65,31 @@ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
6965

7066
def _get_redis_module_and_error(self):
7167
parsed_url = urlparse(self.redis_url)
72-
schema = parsed_url.scheme.split('+', 1)[0].lower()
73-
if schema in ['redis', 'unix']:
68+
scheme = parsed_url.scheme.split('+', 1)[0].lower()
69+
if scheme in ['redis', 'rediss']:
7470
if aioredis is None or RedisError is None:
7571
raise RuntimeError('Redis package is not installed '
7672
'(Run "pip install redis" '
7773
'in your virtualenv).')
7874
return aioredis, RedisError
79-
if schema == 'valkey':
80-
if valkey is None or ValkeyError is None:
75+
if scheme in ['valkey', 'valkeys']:
76+
if aiovalkey is None or ValkeyError is None:
8177
raise RuntimeError('Valkey package is not installed '
8278
'(Run "pip install valkey" '
8379
'in your virtualenv).')
84-
return valkey, ValkeyError
85-
error_msg = f'Unsupported Redis URL schema: {schema}'
80+
return aiovalkey, ValkeyError
81+
if scheme == 'unix':
82+
if aioredis is None or RedisError is None:
83+
if aiovalkey is None or ValkeyError is None:
84+
raise RuntimeError('Redis package is not installed '
85+
'(Run "pip install redis" '
86+
'or "pip install valkey" '
87+
'in your virtualenv).')
88+
else:
89+
return aiovalkey, ValkeyError
90+
else:
91+
return aioredis, RedisError
92+
error_msg = f'Unsupported Redis URL scheme: {scheme}'
8693
raise ValueError(error_msg)
8794

8895
def _redis_connect(self):
@@ -100,7 +107,7 @@ def _redis_connect(self):
100107
**self.redis_options)
101108
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
102109

103-
async def _publish(self, data):
110+
async def _publish(self, data): # pragma: no cover
104111
retry = True
105112
_, error = self._get_redis_module_and_error()
106113
while True:
@@ -124,7 +131,7 @@ async def _publish(self, data):
124131

125132
break
126133

127-
async def _redis_listen_with_retries(self):
134+
async def _redis_listen_with_retries(self): # pragma: no cover
128135
retry_sleep = 1
129136
connect = False
130137
_, error = self._get_redis_module_and_error()
@@ -147,7 +154,7 @@ async def _redis_listen_with_retries(self):
147154
if retry_sleep > 60:
148155
retry_sleep = 60
149156

150-
async def _listen(self):
157+
async def _listen(self): # pragma: no cover
151158
channel = self.channel.encode('utf-8')
152159
await self.pubsub.subscribe(self.channel)
153160
async for message in self._redis_listen_with_retries():

src/socketio/redis_manager.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
import time
33
from urllib.parse import urlparse
44

5-
try: # pragma: no cover
5+
try:
66
import redis
77
from redis.exceptions import RedisError
8-
except ImportError:
8+
except ImportError: # pragma: no cover
99
redis = None
1010
RedisError = None
1111

12-
try: # pragma: no cover
12+
try:
1313
import valkey
1414
from valkey.exceptions import ValkeyError
15-
except ImportError:
15+
except ImportError: # pragma: no cover
1616
valkey = None
1717
ValkeyError = None
1818

@@ -48,7 +48,7 @@ def parse_redis_sentinel_url(url):
4848
return sentinels, service_name, kwargs
4949

5050

51-
class RedisManager(PubSubManager): # pragma: no cover
51+
class RedisManager(PubSubManager):
5252
"""Redis based client manager.
5353
5454
This class implements a Redis backend for event sharing across multiple
@@ -80,17 +80,12 @@ class RedisManager(PubSubManager): # pragma: no cover
8080

8181
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
8282
write_only=False, logger=None, redis_options=None):
83-
if redis is None and valkey is None:
84-
raise RuntimeError('Redis package is not installed '
85-
'(Run "pip install redis" '
86-
'or "pip install valkey" '
87-
'in your virtualenv).')
8883
super().__init__(channel=channel, write_only=write_only, logger=logger)
8984
self.redis_url = url
9085
self.redis_options = redis_options or {}
9186
self._redis_connect()
9287

93-
def initialize(self):
88+
def initialize(self): # pragma: no cover
9489
super().initialize()
9590

9691
monkey_patched = True
@@ -107,20 +102,31 @@ def initialize(self):
107102

108103
def _get_redis_module_and_error(self):
109104
parsed_url = urlparse(self.redis_url)
110-
schema = parsed_url.scheme.split('+', 1)[0].lower()
111-
if schema in ['redis', 'unix']:
105+
scheme = parsed_url.scheme.split('+', 1)[0].lower()
106+
if scheme in ['redis', 'rediss']:
112107
if redis is None or RedisError is None:
113108
raise RuntimeError('Redis package is not installed '
114109
'(Run "pip install redis" '
115110
'in your virtualenv).')
116111
return redis, RedisError
117-
if schema == 'valkey':
112+
if scheme in ['valkey', 'valkeys']:
118113
if valkey is None or ValkeyError is None:
119114
raise RuntimeError('Valkey package is not installed '
120115
'(Run "pip install valkey" '
121116
'in your virtualenv).')
122117
return valkey, ValkeyError
123-
error_msg = f'Unsupported Redis URL schema: {schema}'
118+
if scheme == 'unix':
119+
if redis is None or RedisError is None:
120+
if valkey is None or ValkeyError is None:
121+
raise RuntimeError('Redis package is not installed '
122+
'(Run "pip install redis" '
123+
'or "pip install valkey" '
124+
'in your virtualenv).')
125+
else:
126+
return valkey, ValkeyError
127+
else:
128+
return redis, RedisError
129+
error_msg = f'Unsupported Redis URL scheme: {scheme}'
124130
raise ValueError(error_msg)
125131

126132
def _redis_connect(self):
@@ -138,7 +144,7 @@ def _redis_connect(self):
138144
**self.redis_options)
139145
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
140146

141-
def _publish(self, data):
147+
def _publish(self, data): # pragma: no cover
142148
retry = True
143149
_, error = self._get_redis_module_and_error()
144150
while True:
@@ -160,7 +166,7 @@ def _publish(self, data):
160166
)
161167
break
162168

163-
def _redis_listen_with_retries(self):
169+
def _redis_listen_with_retries(self): # pragma: no cover
164170
retry_sleep = 1
165171
connect = False
166172
_, error = self._get_redis_module_and_error()
@@ -181,7 +187,7 @@ def _redis_listen_with_retries(self):
181187
if retry_sleep > 60:
182188
retry_sleep = 60
183189

184-
def _listen(self):
190+
def _listen(self): # pragma: no cover
185191
channel = self.channel.encode('utf-8')
186192
self.pubsub.subscribe(self.channel)
187193
for message in self._redis_listen_with_retries():

tests/async/test_redis_manager.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import pytest
2+
import redis
3+
import valkey
4+
5+
from socketio import async_redis_manager
6+
from socketio.async_redis_manager import AsyncRedisManager
7+
8+
9+
class TestAsyncRedisManager:
10+
def test_redis_not_installed(self):
11+
saved_redis = async_redis_manager.aioredis
12+
async_redis_manager.aioredis = None
13+
14+
with pytest.raises(RuntimeError):
15+
AsyncRedisManager('redis://')
16+
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
17+
18+
async_redis_manager.aioredis = saved_redis
19+
20+
def test_valkey_not_installed(self):
21+
saved_valkey = async_redis_manager.aiovalkey
22+
async_redis_manager.aiovalkey = None
23+
24+
with pytest.raises(RuntimeError):
25+
AsyncRedisManager('valkey://')
26+
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
27+
28+
async_redis_manager.aiovalkey = saved_valkey
29+
30+
def test_redis_valkey_not_installed(self):
31+
saved_redis = async_redis_manager.aioredis
32+
async_redis_manager.aioredis = None
33+
saved_valkey = async_redis_manager.aiovalkey
34+
async_redis_manager.aiovalkey = None
35+
36+
with pytest.raises(RuntimeError):
37+
AsyncRedisManager('redis://')
38+
with pytest.raises(RuntimeError):
39+
AsyncRedisManager('valkey://')
40+
with pytest.raises(RuntimeError):
41+
AsyncRedisManager('unix:///var/sock/redis.sock')
42+
43+
async_redis_manager.aioredis = saved_redis
44+
async_redis_manager.aiovalkey = saved_valkey
45+
46+
def test_bad_url(self):
47+
with pytest.raises(ValueError):
48+
AsyncRedisManager('http://localhost:6379')
49+
50+
def test_redis_connect(self):
51+
urls = [
52+
'redis://localhost:6379',
53+
'redis://localhost:6379/0',
54+
'redis://:password@localhost:6379',
55+
'redis://:password@localhost:6379/0',
56+
'redis://user:password@localhost:6379',
57+
'redis://user:password@localhost:6379/0',
58+
59+
'rediss://localhost:6379',
60+
'rediss://localhost:6379/0',
61+
'rediss://:password@localhost:6379',
62+
'rediss://:password@localhost:6379/0',
63+
'rediss://user:password@localhost:6379',
64+
'rediss://user:password@localhost:6379/0',
65+
66+
'unix:///var/sock/redis.sock',
67+
'unix:///var/sock/redis.sock?db=0',
68+
'unix://user@/var/sock/redis.sock',
69+
'unix://user@/var/sock/redis.sock?db=0',
70+
71+
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
72+
]
73+
for url in urls:
74+
c = AsyncRedisManager(url)
75+
assert isinstance(c.redis, redis.asyncio.Redis)
76+
77+
def test_valkey_connect(self):
78+
saved_redis = async_redis_manager.aioredis
79+
async_redis_manager.aioredis = None
80+
81+
urls = [
82+
'valkey://localhost:6379',
83+
'valkey://localhost:6379/0',
84+
'valkey://:password@localhost:6379',
85+
'valkey://:password@localhost:6379/0',
86+
'valkey://user:password@localhost:6379',
87+
'valkey://user:password@localhost:6379/0',
88+
89+
'valkeys://localhost:6379',
90+
'valkeys://localhost:6379/0',
91+
'valkeys://:password@localhost:6379',
92+
'valkeys://:password@localhost:6379/0',
93+
'valkeys://user:password@localhost:6379',
94+
'valkeys://user:password@localhost:6379/0',
95+
96+
'unix:///var/sock/redis.sock',
97+
'unix:///var/sock/redis.sock?db=0',
98+
'unix://user@/var/sock/redis.sock',
99+
'unix://user@/var/sock/redis.sock?db=0',
100+
101+
'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
102+
]
103+
for url in urls:
104+
c = AsyncRedisManager(url)
105+
assert isinstance(c.redis, valkey.asyncio.Valkey)
106+
107+
async_redis_manager.aioredis = saved_redis

0 commit comments

Comments
 (0)