Skip to content

Commit 1b05d65

Browse files
committed
Added async logic
1 parent d2b8ca1 commit 1b05d65

File tree

11 files changed

+476
-6
lines changed

11 files changed

+476
-6
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Merijn Bertels <merijn.bertels@gmail.com>
3232
Omer Katz <omer.drow@gmail.com>
3333
Petr Knap <dev@petrknap.cz>
3434
Philip Neustrom <philipn@gmail.com>
35+
Philipp Thumfart <philipp@thumfart.eu>
3536
Pierre-Olivier Marec <pomarec@free.fr>
3637
Roman Krejcik <farin@farin.cz>
3738
Silvan Spross <silvan.spross@gmail.com>

constance/backends/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,31 @@ def get(self, key):
99
"""
1010
raise NotImplementedError
1111

12+
async def aget(self, key):
13+
"""
14+
Get the key from the backend store and return the value.
15+
Return None if not found.
16+
"""
17+
raise NotImplementedError
18+
1219
def mget(self, keys):
1320
"""
1421
Get the keys from the backend store and return a list of the values.
1522
Return an empty list if not found.
1623
"""
1724
raise NotImplementedError
1825

26+
async def amget(self, keys):
27+
"""
28+
Get the keys from the backend store and return a list of the values.
29+
Return an empty list if not found.
30+
"""
31+
raise NotImplementedError
32+
1933
def set(self, key, value):
2034
"""Add the value to the backend store given the key."""
2135
raise NotImplementedError
36+
37+
async def aset(self, key, value):
38+
"""Add the value to the backend store given the key."""
39+
raise NotImplementedError

constance/backends/database.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,46 @@ def get(self, key):
8585
self._cache.add(key, value)
8686
return value
8787

88+
async def aget(self, key):
89+
from asgiref.sync import sync_to_async
90+
91+
prefixed_key = self.add_prefix(key)
92+
value = None
93+
if self._cache:
94+
value = await self._cache.aget(prefixed_key)
95+
if value is None:
96+
await sync_to_async(self.autofill)()
97+
value = await self._cache.aget(prefixed_key)
98+
if value is None:
99+
match = await self._model._default_manager.filter(key=prefixed_key).only("value").afirst()
100+
if match:
101+
value = loads(match.value)
102+
if self._cache:
103+
await self._cache.aadd(prefixed_key, value)
104+
return value
105+
106+
async def amget(self, keys):
107+
if not keys:
108+
return {}
109+
110+
prefixed_keys_map = {self.add_prefix(key): key for key in keys}
111+
results = {}
112+
113+
if self._cache:
114+
cache_results = await self._cache.aget_many(prefixed_keys_map.keys())
115+
for prefixed_key, value in cache_results.items():
116+
results[prefixed_keys_map[prefixed_key]] = value
117+
118+
missing_prefixed_keys = [k for k in prefixed_keys_map if prefixed_keys_map[k] not in results]
119+
if missing_prefixed_keys:
120+
try:
121+
async for const in self._model._default_manager.filter(key__in=missing_prefixed_keys):
122+
results[prefixed_keys_map[const.key]] = loads(const.value)
123+
except (OperationalError, ProgrammingError):
124+
pass
125+
126+
return results
127+
88128
def set(self, key, value):
89129
key = self.add_prefix(key)
90130
created = False
@@ -119,6 +159,13 @@ def set(self, key, value):
119159

120160
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
121161

162+
async def aset(self, key, value):
163+
from asgiref.sync import sync_to_async
164+
165+
# We use sync_to_async because Django's transaction.atomic() and database connections are thread-local.
166+
# This ensures the operation runs in the correct database thread until native async transactions are supported.
167+
return await sync_to_async(self.set)(key, value)
168+
122169
def clear(self, sender, instance, created, **kwargs):
123170
if self._cache and not created:
124171
keys = [self.add_prefix(k) for k in settings.CONFIG]

constance/backends/memory.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ def get(self, key):
1919
with self._lock:
2020
return self._storage.get(key)
2121

22+
async def aget(self, key):
23+
# Memory operations are fast enough that we don't need true async here
24+
return self.get(key)
25+
2226
def mget(self, keys):
2327
if not keys:
2428
return None
@@ -30,8 +34,18 @@ def mget(self, keys):
3034
result.append((key, value))
3135
return result
3236

37+
async def amget(self, keys):
38+
if not keys:
39+
return {}
40+
with self._lock:
41+
return {key: self._storage[key] for key in keys if key in self._storage}
42+
3343
def set(self, key, value):
3444
with self._lock:
3545
old_value = self._storage.get(key)
3646
self._storage[key] = value
3747
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
48+
49+
async def aset(self, key, value):
50+
# Memory operations are fast enough that we don't need true async here
51+
self.set(key, value)

constance/backends/redisd.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from threading import RLock
23
from time import monotonic
34

@@ -19,16 +20,28 @@ def __init__(self):
1920
connection_cls = settings.REDIS_CONNECTION_CLASS
2021
if connection_cls is not None:
2122
self._rd = utils.import_module_attr(connection_cls)()
23+
self._ard = self._rd
2224
else:
2325
try:
2426
import redis
2527
except ImportError:
2628
raise ImproperlyConfigured("The Redis backend requires redis-py to be installed.") from None
29+
2730
if isinstance(settings.REDIS_CONNECTION, str):
2831
self._rd = redis.from_url(settings.REDIS_CONNECTION)
2932
else:
3033
self._rd = redis.Redis(**settings.REDIS_CONNECTION)
3134

35+
try:
36+
import redis.asyncio as aredis
37+
38+
if isinstance(settings.REDIS_CONNECTION, str):
39+
self._ard = aredis.from_url(settings.REDIS_CONNECTION)
40+
else:
41+
self._ard = aredis.Redis(**settings.REDIS_CONNECTION)
42+
except ImportError:
43+
self._ard = self._rd
44+
3245
def add_prefix(self, key):
3346
return f"{self._prefix}{key}"
3447

@@ -38,6 +51,15 @@ def get(self, key):
3851
return loads(value)
3952
return None
4053

54+
async def aget(self, key):
55+
if hasattr(self._ard, "aget"):
56+
value = await self._ard.aget(self.add_prefix(key))
57+
else:
58+
value = await asyncio.to_thread(self._rd.get, self.add_prefix(key))
59+
if value:
60+
return loads(value)
61+
return None
62+
4163
def mget(self, keys):
4264
if not keys:
4365
return
@@ -46,22 +68,49 @@ def mget(self, keys):
4668
if value:
4769
yield key, loads(value)
4870

71+
async def amget(self, keys):
72+
if not keys:
73+
return {}
74+
prefixed_keys = [self.add_prefix(key) for key in keys]
75+
if hasattr(self._ard, "amget"):
76+
values = await self._ard.amget(prefixed_keys)
77+
else:
78+
values = await asyncio.to_thread(self._rd.mget, prefixed_keys)
79+
return {key: loads(value) for key, value in zip(keys, values) if value}
80+
4981
def set(self, key, value):
5082
old_value = self.get(key)
5183
self._rd.set(self.add_prefix(key), dumps(value))
5284
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
5385

86+
async def aset(self, key, value):
87+
# We need the old value for the signal.
88+
# Signals are synchronous in Django, but we can't easily change that here.
89+
old_value = await self.aget(key)
90+
if hasattr(self._ard, "aset"):
91+
await self._ard.aset(self.add_prefix(key), dumps(value))
92+
else:
93+
await asyncio.to_thread(self._rd.set, self.add_prefix(key), dumps(value))
94+
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
95+
5496

5597
class CachingRedisBackend(RedisBackend):
5698
_sentinel = object()
5799
_lock = RLock()
100+
_async_lock = None # Lazy-initialized asyncio.Lock
58101

59102
def __init__(self):
60103
super().__init__()
61104
self._timeout = settings.REDIS_CACHE_TIMEOUT
62105
self._cache = {}
63106
self._sentinel = object()
64107

108+
def _get_async_lock(self):
109+
# Lazily create the asyncio lock to avoid issues with event loops
110+
if self._async_lock is None:
111+
self._async_lock = asyncio.Lock()
112+
return self._async_lock
113+
65114
def _has_expired(self, value):
66115
return value[0] <= monotonic()
67116

@@ -79,15 +128,70 @@ def get(self, key):
79128

80129
return value[1]
81130

131+
async def aget(self, key):
132+
value = self._cache.get(key, self._sentinel)
133+
134+
if value is self._sentinel or self._has_expired(value):
135+
async with self._get_async_lock():
136+
# Double-check after acquiring lock
137+
value = self._cache.get(key, self._sentinel)
138+
if value is self._sentinel or self._has_expired(value):
139+
new_value = await super().aget(key)
140+
self._cache_value(key, new_value)
141+
return new_value
142+
return value[1]
143+
144+
return value[1]
145+
82146
def set(self, key, value):
83147
with self._lock:
84148
super().set(key, value)
85149
self._cache_value(key, value)
86150

151+
async def aset(self, key, value):
152+
async with self._get_async_lock():
153+
await super().aset(key, value)
154+
self._cache_value(key, value)
155+
87156
def mget(self, keys):
88157
if not keys:
89158
return
90159
for key in keys:
91160
value = self.get(key)
92161
if value is not None:
93162
yield key, value
163+
164+
async def amget(self, keys):
165+
if not keys:
166+
return {}
167+
168+
results = {}
169+
missing_keys = []
170+
171+
# First, check the local cache for all keys
172+
for key in keys:
173+
value = self._cache.get(key, self._sentinel)
174+
if value is not self._sentinel and not self._has_expired(value):
175+
results[key] = value[1]
176+
else:
177+
missing_keys.append(key)
178+
179+
# Fetch missing keys from Redis
180+
if missing_keys:
181+
async with self._get_async_lock():
182+
# Re-check cache for keys that might have been fetched while waiting for lock
183+
still_missing = []
184+
for key in missing_keys:
185+
value = self._cache.get(key, self._sentinel)
186+
if value is not self._sentinel and not self._has_expired(value):
187+
results[key] = value[1]
188+
else:
189+
still_missing.append(key)
190+
191+
if still_missing:
192+
fetched = await super().amget(still_missing)
193+
for key, value in fetched.items():
194+
self._cache_value(key, value)
195+
results[key] = value
196+
197+
return results

0 commit comments

Comments
 (0)