diff --git a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs index 7c281a5c504..9046dead62e 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs @@ -1304,57 +1304,57 @@ macro_rules! event_cache_store_integration_tests_time { let store = get_event_cache_store().await.unwrap().into_event_cache_store(); let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first lock generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // not acquired // That's a nice test we got here, go take a little nap. sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs index c1ec5b9d4e5..1eb343089a7 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/memory_store.rs @@ -19,13 +19,16 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, + cross_process_lock::{ + CrossProcessLockGeneration, + memory_store_helper::{Lease, try_take_leased_lock}, + }, linked_chunk::{ ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, relational::RelationalLinkedChunk, }, }; -use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant}; +use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType}; use tracing::error; use super::{ @@ -43,7 +46,7 @@ pub struct MemoryStore { #[derive(Debug)] struct MemoryStoreInner { - leases: HashMap, + leases: HashMap, events: RelationalLinkedChunk, } @@ -75,7 +78,7 @@ impl EventCacheStore for MemoryStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { let mut inner = self.inner.write().unwrap(); Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder)) diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 39330ded1df..5c5b884256c 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -28,7 +28,8 @@ mod memory_store; mod traits; use matrix_sdk_common::cross_process_lock::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, + TryLock, }; pub use matrix_sdk_store_encryption::Error as StoreEncryptionError; use ruma::{ @@ -192,7 +193,7 @@ impl TryLock for LockableEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-base/src/event_cache/store/traits.rs b/crates/matrix-sdk-base/src/event_cache/store/traits.rs index c9cdf2b0161..67070389382 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/traits.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/traits.rs @@ -17,6 +17,7 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; use matrix_sdk_common::{ AsyncTraitDeps, + cross_process_lock::CrossProcessLockGeneration, linked_chunk::{ ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, RawChunk, Update, @@ -46,7 +47,7 @@ pub trait EventCacheStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// An [`Update`] reflects an operation that has happened inside a linked /// chunk. The linked chunk is used by the event cache to store the events @@ -191,7 +192,7 @@ impl EventCacheStore for EraseEventCacheStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-base/src/media/store/integration_tests.rs b/crates/matrix-sdk-base/src/media/store/integration_tests.rs index f7bbe59efb2..5dda5f12781 100644 --- a/crates/matrix-sdk-base/src/media/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/media/store/integration_tests.rs @@ -1337,57 +1337,57 @@ macro_rules! media_store_integration_tests_time { let store = get_media_store().await.unwrap(); let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first lock generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // not acquired // That's a nice test we got here, go take a little nap. sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-base/src/media/store/memory_store.rs b/crates/matrix-sdk-base/src/media/store/memory_store.rs index 8b0fab3e01d..b97c343adcb 100644 --- a/crates/matrix-sdk-base/src/media/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/media/store/memory_store.rs @@ -20,12 +20,13 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer, -}; -use ruma::{ - MxcUri, OwnedMxcUri, - time::{Instant, SystemTime}, + cross_process_lock::{ + CrossProcessLockGeneration, + memory_store_helper::{Lease, try_take_leased_lock}, + }, + ring_buffer::RingBuffer, }; +use ruma::{MxcUri, OwnedMxcUri, time::SystemTime}; use super::Result; use crate::media::{ @@ -48,7 +49,7 @@ pub struct MemoryMediaStore { #[derive(Debug)] struct MemoryMediaStoreInner { media: RingBuffer, - leases: HashMap, + leases: HashMap, media_retention_policy: Option, last_media_cleanup_time: SystemTime, } @@ -110,7 +111,7 @@ impl MediaStore for MemoryMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { let mut inner = self.inner.write().unwrap(); Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder)) diff --git a/crates/matrix-sdk-base/src/media/store/mod.rs b/crates/matrix-sdk-base/src/media/store/mod.rs index ce64a972620..5269c550c5f 100644 --- a/crates/matrix-sdk-base/src/media/store/mod.rs +++ b/crates/matrix-sdk-base/src/media/store/mod.rs @@ -32,7 +32,8 @@ use std::fmt; use std::{ops::Deref, sync::Arc}; use matrix_sdk_common::cross_process_lock::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, + TryLock, }; use matrix_sdk_store_encryption::Error as StoreEncryptionError; pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner}; @@ -172,7 +173,7 @@ impl TryLock for LockableMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-base/src/media/store/traits.rs b/crates/matrix-sdk-base/src/media/store/traits.rs index 553ffa3d0ff..73b85e391e2 100644 --- a/crates/matrix-sdk-base/src/media/store/traits.rs +++ b/crates/matrix-sdk-base/src/media/store/traits.rs @@ -17,7 +17,7 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{AsyncTraitDeps, cross_process_lock::CrossProcessLockGeneration}; use ruma::{MxcUri, time::SystemTime}; #[cfg(doc)] @@ -41,7 +41,7 @@ pub trait MediaStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// Add a media file's content in the media store. /// @@ -313,7 +313,7 @@ impl MediaStore for EraseMediaStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-common/src/cross_process_lock.rs b/crates/matrix-sdk-common/src/cross_process_lock.rs index 977ba62f8f8..80b832a7091 100644 --- a/crates/matrix-sdk-common/src/cross_process_lock.rs +++ b/crates/matrix-sdk-common/src/cross_process_lock.rs @@ -17,7 +17,7 @@ //! This is a per-process lock that may be used only for very specific use //! cases, where multiple processes might concurrently write to the same //! database at the same time; this would invalidate store caches, so -//! that should be done mindfully. Such a lock can be acquired multiple times by +//! that should be done mindfully. Such a lock can be obtained multiple times by //! the same process, and it remains active as long as there's at least one user //! in a given process. //! @@ -26,7 +26,7 @@ //! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more //! details. //! -//! The lock is initially acquired for a certain period of time (namely, the +//! The lock is initially obtainedd for a certain period of time (namely, the //! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task //! renews the lease to extend its duration, every so often (namely, every //! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the @@ -42,13 +42,13 @@ use std::{ future::Future, sync::{ Arc, - atomic::{self, AtomicU32}, + atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, }, time::Duration, }; use tokio::sync::Mutex; -use tracing::{debug, error, instrument, trace}; +use tracing::{debug, error, instrument, trace, warn}; use crate::{ SendOutsideWasm, @@ -56,6 +56,12 @@ use crate::{ sleep::sleep, }; +/// A lock generation is an integer incremented each time the lock is taken by +/// a different holder. +/// +/// This is used to know if a lock has been dirtied. +pub type CrossProcessLockGeneration = u64; + /// Trait used to try to take a lock. Foundation of [`CrossProcessLock`]. pub trait TryLock { #[cfg(not(target_family = "wasm"))] @@ -69,18 +75,23 @@ pub trait TryLock { /// This attempts to take a lock for the given lease duration. /// /// - If we already had the lease, this will extend the lease. - /// - If we didn't, but the previous lease has expired, we will acquire the + /// - If we didn't, but the previous lease has expired, we will obtain the /// lock. - /// - If there was no previous lease, we will acquire the lock. + /// - If there was no previous lease, we will obtain the lock. /// - Otherwise, we don't get the lock. /// - /// Returns whether taking the lock succeeded. + /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The + /// cross-process lock generation must be compared to the generation before + /// the call to see if the lock has been dirtied: a different generation + /// means the lock has been dirtied, i.e. taken by a different holder in + /// the meantime. fn try_lock( &self, lease_duration_ms: u32, key: &str, holder: &str, - ) -> impl Future> + SendOutsideWasm; + ) -> impl Future, Self::LockError>> + + SendOutsideWasm; } /// Small state machine to handle wait times. @@ -101,13 +112,19 @@ pub struct CrossProcessLockGuard { num_holders: Arc, } +impl CrossProcessLockGuard { + fn new(num_holders: Arc) -> Self { + Self { num_holders } + } +} + impl Drop for CrossProcessLockGuard { fn drop(&mut self) { - self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst); + self.num_holders.fetch_sub(1, Ordering::SeqCst); } } -/// A store-based lock for a `Store`. +/// A cross-process lock implementation. /// /// See the doc-comment of this module for more information. #[derive(Clone, Debug)] @@ -124,7 +141,7 @@ where /// Number of holders of the lock in this process. /// - /// If greater than 0, this means we've already acquired this lock, in this + /// If greater than 0, this means we've already obtained this lock, in this /// process, and the store lock mustn't be touched. /// /// When the number of holders is decreased to 0, then the lock must be @@ -146,6 +163,15 @@ where /// Backoff time, in milliseconds. backoff: Arc>, + + /// This lock generation. + generation: Arc, + + /// Whether the lock has been dirtied. + /// + /// See [`CrossProcessLockResult::Dirty`] to learn more about the semantics + /// of _dirty_. + is_dirty: Arc, } /// Amount of time a lease of the lock should last, in milliseconds. @@ -166,6 +192,19 @@ const INITIAL_BACKOFF_MS: u32 = 10; /// we'll wait for the lock, *between two attempts*. pub const MAX_BACKOFF_MS: u32 = 1000; +/// Sentinel value representing the absence of a lock generation value. +/// +/// When the lock is created, it has no generation. Once locked, it receives its +/// first generation from [`TryLock::try_lock`]. Subsequent lockings may +/// generate new lock generation. The generation is incremented by 1 every time. +/// +/// The first generation is defined by [`FIRST_CROSS_PROCESS_LOCK_GENERATION`]. +pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0; + +/// Describe the first lock generation value (see +/// [`CrossProcessLockGeneration`]). +pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1; + impl CrossProcessLock where L: TryLock + Clone + SendOutsideWasm + 'static, @@ -185,45 +224,93 @@ where num_holders: Arc::new(0.into()), locking_attempt: Arc::new(Mutex::new(())), renew_task: Default::default(), + generation: Arc::new(AtomicU64::new(NO_CROSS_PROCESS_LOCK_GENERATION)), + is_dirty: Arc::new(AtomicBool::new(false)), } } + /// Determine whether the cross-process lock is dirty. + /// + /// See [`CrossProcessLockResult::Dirty`] to learn more about the semantics + /// of _dirty_. + pub fn is_dirty(&self) -> bool { + self.is_dirty.load(Ordering::SeqCst) + } + + /// Clear the dirty state from this cross-process lock. + /// + /// If the cross-process lock is dirtied, it will remain dirtied until + /// this method is called. This allows recovering from a dirty state and + /// marking that it has recovered. + pub fn clear_dirty(&self) { + self.is_dirty.store(false, Ordering::SeqCst); + self.generation.store(NO_CROSS_PROCESS_LOCK_GENERATION, Ordering::SeqCst); + } + /// Try to lock once, returns whether the lock was obtained or not. + /// + /// The lock can be obtained but it can be dirty. In all cases, the renew + /// task will run in the background. #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] - pub async fn try_lock_once( - &self, - ) -> Result, CrossProcessLockError> { + pub async fn try_lock_once(&self) -> Result { // Hold onto the locking attempt mutex for the entire lifetime of this // function, to avoid multiple reentrant calls. let mut _attempt = self.locking_attempt.lock().await; // If another thread obtained the lock, make sure to only superficially increase // the number of holders, and carry on. - if self.num_holders.load(atomic::Ordering::SeqCst) > 0 { + if self.num_holders.load(Ordering::SeqCst) > 0 { // Note: between the above load and the fetch_add below, another thread may // decrement `num_holders`. That's fine because that means the lock // was taken by at least one thread, and after this call it will be // taken by at least one thread. trace!("We already had the lock, incrementing holder count"); - self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); - let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() }; - return Ok(Some(guard)); + + self.num_holders.fetch_add(1, Ordering::SeqCst); + + return Ok(CrossProcessLockResult::Clean(CrossProcessLockGuard::new( + self.num_holders.clone(), + ))); } - let acquired = self + if let Some(new_generation) = self .locker .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder) .await - .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?; + .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))? + { + match self.generation.swap(new_generation, Ordering::SeqCst) { + // If there was no lock generation, it means this is the first time the lock is + // obtained. It cannot be dirty. + NO_CROSS_PROCESS_LOCK_GENERATION => { + trace!(?new_generation, "Setting the lock generation for the first time"); + } + + // This was NOT the same generation, the lock has been dirtied! + previous_generation if previous_generation != new_generation => { + warn!( + ?previous_generation, + ?new_generation, + "The lock has been obtained, but it's been dirtied!" + ); + self.is_dirty.store(true, Ordering::SeqCst); + } - if !acquired { - trace!("Couldn't acquire the lock immediately."); - return Ok(None); + // This was the same generation, no problem. + _ => { + trace!("Same lock generation; no problem"); + } + } + + trace!("Lock obtained!"); + } else { + trace!("Couldn't obtain the lock immediately."); + return Ok(CrossProcessLockResult::Unobtained); } - trace!("Acquired the lock, spawning the lease extension task."); + trace!("Obtained the lock, spawning the lease extension task."); - // This is the first time we've acquired the lock. We're going to spawn the task + // This is the first time we've obtaind the lock. We're going to spawn the task // that will renew the lease. // Clone data to be owned by the task. @@ -260,7 +347,7 @@ where let _guard = this.locking_attempt.lock().await; // If there are no more users, we can quit. - if this.num_holders.load(atomic::Ordering::SeqCst) == 0 { + if this.num_holders.load(Ordering::SeqCst) == 0 { trace!("exiting the lease extension loop"); // Cancel the lease with another 0ms lease. @@ -275,21 +362,48 @@ where sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await; - let fut = - this.locker.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder); + match this + .locker + .try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder) + .await + { + Ok(Some(_generation)) => { + // It's impossible that the generation can be different + // from the previous generation. + // + // As long as the task runs, the lock is renewed, so the + // generation remains the same. If the lock is not + // taken, it's because the lease has expired, which is + // represented by the `Ok(None)` value, and the task + // must stop. + } + + Ok(None) => { + error!("Failed to renew the lock lease: the lock could not be obtained"); + + // Exit the loop. + break; + } + + Err(err) => { + error!("Error when extending the lock lease: {err:#}"); - if let Err(err) = fut.await { - error!("error when extending lock lease: {err:#}"); - // Exit the loop. - break; + // Exit the loop. + break; + } } } })); - self.num_holders.fetch_add(1, atomic::Ordering::SeqCst); + self.num_holders.fetch_add(1, Ordering::SeqCst); - let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() }; - Ok(Some(guard)) + let guard = CrossProcessLockGuard::new(self.num_holders.clone()); + + Ok(if self.is_dirty() { + CrossProcessLockResult::Dirty(guard) + } else { + CrossProcessLockResult::Clean(guard) + }) } /// Attempt to take the lock, with exponential backoff if the lock has @@ -311,7 +425,7 @@ where // lock in `try_lock_once` should sequentialize it all. loop { - if let Some(guard) = self.try_lock_once().await? { + if let Some(guard) = self.try_lock_once().await?.ok() { // Reset backoff before returning, for the next attempt to lock. *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS); return Ok(guard); @@ -348,6 +462,42 @@ where } } +/// Represent the result of a locking attempt, either by +/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`]. +#[derive(Debug)] +pub enum CrossProcessLockResult { + /// The lock has been obtained successfully, all good. + Clean(CrossProcessLockGuard), + + /// The lock has been obtained successfully, but the lock is dirty! + /// + /// This holder has obtained this cross-process lock once, then another + /// holder has obtained this cross-process lock _before_ this holder + /// obtained it again. The lock is marked as dirty. It means the value + /// protected by the cross-process lock may need to be reloaded if + /// synchronisation is important. + Dirty(CrossProcessLockGuard), + + /// The lock has not been obtained. + Unobtained, +} + +impl CrossProcessLockResult { + /// Convert from [`CrossProcessLockResult`] to + /// [`Option`] where `T` is [`CrossProcessLockGuard`]. + pub fn ok(self) -> Option { + match self { + Self::Clean(guard) | Self::Dirty(guard) => Some(guard), + Self::Unobtained => None, + } + } + + /// Return `true` if the lock has been obtained, `false` otherwise. + pub fn is_ok(&self) -> bool { + matches!(self, Self::Clean(_) | Self::Dirty(_)) + } +} + /// Error related to the locking API of the store. #[derive(Debug, thiserror::Error)] pub enum CrossProcessLockError { @@ -369,8 +519,8 @@ pub enum CrossProcessLockError { mod tests { use std::{ collections::HashMap, + ops::Not, sync::{Arc, RwLock, atomic}, - time::Instant, }; use assert_matches::assert_matches; @@ -381,17 +531,23 @@ mod tests { }; use super::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, EXTEND_LEASE_EVERY_MS, - TryLock, memory_store_helper::try_take_leased_lock, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, + CrossProcessLockResult, EXTEND_LEASE_EVERY_MS, TryLock, + memory_store_helper::{Lease, try_take_leased_lock}, }; #[derive(Clone, Default)] struct TestStore { - leases: Arc>>, + leases: Arc>>, } impl TestStore { - fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool { + fn try_take_leased_lock( + &self, + lease_duration_ms: u32, + key: &str, + holder: &str, + ) -> Option { try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder) } } @@ -408,13 +564,13 @@ mod tests { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::LockError> { Ok(self.try_take_leased_lock(lease_duration_ms, key, holder)) } } - async fn release_lock(guard: Option) { - drop(guard); + async fn release_lock(result: CrossProcessLockResult) { + drop(result); sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await; } @@ -426,19 +582,19 @@ mod tests { let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned()); // The lock plain works when used with a single holder. - let acquired = lock.try_lock_once().await?; - assert!(acquired.is_some()); + let result = lock.try_lock_once().await?; + assert!(result.is_ok()); assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); // Releasing works. - release_lock(acquired).await; + release_lock(result).await; assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); // Spin locking on the same lock always works, assuming no concurrent access. - let acquired = lock.spin_lock(None).await.unwrap(); + let guard = lock.spin_lock(None).await.unwrap(); // Releasing still works. - release_lock(Some(acquired)).await; + release_lock(CrossProcessLockResult::Clean(guard)).await; assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); Ok(()) @@ -449,9 +605,9 @@ mod tests { let store = TestStore::default(); let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned()); - // When a lock is acquired... - let acquired = lock.try_lock_once().await?; - assert!(acquired.is_some()); + // When a lock is obtained... + let result = lock.try_lock_once().await?; + assert!(result.is_ok()); assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); // But then forgotten... (note: no need to release the guard) @@ -461,8 +617,8 @@ mod tests { let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned()); // We still got it. - let acquired = lock.try_lock_once().await?; - assert!(acquired.is_some()); + let result = lock.try_lock_once().await?; + assert!(result.is_ok()); assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); Ok(()) @@ -474,19 +630,19 @@ mod tests { let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned()); // Taking the lock twice... - let acquired = lock.try_lock_once().await?; - assert!(acquired.is_some()); + let result1 = lock.try_lock_once().await?; + assert!(result1.is_ok()); - let acquired2 = lock.try_lock_once().await?; - assert!(acquired2.is_some()); + let result2 = lock.try_lock_once().await?; + assert!(result2.is_ok()); assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2); // ...means we can release it twice. - release_lock(acquired).await; + release_lock(result1).await; assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1); - release_lock(acquired2).await; + release_lock(result2).await; assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0); Ok(()) @@ -499,22 +655,22 @@ mod tests { let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned()); // When the first process takes the lock... - let acquired1 = lock1.try_lock_once().await?; - assert!(acquired1.is_some()); + let result1 = lock1.try_lock_once().await?; + assert!(result1.is_ok()); // The second can't take it immediately. - let acquired2 = lock2.try_lock_once().await?; - assert!(acquired2.is_none()); + let result2 = lock2.try_lock_once().await?; + assert!(result2.is_ok().not()); let lock2_clone = lock2.clone(); let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await }); sleep(Duration::from_millis(100)).await; - drop(acquired1); + drop(result1); // lock2 in the background manages to get the lock at some point. - let _acquired2 = handle + let _result2 = handle .await .expect("join handle is properly awaited") .expect("lock was obtained after spin-locking"); @@ -533,48 +689,63 @@ pub mod memory_store_helper { use ruma::time::{Duration, Instant}; + use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION}; + + #[derive(Debug)] + pub struct Lease { + holder: String, + expiration: Instant, + generation: CrossProcessLockGeneration, + } + pub fn try_take_leased_lock( - leases: &mut HashMap, + leases: &mut HashMap, lease_duration_ms: u32, key: &str, holder: &str, - ) -> bool { + ) -> Option { let now = Instant::now(); let expiration = now + Duration::from_millis(lease_duration_ms.into()); match leases.entry(key.to_owned()) { // There is an existing holder. Entry::Occupied(mut entry) => { - let (current_holder, current_expiration) = entry.get_mut(); + let Lease { + holder: current_holder, + expiration: current_expiration, + generation: current_generation, + } = entry.get_mut(); if current_holder == holder { // We had the lease before, extend it. *current_expiration = expiration; - true + Some(*current_generation) } else { // We didn't have it. if *current_expiration < now { // Steal it! *current_holder = holder.to_owned(); *current_expiration = expiration; + *current_generation += 1; - true + Some(*current_generation) } else { // We tried our best. - false + None } } } // There is no holder, easy. Entry::Vacant(entry) => { - entry.insert(( - holder.to_owned(), - Instant::now() + Duration::from_millis(lease_duration_ms.into()), - )); + entry.insert(Lease { + holder: holder.to_owned(), + expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()), + generation: FIRST_CROSS_PROCESS_LOCK_GENERATION, + }); - true + Some(FIRST_CROSS_PROCESS_LOCK_GENERATION) } } } diff --git a/crates/matrix-sdk-crypto/src/store/integration_tests.rs b/crates/matrix-sdk-crypto/src/store/integration_tests.rs index a5b9063c97d..7acbdc8dc2d 100644 --- a/crates/matrix-sdk-crypto/src/store/integration_tests.rs +++ b/crates/matrix-sdk-crypto/src/store/integration_tests.rs @@ -1446,57 +1446,57 @@ macro_rules! cryptostore_integration_tests_time { let (_account, store) = get_loaded_store("lease_locks").await; let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired0); + assert_eq!(acquired0, Some(1)); // first generation // Should extend the lease automatically (same holder). let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired2); + assert_eq!(acquired2, Some(1)); // same lock generation // Should extend the lease automatically (same holder + time is ok). let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(acquired3); + assert_eq!(acquired3, Some(1)); // same lock generation // Another attempt at taking the lock should fail, because it's taken. let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired4); + assert!(acquired4.is_none()); // not acquired // Even if we insist. let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired5); + assert!(acquired5.is_none()); // That's a nice test we got here, go take a little nap. tokio::time::sleep(Duration::from_millis(50)).await; // Still too early. let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(!acquired55); + assert!(acquired55.is_none()); // not acquired // Ok you can take another nap then. tokio::time::sleep(Duration::from_millis(250)).await; // At some point, we do get the lock. let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap(); - assert!(acquired6); + assert_eq!(acquired6, Some(2)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; // The other gets it almost immediately too. let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap(); - assert!(acquired7); + assert_eq!(acquired7, Some(3)); // new lock generation! tokio::time::sleep(Duration::from_millis(1)).await; - // But when we take a longer lease... + // But when we take a longer lease… let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired8); + assert_eq!(acquired8, Some(4)); // new lock generation! // It blocks the other user. let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap(); - assert!(!acquired9); + assert!(acquired9.is_none()); // not acquired // We can hold onto our lease. let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap(); - assert!(acquired10); + assert_eq!(acquired10, Some(4)); // same lock generation } } }; diff --git a/crates/matrix-sdk-crypto/src/store/memorystore.rs b/crates/matrix-sdk-crypto/src/store/memorystore.rs index de3c333950b..4d4eb1dc707 100644 --- a/crates/matrix-sdk-crypto/src/store/memorystore.rs +++ b/crates/matrix-sdk-crypto/src/store/memorystore.rs @@ -20,11 +20,15 @@ use std::{ use async_trait::async_trait; use matrix_sdk_common::{ - cross_process_lock::memory_store_helper::try_take_leased_lock, locks::RwLock as StdRwLock, + cross_process_lock::{ + memory_store_helper::{try_take_leased_lock, Lease}, + CrossProcessLockGeneration, + }, + locks::RwLock as StdRwLock, }; use ruma::{ - events::secret::request::SecretName, time::Instant, DeviceId, OwnedDeviceId, OwnedRoomId, - OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId, + events::secret::request::SecretName, DeviceId, OwnedDeviceId, OwnedRoomId, OwnedTransactionId, + OwnedUserId, RoomId, TransactionId, UserId, }; use tokio::sync::{Mutex, RwLock}; use tracing::warn; @@ -99,7 +103,7 @@ pub struct MemoryStore { key_requests_by_info: StdRwLock>, direct_withheld_info: StdRwLock>>, custom_values: StdRwLock>>, - leases: StdRwLock>, + leases: StdRwLock>, secret_inbox: StdRwLock>>, backup_keys: RwLock, dehydrated_device_pickle_key: RwLock>, @@ -756,7 +760,7 @@ impl CryptoStore for MemoryStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { Ok(try_take_leased_lock(&mut self.leases.write(), lease_duration_ms, key, holder)) } } @@ -1257,6 +1261,7 @@ mod integration_tests { }; use async_trait::async_trait; + use matrix_sdk_common::cross_process_lock::CrossProcessLockGeneration; use ruma::{ events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId, }; @@ -1568,7 +1573,7 @@ mod integration_tests { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index dea03f5239d..ba1b5dfc1b0 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -99,7 +99,9 @@ pub mod integration_tests; pub(crate) use crypto_store_wrapper::CryptoStoreWrapper; pub use error::{CryptoStoreError, Result}; use matrix_sdk_common::{ - cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout, + cross_process_lock::{CrossProcessLock, CrossProcessLockGeneration}, + deserialized_responses::WithheldCode, + timeout::timeout, }; pub use memorystore::MemoryStore; pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore}; @@ -1708,7 +1710,7 @@ impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> std::result::Result { + ) -> std::result::Result, Self::LockError> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await } } diff --git a/crates/matrix-sdk-crypto/src/store/traits.rs b/crates/matrix-sdk-crypto/src/store/traits.rs index 133b8458bf2..5d5b2fad1cf 100644 --- a/crates/matrix-sdk-crypto/src/store/traits.rs +++ b/crates/matrix-sdk-crypto/src/store/traits.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, fmt, sync::Arc}; use async_trait::async_trait; -use matrix_sdk_common::AsyncTraitDeps; +use matrix_sdk_common::{cross_process_lock::CrossProcessLockGeneration, AsyncTraitDeps}; use ruma::{ events::secret::request::SecretName, DeviceId, OwnedDeviceId, RoomId, TransactionId, UserId, }; @@ -382,7 +382,7 @@ pub trait CryptoStore: AsyncTraitDeps { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result; + ) -> Result, Self::Error>; /// Load the next-batch token for a to-device query, if any. async fn next_batch_token(&self) -> Result, Self::Error>; @@ -621,7 +621,7 @@ impl CryptoStore for EraseCryptoStoreError { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result, Self::Error> { self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into) } diff --git a/crates/matrix-sdk-sqlite/Cargo.toml b/crates/matrix-sdk-sqlite/Cargo.toml index 83b7e945ecf..78c8119b43c 100644 --- a/crates/matrix-sdk-sqlite/Cargo.toml +++ b/crates/matrix-sdk-sqlite/Cargo.toml @@ -15,7 +15,7 @@ default = ["state-store", "event-cache"] testing = ["matrix-sdk-crypto?/testing"] bundled = ["rusqlite/bundled"] -crypto-store = ["dep:matrix-sdk-crypto"] +crypto-store = ["dep:matrix-sdk-base", "dep:matrix-sdk-crypto"] event-cache = ["dep:matrix-sdk-base"] state-store = ["dep:matrix-sdk-base"] diff --git a/crates/matrix-sdk-sqlite/migrations/crypto_store/012_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/crypto_store/012_lease_locks_with_generation.sql new file mode 100644 index 00000000000..bdd7cad7594 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/crypto_store/012_lease_locks_with_generation.sql @@ -0,0 +1,7 @@ +-- Rename the `expiration_ts` column to `expiration` to be consistent with other +-- `lease_locks` tables in other stores. +ALTER TABLE "lease_locks" RENAME COLUMN "expiration_ts" TO "expiration"; + +-- Add the `generation` column to handle _poison_. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_lease_locks_with_generation.sql new file mode 100644 index 00000000000..a83fda482a6 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/event_cache_store/012_lease_locks_with_generation.sql @@ -0,0 +1,3 @@ +-- Add the `generation` column to handle _poison_. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql b/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql new file mode 100644 index 00000000000..a83fda482a6 --- /dev/null +++ b/crates/matrix-sdk-sqlite/migrations/media_store/002_lease_locks_with_generation.sql @@ -0,0 +1,3 @@ +-- Add the `generation` column to handle _poison_. +-- Default value is `FIRST_CROSS_PROCESS_LOCK_GENERATION`. +ALTER TABLE "lease_locks" ADD COLUMN "generation" INTEGER NOT NULL DEFAULT 1; diff --git a/crates/matrix-sdk-sqlite/src/crypto_store.rs b/crates/matrix-sdk-sqlite/src/crypto_store.rs index a2f2ec0ed26..ca9c23d4a3e 100644 --- a/crates/matrix-sdk-sqlite/src/crypto_store.rs +++ b/crates/matrix-sdk-sqlite/src/crypto_store.rs @@ -21,6 +21,7 @@ use std::{ use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; +use matrix_sdk_base::{cross_process_lock::CrossProcessLockGeneration, timer}; use matrix_sdk_crypto::{ olm::{ InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession, @@ -176,7 +177,7 @@ impl SqliteCryptoStore { } } -const DATABASE_VERSION: u8 = 11; +const DATABASE_VERSION: u8 = 12; /// key for the dehydrated device pickle key in the key/value table. const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key"; @@ -292,6 +293,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 12 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/crypto_store/012_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(12) + }) + .await?; + } + Ok(()) } @@ -1456,37 +1467,52 @@ impl CryptoStore for SqliteCryptoStore { Ok(()) } + #[instrument(skip(self))] async fn try_take_leased_lock( &self, lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { + let _timer = timer!("method"); + let key = key.to_owned(); let holder = holder.to_owned(); - let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); - let expiration_ts = now_ts + lease_duration_ms as u64; + let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); + let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .acquire() .await? .with_transaction(move |txn| { - txn.execute( - "INSERT INTO lease_locks (key, holder, expiration_ts) + txn.query_row( + "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration_ts = ?3 - WHERE holder = ?2 - OR expiration_ts < ?4 - ", - (key, holder, expiration_ts, now_ts), + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", + (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } async fn next_batch_token(&self) -> Result, Self::Error> { diff --git a/crates/matrix-sdk-sqlite/src/event_cache_store.rs b/crates/matrix-sdk-sqlite/src/event_cache_store.rs index ab2654e9e59..c5ac15f6ea3 100644 --- a/crates/matrix-sdk-sqlite/src/event_cache_store.rs +++ b/crates/matrix-sdk-sqlite/src/event_cache_store.rs @@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc}; use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ + cross_process_lock::CrossProcessLockGeneration, deserialized_responses::TimelineEvent, event_cache::{ store::{compute_filters_string, extract_event_relation, EventCacheStore}, @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 11; +const DATABASE_VERSION: u8 = 12; /// The string used to identify a chunk of type events, in the `type` field in /// the database. @@ -472,6 +473,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 12 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/event_cache_store/012_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(12) + }) + .await?; + } + Ok(()) } @@ -485,7 +496,7 @@ impl EventCacheStore for SqliteEventCacheStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { let _timer = timer!("method"); let key = key.to_owned(); @@ -494,25 +505,37 @@ impl EventCacheStore for SqliteEventCacheStore { let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .write() .await? .with_transaction(move |txn| { - txn.execute( + txn.query_row( "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration = ?3 - WHERE holder = ?2 - OR expiration < ?4 - ", + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } #[instrument(skip(self, updates))] diff --git a/crates/matrix-sdk-sqlite/src/media_store.rs b/crates/matrix-sdk-sqlite/src/media_store.rs index 1bcc4d860ea..a7275f55055 100644 --- a/crates/matrix-sdk-sqlite/src/media_store.rs +++ b/crates/matrix-sdk-sqlite/src/media_store.rs @@ -19,6 +19,7 @@ use std::{fmt, path::Path, sync::Arc}; use async_trait::async_trait; use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime}; use matrix_sdk_base::{ + cross_process_lock::CrossProcessLockGeneration, media::{ store::{ IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore, @@ -63,7 +64,7 @@ const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3"; /// This is used to figure whether the SQLite database requires a migration. /// Every new SQL migration should imply a bump of this number, and changes in /// the [`run_migrations`] function. -const DATABASE_VERSION: u8 = 1; +const DATABASE_VERSION: u8 = 2; /// An SQLite-based media store. #[derive(Clone)] @@ -225,6 +226,16 @@ async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> { .await?; } + if version < 2 { + conn.with_transaction(|txn| { + txn.execute_batch(include_str!( + "../migrations/media_store/002_lease_locks_with_generation.sql" + ))?; + txn.set_db_version(2) + }) + .await?; + } + Ok(()) } @@ -238,7 +249,7 @@ impl MediaStore for SqliteMediaStore { lease_duration_ms: u32, key: &str, holder: &str, - ) -> Result { + ) -> Result> { let _timer = timer!("method"); let key = key.to_owned(); @@ -247,25 +258,37 @@ impl MediaStore for SqliteMediaStore { let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into(); let expiration = now + lease_duration_ms as u64; - let num_touched = self + // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html. + let generation = self .write() .await? .with_transaction(move |txn| { - txn.execute( + txn.query_row( "INSERT INTO lease_locks (key, holder, expiration) VALUES (?1, ?2, ?3) ON CONFLICT (key) DO - UPDATE SET holder = ?2, expiration = ?3 - WHERE holder = ?2 - OR expiration < ?4 - ", + UPDATE SET + holder = excluded.holder, + expiration = excluded.expiration, + generation = + CASE holder + WHEN excluded.holder THEN generation + ELSE generation + 1 + END + WHERE + holder = excluded.holder + OR expiration < ?4 + RETURNING generation + ", (key, holder, expiration, now), + |row| row.get(0), ) + .optional() }) .await?; - Ok(num_touched == 1) + Ok(generation) } async fn add_media_content( diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 5c4fc7205f0..c998f1ac6b2 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -1714,8 +1714,9 @@ impl Encryption { // If we don't get the lock immediately, then it is already acquired by another // process, and we'll get to reload next time we acquire the lock. { - let guard = lock.try_lock_once().await?; - if guard.is_some() { + let lock_result = lock.try_lock_once().await?; + + if lock_result.is_ok() { olm_machine .initialize_crypto_store_generation( &self.client.locks().crypto_store_generation, @@ -1789,9 +1790,9 @@ impl Encryption { &self, ) -> Result, Error> { if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() { - let maybe_guard = lock.try_lock_once().await?; + let lock_result = lock.try_lock_once().await?; - let Some(guard) = maybe_guard else { + let Some(guard) = lock_result.ok() else { return Ok(None); };