diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 85a582b4d1a..e6f6e749726 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -92,6 +92,39 @@ impl EventCacheStoreLock { Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }) } + + /// Acquire a spin lock and return an owned guard that can be used to access + /// the cache store. + pub async fn lock_owned(&self) -> Result { + let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?; + + Ok(OwnedEventCacheStoreLockGuard { + cross_process_lock_guard: Arc::new(cross_process_lock_guard), + store: self.store.clone(), + }) + } +} + +/// An RAII implementation of a owned “scoped lock” of an +/// [`EventCacheStoreLock`]. +/// +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +#[derive(Clone)] +pub struct OwnedEventCacheStoreLockGuard { + /// The cross process lock guard. + #[allow(unused)] + cross_process_lock_guard: Arc, + + /// A reference to the store. + pub store: Arc, +} + +#[cfg(not(tarpaulin_include))] +impl fmt::Debug for OwnedEventCacheStoreLockGuard { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.debug_struct("OwnedEventCacheStoreLockGuard").finish_non_exhaustive() + } } /// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`]. diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index 65ba9a73fe0..574b4629d94 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -181,8 +181,7 @@ impl SqliteEventCacheStore { // Acquire a connection for executing read operations. #[instrument(skip_all)] async fn read(&self) -> Result { - trace!("Taking a `read` connection"); - let _timer = timer!("connection"); + let _timer = timer!("Taking a `read` connection"); let connection = self.pool.get().await?; @@ -198,8 +197,7 @@ impl SqliteEventCacheStore { // Acquire a connection for executing write operations. #[instrument(skip_all)] async fn write(&self) -> Result> { - trace!("Taking a `write` connection"); - let _timer = timer!("connection"); + let _timer = timer!("Taking a `write` connection"); let connection = self.write_connection.clone().lock_owned().await; diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 779cfb5468e..b3791714d1c 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreLock, + event_cache::store::OwnedEventCacheStoreLockGuard, linked_chunk::{LinkedChunkId, Position}, }; use ruma::OwnedEventId; @@ -32,7 +32,7 @@ use super::{ /// information about the duplicates found in the new events, including the /// events that are not loaded in memory. pub async fn filter_duplicate_events( - store: &EventCacheStoreLock, + store: OwnedEventCacheStoreLockGuard, linked_chunk_id: LinkedChunkId<'_>, linked_chunk: &EventLinkedChunk, mut new_events: Vec, @@ -50,10 +50,9 @@ pub async fn filter_duplicate_events( }); } - let store = store.lock().await?; - // Let the store do its magic ✨ let duplicated_event_ids = store + .store .filter_duplicated_events( linked_chunk_id, new_events.iter().filter_map(|event| event.event_id()).collect(), @@ -148,7 +147,10 @@ pub(super) struct DeduplicationOutcome { mod tests { use std::ops::Not as _; - use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier}; + use matrix_sdk_base::{ + deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock, + linked_chunk::ChunkIdentifier, + }; use matrix_sdk_test::{async_test, event_factory::EventFactory}; use ruma::{owned_event_id, serde::Raw, user_id, EventId}; @@ -222,6 +224,7 @@ mod tests { .unwrap(); let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let locked_store = event_cache_store.lock_owned().await.unwrap(); { // When presenting with only duplicate events, some of them in the in-memory @@ -232,7 +235,7 @@ mod tests { linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + locked_store.clone(), LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()], @@ -247,7 +250,7 @@ mod tests { linked_chunk.push_events([event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + locked_store, LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0, event_1, event_2, event_3, event_4], @@ -351,6 +354,7 @@ mod tests { // Wrap the store into its lock. let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let locked_store = event_cache_store.lock_owned().await.unwrap(); let linked_chunk = EventLinkedChunk::new(); @@ -360,7 +364,7 @@ mod tests { in_store_duplicated_event_ids, non_empty_all_duplicates, } = filter_duplicate_events( - &event_cache_store, + locked_store, LinkedChunkId::Room(room_id), &linked_chunk, vec![ev1, ev2, ev3, ev4], diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 45225d62c16..d1621cb8212 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -39,7 +39,7 @@ use futures_util::future::{join_all, try_join_all}; use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, TimelineEvent}, event_cache::{ - store::{EventCacheStoreError, EventCacheStoreLock}, + store::{DynEventCacheStore, EventCacheStoreError, EventCacheStoreLock}, Gap, }, executor::AbortOnDrop, @@ -358,18 +358,29 @@ impl EventCache { while let Some(room_id) = rx.recv().await { trace!(for_room = %room_id, "received notification to shrink"); - let room = match inner.for_room(&room_id).await { - Ok(room) => room, - Err(err) => { - warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); + let room = { + let Ok(store) = inner + .store + .lock() + .await + .inspect_err(|err| warn!("Failed to lock the store: {err}")) + else { continue; + }; + + match inner.for_room(&*store, &room_id).await { + Ok(room) => room, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}"); + continue; + } } }; trace!("waiting for state lock…"); let mut state = room.inner.state.write().await; - match state.auto_shrink_if_no_subscribers().await { + match state.auto_shrink_if_no_subscribers(&room.inner.store).await { Ok(diffs) => { if let Some(diffs) = diffs { // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any @@ -408,7 +419,8 @@ impl EventCache { return Err(EventCacheError::NotSubscribedYet); }; - let room = self.inner.for_room(room_id).await?; + let store = self.inner.store.lock().await?; + let room = self.inner.for_room(&*store, room_id).await?; Ok((room, drop_handles)) } @@ -804,25 +816,31 @@ impl EventCacheInner { .await; // Clear the storage for all the rooms, using the storage facility. - self.store.lock().await?.clear_all_linked_chunks().await?; + let locked_store = self.store.lock_owned().await?; + + locked_store.store.clear_all_linked_chunks().await?; // At this point, all the in-memory linked chunks are desynchronized from the // storage. Resynchronize them manually by calling reset(), and // propagate updates to observers. - try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move { - let updates_as_vector_diffs = state_guard.reset().await?; + try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| { + let locked_store = locked_store.clone(); - let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { - diffs: updates_as_vector_diffs, - origin: EventsOrigin::Cache, - }); + async move { + let updates_as_vector_diffs = state_guard.reset(locked_store).await?; - let _ = room - .inner - .generic_update_sender - .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() }); + let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { + diffs: updates_as_vector_diffs, + origin: EventsOrigin::Cache, + }); - Ok::<_, EventCacheError>(()) + let _ = room + .inner + .generic_update_sender + .send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() }); + + Ok::<_, EventCacheError>(()) + } })) .await?; @@ -839,6 +857,8 @@ impl EventCacheInner { self.multiple_room_updates_lock.lock().await }; + let locked_store = self.store.lock_owned().await?; + // Note: bnjbvr tried to make this concurrent at some point, but it turned out // to be a performance regression, even for large sync updates. Lacking // time to investigate, this code remains sequential for now. See also @@ -846,9 +866,11 @@ impl EventCacheInner { // Left rooms. for (room_id, left_room_update) in updates.left { - let room = self.for_room(&room_id).await?; + let room = self.for_room(&*locked_store.store, &room_id).await?; - if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { + if let Err(err) = + room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await + { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -858,9 +880,11 @@ impl EventCacheInner { for (room_id, joined_room_update) in updates.joined { trace!(?room_id, "Handling a `JoinedRoomUpdate`"); - let room = self.for_room(&room_id).await?; + let room = self.for_room(&*locked_store.store, &room_id).await?; - if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { + if let Err(err) = + room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await + { // Non-fatal error, try to continue to the next room. error!(%room_id, "handling joined room update: {err}"); } @@ -873,7 +897,11 @@ impl EventCacheInner { } /// Return a room-specific view over the [`EventCache`]. - async fn for_room(&self, room_id: &RoomId) -> Result { + async fn for_room( + &self, + store: &DynEventCacheStore, + room_id: &RoomId, + ) -> Result { // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a // write lock. let by_room_guard = self.by_room.read().await; @@ -908,7 +936,7 @@ impl EventCacheInner { room_id.to_owned(), room_version_rules, self.linked_chunk_update_sender.clone(), - self.store.clone(), + store, pagination_status.clone(), ) .await?; @@ -925,6 +953,7 @@ impl EventCacheInner { let room_event_cache = RoomEventCache::new( self.client.clone(), + self.store.clone(), room_state, pagination_status, room_id.to_owned(), @@ -1125,11 +1154,17 @@ mod tests { .unwrap(); let account_data = vec![read_marker_event; 100]; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { account_data, ..Default::default() }, + ) + .await + .unwrap(); + } // … there's only one read marker update. assert_matches!( diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 368959390aa..e928f4b6585 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -180,7 +180,7 @@ impl RoomPagination { loop { let mut state_guard = self.inner.state.write().await; - match state_guard.load_more_events_backwards().await? { + match state_guard.load_more_events_backwards(&self.inner.store).await? { LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => { const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3); @@ -276,12 +276,15 @@ impl RoomPagination { .await .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?; + let locked_store = self.inner.store.lock_owned().await?; + if let Some((outcome, timeline_event_diffs)) = self .inner .state .write() .await .handle_backpagination( + locked_store, response.chunk, response.end, prev_token, diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 4997e1eb825..c4f6e92dfab 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -29,7 +29,10 @@ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ deserialized_responses::AmbiguityChange, - event_cache::Event, + event_cache::{ + store::{EventCacheStoreLock, OwnedEventCacheStoreLockGuard}, + Event, + }, linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; @@ -163,6 +166,7 @@ impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. pub(super) fn new( client: WeakClient, + store: EventCacheStoreLock, state: RoomEventCacheState, pagination_status: SharedObservable, room_id: OwnedRoomId, @@ -172,6 +176,7 @@ impl RoomEventCache { Self { inner: Arc::new(RoomEventCacheInner::new( client, + store, state, pagination_status, room_id, @@ -331,11 +336,13 @@ impl RoomEventCache { /// It starts by looking into loaded events before looking inside the /// storage. pub async fn find_event(&self, event_id: &EventId) -> Option { + let locked_store = self.inner.store.lock_owned().await.ok()?; + self.inner .state .read() .await - .find_event(event_id) + .find_event(&locked_store, event_id) .await .ok() .flatten() @@ -363,7 +370,7 @@ impl RoomEventCache { .state .read() .await - .find_event_with_relations(event_id, filter.clone()) + .find_event_with_relations(&self.inner.store, event_id, filter.clone()) .await .ok() .flatten() @@ -375,7 +382,9 @@ impl RoomEventCache { /// storage. pub async fn clear(&self) -> Result<()> { // Clear the linked chunk and persisted storage. - let updates_as_vector_diffs = self.inner.state.write().await.reset().await?; + let locked_store = self.inner.store.lock_owned().await?; + + let updates_as_vector_diffs = self.inner.state.write().await.reset(locked_store).await?; // Notify observers about the update. let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { @@ -395,7 +404,13 @@ impl RoomEventCache { /// Save some events in the event cache, for further retrieval with /// [`Self::event`]. pub(crate) async fn save_events(&self, events: impl IntoIterator) { - if let Err(err) = self.inner.state.write().await.save_event(events).await { + let Ok(locked_store) = self.inner.store.lock_owned().await.inspect_err(|err| { + warn!("couldn't lock the event cache store: {err}"); + }) else { + return; + }; + + if let Err(err) = self.inner.state.write().await.save_event(locked_store, events).await { warn!("couldn't save event in the event cache: {err}"); } } @@ -414,6 +429,8 @@ pub(super) struct RoomEventCacheInner { pub weak_room: WeakRoom, + pub store: EventCacheStoreLock, + /// Sender part for subscribers to this room. pub sender: Sender, @@ -444,6 +461,7 @@ impl RoomEventCacheInner { /// to handle new timeline events. fn new( client: WeakClient, + store: EventCacheStoreLock, state: RoomEventCacheState, pagination_status: SharedObservable, room_id: OwnedRoomId, @@ -457,6 +475,7 @@ impl RoomEventCacheInner { weak_room, state: RwLock::new(state), sender, + store, pagination_batch_token_notifier: Default::default(), auto_shrink_sender, pagination_status, @@ -504,21 +523,32 @@ impl RoomEventCacheInner { } #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { + pub(super) async fn handle_joined_room_update( + &self, + locked_store: OwnedEventCacheStoreLockGuard, + updates: JoinedRoomUpdate, + ) -> Result<()> { self.handle_timeline( + locked_store, updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes, ) .await?; + self.handle_account_data(updates.account_data); Ok(()) } #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { - self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?; + pub(super) async fn handle_left_room_update( + &self, + locked_store: OwnedEventCacheStoreLockGuard, + updates: LeftRoomUpdate, + ) -> Result<()> { + self.handle_timeline(locked_store, updates.timeline, Vec::new(), updates.ambiguity_changes) + .await?; Ok(()) } @@ -527,6 +557,7 @@ impl RoomEventCacheInner { /// room. async fn handle_timeline( &self, + locked_store: OwnedEventCacheStoreLockGuard, timeline: Timeline, ephemeral_events: Vec>, ambiguity_changes: BTreeMap, @@ -553,6 +584,7 @@ impl RoomEventCacheInner { .write() .await .handle_sync( + locked_store, timeline, #[cfg(feature = "experimental-search")] &room, @@ -628,7 +660,7 @@ mod private { apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, event_cache::{ - store::{DynEventCacheStore, EventCacheStoreLock}, + store::{DynEventCacheStore, EventCacheStoreLock, OwnedEventCacheStoreLockGuard}, Event, Gap, }, linked_chunk::{ @@ -676,9 +708,6 @@ mod private { /// The rules for the version of this room. room_version_rules: RoomVersionRules, - /// Reference to the underlying backing store. - store: EventCacheStoreLock, - /// The loaded events for the current room, that is, the in-memory /// linked chunk for this room. room_linked_chunk: EventLinkedChunk, @@ -719,11 +748,9 @@ mod private { room_id: OwnedRoomId, room_version_rules: RoomVersionRules, linked_chunk_update_sender: Sender, - store: EventCacheStoreLock, + store: &DynEventCacheStore, pagination_status: SharedObservable, ) -> Result { - let store_lock = store.lock().await?; - let linked_chunk_id = LinkedChunkId::Room(&room_id); // Load the full linked chunk's metadata, so as to feed the order tracker. @@ -731,7 +758,7 @@ mod private { // If loading the full linked chunk failed, we'll clear the event cache, as it // indicates that at some point, there's some malformed data. let full_linked_chunk_metadata = - match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await { + match Self::load_linked_chunk_metadata(store, linked_chunk_id).await { Ok(metas) => metas, Err(err) => { error!( @@ -739,7 +766,7 @@ mod private { ); // Try to clear storage for this room. - store_lock + store .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -748,7 +775,7 @@ mod private { } }; - let linked_chunk = match store_lock + let linked_chunk = match store .load_last_chunk(linked_chunk_id) .await .map_err(EventCacheError::from) @@ -763,9 +790,7 @@ mod private { ); // Try to clear storage for this room. - store_lock - .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) - .await?; + store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?; None } @@ -785,7 +810,6 @@ mod private { Ok(Self { room: room_id, room_version_rules, - store, room_linked_chunk, threads, waited_for_initial_prev_token: false, @@ -946,6 +970,7 @@ mod private { /// Load more events backwards if the last chunk is **not** a gap. pub(in super::super) async fn load_more_events_backwards( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result { // If any in-memory chunk is a gap, don't load more events, and let the caller // resolve the gap. @@ -962,7 +987,7 @@ mod private { .expect("a linked chunk is never empty") .identifier(); - let store = self.store.lock().await?; + let store = store_lock.lock().await?; // The first chunk is not a gap, we can load its previous chunk. let linked_chunk_id = LinkedChunkId::Room(&self.room); @@ -1042,13 +1067,14 @@ mod private { /// pending diff updates with the result of this function. /// /// Otherwise, returns `None`. - pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> { - let store_lock = self.store.lock().await?; - + pub(super) async fn shrink_to_last_chunk( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { // Attempt to load the last chunk. let linked_chunk_id = LinkedChunkId::Room(&self.room); let (last_chunk, chunk_identifier_generator) = - match store_lock.load_last_chunk(linked_chunk_id).await { + match locked_store.store.load_last_chunk(linked_chunk_id).await { Ok(pair) => pair, Err(err) => { @@ -1056,7 +1082,8 @@ mod private { error!("error when reloading a linked chunk from memory: {err}"); // Clear storage for this room. - store_lock + locked_store + .store .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -1073,7 +1100,7 @@ mod private { self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator) { error!("error when replacing the linked chunk: {err}"); - return self.reset_internal().await; + return self.reset_internal(locked_store).await; } // Let pagination observers know that we may have not reached the start of the @@ -1093,6 +1120,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub(crate) async fn auto_shrink_if_no_subscribers( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result>>, EventCacheError> { let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst); @@ -1101,7 +1129,8 @@ mod private { if subscriber_count == 0 { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. - self.shrink_to_last_chunk().await?; + let locked_store = store_lock.lock_owned().await?; + self.shrink_to_last_chunk(locked_store).await?; Ok(Some(self.room_linked_chunk.updates_as_vector_diffs())) } else { Ok(None) @@ -1111,8 +1140,10 @@ mod private { #[cfg(test)] pub(crate) async fn force_shrink_to_last_chunk( &mut self, + store_lock: &EventCacheStoreLock, ) -> Result>, EventCacheError> { - self.shrink_to_last_chunk().await?; + let locked_store = store_lock.lock_owned().await?; + self.shrink_to_last_chunk(locked_store).await?; Ok(self.room_linked_chunk.updates_as_vector_diffs()) } @@ -1172,6 +1203,7 @@ mod private { #[instrument(skip_all)] async fn remove_events( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, in_memory_events: Vec<(OwnedEventId, Position)>, in_store_events: Vec<(OwnedEventId, Position)>, ) -> Result<(), EventCacheError> { @@ -1189,7 +1221,7 @@ mod private { .map(|pos| Update::RemoveItem { at: pos }) .collect::>(); - self.apply_store_only_updates(updates).await?; + self.apply_store_only_updates(locked_store.clone(), updates).await?; } // In-memory events. @@ -1205,13 +1237,16 @@ mod private { ) .expect("failed to remove an event"); - self.propagate_changes().await + self.propagate_changes(locked_store).await } /// Propagate changes to the underlying storage. - async fn propagate_changes(&mut self) -> Result<(), EventCacheError> { + async fn propagate_changes( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { let updates = self.room_linked_chunk.store_updates().take(); - self.send_updates_to_store(updates).await + self.send_updates_to_store(locked_store, updates).await } /// Apply some updates that are effective only on the store itself. @@ -1222,14 +1257,16 @@ mod private { /// storage. async fn apply_store_only_updates( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, updates: Vec>, ) -> Result<(), EventCacheError> { self.room_linked_chunk.order_tracker.map_updates(&updates); - self.send_updates_to_store(updates).await + self.send_updates_to_store(locked_store, updates).await } async fn send_updates_to_store( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, mut updates: Vec>, ) -> Result<(), EventCacheError> { if updates.is_empty() { @@ -1259,16 +1296,16 @@ mod private { // The store cross-process locking involves an actual mutex, which ensures that // storing updates happens in the expected order. - let store = self.store.clone(); let room_id = self.room.clone(); let cloned_updates = updates.clone(); spawn(async move { - let store = store.lock().await?; - trace!(updates = ?cloned_updates, "sending linked chunk updates to the store"); let linked_chunk_id = LinkedChunkId::Room(&room_id); - store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?; + locked_store + .store + .handle_linked_chunk_updates(linked_chunk_id, cloned_updates) + .await?; trace!("linked chunk updates applied"); super::Result::Ok(()) @@ -1291,8 +1328,11 @@ mod private { /// result, the caller may override any pending diff updates /// with the result of this function. #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub async fn reset(&mut self) -> Result>, EventCacheError> { - self.reset_internal().await?; + pub async fn reset( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result>, EventCacheError> { + self.reset_internal(locked_store).await?; let diff_updates = self.room_linked_chunk.updates_as_vector_diffs(); @@ -1303,7 +1343,10 @@ mod private { Ok(diff_updates) } - async fn reset_internal(&mut self) -> Result<(), EventCacheError> { + async fn reset_internal( + &mut self, + locked_store: OwnedEventCacheStoreLockGuard, + ) -> Result<(), EventCacheError> { self.room_linked_chunk.reset(); // No need to update the thread summaries: the room events are @@ -1315,7 +1358,7 @@ mod private { thread.clear(); } - self.propagate_changes().await?; + self.propagate_changes(locked_store).await?; // Reset the pagination state too: pretend we never waited for the initial // prev-batch token, and indicate that we're not at the start of the @@ -1350,6 +1393,7 @@ mod private { /// looking inside the storage. pub async fn find_event( &self, + locked_store: &OwnedEventCacheStoreLockGuard, event_id: &EventId, ) -> Result, EventCacheError> { // There are supposedly fewer events loaded in memory than in the store. Let's @@ -1360,9 +1404,8 @@ mod private { } } - let store = self.store.lock().await?; - - Ok(store + Ok(locked_store + .store .find_event(&self.room, event_id) .await? .map(|event| (EventLocation::Store, event))) @@ -1383,10 +1426,11 @@ mod private { /// chunk. pub async fn find_event_with_relations( &self, + store_lock: &EventCacheStoreLock, event_id: &EventId, filters: Option>, ) -> Result)>, EventCacheError> { - let store = self.store.lock().await?; + let store = store_lock.lock().await?; // First, hit storage to get the target event and its related events. let found = store.find_event(&self.room, event_id).await?; @@ -1501,17 +1545,18 @@ mod private { /// Flushes updates to disk first. async fn post_process_new_events( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, events: Vec, is_sync: bool, #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(), EventCacheError> { // Update the store before doing the post-processing. - self.propagate_changes().await?; + self.propagate_changes(locked_store.clone()).await?; let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); for event in events { - self.maybe_apply_new_redaction(&event).await?; // TODO: Handle redaction for search index + self.maybe_apply_new_redaction(locked_store.clone(), &event).await?; // TODO: Handle redaction for search index // We can also add the event to the index. #[cfg(feature = "experimental-search")] @@ -1530,11 +1575,11 @@ mod private { // Save a bundled thread event, if there was one. if let Some(bundled_thread) = event.bundled_latest_thread_event { - self.save_event([*bundled_thread]).await?; + self.save_event(locked_store.clone(), [*bundled_thread]).await?; } } - self.update_threads(new_events_by_thread, is_sync).await?; + self.update_threads(locked_store, new_events_by_thread, is_sync).await?; Ok(()) } @@ -1554,6 +1599,7 @@ mod private { #[instrument(skip_all)] async fn update_threads( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, new_events_by_thread: BTreeMap>, is_sync: bool, ) -> Result<(), EventCacheError> { @@ -1573,7 +1619,8 @@ mod private { let last_event_id = thread_cache.latest_event_id(); - let Some((location, mut target_event)) = self.find_event(&thread_root).await? + let Some((location, mut target_event)) = + self.find_event(&locked_store, &thread_root).await? else { trace!(%thread_root, "thread root event is missing from the linked chunk"); continue; @@ -1592,8 +1639,8 @@ mod private { // worry about filtering out aggregation events (like // reactions/edits/etc.). Pretty neat, huh? let num_replies = { - let store_guard = &*self.store.lock().await?; - let related_thread_events = store_guard + let related_thread_events = locked_store + .store .find_event_relations( &self.room, &thread_root, @@ -1616,7 +1663,7 @@ mod private { // Trigger an update to observers. target_event.thread_summary = ThreadSummaryStatus::Some(new_summary); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store.clone(), location, target_event).await?; } Ok(()) @@ -1630,6 +1677,7 @@ mod private { /// unlikely to observe the store updates directly. async fn replace_event_at( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, location: EventLocation, event: Event, ) -> Result<(), EventCacheError> { @@ -1640,10 +1688,10 @@ mod private { .expect("should have been a valid position of an item"); // We just changed the in-memory representation; synchronize this with // the store. - self.propagate_changes().await?; + self.propagate_changes(locked_store).await?; } EventLocation::Store => { - self.save_event([event]).await?; + self.save_event(locked_store, [event]).await?; } } @@ -1656,6 +1704,7 @@ mod private { #[instrument(skip_all)] async fn maybe_apply_new_redaction( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, event: &Event, ) -> Result<(), EventCacheError> { let raw_event = event.raw(); @@ -1683,7 +1732,9 @@ mod private { }; // Replace the redacted event by a redacted form, if we knew about it. - let Some((location, mut target_event)) = self.find_event(event_id).await? else { + let Some((location, mut target_event)) = + self.find_event(&locked_store, event_id).await? + else { trace!("redacted event is missing from the linked chunk"); return Ok(()); }; @@ -1715,7 +1766,7 @@ mod private { // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. target_event.replace_raw(redacted_event.cast_unchecked()); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store, location, target_event).await?; } Ok(()) @@ -1729,17 +1780,16 @@ mod private { /// the event. Instead, an update to the linked chunk must be used. pub async fn save_event( &self, + locked_store: OwnedEventCacheStoreLockGuard, events: impl IntoIterator, ) -> Result<(), EventCacheError> { - let store = self.store.clone(); let room_id = self.room.clone(); let events = events.into_iter().collect::>(); // Spawn a task so the save is uninterrupted by task cancellation. spawn(async move { - let store = store.lock().await?; for event in events { - store.save_event(&room_id, event).await?; + locked_store.store.save_event(&room_id, event).await?; } super::Result::Ok(()) }) @@ -1759,6 +1809,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub async fn handle_sync( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, mut timeline: Timeline, #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(bool, Vec>), EventCacheError> { @@ -1770,7 +1821,7 @@ mod private { in_store_duplicated_event_ids, non_empty_all_duplicates: all_duplicates, } = filter_duplicate_events( - &self.store, + locked_store.clone(), LinkedChunkId::Room(self.room.as_ref()), &self.room_linked_chunk, timeline.events, @@ -1813,7 +1864,8 @@ mod private { // thread event is. The thread count can remain as is, as it might still be // valid, and there's no good value to reset it to, anyways. for thread_root in summaries_to_update { - let Some((location, mut target_event)) = self.find_event(&thread_root).await? + let Some((location, mut target_event)) = + self.find_event(&locked_store, &thread_root).await? else { trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync"); continue; @@ -1824,7 +1876,7 @@ mod private { target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary); - self.replace_event_at(location, target_event).await?; + self.replace_event_at(locked_store.clone(), location, target_event).await?; } } } @@ -1847,13 +1899,18 @@ mod private { // // We don't have to worry the removals can change the position of the existing // events, because we are pushing all _new_ `events` at the back. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; + self.remove_events( + locked_store.clone(), + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + ) + .await?; self.room_linked_chunk .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events); self.post_process_new_events( + locked_store.clone(), events, true, #[cfg(feature = "experimental-search")] @@ -1868,7 +1925,7 @@ mod private { // // We must do this *after* persisting these events to storage (in // `post_process_new_events`). - self.shrink_to_last_chunk().await?; + self.shrink_to_last_chunk(locked_store).await?; } let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); @@ -1886,6 +1943,7 @@ mod private { #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub async fn handle_backpagination( &mut self, + locked_store: OwnedEventCacheStoreLockGuard, events: Vec, mut new_token: Option, prev_token: Option, @@ -1920,7 +1978,7 @@ mod private { in_store_duplicated_event_ids, non_empty_all_duplicates: all_duplicates, } = filter_duplicate_events( - &self.store, + locked_store.clone(), LinkedChunkId::Room(self.room.as_ref()), &self.room_linked_chunk, events, @@ -1942,8 +2000,12 @@ mod private { if !all_duplicates { // Let's forget all the previous events. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; + self.remove_events( + locked_store.clone(), + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + ) + .await?; } else { // All new events are duplicated, they can all be ignored. events.clear(); @@ -1965,6 +2027,7 @@ mod private { // Note: this flushes updates to the store. self.post_process_new_events( + locked_store, topo_ordered_events, false, #[cfg(feature = "experimental-search")] @@ -2347,11 +2410,17 @@ mod timed_tests { events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()], }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2426,11 +2495,17 @@ mod timed_tests { let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2754,11 +2829,17 @@ mod timed_tests { // A new update with one of these events leads to deduplication. let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] }; - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { timeline, ..Default::default() }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. There is a duplicate event, so // no generic changes whatsoever! @@ -2861,18 +2942,24 @@ mod timed_tests { // Propagate an update including a limited timeline with one message and a // prev-batch token. - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: true, - prev_batch: Some("raclette".to_owned()), - events: vec![f.text_msg("hey yo").into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("raclette".to_owned()), + events: vec![f.text_msg("hey yo").into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -2902,7 +2989,7 @@ mod timed_tests { // But if I manually reload more of the chunk, the gap will be present. assert_matches!( - state.load_more_events_backwards().await.unwrap(), + state.load_more_events_backwards(&room_event_cache.inner.store).await.unwrap(), LoadMoreEventsBackwardsOutcome::Gap { .. } ); @@ -2922,18 +3009,24 @@ mod timed_tests { // Now, propagate an update for another message, but the timeline isn't limited // this time. - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: false, - prev_batch: Some("fondue".to_owned()), - events: vec![f.text_msg("sup").into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: false, + prev_batch: Some("fondue".to_owned()), + events: vec![f.text_msg("sup").into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } // Just checking the generic update is correct. assert_matches!( @@ -3055,7 +3148,7 @@ mod timed_tests { .state .write() .await - .force_shrink_to_last_chunk() + .force_shrink_to_last_chunk(&room_event_cache.inner.store) .await .expect("shrinking should succeed"); @@ -3180,18 +3273,24 @@ mod timed_tests { // last chunk, and that the linked chunk only contains the last two // events. let evid4 = event_id!("$4"); - room_event_cache - .inner - .handle_joined_room_update(JoinedRoomUpdate { - timeline: Timeline { - limited: true, - prev_batch: Some("fondue".to_owned()), - events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], - }, - ..Default::default() - }) - .await - .unwrap(); + { + let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap(); + room_event_cache + .inner + .handle_joined_room_update( + locked_store, + JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("fondue".to_owned()), + events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()], + }, + ..Default::default() + }, + ) + .await + .unwrap(); + } { let state = room_event_cache.inner.state.read().await;