Skip to content

Commit 68b902e

Browse files
Johennespoljar
authored andcommitted
feat(ffi): add bindings for listening to global send queue updates
Signed-off-by: Johannes Marbach <[email protected]>
1 parent 8bb8bba commit 68b902e

File tree

3 files changed

+78
-1
lines changed

3 files changed

+78
-1
lines changed

bindings/matrix-sdk-ffi/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ All notable changes to this project will be documented in this file.
2929
([#5786](https://github.com/matrix-org/matrix-rust-sdk/pull/5786))
3030
- `ComposerDraft` now includes attachments alongside the text message.
3131
([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794))
32+
- Add `Client::subscribe_to_send_queue_updates` to observe global send queue updates.
33+
([#5784](https://github.com/matrix-org/matrix-rust-sdk/pull/5784))
3234

3335
### Features:
3436

bindings/matrix-sdk-ffi/src/client.rs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ use crate::{
103103
notification::NotificationClient,
104104
notification_settings::NotificationSettings,
105105
qr_code::LoginWithQrCodeHandler,
106-
room::{RoomHistoryVisibility, RoomInfoListener},
106+
room::{RoomHistoryVisibility, RoomInfoListener, RoomSendQueueUpdate},
107107
room_directory_search::RoomDirectorySearch,
108108
room_preview::RoomPreview,
109109
ruma::{
@@ -195,6 +195,13 @@ pub trait ProgressWatcher: SyncOutsideWasm + SendOutsideWasm {
195195
fn transmission_progress(&self, progress: TransmissionProgress);
196196
}
197197

198+
/// A listener to the global (client-wide) update reporter of the send queue.
199+
#[matrix_sdk_ffi_macros::export(callback_interface)]
200+
pub trait SendQueueRoomUpdateListener: SyncOutsideWasm + SendOutsideWasm {
201+
/// Called every time the send queue emits an update for a given room.
202+
fn on_update(&self, room_id: String, update: RoomSendQueueUpdate);
203+
}
204+
198205
/// A listener to the global (client-wide) error reporter of the send queue.
199206
#[matrix_sdk_ffi_macros::export(callback_interface)]
200207
pub trait SendQueueRoomErrorListener: SyncOutsideWasm + SendOutsideWasm {
@@ -607,6 +614,49 @@ impl Client {
607614
self.inner.send_queue().enable_upload_progress(enable);
608615
}
609616

617+
/// Subscribe to the global send queue update reporter, at the
618+
/// client-wide level.
619+
///
620+
/// The given listener will be immediately called with
621+
/// `RoomSendQueueUpdate::NewLocalEvent` for each local echo existing in
622+
/// the queue.
623+
pub async fn subscribe_to_send_queue_updates(
624+
&self,
625+
listener: Box<dyn SendQueueRoomUpdateListener>,
626+
) -> Result<Arc<TaskHandle>, ClientError> {
627+
let q = self.inner.send_queue();
628+
let local_echoes = q.local_echoes().await?;
629+
let mut subscriber = q.subscribe();
630+
631+
for (room_id, local_echoes) in local_echoes {
632+
for local_echo in local_echoes {
633+
listener.on_update(
634+
room_id.clone().into(),
635+
RoomSendQueueUpdate::NewLocalEvent {
636+
transaction_id: local_echo.transaction_id.into(),
637+
},
638+
);
639+
}
640+
}
641+
642+
Ok(Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
643+
loop {
644+
match subscriber.recv().await {
645+
Ok(update) => {
646+
let room_id = update.room_id.to_string();
647+
match update.update.try_into() {
648+
Ok(update) => listener.on_update(room_id, update),
649+
Err(err) => error!("error when converting send queue update: {err}"),
650+
}
651+
}
652+
Err(err) => {
653+
error!("error when listening to the send queue update reporter: {err}");
654+
}
655+
}
656+
}
657+
}))))
658+
}
659+
610660
/// Subscribe to the global enablement status of the send queue, at the
611661
/// client-wide level.
612662
///

crates/matrix-sdk/src/send_queue/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,31 @@ impl SendQueue {
305305
self.data().global_update_sender.subscribe()
306306
}
307307

308+
/// Get local echoes from all room send queues.
309+
pub async fn local_echoes(
310+
&self,
311+
) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
312+
let room_ids =
313+
self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
314+
|err| {
315+
warn!("error when loading rooms with unsent requests: {err}");
316+
Vec::new()
317+
},
318+
);
319+
320+
let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
321+
322+
for room_id in room_ids {
323+
if let Some(room) = self.client.get_room(&room_id) {
324+
let queue = self.for_room(room);
325+
local_echoes
326+
.insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
327+
}
328+
}
329+
330+
Ok(local_echoes)
331+
}
332+
308333
/// A subscriber to the enablement status (enabled or disabled) of the
309334
/// send queue, along with useful errors.
310335
pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {

0 commit comments

Comments
 (0)