diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 9e554a865ee5..3f5bbf12f83b 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -31,7 +31,9 @@ federation_ip_range_blacklist: [] # Disable server rate-limiting rc_federation: window_size: 1000 - sleep_limit: 10 + # foo: We run into the rate limiter hard with the MSC2716 tests. + # We go from 35s /messages requests to 20s just by making `/state_ids` and `/state` go faster + sleep_limit: 99999 sleep_delay: 500 reject_limit: 99999 concurrent: 3 diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index bde7a6648ae7..ffe14283a315 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -43,7 +43,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.tracing import ( @@ -435,6 +435,21 @@ async def enqueue( else: events.append(event) + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) + return ( events, self.main_store.get_room_max_token(), @@ -467,6 +482,21 @@ async def persist_event( replaced_event = replaced_events.get(event.event_id) if replaced_event: event = await self.main_store.get_event(replaced_event) + else: + # We expect events to be persisted by this point + assert event.internal_metadata.stream_ordering + # Invalidate related caches after we persist a new event + relation = relation_from_event(event) + self.main_store._invalidate_caches_for_event( + stream_ordering=event.internal_metadata.stream_ordering, + event_id=event.event_id, + room_id=event.room_id, + etype=event.type, + state_key=event.state_key if hasattr(event, "state_key") else None, + redacts=event.redacts, + relates_to=relation.parent_id if relation else None, + backfilled=backfilled, + ) event_stream_id = event.internal_metadata.stream_ordering # stream ordering should have been assigned by now diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a423826a..b28a91e8f638 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -54,6 +54,7 @@ def __init__( db_conn: LoggingDatabaseConnection, hs: "HomeServer", ): + logger.info("CacheInvalidationWorkerStore constructor") super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() @@ -222,8 +223,13 @@ def _invalidate_caches_for_event( # This invalidates any local in-memory cached event objects, the original # process triggering the invalidation is responsible for clearing any external # cached objects. + logger.info( + "CacheInvalidationWorkerStore _invalidate_caches_for_event room_id=%s event_id=%s", + room_id, + event_id, + ) self._invalidate_local_get_event_cache(event_id) - self.have_seen_event.invalidate((room_id, event_id)) + self.have_seen_event.invalidate(((room_id, event_id),)) self.get_latest_event_ids_in_room.invalidate((room_id,)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9f6b1fcef1ce..debe8e5f3f6e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1453,7 +1453,7 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]: @trace @tag_args async def have_seen_events( - self, room_id: str, event_ids: Iterable[str] + self, room_id: str, event_ids: Collection[str] ) -> Set[str]: """Given a list of event ids, check if we have already processed them. @@ -1468,6 +1468,7 @@ async def have_seen_events( Returns: The set of events we have already seen. """ + logger.info("have_seen_events room_id=%s event_ids=%s", room_id, event_ids) # @cachedList chomps lots of memory if you call it with a big list, so # we break it down. However, each batch requires its own index scan, so we make @@ -1491,6 +1492,7 @@ async def _have_seen_events_dict( Returns: a dict {(room_id, event_id)-> bool} """ + logger.info("_have_seen_events_dict keys=%s", keys) # if the event cache contains the event, obviously we've seen it. cache_results = { diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 6425f851eaa4..36b05fc344dc 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -383,8 +383,14 @@ def invalidate(self, key: KT) -> None: may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + import logging + + logger = logging.getLogger(__name__) + logger.info("DeferredCache before=%s", self.cache.len()) + logger.info("DeferredCache invalidate key=%s", key) self.check_thread() self.cache.del_multi(key) + logger.info("DeferredCache after=%s", self.cache.len()) # if we have a pending lookup for this key, remove it from the # _pending_deferred_cache, which will (a) stop it being returned for diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index aa93109d1380..5a745eb8c5b6 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -511,6 +511,7 @@ def add_node( callbacks, prune_unread_entries, ) + logger.info("LruCache add_node key=%s value=%s", key, value) cache[key] = node if size_callback: @@ -722,7 +723,12 @@ def cache_del_multi(key: KT) -> None: may be of lower cardinality than the TreeCache - in which case the whole subtree is deleted. """ + logger.info( + "LruCache cache values before pop %s", + {node.key: node.value for node in cache.values()}, + ) popped = cache.pop(key, None) + logger.info("LruCache cache_del_multi key=%s popped=%s", key, popped) if popped is None: return # for each deleted node, we now need to remove it from the linked list diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 0154f92107e7..ac9b7cb6c038 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -216,6 +216,9 @@ def __init__( self.reject_limit = config.reject_limit self.concurrent_requests = config.concurrent + logger.info("self.sleep_limit=%s", self.sleep_limit) + logger.info("self.reject_limit=%s", self.reject_limit) + # request_id objects for requests which have been slept self.sleeping_requests: Set[object] = set() diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 67401272ac37..158ad1f4393e 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -35,66 +35,45 @@ from synapse.util.async_helpers import yieldable_gather_results from tests import unittest +from tests.test_utils.event_injection import create_event, inject_event class HaveSeenEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + def prepare(self, reactor, clock, hs): + self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main - # insert some test data - for rid in ("room1", "room2"): - self.get_success( - self.store.db_pool.simple_insert( - "rooms", - {"room_id": rid, "room_version": 4}, - ) - ) + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.token) self.event_ids: List[str] = [] - for idx, rid in enumerate( - ( - "room1", - "room1", - "room1", - "room2", - ) - ): - event_json = {"type": f"test {idx}", "room_id": rid} - event = make_event_from_dict(event_json, room_version=RoomVersions.V4) - event_id = event.event_id - - self.get_success( - self.store.db_pool.simple_insert( - "events", - { - "event_id": event_id, - "room_id": rid, - "topological_ordering": idx, - "stream_ordering": idx, - "type": event.type, - "processed": True, - "outlier": False, - }, - ) - ) - self.get_success( - self.store.db_pool.simple_insert( - "event_json", - { - "event_id": event_id, - "room_id": rid, - "json": json.dumps(event_json), - "internal_metadata": "{}", - "format_version": 3, - }, + for i in range(3): + event = self.get_success( + inject_event( + hs, + room_version=RoomVersions.V7.identifier, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": f"foobarbaz{i}"}, ) ) - self.event_ids.append(event_id) + + self.event_ids.append(event.event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) @@ -104,7 +83,9 @@ def test_simple(self): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) @@ -116,11 +97,54 @@ def test_query_via_event_cache(self): # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0]]) + self.store.have_seen_events(self.room_id, [self.event_ids[0]]) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) + def test_persisting_event_invalidates_cache(self): + event, event_context = self.get_success( + create_event( + self.hs, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": "garply"}, + ) + ) + + with LoggingContext(name="test") as ctx: + # First, check `have_seen_event` for an event we have not seen yet + # to prime the cache with a `false` value. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, set()) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + # Persist the event which should invalidate or prefill the + # `have_seen_event` cache so we don't return stale values. + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, + ) + ) + + with LoggingContext(name="test") as ctx: + # Check `have_seen_event` again and we should see the updated fact + # that we have now seen the event after persisting it. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, {event.event_id}) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + class EventCacheTestCase(unittest.HomeserverTestCase): """Test that the various layers of event cache works.""" diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index 8027c7a856e2..978dffcda2ff 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -93,7 +93,8 @@ async def create_event( KNOWN_ROOM_VERSIONS[room_version], kwargs ) event, context = await hs.get_event_creation_handler().create_new_client_event( - builder, prev_event_ids=prev_event_ids + builder, + prev_event_ids=prev_event_ids, ) return event, context