Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions crates/matrix-sdk-base/src/event_cache/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,57 +1304,57 @@ macro_rules! event_cache_store_integration_tests_time {
let store = get_event_cache_store().await.unwrap().into_event_cache_store();

let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);
assert_eq!(acquired0, Some(1)); // first lock generation

// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);
assert_eq!(acquired2, Some(1)); // same lock generation

// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);
assert_eq!(acquired3, Some(1)); // same lock generation

// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);
assert!(acquired4.is_none()); // not acquired

// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);
assert!(acquired5.is_none()); // not acquired

// That's a nice test we got here, go take a little nap.
sleep(Duration::from_millis(50)).await;

// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);
assert!(acquired55.is_none()); // not acquired

// Ok you can take another nap then.
sleep(Duration::from_millis(250)).await;

// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);
assert_eq!(acquired6, Some(2)); // new lock generation!

sleep(Duration::from_millis(1)).await;

// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);
assert_eq!(acquired7, Some(3)); // new lock generation!

sleep(Duration::from_millis(1)).await;

// But when we take a longer lease...
// But when we take a longer lease
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);
assert_eq!(acquired8, Some(4)); // new lock generation!

// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);
assert!(acquired9.is_none()); // not acquired

// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
assert_eq!(acquired10, Some(4)); // same lock generation
}
}
};
Expand Down
11 changes: 7 additions & 4 deletions crates/matrix-sdk-base/src/event_cache/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use std::{

use async_trait::async_trait;
use matrix_sdk_common::{
cross_process_lock::memory_store_helper::try_take_leased_lock,
cross_process_lock::{
CrossProcessLockGeneration,
memory_store_helper::{Lease, try_take_leased_lock},
},
linked_chunk::{
ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
RawChunk, Update, relational::RelationalLinkedChunk,
},
};
use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
use tracing::error;

use super::{
Expand All @@ -43,7 +46,7 @@ pub struct MemoryStore {

#[derive(Debug)]
struct MemoryStoreInner {
leases: HashMap<String, (String, Instant)>,
leases: HashMap<String, Lease>,
events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
}

Expand Down Expand Up @@ -75,7 +78,7 @@ impl EventCacheStore for MemoryStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
let mut inner = self.inner.write().unwrap();

Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
Expand Down
5 changes: 3 additions & 2 deletions crates/matrix-sdk-base/src/event_cache/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ mod memory_store;
mod traits;

use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
TryLock,
};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use ruma::{
Expand Down Expand Up @@ -192,7 +193,7 @@ impl TryLock for LockableEventCacheStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{fmt, sync::Arc};
use async_trait::async_trait;
use matrix_sdk_common::{
AsyncTraitDeps,
cross_process_lock::CrossProcessLockGeneration,
linked_chunk::{
ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
RawChunk, Update,
Expand Down Expand Up @@ -46,7 +47,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;

/// An [`Update`] reflects an operation that has happened inside a linked
/// chunk. The linked chunk is used by the event cache to store the events
Expand Down Expand Up @@ -191,7 +192,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

Expand Down
24 changes: 12 additions & 12 deletions crates/matrix-sdk-base/src/media/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1337,57 +1337,57 @@ macro_rules! media_store_integration_tests_time {
let store = get_media_store().await.unwrap();

let acquired0 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired0);
assert_eq!(acquired0, Some(1)); // first lock generation

// Should extend the lease automatically (same holder).
let acquired2 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired2);
assert_eq!(acquired2, Some(1)); // same lock generation

// Should extend the lease automatically (same holder + time is ok).
let acquired3 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(acquired3);
assert_eq!(acquired3, Some(1)); // same lock generation

// Another attempt at taking the lock should fail, because it's taken.
let acquired4 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired4);
assert!(acquired4.is_none()); // not acquired

// Even if we insist.
let acquired5 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired5);
assert!(acquired5.is_none()); // not acquired

// That's a nice test we got here, go take a little nap.
sleep(Duration::from_millis(50)).await;

// Still too early.
let acquired55 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(!acquired55);
assert!(acquired55.is_none()); // not acquired

// Ok you can take another nap then.
sleep(Duration::from_millis(250)).await;

// At some point, we do get the lock.
let acquired6 = store.try_take_leased_lock(0, "key", "bob").await.unwrap();
assert!(acquired6);
assert_eq!(acquired6, Some(2)); // new lock generation!

sleep(Duration::from_millis(1)).await;

// The other gets it almost immediately too.
let acquired7 = store.try_take_leased_lock(0, "key", "alice").await.unwrap();
assert!(acquired7);
assert_eq!(acquired7, Some(3)); // new lock generation!

sleep(Duration::from_millis(1)).await;

// But when we take a longer lease...
// But when we take a longer lease
let acquired8 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired8);
assert_eq!(acquired8, Some(4)); // new lock generation!

// It blocks the other user.
let acquired9 = store.try_take_leased_lock(300, "key", "alice").await.unwrap();
assert!(!acquired9);
assert!(acquired9.is_none()); // not acquired

// We can hold onto our lease.
let acquired10 = store.try_take_leased_lock(300, "key", "bob").await.unwrap();
assert!(acquired10);
assert_eq!(acquired10, Some(4)); // same lock generation
}
}
};
Expand Down
15 changes: 8 additions & 7 deletions crates/matrix-sdk-base/src/media/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use std::{

use async_trait::async_trait;
use matrix_sdk_common::{
cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer,
};
use ruma::{
MxcUri, OwnedMxcUri,
time::{Instant, SystemTime},
cross_process_lock::{
CrossProcessLockGeneration,
memory_store_helper::{Lease, try_take_leased_lock},
},
ring_buffer::RingBuffer,
};
use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};

use super::Result;
use crate::media::{
Expand All @@ -48,7 +49,7 @@ pub struct MemoryMediaStore {
#[derive(Debug)]
struct MemoryMediaStoreInner {
media: RingBuffer<MediaContent>,
leases: HashMap<String, (String, Instant)>,
leases: HashMap<String, Lease>,
media_retention_policy: Option<MediaRetentionPolicy>,
last_media_cleanup_time: SystemTime,
}
Expand Down Expand Up @@ -110,7 +111,7 @@ impl MediaStore for MemoryMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
let mut inner = self.inner.write().unwrap();

Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
Expand Down
5 changes: 3 additions & 2 deletions crates/matrix-sdk-base/src/media/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use std::fmt;
use std::{ops::Deref, sync::Arc};

use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
TryLock,
};
use matrix_sdk_store_encryption::Error as StoreEncryptionError;
pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner};
Expand Down Expand Up @@ -172,7 +173,7 @@ impl TryLock for LockableMediaStore {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> std::result::Result<bool, Self::LockError> {
) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
}
}
6 changes: 3 additions & 3 deletions crates/matrix-sdk-base/src/media/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{fmt, sync::Arc};

use async_trait::async_trait;
use matrix_sdk_common::AsyncTraitDeps;
use matrix_sdk_common::{AsyncTraitDeps, cross_process_lock::CrossProcessLockGeneration};
use ruma::{MxcUri, time::SystemTime};

#[cfg(doc)]
Expand All @@ -41,7 +41,7 @@ pub trait MediaStore: AsyncTraitDeps {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error>;
) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;

/// Add a media file's content in the media store.
///
Expand Down Expand Up @@ -313,7 +313,7 @@ impl<T: MediaStore> MediaStore for EraseMediaStoreError<T> {
lease_duration_ms: u32,
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
}

Expand Down
Loading
Loading