Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit 4d5d02d

Browse files
committed
Async get event cache prep (matrix-org#13242)
Some experimental prep work to enable external event caching based on matrix-org#9379 & matrix-org#12955. Doesn't actually move the cache at all, just lays the groundwork for async implemented caches. Signed off by Nick @ Beeper (@Fizzadar)
1 parent f891932 commit 4d5d02d

File tree

7 files changed

+43
-20
lines changed

7 files changed

+43
-20
lines changed

synapse/storage/database.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -822,8 +822,10 @@ async def _runInteraction() -> R:
822822

823823
return cast(R, result)
824824
except Exception:
825-
for after_callback, after_args, after_kwargs in exception_callbacks:
826-
after_callback(*after_args, **after_kwargs)
825+
for exception_callback, after_args, after_kwargs in exception_callbacks:
826+
await maybe_awaitable(
827+
exception_callback(*after_args, **after_kwargs)
828+
)
827829
raise
828830

829831
# To handle cancellation, we ensure that `after_callback`s and

synapse/storage/databases/main/cache.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ def _invalidate_caches_for_event(
193193
relates_to: Optional[str],
194194
backfilled: bool,
195195
) -> None:
196+
# This invalidates any local in-memory cached event objects, the original
197+
# process triggering the invalidation is responsible for clearing any external
198+
# cached objects.
196199
self._invalidate_local_get_event_cache(event_id)
197200
self.have_seen_event.invalidate((room_id, event_id))
198201

synapse/storage/databases/main/events_worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -745,11 +745,17 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
745745
return event_entry_map
746746

747747
async def _invalidate_get_event_cache(self, event_id: str) -> None:
748+
# First we invalidate the asynchronous cache instance. This may include
749+
# out-of-process caches such as Redis/memcache. Once complete we can
750+
# invalidate any in memory cache. The ordering is important here to
751+
# ensure we don't pull in any remote invalid value after we invalidate
752+
# the in-memory cache.
748753
await self._get_event_cache.invalidate((event_id,))
749-
self._invalidate_local_get_event_cache(event_id)
754+
self._event_ref.pop(event_id, None)
755+
self._current_event_fetches.pop(event_id, None)
750756

751757
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
752-
self._get_event_cache.lru_cache.invalidate((event_id,))
758+
self._get_event_cache.invalidate_local((event_id,))
753759
self._event_ref.pop(event_id, None)
754760
self._current_event_fetches.pop(event_id, None)
755761

synapse/util/caches/lrucache.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -734,27 +734,37 @@ def __del__(self) -> None:
734734

735735
class AsyncLruCache(Generic[KT, VT]):
736736
"""
737-
An asynchronous wrapper around a subset of the LruCache API. On it's own
738-
this doesn't change the behaviour but allows subclasses that utilize
739-
external cache systems that require await behaviour to be created.
737+
An asynchronous wrapper around a subset of the LruCache API.
738+
739+
On its own this doesn't change the behaviour but allows subclasses that
740+
utilize external cache systems that require await behaviour to be created.
740741
"""
741742

742743
def __init__(self, *args, **kwargs): # type: ignore
743-
self.lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
744+
self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
744745

745746
async def get(
746747
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
747748
) -> Optional[VT]:
748-
return self.lru_cache.get(key, update_metrics=update_metrics)
749+
return self._lru_cache.get(key, update_metrics=update_metrics)
749750

750751
async def set(self, key: KT, value: VT) -> None:
751-
self.lru_cache.set(key, value)
752+
self._lru_cache.set(key, value)
752753

753754
async def invalidate(self, key: KT) -> None:
754-
return self.lru_cache.invalidate(key)
755+
# This method should invalidate any external cache and then invalidate the LruCache.
756+
return self._lru_cache.invalidate(key)
757+
758+
def invalidate_local(self, key: KT) -> None:
759+
"""Remove an entry from the local cache
760+
761+
This variant of `invalidate` is useful if we know that the external
762+
cache has already been invalidated.
763+
"""
764+
return self._lru_cache.invalidate(key)
755765

756766
async def contains(self, key: KT) -> bool:
757-
return self.lru_cache.contains(key)
767+
return self._lru_cache.contains(key)
758768

759-
def clear(self) -> None:
760-
self.lru_cache.clear()
769+
async def clear(self) -> None:
770+
self._lru_cache.clear()

synapse/util/caches/redis_caches.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from typing import Any, Generic, Optional, Union, TYPE_CHECKING
21
from functools import wraps
2+
from typing import TYPE_CHECKING, Any, Generic, Optional, Union
33

44
from synapse.util.caches.lrucache import KT, VT, AsyncLruCache, T
55

@@ -21,7 +21,9 @@ async def _wrapped(**kwargs):
2121

2222
values.update(missing_values)
2323
return values
24+
2425
return _wrapped
26+
2527
return decorator
2628

2729

tests/handlers/test_sync.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def test_unknown_room_version(self):
159159

160160
# Blow away caches (supported room versions can only change due to a restart).
161161
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
162-
self.store._get_event_cache.clear()
162+
self.get_success(self.store._get_event_cache.clear())
163163
self.store._event_ref.clear()
164164

165165
# The rooms should be excluded from the sync response.

tests/storage/databases/main/test_events_worker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def prepare(self, reactor, clock, hs):
143143
self.event_id = res["event_id"]
144144

145145
# Reset the event cache so the tests start with it empty
146-
self.store._get_event_cache.clear()
146+
self.get_success(self.store._get_event_cache.clear())
147147

148148
def test_simple(self):
149149
"""Test that we cache events that we pull from the DB."""
@@ -160,7 +160,7 @@ def test_event_ref(self):
160160
"""
161161

162162
# Reset the event cache
163-
self.store._get_event_cache.clear()
163+
self.get_success(self.store._get_event_cache.clear())
164164

165165
with LoggingContext("test") as ctx:
166166
# We keep hold of the event event though we never use it.
@@ -170,7 +170,7 @@ def test_event_ref(self):
170170
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)
171171

172172
# Reset the event cache
173-
self.store._get_event_cache.clear()
173+
self.get_success(self.store._get_event_cache.clear())
174174

175175
with LoggingContext("test") as ctx:
176176
self.get_success(self.store.get_event(self.event_id))
@@ -345,7 +345,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer):
345345
self.event_id = res["event_id"]
346346

347347
# Reset the event cache so the tests start with it empty
348-
self.store._get_event_cache.clear()
348+
self.get_success(self.store._get_event_cache.clear())
349349

350350
@contextmanager
351351
def blocking_get_event_calls(

0 commit comments

Comments
 (0)