Skip to content

Commit 2f125e9

Browse files
committed
send queue: make SendHandle::abort/update more precise
Using `SendHandle::abort()` after the event has been sent would look like a successful abort of the event, while it's not the case; this fixes this by having the state store backends return whether they've touched an entry in the database.
1 parent 16aa6df commit 2f125e9

File tree

6 files changed

+105
-35
lines changed

6 files changed

+105
-35
lines changed

crates/matrix-sdk-base/src/store/memory_store.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -890,7 +890,7 @@ impl StateStore for MemoryStore {
890890
room_id: &RoomId,
891891
transaction_id: &TransactionId,
892892
content: SerializableEventContent,
893-
) -> Result<(), Self::Error> {
893+
) -> Result<bool, Self::Error> {
894894
if let Some(entry) = self
895895
.send_queue_events
896896
.write()
@@ -902,15 +902,17 @@ impl StateStore for MemoryStore {
902902
{
903903
entry.event = content;
904904
entry.is_wedged = false;
905+
Ok(true)
906+
} else {
907+
Ok(false)
905908
}
906-
Ok(())
907909
}
908910

909911
async fn remove_send_queue_event(
910912
&self,
911913
room_id: &RoomId,
912914
transaction_id: &TransactionId,
913-
) -> Result<(), Self::Error> {
915+
) -> Result<bool, Self::Error> {
914916
let mut q = self.send_queue_events.write().unwrap();
915917

916918
let entry = q.get_mut(room_id);
@@ -922,10 +924,11 @@ impl StateStore for MemoryStore {
922924
if entry.is_empty() {
923925
q.remove(room_id);
924926
}
927+
return Ok(true);
925928
}
926929
}
927930

928-
Ok(())
931+
Ok(false)
929932
}
930933

931934
async fn load_send_queue_events(

crates/matrix-sdk-base/src/store/traits.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,20 +418,24 @@ pub trait StateStore: AsyncTraitDeps {
418418
/// * `transaction_id` - The unique key identifying the event to be sent
419419
/// (and its transaction).
420420
/// * `content` - Serializable event content to replace the original one.
421+
///
422+
/// Returns true if an event has been updated, or false otherwise.
421423
async fn update_send_queue_event(
422424
&self,
423425
room_id: &RoomId,
424426
transaction_id: &TransactionId,
425427
content: SerializableEventContent,
426-
) -> Result<(), Self::Error>;
428+
) -> Result<bool, Self::Error>;
427429

428430
/// Remove an event previously inserted with [`Self::save_send_queue_event`]
429431
/// from the database, based on its transaction id.
432+
///
433+
/// Returns true if an event has been removed, or false otherwise.
430434
async fn remove_send_queue_event(
431435
&self,
432436
room_id: &RoomId,
433437
transaction_id: &TransactionId,
434-
) -> Result<(), Self::Error>;
438+
) -> Result<bool, Self::Error>;
435439

436440
/// Loads all the send queue events for the given room.
437441
async fn load_send_queue_events(
@@ -687,15 +691,15 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
687691
room_id: &RoomId,
688692
transaction_id: &TransactionId,
689693
content: SerializableEventContent,
690-
) -> Result<(), Self::Error> {
694+
) -> Result<bool, Self::Error> {
691695
self.0.update_send_queue_event(room_id, transaction_id, content).await.map_err(Into::into)
692696
}
693697

694698
async fn remove_send_queue_event(
695699
&self,
696700
room_id: &RoomId,
697701
transaction_id: &TransactionId,
698-
) -> Result<(), Self::Error> {
702+
) -> Result<bool, Self::Error> {
699703
self.0.remove_send_queue_event(room_id, transaction_id).await.map_err(Into::into)
700704
}
701705

crates/matrix-sdk-indexeddb/src/state_store/mod.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,7 +1400,7 @@ impl_state_store!({
14001400
room_id: &RoomId,
14011401
transaction_id: &TransactionId,
14021402
content: SerializableEventContent,
1403-
) -> Result<()> {
1403+
) -> Result<bool> {
14041404
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
14051405

14061406
let tx = self
@@ -1423,21 +1423,22 @@ impl_state_store!({
14231423
if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
14241424
entry.event = content;
14251425
entry.is_wedged = false;
1426-
}
14271426

1428-
// Save the new vector into db.
1429-
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1427+
// Save the new vector into db.
1428+
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1429+
tx.await.into_result()?;
14301430

1431-
tx.await.into_result()?;
1432-
1433-
Ok(())
1431+
Ok(true)
1432+
} else {
1433+
Ok(false)
1434+
}
14341435
}
14351436

14361437
async fn remove_send_queue_event(
14371438
&self,
14381439
room_id: &RoomId,
14391440
transaction_id: &TransactionId,
1440-
) -> Result<()> {
1441+
) -> Result<bool> {
14411442
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
14421443

14431444
let tx = self
@@ -1459,12 +1460,13 @@ impl_state_store!({
14591460
} else {
14601461
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
14611462
}
1463+
1464+
tx.await.into_result()?;
1465+
return Ok(true);
14621466
}
14631467
}
14641468

1465-
tx.await.into_result()?;
1466-
1467-
Ok(())
1469+
Ok(false)
14681470
}
14691471

14701472
async fn load_send_queue_events(&self, room_id: &RoomId) -> Result<Vec<QueuedEvent>> {

crates/matrix-sdk-sqlite/src/state_store.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,43 +1709,46 @@ impl StateStore for SqliteStateStore {
17091709
room_id: &RoomId,
17101710
transaction_id: &TransactionId,
17111711
content: SerializableEventContent,
1712-
) -> Result<(), Self::Error> {
1712+
) -> Result<bool, Self::Error> {
17131713
let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
17141714

17151715
let content = self.serialize_json(&content)?;
17161716
// See comment in [`Self::save_send_queue_event`] to understand why the
17171717
// transaction id is neither encrypted or hashed.
17181718
let transaction_id = transaction_id.to_string();
17191719

1720-
self.acquire()
1720+
let num_updated = self.acquire()
17211721
.await?
17221722
.with_transaction(move |txn| {
1723-
txn.prepare_cached("UPDATE send_queue_events SET wedged = false, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))?;
1724-
Ok(())
1723+
txn.prepare_cached("UPDATE send_queue_events SET wedged = false, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
17251724
})
1726-
.await
1725+
.await?;
1726+
1727+
Ok(num_updated > 0)
17271728
}
17281729

17291730
async fn remove_send_queue_event(
17301731
&self,
17311732
room_id: &RoomId,
17321733
transaction_id: &TransactionId,
1733-
) -> Result<(), Self::Error> {
1734+
) -> Result<bool, Self::Error> {
17341735
let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
17351736

17361737
// See comment in `save_send_queue_event`.
17371738
let transaction_id = transaction_id.to_string();
17381739

1739-
self.acquire()
1740+
let num_deleted = self
1741+
.acquire()
17401742
.await?
17411743
.with_transaction(move |txn| {
17421744
txn.prepare_cached(
17431745
"DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
17441746
)?
1745-
.execute((room_id, transaction_id))?;
1746-
Ok(())
1747+
.execute((room_id, transaction_id))
17471748
})
1748-
.await
1749+
.await?;
1750+
1751+
Ok(num_deleted > 0)
17491752
}
17501753

17511754
async fn load_send_queue_events(

crates/matrix-sdk/src/send_queue.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -674,13 +674,19 @@ impl QueueStorage {
674674
) -> Result<(), RoomSendQueueStorageError> {
675675
self.mark_as_not_being_sent(transaction_id).await;
676676

677-
Ok(self
677+
let removed = self
678678
.client
679679
.get()
680680
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
681681
.store()
682682
.remove_send_queue_event(&self.room_id, transaction_id)
683-
.await?)
683+
.await?;
684+
685+
if !removed {
686+
warn!(txn_id = %transaction_id, "event marked as sent was missing from storage");
687+
}
688+
689+
Ok(())
684690
}
685691

686692
/// Cancel a sending command for an event that has been sent with
@@ -699,14 +705,15 @@ impl QueueStorage {
699705
return Ok(false);
700706
}
701707

702-
self.client
708+
let removed = self
709+
.client
703710
.get()
704711
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
705712
.store()
706713
.remove_send_queue_event(&self.room_id, transaction_id)
707714
.await?;
708715

709-
Ok(true)
716+
Ok(removed)
710717
}
711718

712719
/// Replace an event that has been sent with
@@ -727,14 +734,15 @@ impl QueueStorage {
727734
return Ok(false);
728735
}
729736

730-
self.client
737+
let edited = self
738+
.client
731739
.get()
732740
.ok_or(RoomSendQueueStorageError::ClientShuttingDown)?
733741
.store()
734742
.update_send_queue_event(&self.room_id, transaction_id, serializable)
735743
.await?;
736744

737-
Ok(true)
745+
Ok(edited)
738746
}
739747

740748
/// Returns a list of the local echoes, that is, all the events that we're

crates/matrix-sdk/tests/integration/send_queue.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
ops::Not as _,
23
sync::{
34
atomic::{AtomicBool, Ordering},
45
Arc, Mutex as StdMutex,
@@ -1018,6 +1019,55 @@ async fn test_abort_after_disable() {
10181019
assert!(errors.is_empty());
10191020
}
10201021

1022+
#[async_test]
1023+
async fn test_abort_or_edit_after_send() {
1024+
let (client, server) = logged_in_client_with_server().await;
1025+
1026+
// Mark the room as joined.
1027+
let room_id = room_id!("!a:b.c");
1028+
1029+
let room = mock_sync_with_new_room(
1030+
|builder| {
1031+
builder.add_joined_room(JoinedRoomBuilder::new(room_id));
1032+
},
1033+
&client,
1034+
&server,
1035+
room_id,
1036+
)
1037+
.await;
1038+
1039+
// Start with an enabled sending queue.
1040+
client.send_queue().set_enabled(true).await;
1041+
1042+
let q = room.send_queue();
1043+
1044+
let (local_echoes, mut watch) = q.subscribe().await.unwrap();
1045+
assert!(local_echoes.is_empty());
1046+
assert!(watch.is_empty());
1047+
1048+
server.reset().await;
1049+
mock_encryption_state(&server, false).await;
1050+
mock_send_event(event_id!("$1")).mount(&server).await;
1051+
1052+
let handle = q.send(RoomMessageEventContent::text_plain("hey there").into()).await.unwrap();
1053+
1054+
// It is first seen as a local echo,
1055+
let (txn, _) = assert_update!(watch => local echo { body = "hey there" });
1056+
// Then sent.
1057+
assert_update!(watch => sent { txn = txn, });
1058+
1059+
// Editing shouldn't work anymore.
1060+
assert!(handle
1061+
.edit(RoomMessageEventContent::text_plain("i meant something completely different").into())
1062+
.await
1063+
.unwrap()
1064+
.not());
1065+
// Neither will aborting.
1066+
assert!(handle.abort().await.unwrap().not());
1067+
1068+
assert!(watch.is_empty());
1069+
}
1070+
10211071
#[async_test]
10221072
async fn test_unrecoverable_errors() {
10231073
let (client, server) = logged_in_client_with_server().await;

0 commit comments

Comments
 (0)