Skip to content

Commit 505e293

Browse files
committed
added async sentinel client
1 parent 4398249 commit 505e293

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1-
from .default import AsyncDefaultClient
2-
from .herd import AsyncHerdClient
1+
from django_valkey.async_cache.client.default import AsyncDefaultClient
2+
from django_valkey.async_cache.client.herd import AsyncHerdClient
3+
from django_valkey.async_cache.client.sentinel import AsyncSentinelClient
4+
5+
__all__ = ["AsyncDefaultClient", "AsyncHerdClient", "AsyncSentinelClient"]
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from urllib.parse import parse_qs, urlparse
2+
3+
from django.core.exceptions import ImproperlyConfigured
4+
from valkey.asyncio.sentinel import SentinelConnectionPool
5+
6+
from django_valkey.async_cache.client.default import AsyncDefaultClient
7+
from django_valkey.client.sentinel import replace_query
8+
9+
10+
class AsyncSentinelClient(AsyncDefaultClient):
11+
CONNECTION_FACTORY_PATH = (
12+
"django_valkey.async_cache.pool.AsyncSentinelConnectionFactory"
13+
)
14+
"""
15+
Sentinel client which uses the single valkey URL specified by the CACHE's
16+
LOCATION to create a LOCATION configuration for two connection pools; One
17+
pool for the primaries and another pool for the replicas, and upon
18+
connecting ensures the connection pool factory is configured correctly.
19+
"""
20+
21+
def __init__(self, server, params, backend):
22+
if isinstance(server, str):
23+
url = urlparse(server)
24+
primary_query = parse_qs(url.query, keep_blank_values=True)
25+
replica_query = primary_query
26+
primary_query["is_master"] = [1] # type: ignore
27+
replica_query["is_master"] = [0] # type: ignore
28+
29+
server = [replace_query(url, i) for i in (primary_query, replica_query)]
30+
31+
super().__init__(server, params, backend)
32+
33+
async def aconnect(self, *args, **kwargs):
34+
connection = await super().aconnect(*args, **kwargs)
35+
if not isinstance(connection.connection_pool, SentinelConnectionPool):
36+
error_message = (
37+
"Settings DJANGO_VALKEY_CONNECTION_FACTORY or "
38+
"CACHE[].OPTIONS.CONNECTION_POOL_CLASS is not configured correctly."
39+
)
40+
raise ImproperlyConfigured(error_message)
41+
42+
return connection
43+
44+
connect = aconnect

0 commit comments

Comments
 (0)