Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions crates/matrix-sdk-base/src/event_cache/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ impl EventCacheStoreLock {

Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
}

/// Acquire a spin lock and return an owned guard that can be used to access
/// the cache store.
pub async fn lock_owned(&self) -> Result<OwnedEventCacheStoreLockGuard, LockStoreError> {
let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;

Ok(OwnedEventCacheStoreLockGuard {
cross_process_lock_guard: Arc::new(cross_process_lock_guard),
store: self.store.clone(),
})
}
}

/// An RAII implementation of a owned “scoped lock” of an
/// [`EventCacheStoreLock`].
///
/// When this structure is dropped (falls out of scope), the lock will be
/// unlocked.
#[derive(Clone)]
pub struct OwnedEventCacheStoreLockGuard {
/// The cross process lock guard.
#[allow(unused)]
cross_process_lock_guard: Arc<CrossProcessStoreLockGuard>,

/// A reference to the store.
pub store: Arc<DynEventCacheStore>,
}

#[cfg(not(tarpaulin_include))]
impl fmt::Debug for OwnedEventCacheStoreLockGuard {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("OwnedEventCacheStoreLockGuard").finish_non_exhaustive()
}
}

/// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`].
Expand Down
6 changes: 2 additions & 4 deletions crates/matrix-sdk-sqlite/src/event_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ impl SqliteEventCacheStore {
// Acquire a connection for executing read operations.
#[instrument(skip_all)]
async fn read(&self) -> Result<SqliteAsyncConn> {
trace!("Taking a `read` connection");
let _timer = timer!("connection");
let _timer = timer!("Taking a `read` connection");

let connection = self.pool.get().await?;

Expand All @@ -198,8 +197,7 @@ impl SqliteEventCacheStore {
// Acquire a connection for executing write operations.
#[instrument(skip_all)]
async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
trace!("Taking a `write` connection");
let _timer = timer!("connection");
let _timer = timer!("Taking a `write` connection");

let connection = self.write_connection.clone().lock_owned().await;

Expand Down
20 changes: 12 additions & 8 deletions crates/matrix-sdk/src/event_cache/deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::collections::BTreeSet;

use matrix_sdk_base::{
event_cache::store::EventCacheStoreLock,
event_cache::store::OwnedEventCacheStoreLockGuard,
linked_chunk::{LinkedChunkId, Position},
};
use ruma::OwnedEventId;
Expand All @@ -32,7 +32,7 @@ use super::{
/// information about the duplicates found in the new events, including the
/// events that are not loaded in memory.
pub async fn filter_duplicate_events(
store: &EventCacheStoreLock,
store: OwnedEventCacheStoreLockGuard,
linked_chunk_id: LinkedChunkId<'_>,
linked_chunk: &EventLinkedChunk,
mut new_events: Vec<Event>,
Expand All @@ -50,10 +50,9 @@ pub async fn filter_duplicate_events(
});
}

let store = store.lock().await?;

// Let the store do its magic ✨
let duplicated_event_ids = store
.store
.filter_duplicated_events(
linked_chunk_id,
new_events.iter().filter_map(|event| event.event_id()).collect(),
Expand Down Expand Up @@ -148,7 +147,10 @@ pub(super) struct DeduplicationOutcome {
mod tests {
use std::ops::Not as _;

use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
use matrix_sdk_base::{
deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock,
linked_chunk::ChunkIdentifier,
};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use ruma::{owned_event_id, serde::Raw, user_id, EventId};

Expand Down Expand Up @@ -222,6 +224,7 @@ mod tests {
.unwrap();

let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
let locked_store = event_cache_store.lock_owned().await.unwrap();

{
// When presenting with only duplicate events, some of them in the in-memory
Expand All @@ -232,7 +235,7 @@ mod tests {
linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]);

let outcome = filter_duplicate_events(
&event_cache_store,
locked_store.clone(),
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()],
Expand All @@ -247,7 +250,7 @@ mod tests {
linked_chunk.push_events([event_2.clone(), event_3.clone()]);

let outcome = filter_duplicate_events(
&event_cache_store,
locked_store,
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![event_0, event_1, event_2, event_3, event_4],
Expand Down Expand Up @@ -351,6 +354,7 @@ mod tests {

// Wrap the store into its lock.
let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
let locked_store = event_cache_store.lock_owned().await.unwrap();

let linked_chunk = EventLinkedChunk::new();

Expand All @@ -360,7 +364,7 @@ mod tests {
in_store_duplicated_event_ids,
non_empty_all_duplicates,
} = filter_duplicate_events(
&event_cache_store,
locked_store,
LinkedChunkId::Room(room_id),
&linked_chunk,
vec![ev1, ev2, ev3, ev4],
Expand Down
95 changes: 65 additions & 30 deletions crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures_util::future::{join_all, try_join_all};
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, TimelineEvent},
event_cache::{
store::{EventCacheStoreError, EventCacheStoreLock},
store::{DynEventCacheStore, EventCacheStoreError, EventCacheStoreLock},
Gap,
},
executor::AbortOnDrop,
Expand Down Expand Up @@ -358,18 +358,29 @@ impl EventCache {
while let Some(room_id) = rx.recv().await {
trace!(for_room = %room_id, "received notification to shrink");

let room = match inner.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
let room = {
let Ok(store) = inner
.store
.lock()
.await
.inspect_err(|err| warn!("Failed to lock the store: {err}"))
else {
continue;
};

match inner.for_room(&*store, &room_id).await {
Ok(room) => room,
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `RoomEventCache`: {err}");
continue;
}
}
};

trace!("waiting for state lock…");
let mut state = room.inner.state.write().await;

match state.auto_shrink_if_no_subscribers().await {
match state.auto_shrink_if_no_subscribers(&room.inner.store).await {
Ok(diffs) => {
if let Some(diffs) = diffs {
// Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
Expand Down Expand Up @@ -408,7 +419,8 @@ impl EventCache {
return Err(EventCacheError::NotSubscribedYet);
};

let room = self.inner.for_room(room_id).await?;
let store = self.inner.store.lock().await?;
let room = self.inner.for_room(&*store, room_id).await?;

Ok((room, drop_handles))
}
Expand Down Expand Up @@ -804,25 +816,31 @@ impl EventCacheInner {
.await;

// Clear the storage for all the rooms, using the storage facility.
self.store.lock().await?.clear_all_linked_chunks().await?;
let locked_store = self.store.lock_owned().await?;

locked_store.store.clear_all_linked_chunks().await?;

// At this point, all the in-memory linked chunks are desynchronized from the
// storage. Resynchronize them manually by calling reset(), and
// propagate updates to observers.
try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
let updates_as_vector_diffs = state_guard.reset().await?;
try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| {
let locked_store = locked_store.clone();

let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});
async move {
let updates_as_vector_diffs = state_guard.reset(locked_store).await?;

let _ = room
.inner
.generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });
let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});

Ok::<_, EventCacheError>(())
let _ = room
.inner
.generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room.inner.room_id.clone() });

Ok::<_, EventCacheError>(())
}
}))
.await?;

Expand All @@ -839,16 +857,20 @@ impl EventCacheInner {
self.multiple_room_updates_lock.lock().await
};

let locked_store = self.store.lock_owned().await?;

// Note: bnjbvr tried to make this concurrent at some point, but it turned out
// to be a performance regression, even for large sync updates. Lacking
// time to investigate, this code remains sequential for now. See also
// https://github.com/matrix-org/matrix-rust-sdk/pull/5426.

// Left rooms.
for (room_id, left_room_update) in updates.left {
let room = self.for_room(&room_id).await?;
let room = self.for_room(&*locked_store.store, &room_id).await?;

if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
if let Err(err) =
room.inner.handle_left_room_update(locked_store.clone(), left_room_update).await
{
// Non-fatal error, try to continue to the next room.
error!("handling left room update: {err}");
}
Expand All @@ -858,9 +880,11 @@ impl EventCacheInner {
for (room_id, joined_room_update) in updates.joined {
trace!(?room_id, "Handling a `JoinedRoomUpdate`");

let room = self.for_room(&room_id).await?;
let room = self.for_room(&*locked_store.store, &room_id).await?;

if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
if let Err(err) =
room.inner.handle_joined_room_update(locked_store.clone(), joined_room_update).await
{
// Non-fatal error, try to continue to the next room.
error!(%room_id, "handling joined room update: {err}");
}
Expand All @@ -873,7 +897,11 @@ impl EventCacheInner {
}

/// Return a room-specific view over the [`EventCache`].
async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
async fn for_room(
&self,
store: &DynEventCacheStore,
room_id: &RoomId,
) -> Result<RoomEventCache> {
// Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
// write lock.
let by_room_guard = self.by_room.read().await;
Expand Down Expand Up @@ -908,7 +936,7 @@ impl EventCacheInner {
room_id.to_owned(),
room_version_rules,
self.linked_chunk_update_sender.clone(),
self.store.clone(),
store,
pagination_status.clone(),
)
.await?;
Expand All @@ -925,6 +953,7 @@ impl EventCacheInner {

let room_event_cache = RoomEventCache::new(
self.client.clone(),
self.store.clone(),
room_state,
pagination_status,
room_id.to_owned(),
Expand Down Expand Up @@ -1125,11 +1154,17 @@ mod tests {
.unwrap();
let account_data = vec![read_marker_event; 100];

room_event_cache
.inner
.handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
.await
.unwrap();
{
let locked_store = room_event_cache.inner.store.lock_owned().await.unwrap();
room_event_cache
.inner
.handle_joined_room_update(
locked_store,
JoinedRoomUpdate { account_data, ..Default::default() },
)
.await
.unwrap();
}

// … there's only one read marker update.
assert_matches!(
Expand Down
5 changes: 4 additions & 1 deletion crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl RoomPagination {
loop {
let mut state_guard = self.inner.state.write().await;

match state_guard.load_more_events_backwards().await? {
match state_guard.load_more_events_backwards(&self.inner.store).await? {
LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);

Expand Down Expand Up @@ -276,12 +276,15 @@ impl RoomPagination {
.await
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;

let locked_store = self.inner.store.lock_owned().await?;

if let Some((outcome, timeline_event_diffs)) = self
.inner
.state
.write()
.await
.handle_backpagination(
locked_store,
response.chunk,
response.end,
prev_token,
Expand Down
Loading
Loading