Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions crates/matrix-sdk/src/widget/machine/driver_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{collections::BTreeMap, marker::PhantomData};
use ruma::{
OwnedUserId,
api::client::{account::request_openid_token, delayed_events::update_delayed_event},
events::{AnyStateEvent, AnyTimelineEvent, AnyToDeviceEventContent},
events::{AnyStateEvent, AnyToDeviceEventContent},
serde::Raw,
to_device::DeviceIdOrAllDevices,
};
Expand All @@ -31,7 +31,7 @@ use super::{
Action, MatrixDriverRequestMeta, SendToDeviceEventResponse, WidgetMachine,
from_widget::SendEventResponse, incoming::MatrixDriverResponse,
};
use crate::widget::{Capabilities, StateKeySelector};
use crate::widget::{Capabilities, StateKeySelector, machine::from_widget::ReadEventsResponse};

#[derive(Clone, Debug)]
pub(crate) enum MatrixDriverRequestData {
Expand Down Expand Up @@ -184,6 +184,8 @@ pub(crate) struct ReadEventsRequest {

/// The maximum number of events to return.
pub(crate) limit: u32,

pub(crate) from: Option<String>,
}

impl From<ReadEventsRequest> for MatrixDriverRequestData {
Expand All @@ -193,10 +195,10 @@ impl From<ReadEventsRequest> for MatrixDriverRequestData {
}

impl MatrixDriverRequest for ReadEventsRequest {
type Response = Vec<Raw<AnyTimelineEvent>>;
type Response = ReadEventsResponse;
}

impl FromMatrixDriverResponse for Vec<Raw<AnyTimelineEvent>> {
impl FromMatrixDriverResponse for ReadEventsResponse {
fn from_response(ev: MatrixDriverResponse) -> Option<Self> {
match ev {
MatrixDriverResponse::EventsRead(response) => Some(response),
Expand Down
7 changes: 5 additions & 2 deletions crates/matrix-sdk/src/widget/machine/from_widget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,14 @@ pub(super) struct ReadEventsRequest {
pub(super) event_type: String,
pub(super) state_key: Option<StateKeySelector>,
pub(super) limit: Option<u32>,
pub(super) from: Option<String>,
}

#[derive(Debug, Serialize)]
pub(super) struct ReadEventsResponse {
pub(super) events: Vec<Raw<AnyTimelineEvent>>,
pub(crate) struct ReadEventsResponse {
pub(crate) events: Vec<Raw<AnyTimelineEvent>>,
pub(crate) pagination_token: Option<String>,
pub(crate) reached_start: bool,
}

#[derive(Serialize, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk/src/widget/machine/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::{
from_widget::{FromWidgetRequest, SendEventResponse},
to_widget::ToWidgetResponse,
};
use crate::widget::Capabilities;
use crate::widget::{Capabilities, machine::ReadEventsResponse};

/// Incoming message for the widget client side module that it must process.
pub(crate) enum IncomingMessage {
Expand Down Expand Up @@ -75,7 +75,7 @@ pub(crate) enum MatrixDriverResponse {
OpenIdReceived(request_openid_token::v3::Response),
/// Client read some Matrix event(s).
/// A response to a [`MatrixDriverRequestData::ReadEvents`] command.
EventsRead(Vec<Raw<AnyTimelineEvent>>),
EventsRead(ReadEventsResponse),
/// Client read some Matrix room state entries.
/// A response to a [`MatrixDriverRequestData::ReadState`] command.
StateRead(Vec<Raw<AnyStateEvent>>),
Expand Down
57 changes: 36 additions & 21 deletions crates/matrix-sdk/src/widget/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use from_widget::UpdateDelayedEventResponse;
use indexmap::IndexMap;
use ruma::{
OwnedRoomId,
events::{AnyStateEvent, AnyTimelineEvent},
events::AnyStateEvent,
serde::{JsonObject, Raw},
};
use serde::Serialize;
Expand All @@ -34,10 +34,7 @@ use self::{
driver_req::{
AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle, RequestOpenId,
},
from_widget::{
FromWidgetErrorResponse, FromWidgetRequest, ReadEventsResponse,
SupportedApiVersionsResponse,
},
from_widget::{FromWidgetErrorResponse, FromWidgetRequest, SupportedApiVersionsResponse},
incoming::{IncomingWidgetMessage, IncomingWidgetMessageKind},
openid::{OpenIdResponse, OpenIdState},
pending::{PendingRequests, RequestLimits},
Expand All @@ -53,7 +50,10 @@ use super::{
capabilities::{SEND_DELAYED_EVENT, UPDATE_DELAYED_EVENT},
filter::FilterInput,
};
use crate::{Error, Result, widget::Filter};
use crate::{
Error, Result,
widget::{Filter, machine},
};

mod driver_req;
mod from_widget;
Expand All @@ -66,7 +66,7 @@ mod to_widget;

pub(crate) use self::{
driver_req::{MatrixDriverRequestData, SendEventRequest, SendToDeviceRequest},
from_widget::{SendEventResponse, SendToDeviceEventResponse},
from_widget::{ReadEventsResponse, SendEventResponse, SendToDeviceEventResponse},
incoming::{IncomingMessage, MatrixDriverResponse},
};

Expand Down Expand Up @@ -133,6 +133,10 @@ pub(crate) struct WidgetMachine {
/// The room to which this widget machine is attached.
room_id: OwnedRoomId,

/// Whether to wait with the initialization (capability request) until we
/// receive the content_load action.
init_on_content_load: bool,

/// Outstanding requests sent to the widget (mapped by uuid).
pending_to_widget_requests: PendingRequests<ToWidgetRequestMeta>,

Expand Down Expand Up @@ -167,17 +171,19 @@ impl WidgetMachine {
let limits =
RequestLimits { max_pending_requests: 15, response_timeout: Duration::from_secs(10) };

let mut machine = Self {
let machine = Self {
widget_id,
room_id,
init_on_content_load,
pending_to_widget_requests: PendingRequests::new(limits.clone()),
pending_matrix_driver_requests: PendingRequests::new(limits),
pending_state_updates: None,
capabilities: CapabilitiesState::Unset,
};

let initial_actions =
if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() };
let initial_actions = Vec::new();
// let initial_actions =
// if init_on_content_load { Vec::new() } else {
// machine.negotiate_capabilities() };

(machine, initial_actions)
}
Expand Down Expand Up @@ -286,17 +292,25 @@ impl WidgetMachine {

match request {
FromWidgetRequest::SupportedApiVersions {} => {
let response = SupportedApiVersionsResponse::new();
vec![Self::send_from_widget_response(raw_request, Ok(response))]
let mut response_array = vec![Self::send_from_widget_response(
raw_request,
Ok(SupportedApiVersionsResponse::new()),
)];
if !self.init_on_content_load
&& matches!(self.capabilities, CapabilitiesState::Unset)
{
response_array.append(&mut self.negotiate_capabilities());
}
response_array
}

FromWidgetRequest::ContentLoaded {} => {
let mut response =
let mut response_array =
vec![Self::send_from_widget_response(raw_request, Ok(JsonObject::new()))];
if matches!(self.capabilities, CapabilitiesState::Unset) {
response.append(&mut self.negotiate_capabilities());
response_array.append(&mut self.negotiate_capabilities());
}
response
response_array
}

FromWidgetRequest::ReadEvent(req) => self
Expand Down Expand Up @@ -391,7 +405,7 @@ impl WidgetMachine {
fn send_read_events_response(
&self,
request: Raw<FromWidgetRequest>,
events: Result<Vec<Raw<AnyTimelineEvent>>, Error>,
response: Result<ReadEventsResponse, Error>,
) -> Vec<Action> {
let response = match &self.capabilities {
CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string(
Expand All @@ -400,10 +414,10 @@ impl WidgetMachine {
CapabilitiesState::Negotiating => Err(FromWidgetErrorResponse::from_string(
"Received read events request while capabilities were negotiating",
)),
CapabilitiesState::Negotiated(capabilities) => events
.map(|mut events| {
events.retain(|e| capabilities.allow_reading(e));
ReadEventsResponse { events }
CapabilitiesState::Negotiated(capabilities) => response
.map(|mut res| {
res.events.retain(|e| capabilities.allow_reading(e));
res
})
.map_err(FromWidgetErrorResponse::from_error),
};
Expand Down Expand Up @@ -463,6 +477,7 @@ impl WidgetMachine {
event_type: request.event_type,
state_key: request.state_key,
limit,
from: request.from,
};

self.send_matrix_driver_request(request).map(|(request, action)| {
Expand Down
128 changes: 96 additions & 32 deletions crates/matrix-sdk/src/widget/matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ use std::collections::{BTreeMap, BTreeSet};
use as_variant::as_variant;
use matrix_sdk_base::{
crypto::CollectStrategy,
deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState},
deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState, TimelineEvent},
sync::State,
};
use ruma::{
EventId, OwnedDeviceId, OwnedUserId, RoomId, TransactionId,
api::client::{
account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
delayed_events::{self, update_delayed_event::unstable::UpdateAction},
filter::RoomEventFilter,
to_device::send_event_to_device::v3::Request as RumaToDeviceRequest,
},
assign,
events::{
AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContent, AnySyncStateEvent,
AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, AnyToDeviceEventContent,
Expand All @@ -48,18 +46,30 @@ use tokio::sync::{
};
use tracing::{error, trace, warn};

use super::{StateKeySelector, machine::SendEventResponse};
use super::{
StateKeySelector,
machine::{ReadEventsResponse, SendEventResponse},
};
use crate::{
Client, Error, Result, Room, event_handler::EventHandlerDropGuard, room::MessagesOptions,
sync::RoomUpdate, widget::machine::SendToDeviceEventResponse,
Client, Error, Result, Room, event_handler::EventHandlerDropGuard, sync::RoomUpdate,
widget::machine::SendToDeviceEventResponse,
};

/// Thin wrapper around a [`Room`] that provides functionality relevant for
/// widgets.
pub(crate) struct MatrixDriver {
room: Room,
}

/// Internal representation of errors.
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
enum ReadEventsError {
#[error(
"There provided `from` (EventId) value was not found in the current event_cache.
Make sure you only use from values you have received from a previous call to read_events."
)]
InvalidFromEventId,
}
impl MatrixDriver {
/// Creates a new `MatrixDriver` for a given `room`.
pub(crate) fn new(room: Room) -> Self {
Expand All @@ -78,37 +88,91 @@ impl MatrixDriver {

/// Reads the latest `limit` events of a given `event_type` from the room's
/// timeline.
///
/// # Arguments
///
/// * `from` - The token to start reading from. This is just the ev id of
/// the last event in the cache that was sent to the widget.
pub(crate) async fn read_events(
&self,
event_type: TimelineEventType,
state_key: Option<StateKeySelector>,
limit: u32,
) -> Result<Vec<Raw<AnyTimelineEvent>>> {
let options = assign!(MessagesOptions::backward(), {
limit: limit.into(),
filter: assign!(RoomEventFilter::default(), {
types: Some(vec![event_type.to_string()])
limit: usize,
from: Option<String>,
) -> Result<ReadEventsResponse> {
let ev_cache = self.room.event_cache().await?.0;
let mut events = ev_cache.events().await?;

let mut reached_start = false;
let mut allow_to_look_for_more_events = true;
const LIMIT_ITERATIONS: usize = 5;
let mut iterations = 0;

let compute_index_of_token = |from: &Option<String>, events: &Vec<TimelineEvent>| match from
{
Some(f) => match events
.iter()
.position(|e| e.event_id().is_some_and(|id| &id.to_string() == f))
{
Some(index) => Ok(index),
None => {
return Err(Error::UnknownError(Box::new(ReadEventsError::InvalidFromEventId)));
}
},
None => Ok(if events.len() > 0 { events.len() - 1 } else { 0 }),
};
let allow_to_look_for_more_events = |reached_start: bool, iterations: usize| {
reached_start == false && iterations <= LIMIT_ITERATIONS
};

let mut index_of_token = compute_index_of_token(&from, &events)?;
while index_of_token <= limit && allow_to_look_for_more_events(reached_start, iterations) {
// Fetch more events from the server
// And update local event array
reached_start =
ev_cache.pagination().run_backwards_until((limit) as u16).await?.reached_start;
events = ev_cache.events().await?;

// update the index where we can find our pagination token
index_of_token = compute_index_of_token(&from, &events)?;
iterations += 1;
}

// TODO use checked_sign_diff
let lower_bound_index = std::cmp::max((index_of_token as i32) - (limit as i32), 0) as usize;
let token = events[lower_bound_index].event_id().map(|id| id.to_string());

let filter_event_type = |e: &Raw<AnyTimelineEvent>| {
e.get_field::<String>("type")
.is_ok_and(|ev_ty| ev_ty.is_some_and(|ty| ty == event_type.to_string()))
};

let filter_state_key = |e: &Raw<AnyTimelineEvent>| match &state_key {
None => true,
Some(state_key) => e.get_field::<String>("state_key").is_ok_and(|key| {
key.is_some_and(|k| match state_key {
StateKeySelector::Any => true,
StateKeySelector::Key(request_key) => request_key == &k,
})
}),
});
};

let messages = self.room.messages(options).await?;

Ok(messages
.chunk
.into_iter()
.map(|ev| ev.into_raw().cast_unchecked())
.filter(|ev| match &state_key {
Some(state_key) => {
ev.get_field::<String>("state_key").is_ok_and(|key| match state_key {
StateKeySelector::Key(state_key) => {
key.is_some_and(|key| &key == state_key)
}
StateKeySelector::Any => key.is_some(),
})
}
None => true,
})
.collect())
let filtered_events = if index_of_token as i32 - lower_bound_index as i32 > 0 {
events[lower_bound_index..index_of_token]
.into_iter()
.map(|e| attach_room_id(e.raw(), self.room.room_id()))
.filter(filter_event_type)
.filter(filter_state_key)
.collect()
} else {
vec![]
};

return Ok(ReadEventsResponse {
events: filtered_events,
pagination_token: token,
reached_start,
});
}

/// Reads the current values of the room state entries matching the given
Expand Down
Loading
Loading