Skip to content

Commit d44ec81

Browse files
committed
feat(sdk): Implement LatestEvent::update_with_send_queue.
This patch implements `LatestEvent::update_with_send_queue`. It introduces an intermediate type, for the sake of clarity, `LatestEventValuesForLocalEvents`. The difficulty here is to keep a buffer of `LatestEventValue`s requested by the `SendQueue`. Why? Because we want the latest event value, but we only receive `RoomSendQueueUpdate`s, we can't iterate over local events in the `SendQueue` like we do for the `EventCache` to re-compute the latest event if a local event has been cancelled or updated. A particular care must also be applied when a local event is wedged: this local event and all its followings must be marked as wedged too, so that the `LatestEventValue` is `LocalIsWedged`. Same when the local event is unwedged.
1 parent 08e6143 commit d44ec81

File tree

2 files changed

+369
-4
lines changed

2 files changed

+369
-4
lines changed

crates/matrix-sdk/src/latest_events/latest_event.rs

Lines changed: 347 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ pub(super) struct LatestEvent {
4646
/// The thread (if any) owning this latest event.
4747
_thread_id: Option<OwnedEventId>,
4848

49+
/// A buffer of the current [`LatestEventValue`] computed for local events
50+
/// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn
51+
/// more.
52+
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
53+
4954
/// The latest event value.
5055
current_value: SharedObservable<LatestEventValue, AsyncLock>,
5156
}
@@ -60,6 +65,7 @@ impl LatestEvent {
6065
Self {
6166
_room_id: room_id.to_owned(),
6267
_thread_id: thread_id.map(ToOwned::to_owned),
68+
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
6369
current_value: SharedObservable::new_async(
6470
LatestEventValue::new_remote(room_event_cache, weak_room).await,
6571
),
@@ -86,8 +92,21 @@ impl LatestEvent {
8692

8793
/// Update the inner latest event value, based on the send queue
8894
/// (specifically with a [`RoomSendQueueUpdate`]).
89-
pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
90-
todo!()
95+
pub async fn update_with_send_queue(
96+
&mut self,
97+
send_queue_update: &RoomSendQueueUpdate,
98+
room_event_cache: &RoomEventCache,
99+
power_levels: &Option<(&UserId, RoomPowerLevels)>,
100+
) {
101+
let new_value = LatestEventValue::new_local(
102+
send_queue_update,
103+
&mut self.buffer_of_values_for_local_events,
104+
room_event_cache,
105+
power_levels,
106+
)
107+
.await;
108+
109+
self.update(new_value).await;
91110
}
92111

93112
/// Update [`Self::current_value`] if and only if the `new_value` is not
@@ -150,6 +169,332 @@ impl LatestEventValue {
150169
.map(Self::Remote)
151170
.unwrap_or_default()
152171
}
172+
173+
/// Create a new [`LatestEventValue::LocalIsSending`] or
174+
/// [`LatestEventValue::LocalIsWedged`].
175+
async fn new_local(
176+
send_queue_update: &RoomSendQueueUpdate,
177+
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
178+
room_event_cache: &RoomEventCache,
179+
power_levels: &Option<(&UserId, RoomPowerLevels)>,
180+
) -> Self {
181+
use crate::send_queue::{LocalEcho, LocalEchoContent};
182+
183+
match send_queue_update {
184+
// A new local event is being sent.
185+
//
186+
// Let's create the `LatestEventValue` and push it in the buffer of values.
187+
RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
188+
transaction_id,
189+
content: local_echo_content,
190+
}) => match local_echo_content {
191+
LocalEchoContent::Event { serialized_event: content, .. } => {
192+
if let Ok(content) = content.deserialize() {
193+
if let Some(kind) = find_and_map_any_message_like_event_content(content) {
194+
let value = Self::LocalIsSending(kind);
195+
196+
buffer_of_values_for_local_events
197+
.push(transaction_id.to_owned(), value.clone());
198+
199+
value
200+
} else {
201+
Self::None
202+
}
203+
} else {
204+
Self::None
205+
}
206+
}
207+
208+
LocalEchoContent::React { .. } => Self::None,
209+
},
210+
211+
// A local event has been cancelled before being sent.
212+
//
213+
// Remove the calculated `LatestEventValue` from the buffer of values, and return the
214+
// last `LatestEventValue` or calculate a new one.
215+
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
216+
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
217+
buffer_of_values_for_local_events.remove(position);
218+
}
219+
220+
Self::new_local_or_remote(
221+
buffer_of_values_for_local_events,
222+
room_event_cache,
223+
power_levels,
224+
)
225+
.await
226+
}
227+
228+
// A local event has successfully been sent!
229+
//
230+
// Unwedge all wedged values after the one matching `transaction_id`. Indeed, if
231+
// an event has been sent, it means the send queue is working, so if any value has been
232+
// marked as wedged, it must be marked as unwedged. Then, remove the calculated
233+
// `LatestEventValue` from the buffer of values. Finally, return the last
234+
// `LatestEventValue` or calculate a new one.
235+
RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
236+
let position = buffer_of_values_for_local_events.unwedged_after(transaction_id);
237+
238+
if let Some(position) = position {
239+
buffer_of_values_for_local_events.remove(position);
240+
}
241+
242+
Self::new_local_or_remote(
243+
buffer_of_values_for_local_events,
244+
room_event_cache,
245+
power_levels,
246+
)
247+
.await
248+
}
249+
250+
// A local event has been replaced by another one.
251+
//
252+
// Replace the latest event value matching `transaction_id` in the buffer if it exists
253+
// (note: it should!), and return the last `LatestEventValue` or calculate a new one.
254+
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content: content } => {
255+
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
256+
if let Ok(content) = content.deserialize() {
257+
if let Some(kind) = find_and_map_any_message_like_event_content(content) {
258+
buffer_of_values_for_local_events.replace_kind(position, kind);
259+
}
260+
} else {
261+
return Self::None;
262+
}
263+
}
264+
265+
Self::new_local_or_remote(
266+
buffer_of_values_for_local_events,
267+
room_event_cache,
268+
power_levels,
269+
)
270+
.await
271+
}
272+
273+
// An error has occurred.
274+
//
275+
// Mark the latest event value matching `transaction_id`, and all its following values,
276+
// as wedged.
277+
RoomSendQueueUpdate::SendError { transaction_id, .. } => {
278+
buffer_of_values_for_local_events.wedged_from(transaction_id);
279+
280+
Self::new_local_or_remote(
281+
buffer_of_values_for_local_events,
282+
room_event_cache,
283+
power_levels,
284+
)
285+
.await
286+
}
287+
288+
// A local event has been unwedged and sending is being retried.
289+
//
290+
// Mark the latest event value matching `transaction_id`, and all its following values,
291+
// as unwedged.
292+
RoomSendQueueUpdate::RetryEvent { transaction_id } => {
293+
buffer_of_values_for_local_events.unwedged_from(transaction_id);
294+
295+
Self::new_local_or_remote(
296+
buffer_of_values_for_local_events,
297+
room_event_cache,
298+
power_levels,
299+
)
300+
.await
301+
}
302+
303+
// A media upload has made progress.
304+
//
305+
// Nothing to do here.
306+
RoomSendQueueUpdate::MediaUpload { .. } => Self::None,
307+
}
308+
}
309+
310+
/// Get the last [`LatestEventValue`] from the local latest event values if
311+
/// any, or create a new [`LatestEventValue`] from the remote events.
312+
///
313+
/// If the buffer of latest event values is not empty, let's return the last
314+
/// one. Otherwise, it means we no longer have any local event: let's
315+
/// fallback on remote event!
316+
async fn new_local_or_remote(
317+
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
318+
room_event_cache: &RoomEventCache,
319+
power_levels: &Option<(&UserId, RoomPowerLevels)>,
320+
) -> Self {
321+
if let Some(value) = buffer_of_values_for_local_events.last() {
322+
value.clone()
323+
} else {
324+
Self::new_remote_with_power_levels(room_event_cache, power_levels).await
325+
}
326+
}
327+
}
328+
329+
/// A buffer of the current [`LatestEventValue`] computed for local events
330+
/// seen by the send queue. It is used by
331+
/// [`LatestEvent::buffer_of_values_for_local_events`].
332+
///
333+
/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
334+
/// iterate over local events in the send queue when a local event is changed
335+
/// (cancelled, or updated for example). That's why we keep our own buffer here.
336+
/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
337+
///
338+
/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
339+
/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
340+
/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
341+
/// event,
342+
/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
343+
/// event.
344+
///
345+
/// `NewLocalEvent`s will trigger the computation of new
346+
/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
347+
/// any information to compute a new `LatestEventValue`, so we need to
348+
/// remember the previous values, until the local events are sent and
349+
/// removed from this buffer.
350+
///
351+
/// Another reason why we need a buffer is to handle wedged local event. Imagine
352+
/// the system receives 3 [`RoomSendQueueUpdate`]:
353+
///
354+
/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
355+
/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
356+
/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
357+
/// be sent.
358+
///
359+
/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
360+
/// send queue is stopped. However, the `LatestEventValue` targets the second
361+
/// `NewLocalEvent`. The system must consider that when a local event is wedged,
362+
/// all the following local events must also be marked as wedged. And vice
363+
/// versa, when the send queue is able to send an event again, all the following
364+
/// local events must be marked as unwedged.
365+
///
366+
/// This type isolates a couple of methods designed to manage these specific
367+
/// behaviours.
368+
#[derive(Debug)]
369+
struct LatestEventValuesForLocalEvents {
370+
buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
371+
}
372+
373+
impl LatestEventValuesForLocalEvents {
374+
/// Create a new [`LatestEventValuesForLocalEvents`].
375+
fn new() -> Self {
376+
Self { buffer: Vec::with_capacity(2) }
377+
}
378+
379+
/// Get the last [`LatestEventValue`].
380+
fn last(&self) -> Option<&LatestEventValue> {
381+
self.buffer.last().map(|(_, value)| value)
382+
}
383+
384+
/// Find the position of the [`LatestEventValue`] matching `transaction_id`.
385+
fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
386+
self.buffer
387+
.iter()
388+
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
389+
}
390+
391+
/// Push a new [`LatestEventValue`].
392+
///
393+
/// # Panics
394+
///
395+
/// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
396+
/// [`LatestEventValue::LocalIsWedged`].
397+
fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
398+
assert!(
399+
matches!(
400+
value,
401+
LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalIsWedged(_)
402+
),
403+
"`value` must be either `LocalIsSending` or `LocalIsWedged`"
404+
);
405+
406+
self.buffer.push((transaction_id, value));
407+
}
408+
409+
/// Replace the [`LatestEventKind`] of the [`LatestEventValue`] at position
410+
/// `position`.
411+
///
412+
/// # Panics
413+
///
414+
/// Panics if:
415+
/// - `position` is strictly greater than buffer's length,
416+
/// - the [`LatestEventValue`] is not of kind
417+
/// [`LatestEventValue::LocalIsSending`] or
418+
/// [`LatestEventValue::LocalIsWedged`].
419+
fn replace_kind(&mut self, position: usize, new_kind: LatestEventKind) {
420+
let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
421+
422+
match value {
423+
LatestEventValue::LocalIsSending(kind) => *kind = new_kind,
424+
LatestEventValue::LocalIsWedged(kind) => *kind = new_kind,
425+
_ => panic!("`value` must be either `LocalIsSending` or `LocalIsWedged`"),
426+
}
427+
}
428+
429+
/// Remove the [`LatestEventValue`] at position `position`.
430+
///
431+
/// # Panics
432+
///
433+
/// Panics if `position` is strictly greater than buffer's length.
434+
fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
435+
self.buffer.remove(position)
436+
}
437+
438+
/// Mark the `LatestEventValue` matching `transaction_id`, and all the
439+
/// following values, as wedged.
440+
fn wedged_from(&mut self, transaction_id: &TransactionId) {
441+
let mut values = self.buffer.iter_mut();
442+
443+
if let Some(first_value_to_wedge) = values
444+
.by_ref()
445+
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
446+
{
447+
// Iterate over the found value and the following ones.
448+
for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
449+
if let LatestEventValue::LocalIsSending(kind) = value_to_wedge {
450+
*value_to_wedge = LatestEventValue::LocalIsWedged(kind.clone());
451+
}
452+
}
453+
}
454+
}
455+
456+
/// Mark the `LatestEventValue` matching `transaction_id`, and all the
457+
/// following values, as unwedged.
458+
fn unwedged_from(&mut self, transaction_id: &TransactionId) {
459+
let mut values = self.buffer.iter_mut();
460+
461+
if let Some(first_value_to_unwedge) = values
462+
.by_ref()
463+
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
464+
{
465+
// Iterate over the found value and the following ones.
466+
for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
467+
if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge {
468+
*value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone());
469+
}
470+
}
471+
}
472+
}
473+
474+
/// Mark all the following values after the `LatestEventValue` matching
475+
/// `transaction_id` as unwedged.
476+
///
477+
/// Note that contrary to [`Self::unwedged_from`], the `LatestEventValue` is
478+
/// untouched. However, its position is returned (if any).
479+
fn unwedged_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
480+
let mut values = self.buffer.iter_mut();
481+
482+
if let Some(position) = values
483+
.by_ref()
484+
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
485+
{
486+
// Iterate over all values after the found one.
487+
for (_, value_to_unwedge) in values {
488+
if let LatestEventValue::LocalIsWedged(kind) = value_to_unwedge {
489+
*value_to_unwedge = LatestEventValue::LocalIsSending(kind.clone());
490+
}
491+
}
492+
493+
Some(position)
494+
} else {
495+
None
496+
}
497+
}
153498
}
154499

155500
/// A latest event value!

0 commit comments

Comments
 (0)