diff --git a/crates/matrix-sdk/src/widget/filter.rs b/crates/matrix-sdk/src/widget/filter.rs index d4c03abb9aa..a674fdbb5ce 100644 --- a/crates/matrix-sdk/src/widget/filter.rs +++ b/crates/matrix-sdk/src/widget/filter.rs @@ -14,7 +14,8 @@ use ruma::{ events::{ - AnyTimelineEvent, AnyToDeviceEvent, MessageLikeEventType, StateEventType, ToDeviceEventType, + AnyStateEvent, AnyTimelineEvent, AnyToDeviceEvent, MessageLikeEventType, StateEventType, + ToDeviceEventType, }, serde::Raw, }; @@ -224,6 +225,16 @@ impl<'a> TryFrom<&'a Raw> for FilterInput<'a> { } } +/// Create a filter input based on [`AnyStateEvent`]. +/// This will create a [`FilterInput::State`]. +impl<'a> TryFrom<&'a Raw> for FilterInput<'a> { + type Error = serde_json::Error; + + fn try_from(raw_event: &'a Raw) -> Result { + raw_event.deserialize_as() + } +} + #[derive(Debug, Deserialize)] pub struct FilterInputToDevice<'a> { #[serde(rename = "type")] diff --git a/crates/matrix-sdk/src/widget/machine/driver_req.rs b/crates/matrix-sdk/src/widget/machine/driver_req.rs index 9188077f409..b52de600846 100644 --- a/crates/matrix-sdk/src/widget/machine/driver_req.rs +++ b/crates/matrix-sdk/src/widget/machine/driver_req.rs @@ -21,7 +21,7 @@ use ruma::{ account::request_openid_token, delayed_events::update_delayed_event, to_device::send_event_to_device, }, - events::{AnyTimelineEvent, AnyToDeviceEventContent}, + events::{AnyStateEvent, AnyTimelineEvent, AnyToDeviceEventContent}, serde::Raw, to_device::DeviceIdOrAllDevices, OwnedUserId, @@ -48,14 +48,14 @@ pub(crate) enum MatrixDriverRequestData { /// Get OpenId token for a given request ID. GetOpenId, - /// Read message event(s). - ReadMessageLikeEvent(ReadMessageLikeEventRequest), + /// Read events from the timeline. + ReadEvents(ReadEventsRequest), - /// Read state event(s). - ReadStateEvent(ReadStateEventRequest), + /// Read room state entries. + ReadState(ReadStateRequest), /// Send Matrix event that corresponds to the given description. - SendMatrixEvent(SendEventRequest), + SendEvent(SendEventRequest), /// Send a to-device message over the Matrix homeserver. SendToDeviceEvent(SendToDeviceRequest), @@ -170,33 +170,39 @@ impl FromMatrixDriverResponse for request_openid_token::v3::Response { } } -/// Ask the client to read Matrix event(s) that corresponds to the given +/// Ask the client to read Matrix events that correspond to the given /// description and return a list of events as a response. #[derive(Clone, Debug)] -pub(crate) struct ReadMessageLikeEventRequest { +pub(crate) struct ReadEventsRequest { /// The event type to read. // TODO: This wants to be `MessageLikeEventType`` but we need a type which supports `as_str()` // as soon as ruma supports `as_str()` on `MessageLikeEventType` we can use it here. pub(crate) event_type: String, + /// The `state_key` to read. If None, this will read events regardless of + /// whether they are state events. If `Some(Any)`, this will only read state + /// events of the given type. If set to a specific state key, this will only + /// read state events of the given type matching that state key. + pub(crate) state_key: Option, + /// The maximum number of events to return. pub(crate) limit: u32, } -impl From for MatrixDriverRequestData { - fn from(value: ReadMessageLikeEventRequest) -> Self { - MatrixDriverRequestData::ReadMessageLikeEvent(value) +impl From for MatrixDriverRequestData { + fn from(value: ReadEventsRequest) -> Self { + MatrixDriverRequestData::ReadEvents(value) } } -impl MatrixDriverRequest for ReadMessageLikeEventRequest { +impl MatrixDriverRequest for ReadEventsRequest { type Response = Vec>; } impl FromMatrixDriverResponse for Vec> { fn from_response(ev: MatrixDriverResponse) -> Option { match ev { - MatrixDriverResponse::MatrixEventRead(response) => Some(response), + MatrixDriverResponse::EventsRead(response) => Some(response), _ => { error!("bug in MatrixDriver, received wrong event response"); None @@ -205,28 +211,40 @@ impl FromMatrixDriverResponse for Vec> { } } -/// Ask the client to read Matrix event(s) that corresponds to the given -/// description and return a list of events as a response. +/// Ask the client to read Matrix room state entries corresponding to the given +/// description and return a list of state events as a response. #[derive(Clone, Debug)] -pub(crate) struct ReadStateEventRequest { +pub(crate) struct ReadStateRequest { /// The event type to read. // TODO: This wants to be `TimelineEventType` but we need a type which supports `as_str()` // as soon as ruma supports `as_str()` on `TimelineEventType` we can use it here. pub(crate) event_type: String, - /// The `state_key` to read, or `Any` to receive any/all events of the given - /// type, regardless of their `state_key`. + /// The `state_key` to read, or `Any` to receive any/all room state entries + /// of the given type, regardless of their `state_key`. pub(crate) state_key: StateKeySelector, } -impl From for MatrixDriverRequestData { - fn from(value: ReadStateEventRequest) -> Self { - MatrixDriverRequestData::ReadStateEvent(value) +impl From for MatrixDriverRequestData { + fn from(value: ReadStateRequest) -> Self { + MatrixDriverRequestData::ReadState(value) } } -impl MatrixDriverRequest for ReadStateEventRequest { - type Response = Vec>; +impl MatrixDriverRequest for ReadStateRequest { + type Response = Vec>; +} + +impl FromMatrixDriverResponse for Vec> { + fn from_response(ev: MatrixDriverResponse) -> Option { + match ev { + MatrixDriverResponse::StateRead(response) => Some(response), + _ => { + error!("bug in MatrixDriver, received wrong event response"); + None + } + } + } } /// Ask the client to send Matrix event that corresponds to the given @@ -251,7 +269,7 @@ pub(crate) struct SendEventRequest { impl From for MatrixDriverRequestData { fn from(value: SendEventRequest) -> Self { - MatrixDriverRequestData::SendMatrixEvent(value) + MatrixDriverRequestData::SendEvent(value) } } @@ -262,7 +280,7 @@ impl MatrixDriverRequest for SendEventRequest { impl FromMatrixDriverResponse for SendEventResponse { fn from_response(ev: MatrixDriverResponse) -> Option { match ev { - MatrixDriverResponse::MatrixEventSent(response) => Some(response), + MatrixDriverResponse::EventSent(response) => Some(response), _ => { error!("bug in MatrixDriver, received wrong event response"); None @@ -303,7 +321,7 @@ impl TryInto for MatrixDriverResponse { fn try_into(self) -> Result { match self { - MatrixDriverResponse::MatrixToDeviceSent(response) => Ok(response), + MatrixDriverResponse::ToDeviceSent(response) => Ok(response), _ => Err(de::Error::custom("bug in MatrixDriver, received wrong event response")), } } @@ -330,7 +348,7 @@ impl MatrixDriverRequest for UpdateDelayedEventRequest { impl FromMatrixDriverResponse for update_delayed_event::unstable::Response { fn from_response(ev: MatrixDriverResponse) -> Option { match ev { - MatrixDriverResponse::MatrixDelayedEventUpdate(response) => Some(response), + MatrixDriverResponse::DelayedEventUpdated(response) => Some(response), _ => { error!("bug in MatrixDriver, received wrong event response"); None diff --git a/crates/matrix-sdk/src/widget/machine/from_widget.rs b/crates/matrix-sdk/src/widget/machine/from_widget.rs index b553a6a75a2..b62794d395b 100644 --- a/crates/matrix-sdk/src/widget/machine/from_widget.rs +++ b/crates/matrix-sdk/src/widget/machine/from_widget.rs @@ -35,7 +35,7 @@ pub(super) enum FromWidgetRequest { #[serde(rename = "get_openid")] GetOpenId {}, #[serde(rename = "org.matrix.msc2876.read_events")] - ReadEvent(ReadEventRequest), + ReadEvent(ReadEventsRequest), SendEvent(SendEventRequest), SendToDevice(SendToDeviceRequest), #[serde(rename = "org.matrix.msc4157.update_delayed_event")] @@ -133,6 +133,7 @@ impl SupportedApiVersionsResponse { ApiVersion::V0_0_1, ApiVersion::V0_0_2, ApiVersion::MSC2762, + ApiVersion::MSC2762UpdateState, ApiVersion::MSC2871, ApiVersion::MSC3819, ], @@ -151,11 +152,15 @@ pub(super) enum ApiVersion { #[serde(rename = "0.0.2")] V0_0_2, - /// Supports sending and receiving of events. + /// Supports sending and receiving events. #[serde(rename = "org.matrix.msc2762")] MSC2762, - /// Supports sending of approved capabilities back to the widget. + /// Supports receiving room state with the `update_state` action. + #[serde(rename = "org.matrix.msc2762_update_state")] + MSC2762UpdateState, + + /// Supports sending approved capabilities back to the widget. #[serde(rename = "org.matrix.msc2871")] MSC2871, @@ -171,7 +176,7 @@ pub(super) enum ApiVersion { #[serde(rename = "org.matrix.msc2876")] MSC2876, - /// Supports sending and receiving of to-device events. + /// Supports sending and receiving to-device events. #[serde(rename = "org.matrix.msc3819")] MSC3819, @@ -181,22 +186,15 @@ pub(super) enum ApiVersion { } #[derive(Deserialize, Debug)] -#[serde(untagged)] -pub(super) enum ReadEventRequest { - ReadStateEvent { - #[serde(rename = "type")] - event_type: String, - state_key: StateKeySelector, - }, - ReadMessageLikeEvent { - #[serde(rename = "type")] - event_type: String, - limit: Option, - }, +pub(super) struct ReadEventsRequest { + #[serde(rename = "type")] + pub(super) event_type: String, + pub(super) state_key: Option, + pub(super) limit: Option, } #[derive(Debug, Serialize)] -pub(super) struct ReadEventResponse { +pub(super) struct ReadEventsResponse { pub(super) events: Vec>, } diff --git a/crates/matrix-sdk/src/widget/machine/incoming.rs b/crates/matrix-sdk/src/widget/machine/incoming.rs index b854d750d42..2d4ebe3829e 100644 --- a/crates/matrix-sdk/src/widget/machine/incoming.rs +++ b/crates/matrix-sdk/src/widget/machine/incoming.rs @@ -14,13 +14,15 @@ use ruma::{ api::client::{account::request_openid_token, delayed_events, to_device::send_event_to_device}, - events::{AnyTimelineEvent, AnyToDeviceEvent}, + events::{AnyStateEvent, AnyTimelineEvent, AnyToDeviceEvent}, serde::Raw, }; use serde::{de, Deserialize, Deserializer}; use serde_json::value::RawValue as RawJsonValue; use uuid::Uuid; +#[cfg(doc)] +use super::MatrixDriverRequestData; use super::{ from_widget::{FromWidgetRequest, SendEventResponse}, to_widget::ToWidgetResponse, @@ -50,6 +52,13 @@ pub(crate) enum IncomingMessage { /// ([`crate::widget::Action::SubscribeTimeline`] request). MatrixEventReceived(Raw), + /// The `MatrixDriver` notified the `WidgetMachine` of a change in room + /// state. + /// + /// This means that the machine previously subscribed to some events + /// ([`crate::widget::Action::Subscribe`] request). + StateUpdateReceived(Vec>), + /// The `MatrixDriver` notified the `WidgetMachine` of a new to-device /// event. ToDeviceReceived(Raw), @@ -57,20 +66,26 @@ pub(crate) enum IncomingMessage { pub(crate) enum MatrixDriverResponse { /// Client acquired capabilities from the user. - /// - /// A response to an `Action::AcquireCapabilities` command. + /// A response to a [`MatrixDriverRequestData::AcquireCapabilities`] + /// command. CapabilitiesAcquired(Capabilities), /// Client got OpenId token for a given request ID. - /// A response to an `Action::GetOpenId` command. + /// A response to a [`MatrixDriverRequestData::GetOpenId`] command. OpenIdReceived(request_openid_token::v3::Response), /// Client read some Matrix event(s). - /// A response to an `Action::ReadMatrixEvent` commands. - MatrixEventRead(Vec>), + /// A response to a [`MatrixDriverRequestData::ReadEvents`] command. + EventsRead(Vec>), + /// Client read some Matrix room state entries. + /// A response to a [`MatrixDriverRequestData::ReadState`] command. + StateRead(Vec>), /// Client sent some Matrix event. The response contains the event ID. - /// A response to an `Action::SendMatrixEvent` command. - MatrixEventSent(SendEventResponse), - MatrixToDeviceSent(send_event_to_device::v3::Response), - MatrixDelayedEventUpdate(delayed_events::update_delayed_event::unstable::Response), + /// A response to a [`MatrixDriverRequestData::SendEvent`] command. + EventSent(SendEventResponse), + /// A response to a `Action::SendToDevice` command. + ToDeviceSent(send_event_to_device::v3::Response), + /// Client updated a delayed event. + /// A response to a [`MatrixDriverRequestData::UpdateDelayedEvent`] command. + DelayedEventUpdated(delayed_events::update_delayed_event::unstable::Response), } pub(super) struct IncomingWidgetMessage { diff --git a/crates/matrix-sdk/src/widget/machine/mod.rs b/crates/matrix-sdk/src/widget/machine/mod.rs index 266eed35826..0fd4d05d654 100644 --- a/crates/matrix-sdk/src/widget/machine/mod.rs +++ b/crates/matrix-sdk/src/widget/machine/mod.rs @@ -16,34 +16,34 @@ use std::time::Duration; -use driver_req::UpdateDelayedEventRequest; +use driver_req::{ReadStateRequest, UpdateDelayedEventRequest}; use from_widget::{SendToDeviceEventResponse, UpdateDelayedEventResponse}; use indexmap::IndexMap; use ruma::{ + events::{AnyStateEvent, AnyTimelineEvent}, serde::{JsonObject, Raw}, OwnedRoomId, }; use serde::Serialize; use serde_json::value::RawValue as RawJsonValue; +use to_widget::NotifyNewToDeviceMessage; use tracing::{error, info, instrument, warn}; use uuid::Uuid; use self::{ driver_req::{ - AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle, - ReadMessageLikeEventRequest, RequestOpenId, + AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle, RequestOpenId, }, from_widget::{ - FromWidgetErrorResponse, FromWidgetRequest, ReadEventRequest, ReadEventResponse, + FromWidgetErrorResponse, FromWidgetRequest, ReadEventsResponse, SupportedApiVersionsResponse, }, incoming::{IncomingWidgetMessage, IncomingWidgetMessageKind}, openid::{OpenIdResponse, OpenIdState}, pending::{PendingRequests, RequestLimits}, to_widget::{ - NotifyCapabilitiesChanged, NotifyNewMatrixEvent, NotifyNewToDeviceMessage, - NotifyOpenIdChanged, RequestCapabilities, ToWidgetRequest, ToWidgetRequestHandle, - ToWidgetResponse, + NotifyCapabilitiesChanged, NotifyNewMatrixEvent, NotifyOpenIdChanged, NotifyStateUpdate, + RequestCapabilities, ToWidgetRequest, ToWidgetRequestHandle, ToWidgetResponse, }, }; #[cfg(doc)] @@ -51,9 +51,9 @@ use super::WidgetDriver; use super::{ capabilities::{SEND_DELAYED_EVENT, UPDATE_DELAYED_EVENT}, filter::FilterInput, - Capabilities, StateKeySelector, + Capabilities, StateEventFilter, StateKeySelector, }; -use crate::Result; +use crate::{widget::Filter, Error, Result}; mod driver_req; mod from_widget; @@ -65,9 +65,7 @@ mod tests; mod to_widget; pub(crate) use self::{ - driver_req::{ - MatrixDriverRequestData, ReadStateEventRequest, SendEventRequest, SendToDeviceRequest, - }, + driver_req::{MatrixDriverRequestData, SendEventRequest, SendToDeviceRequest}, from_widget::SendEventResponse, incoming::{IncomingMessage, MatrixDriverResponse}, }; @@ -106,6 +104,22 @@ pub(crate) enum Action { Unsubscribe, } +/// An initial state update which is in the process of being computed. +#[derive(Debug)] +struct InitialStateUpdate { + /// The results of the read state requests which establish the initial state + /// to be pushed to the widget. + initial_state: Vec>>, + /// The total number of read state requests that `initial_state` must hold + /// for this update to be considered complete. + request_count: usize, + /// The data carried by any state updates which raced with the requests to + /// read the initial state. These should be pushed to the widget + /// immediately after pushing the initial state to ensure no data is + /// lost. + postponed_updates: Vec>>, +} + /// No I/O state machine. /// /// Handles interactions with the widget as well as the @@ -125,6 +139,18 @@ pub(crate) struct WidgetMachine { /// Outstanding requests sent to the Matrix driver (mapped by uuid). pending_matrix_driver_requests: PendingRequests, + /// Outstanding state updates waiting to be sent to the widget. + /// + /// Whenever the widget is approved to read a set of room state entries, we + /// want to push an initial state to the widget in a single + /// [`NotifyStateUpdate`] action. However, multiple asynchronous + /// requests must be sent to the driver to gather this data. Therefore + /// we use this field to hold the responses to the driver requests while + /// some of them are still in flight. It is set to `Some` whenever the + /// widget is approved to read some room state, and reset to `None` as + /// soon as the [`NotifyStateUpdate`] action is emitted. + pending_state_updates: Option, + /// Current negotiation state for capabilities. capabilities: CapabilitiesState, } @@ -146,6 +172,7 @@ impl WidgetMachine { room_id, pending_to_widget_requests: PendingRequests::new(limits.clone()), pending_matrix_driver_requests: PendingRequests::new(limits), + pending_state_updates: None, capabilities: CapabilitiesState::Unset, }; @@ -196,6 +223,24 @@ impl WidgetMachine { vec![] } } + IncomingMessage::StateUpdateReceived(mut state) => { + let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else { + error!("Received state update before capabilities negotiation"); + return Vec::new(); + }; + + state.retain(|event| capabilities.allow_reading(event)); + + match &mut self.pending_state_updates { + Some(InitialStateUpdate { postponed_updates, .. }) => { + // This state update is racing with the read requests used to calculate the + // initial state; postpone it + postponed_updates.push(state); + Vec::new() + } + None => self.send_state_update(state).into_iter().collect(), + } + } } } @@ -234,7 +279,7 @@ impl WidgetMachine { Err(e) => { return vec![Self::send_from_widget_err_response( raw_request, - FromWidgetErrorResponse::from_error(crate::Error::SerdeJson(e)), + FromWidgetErrorResponse::from_error(Error::SerdeJson(e)), )] } }; @@ -338,9 +383,36 @@ impl WidgetMachine { } } + /// Send a response to a request to read events. + /// + /// `events` represents the message-like events provided by the + /// [`crate::widget::MatrixDriver`]. + fn send_read_events_response( + &self, + request: Raw, + events: Result>, Error>, + ) -> Vec { + let response = match &self.capabilities { + CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string( + "Received read events request before capabilities negotiation", + )), + CapabilitiesState::Negotiating => Err(FromWidgetErrorResponse::from_string( + "Received read events request while capabilities were negotiating", + )), + CapabilitiesState::Negotiated(capabilities) => events + .map(|mut events| { + events.retain(|e| capabilities.allow_reading(e)); + ReadEventsResponse { events } + }) + .map_err(FromWidgetErrorResponse::from_error), + }; + + vec![WidgetMachine::send_from_widget_response(request, response)] + } + fn process_read_event_request( &mut self, - request: ReadEventRequest, + request: from_widget::ReadEventsRequest, raw_request: Raw, ) -> Option { let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else { @@ -350,77 +422,54 @@ impl WidgetMachine { )); }; - match request { - ReadEventRequest::ReadMessageLikeEvent { event_type, limit } => { - if !capabilities.has_read_filter_for_type(&event_type) { + // Check the event type and state key filter against the capabilities + match &request.state_key { + None => { + if !capabilities.has_read_filter_for_type(&request.event_type) { return Some(Self::send_from_widget_error_string_response( raw_request, - "Not allowed to read message like event", + "Not allowed to read message-like event", )); } - - const DEFAULT_EVENT_LIMIT: u32 = 50; - let limit = limit.unwrap_or(DEFAULT_EVENT_LIMIT); - let request = ReadMessageLikeEventRequest { event_type, limit }; - - self.send_matrix_driver_request(request).map(|(request, action)| { - request.add_response_handler(|result, machine| { - let response = match &machine.capabilities { - CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string( - "Received read event request before capabilities negotiation", - )), - CapabilitiesState::Negotiating => { - Err(FromWidgetErrorResponse::from_string( - "Received read event request while capabilities were negotiating", - )) - } - CapabilitiesState::Negotiated(capabilities) => result - .map(|mut events| { - events.retain(|e| capabilities.allow_reading(e)); - ReadEventResponse { events } - }) - .map_err(FromWidgetErrorResponse::from_error), - }; - - vec![Self::send_from_widget_response(raw_request, response)] - }); - - action - }) } - - ReadEventRequest::ReadStateEvent { event_type, state_key } => { + Some(state_key) => { let allowed = match state_key.clone() { // If the widget tries to read any state event we can only skip sending the // request, if the widget does not have any capability for // the requested event type. - StateKeySelector::Any => capabilities.has_read_filter_for_type(&event_type), + StateKeySelector::Any => { + capabilities.has_read_filter_for_type(&request.event_type) + } // If we have a specific state key we will check if the widget has // the capability to read this specific state key and otherwise // skip sending the request. - StateKeySelector::Key(state_key) => { - capabilities.allow_reading(FilterInput::state(&event_type, &state_key)) - } + StateKeySelector::Key(state_key) => capabilities + .allow_reading(FilterInput::state(&request.event_type, &state_key)), }; - if allowed { - self.send_matrix_driver_request(ReadStateEventRequest { event_type, state_key }) - .map(|(request, action)| { - request.add_response_handler(|result, _machine| { - let response = result - .map(|events| ReadEventResponse { events }) - .map_err(FromWidgetErrorResponse::from_error); - vec![Self::send_from_widget_response(raw_request, response)] - }); - action - }) - } else { - Some(Self::send_from_widget_error_string_response( + + if !allowed { + return Some(Self::send_from_widget_error_string_response( raw_request, "Not allowed to read state event", - )) + )); } } } + + const DEFAULT_EVENT_LIMIT: u32 = 50; + let limit = request.limit.unwrap_or(DEFAULT_EVENT_LIMIT); + let request = driver_req::ReadEventsRequest { + event_type: request.event_type, + state_key: request.state_key, + limit, + }; + + self.send_matrix_driver_request(request).map(|(request, action)| { + request.add_response_handler(|result, machine| { + machine.send_read_events_response(raw_request, result) + }); + action + }) } fn process_send_event_request( @@ -645,62 +694,192 @@ impl WidgetMachine { )) } - fn negotiate_capabilities(&mut self) -> Vec { + /// Sends a [`NotifyStateUpdate`] action to the widget. The `events` + /// indicate which room state entries (may) have changed. + fn send_state_update(&mut self, events: Vec>) -> Option { + self.send_to_widget_request(NotifyStateUpdate { state: events }) + .map(|(_request, action)| action) + } + + /// Processes a response to one of the read state requests sent to the + /// [`crate::widget::MatrixDriver`] to compute an initial state update. If + /// the update is complete, this will send it off to the widget in a + /// [`NotifyStateUpdate`] action. + fn process_read_initial_state_response( + &mut self, + events: Vec>, + ) -> Option> { + // Pull the updates struct out of the machine temporarily so that we can match + // on it in one place, mutate it, and still be able to call + // `send_to_widget_request` later in this block (which borrows the machine + // mutably) + match self.pending_state_updates.take() { + None => { + error!("Initial state updates must only be set to `None` once all requests are complete; dropping state response"); + None + } + + Some(mut updates) => { + updates.initial_state.push(events); + + if updates.initial_state.len() != updates.request_count { + // Not all of the initial state requests have completed yet; put the updates + // struct back so we can continue accumulating the initial state. + self.pending_state_updates = Some(updates); + return None; + } + + // The initial state is complete; combine the data and push it to the widget in + // a single action. + let initial = + self.send_state_update(updates.initial_state.into_iter().flatten().collect()); + // Also flush any state updates that had been postponed until after the initial + // state push. We deliberately do not bundle these updates into a single action, + // since they might contain some repeated updates to the same room state entry + // which could confuse the widget if included in the same `events` array. It's + // easiest to let the widget process each update sequentially rather than put + // effort into coalescing them - this is for an edge case after all. + let postponed = updates + .postponed_updates + .into_iter() + .map(|state| self.send_state_update(state)); + + // The postponed updates should come after the initial update + Some(initial.into_iter().chain(postponed.flatten()).collect()) + } + } + } + + /// Processes a response from the [`crate::widget::MatrixDriver`] saying + /// that the widget is approved to acquire some capabilities. This will + /// store those capabilities in the state machine, notify the widget, + /// and then begin computing an initial state update if the widget + /// was approved to read room state. + fn process_acquired_capabilities( + &mut self, + approved: Result, + requested: Capabilities, + ) -> Vec { + let approved = approved.unwrap_or_else(|e| { + error!("Acquiring capabilities failed: {e}"); + Capabilities::default() + }); + let mut actions = Vec::new(); + if !approved.read.is_empty() { + actions.push(Action::Subscribe); + } - if matches!(&self.capabilities, CapabilitiesState::Negotiated(c) if !c.read.is_empty()) { - actions.push(Action::Unsubscribe); + self.capabilities = CapabilitiesState::Negotiated(approved.clone()); + + let state_filters: Vec<_> = approved + .read + .iter() + .filter_map(|f| match f { + Filter::State(f) => Some(f), + _ => None, + }) + .cloned() + .collect(); + + if !state_filters.is_empty() { + // Begin accumulating the initial state to be pushed to the widget. Since this + // widget driver currently doesn't implement capability + // renegotiation, we can be sure that we aren't overwriting another + // in-progress update. + if self.pending_state_updates.is_some() { + // Or so we should be. Let's at least log something if we ever break that + // invariant. + error!("Another initial state update is in progress; overwriting it"); + } + self.pending_state_updates = Some(InitialStateUpdate { + initial_state: Vec::with_capacity(state_filters.len()), + request_count: state_filters.len(), + postponed_updates: Vec::new(), + }) } - self.capabilities = CapabilitiesState::Negotiating; + // For each room state filter that the widget has been approved to read, fire + // off a request to the driver to determine the initial values of the + // matching room state entries + let initial_state_actions = state_filters.iter().flat_map(|filter| { + self.send_matrix_driver_request(match filter { + StateEventFilter::WithType(event_type) => ReadStateRequest { + event_type: event_type.to_string(), + state_key: StateKeySelector::Any, + }, + StateEventFilter::WithTypeAndStateKey(event_type, state_key) => ReadStateRequest { + event_type: event_type.to_string(), + state_key: StateKeySelector::Key(state_key.clone()), + }, + }) + .map(|(request, action)| { + request.add_response_handler(move |result, machine| { + machine + .process_read_initial_state_response(result.unwrap_or_else(|e| { + error!("Reading initial room state failed: {e}"); + // Pretend that we just got an empty response so the initial state + // update won't be completely blocked on this one bit of missing data + Vec::new() + })) + .unwrap_or_default() + }); + action + }) + }); - let Some((request, action)) = self.send_to_widget_request(RequestCapabilities {}) else { - // We're done, return early. - return actions; - }; + actions.extend(initial_state_actions); - request.add_response_handler(|response, machine| { - let requested_capabilities = response.capabilities; + let notify_caps_changed = NotifyCapabilitiesChanged { approved, requested }; + if let Some((_request, action)) = self.send_to_widget_request(notify_caps_changed) { + actions.push(action); + } - let Some((request, action)) = machine.send_matrix_driver_request(AcquireCapabilities { - desired_capabilities: requested_capabilities.clone(), - }) else { - // We're done, return early. - return Vec::new(); - }; + actions + } - request.add_response_handler(|result, machine| { - let approved_capabilities = result.unwrap_or_else(|e| { - error!("Acquiring capabilities failed: {e}"); - Capabilities::default() + /// Attempts to acquire capabilities that have been requested by the widget + /// during the initial capability negotiation handshake. + fn process_requested_capabilities(&mut self, requested: Capabilities) -> Vec { + match self.send_matrix_driver_request(AcquireCapabilities { + desired_capabilities: requested.clone(), + }) { + None => Vec::new(), + Some((request, action)) => { + request.add_response_handler(|result, machine| { + machine.process_acquired_capabilities(result, requested) }); + vec![action] + } + } + } - let mut actions = Vec::new(); - if !approved_capabilities.read.is_empty() { - actions.push(Action::Subscribe); - } - - machine.capabilities = CapabilitiesState::Negotiated(approved_capabilities.clone()); + /// Performs an initial capability negotiation handshake. + /// + /// The sequence is as follows: the machine sends a [`RequestCapabilities`] + /// `toWidget` action, the widget responds with its requested + /// capabilities, the machine attempts to acquire the + /// requested capabilities from the driver, then it sends a + /// [`NotifyCapabilitiesChanged`] `toWidget` action to tell the widget + /// which capabilities were approved. + fn negotiate_capabilities(&mut self) -> Vec { + let mut actions = Vec::new(); - let notify_caps_changed = NotifyCapabilitiesChanged { - approved: approved_capabilities, - requested: requested_capabilities, - }; + // XXX: This branch appears to be accounting for capability **re**negotiation + // (MSC2974), which isn't implemented yet + if matches!(&self.capabilities, CapabilitiesState::Negotiated(c) if !c.read.is_empty()) { + actions.push(Action::Unsubscribe); + } - if let Some(action) = machine - .send_to_widget_request(notify_caps_changed) - .map(|(_request, action)| action) - { - actions.push(action); - } + self.capabilities = CapabilitiesState::Negotiating; - actions + if let Some((request, action)) = self.send_to_widget_request(RequestCapabilities {}) { + request.add_response_handler(|result, machine| { + machine.process_requested_capabilities(result.capabilities) }); + actions.push(action); + } - vec![action] - }); - - actions.push(action); actions } } diff --git a/crates/matrix-sdk/src/widget/machine/tests/api_versions.rs b/crates/matrix-sdk/src/widget/machine/tests/api_versions.rs index c85d9743d0d..7cf9516ed25 100644 --- a/crates/matrix-sdk/src/widget/machine/tests/api_versions.rs +++ b/crates/matrix-sdk/src/widget/machine/tests/api_versions.rs @@ -48,6 +48,7 @@ fn test_get_supported_api_versions() { "0.0.1", "0.0.2", "org.matrix.msc2762", + "org.matrix.msc2762_update_state", "org.matrix.msc2871", "org.matrix.msc3819", ] diff --git a/crates/matrix-sdk/src/widget/machine/tests/capabilities.rs b/crates/matrix-sdk/src/widget/machine/tests/capabilities.rs index 7443cc9b8c5..8689564a9ab 100644 --- a/crates/matrix-sdk/src/widget/machine/tests/capabilities.rs +++ b/crates/matrix-sdk/src/widget/machine/tests/capabilities.rs @@ -200,6 +200,19 @@ pub(super) fn assert_capabilities_dance( assert_matches!(action, Action::Subscribe); } + // We get the `ReadState` command if we requested some state reading + // capabilities. + if capability.starts_with("org.matrix.msc2762.receive.state_event") { + let action = actions.remove(0); + assert_matches!( + action, + Action::MatrixDriverRequest { + request_id: _, + data: MatrixDriverRequestData::ReadState(_) + } + ); + } + // Inform the widget about the acquired capabilities. { let [action]: [Action; 1] = actions.try_into().unwrap(); diff --git a/crates/matrix-sdk/src/widget/machine/tests/error.rs b/crates/matrix-sdk/src/widget/machine/tests/error.rs index a9508b0823b..811022b2c2f 100644 --- a/crates/matrix-sdk/src/widget/machine/tests/error.rs +++ b/crates/matrix-sdk/src/widget/machine/tests/error.rs @@ -96,7 +96,7 @@ fn test_read_request_for_non_allowed_message_like_events() { assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); assert_eq!( msg["response"]["error"]["message"].as_str().unwrap(), - "Not allowed to read message like event" + "Not allowed to read message-like event" ); } @@ -222,6 +222,6 @@ fn test_read_request_for_message_like_with_disallowed_msg_type_fails() { assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); assert_eq!( msg["response"]["error"]["message"].as_str().unwrap(), - "Not allowed to read message like event" + "Not allowed to read message-like event" ); } diff --git a/crates/matrix-sdk/src/widget/machine/to_widget.rs b/crates/matrix-sdk/src/widget/machine/to_widget.rs index 99d7accfcf8..ace810eac07 100644 --- a/crates/matrix-sdk/src/widget/machine/to_widget.rs +++ b/crates/matrix-sdk/src/widget/machine/to_widget.rs @@ -15,7 +15,7 @@ use std::marker::PhantomData; use ruma::{ - events::{AnyTimelineEvent, AnyToDeviceEvent}, + events::{AnyStateEvent, AnyTimelineEvent, AnyToDeviceEvent}, serde::Raw, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -125,6 +125,18 @@ impl ToWidgetRequest for NotifyNewMatrixEvent { type ResponseData = Empty; } +/// Notify the widget that room state has changed. +/// This is a "response" to the widget subscribing to the events in the room. +#[derive(Serialize)] +pub(crate) struct NotifyStateUpdate { + pub(super) state: Vec>, +} + +impl ToWidgetRequest for NotifyStateUpdate { + const ACTION: &'static str = "update_state"; + type ResponseData = Empty; +} + #[derive(Deserialize)] pub(crate) struct Empty {} diff --git a/crates/matrix-sdk/src/widget/matrix.rs b/crates/matrix-sdk/src/widget/matrix.rs index 86744c0406b..f57add34249 100644 --- a/crates/matrix-sdk/src/widget/matrix.rs +++ b/crates/matrix-sdk/src/widget/matrix.rs @@ -27,21 +27,26 @@ use ruma::{ }, assign, events::{ - AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent, - AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, - AnyToDeviceEventContent, MessageLikeEventType, StateEventType, TimelineEventType, - ToDeviceEventType, + AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContent, AnySyncStateEvent, + AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, AnyToDeviceEventContent, + MessageLikeEventType, StateEventType, TimelineEventType, ToDeviceEventType, }, serde::{from_raw_json_value, Raw}, to_device::DeviceIdOrAllDevices, EventId, OwnedUserId, RoomId, TransactionId, }; use serde_json::{value::RawValue as RawJsonValue, Value}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::{ + broadcast::{error::RecvError, Receiver}, + mpsc::{unbounded_channel, UnboundedReceiver}, +}; use tracing::error; use super::{machine::SendEventResponse, StateKeySelector}; -use crate::{event_handler::EventHandlerDropGuard, room::MessagesOptions, Error, Result, Room}; +use crate::{ + event_handler::EventHandlerDropGuard, room::MessagesOptions, sync::RoomUpdate, Error, Result, + Room, +}; /// Thin wrapper around a [`Room`] that provides functionality relevant for /// widgets. @@ -65,10 +70,12 @@ impl MatrixDriver { .map_err(|error| Error::Http(Box::new(error))) } - /// Reads the latest `limit` events of a given `event_type` from the room. - pub(crate) async fn read_message_like_events( + /// Reads the latest `limit` events of a given `event_type` from the room's + /// timeline. + pub(crate) async fn read_events( &self, - event_type: MessageLikeEventType, + event_type: TimelineEventType, + state_key: Option, limit: u32, ) -> Result>> { let options = assign!(MessagesOptions::backward(), { @@ -79,17 +86,35 @@ impl MatrixDriver { }); let messages = self.room.messages(options).await?; - Ok(messages.chunk.into_iter().map(|ev| ev.into_raw().cast()).collect()) + + Ok(messages + .chunk + .into_iter() + .map(|ev| ev.into_raw().cast()) + .filter(|ev| match &state_key { + Some(state_key) => { + ev.get_field::("state_key").is_ok_and(|key| match state_key { + StateKeySelector::Key(state_key) => { + key.is_some_and(|key| &key == state_key) + } + StateKeySelector::Any => key.is_some(), + }) + } + None => true, + }) + .collect()) } - pub(crate) async fn read_state_events( + /// Reads the current values of the room state entries matching the given + /// `event_type` and `state_key` selections. + pub(crate) async fn read_state( &self, event_type: StateEventType, state_key: &StateKeySelector, - ) -> Result>> { + ) -> Result>> { let room_id = self.room.room_id(); let convert = |sync_or_stripped_state| match sync_or_stripped_state { - RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)), + RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id_state(&ev, room_id)), RawAnySyncOrStrippedState::Stripped(_) => { error!("MatrixDriver can't operate in invited rooms"); None @@ -188,30 +213,21 @@ impl MatrixDriver { let (tx, rx) = unbounded_channel(); let room_id = self.room.room_id().to_owned(); - // Get only message like events from the timeline section of the sync. - let _tx = tx.clone(); - let _room_id = room_id.clone(); - let handle_msg_like = - self.room.add_event_handler(move |raw: Raw| { - let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id)); - async {} - }); - let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like); - - // Get only all state events from the state section of the sync. - let handle_state = self.room.add_event_handler(move |raw: Raw| { + let handle = self.room.add_event_handler(move |raw: Raw| { let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id)); async {} }); - let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state); + let drop_guard = self.room.client().event_handler_drop_guard(handle); // The receiver will get a combination of state and message like events. - // The state events will come from the state section of the sync (to always - // represent current resolved state). All state events in the timeline - // section of the sync will not be forwarded to the widget. - // TODO annotate the events and send both timeline and state section state - // events. - EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] } + // These always come from the timeline (rather than the state section of the + // sync). + EventReceiver { rx, _drop_guard: drop_guard } + } + + /// Starts forwarding new updates to room state. + pub(crate) fn state_updates(&self) -> StateUpdateReceiver { + StateUpdateReceiver { room_updates: self.room.subscribe_to_updates() } } /// Starts forwarding new room events. Once the returned `EventReceiver` @@ -231,7 +247,7 @@ impl MatrixDriver { ); let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle); - EventReceiver { rx, _drop_guards: vec![drop_guard] } + EventReceiver { rx, _drop_guard: drop_guard } } /// It will ignore all devices where errors occurred or where the device is @@ -265,7 +281,7 @@ impl MatrixDriver { /// along with the drop guard for the room event handler. pub(crate) struct EventReceiver { rx: UnboundedReceiver, - _drop_guards: Vec, + _drop_guard: EventHandlerDropGuard, } impl EventReceiver { @@ -274,12 +290,44 @@ impl EventReceiver { } } +/// A simple entity that wraps an `UnboundedReceiver` for the room state update +/// handler. +pub(crate) struct StateUpdateReceiver { + room_updates: Receiver, +} + +impl StateUpdateReceiver { + pub(crate) async fn recv(&mut self) -> Result>, RecvError> { + loop { + match self.room_updates.recv().await? { + RoomUpdate::Joined { room, updates } => { + if !updates.state.is_empty() { + return Ok(updates + .state + .into_iter() + .map(|ev| attach_room_id_state(&ev, room.room_id())) + .collect()); + } + } + _ => { + error!("MatrixDriver can only operate in joined rooms"); + return Err(RecvError::Closed); + } + } + } + } +} + fn attach_room_id(raw_ev: &Raw, room_id: &RoomId) -> Raw { let mut ev_obj = raw_ev.deserialize_as::>>().unwrap(); ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap()); Raw::new(&ev_obj).unwrap().cast() } +fn attach_room_id_state(raw_ev: &Raw, room_id: &RoomId) -> Raw { + attach_room_id(raw_ev.cast_ref(), room_id).cast() +} + #[cfg(test)] mod tests { use insta; diff --git a/crates/matrix-sdk/src/widget/mod.rs b/crates/matrix-sdk/src/widget/mod.rs index 106f171efa0..367697a1c97 100644 --- a/crates/matrix-sdk/src/widget/mod.rs +++ b/crates/matrix-sdk/src/widget/mod.rs @@ -212,17 +212,17 @@ impl WidgetDriver { matrix_driver.get_open_id().await.map(MatrixDriverResponse::OpenIdReceived) } - MatrixDriverRequestData::ReadMessageLikeEvent(cmd) => matrix_driver - .read_message_like_events(cmd.event_type.into(), cmd.limit) + MatrixDriverRequestData::ReadEvents(cmd) => matrix_driver + .read_events(cmd.event_type.into(), cmd.state_key, cmd.limit) .await - .map(MatrixDriverResponse::MatrixEventRead), + .map(MatrixDriverResponse::EventsRead), - MatrixDriverRequestData::ReadStateEvent(cmd) => matrix_driver - .read_state_events(cmd.event_type.into(), &cmd.state_key) + MatrixDriverRequestData::ReadState(cmd) => matrix_driver + .read_state(cmd.event_type.into(), &cmd.state_key) .await - .map(MatrixDriverResponse::MatrixEventRead), + .map(MatrixDriverResponse::StateRead), - MatrixDriverRequestData::SendMatrixEvent(req) => { + MatrixDriverRequestData::SendEvent(req) => { let SendEventRequest { event_type, state_key, content, delay } = req; // The widget api action does not use the unstable prefix: // `org.matrix.msc4140.delay` so we @@ -234,13 +234,13 @@ impl WidgetDriver { matrix_driver .send(event_type.into(), state_key, content, delay_event_parameter) .await - .map(MatrixDriverResponse::MatrixEventSent) + .map(MatrixDriverResponse::EventSent) } MatrixDriverRequestData::UpdateDelayedEvent(req) => matrix_driver .update_delayed_event(req.delay_id, req.action) .await - .map(MatrixDriverResponse::MatrixDelayedEventUpdate), + .map(MatrixDriverResponse::DelayedEventUpdated), MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => { matrix_driver @@ -250,7 +250,7 @@ impl WidgetDriver { send_to_device_request.messages, ) .await - .map(MatrixDriverResponse::MatrixToDeviceSent) + .map(MatrixDriverResponse::ToDeviceSent) } }; @@ -273,8 +273,9 @@ impl WidgetDriver { self.event_forwarding_guard = Some(guard); - let mut events_receiver = matrix_driver.events(); - let mut to_device_receiver = matrix_driver.to_device_events(); + let mut events = matrix_driver.events(); + let mut state_updates = matrix_driver.state_updates(); + let mut to_device_events = matrix_driver.to_device_events(); let incoming_msg_tx = incoming_msg_tx.clone(); spawn(async move { @@ -285,12 +286,17 @@ impl WidgetDriver { return; } - Some(event) = events_receiver.recv() => { + Some(event) = events.recv() => { // Forward all events to the incoming messages stream. let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event)); } - Some(event) = to_device_receiver.recv() => { + Ok(state) = state_updates.recv() => { + // Forward all state updates to the incoming messages stream. + let _ = incoming_msg_tx.send(IncomingMessage::StateUpdateReceived(state)); + } + + Some(event) = to_device_events.recv() => { // Forward all events to the incoming messages stream. let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event)); } diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index 7cd783726d1..d0235f88632 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -878,7 +878,7 @@ async fn test_call_notifications_dont_notify_room_without_mention_powerlevel() { let (client, server) = logged_in_client_with_server().await; let mut sync_builder = SyncResponseBuilder::new(); - let mut power_level_event = StateTestEvent::PowerLevels.into_json_value(); + let mut power_level_event: Value = StateTestEvent::PowerLevels.into(); // Allow noone to send room notify events. *power_level_event.get_mut("content").unwrap().get_mut("notifications").unwrap() = json!({"room": 101}); diff --git a/crates/matrix-sdk/tests/integration/widget.rs b/crates/matrix-sdk/tests/integration/widget.rs index 14283f1a7a0..e640f4efe4e 100644 --- a/crates/matrix-sdk/tests/integration/widget.rs +++ b/crates/matrix-sdk/tests/integration/widget.rs @@ -29,9 +29,11 @@ use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder use once_cell::sync::Lazy; use ruma::{ event_id, - events::{room::member::MembershipState, MessageLikeEventType, StateEventType}, + events::{ + room::member::MembershipState, AnySyncStateEvent, MessageLikeEventType, StateEventType, + }, owned_room_id, - serde::JsonObject, + serde::{JsonObject, Raw}, user_id, OwnedRoomId, }; use serde::Serialize; @@ -181,6 +183,35 @@ async fn test_negotiate_capabilities_immediately() { assert_matches!(driver_handle.recv().now_or_never(), None); } +static HELLO_EVENT: Lazy = Lazy::new(|| { + json!({ + "content": { + "body": "hello", + "msgtype": "m.text", + }, + "event_id": "$msda7m0df9E9op3", + "origin_server_ts": 152037280, + "sender": "@example:localhost", + "type": "m.room.message", + "room_id": &*ROOM_ID, + }) +}); + +static TOMBSTONE_EVENT: Lazy = Lazy::new(|| { + json!({ + "content": { + "body": "This room has been replaced", + "replacement_room": "!newroom:localhost", + }, + "event_id": "$foun39djjod0f", + "origin_server_ts": 152039280, + "sender": "@bob:localhost", + "state_key": "", + "type": "m.room.tombstone", + "room_id": &*ROOM_ID, + }) +}); + #[async_test] async fn test_read_messages() { let (_, mock_server, driver_handle) = run_test_driver(true).await; @@ -205,66 +236,40 @@ async fn test_read_messages() { // No messages from the driver assert_matches!(recv_message(&driver_handle).now_or_never(), None); - { - let response_json = json!({ - "chunk": [ - { - "content": { - "body": "hello", - "msgtype": "m.text", - }, - "event_id": "$msda7m0df9E9op3", - "origin_server_ts": 152037280, - "sender": "@example:localhost", - "type": "m.room.message", - "room_id": &*ROOM_ID, - }, - { - "content": { - "body": "This room has been replaced", - "replacement_room": "!newroom:localhost", - }, - "event_id": "$foun39djjod0f", - "origin_server_ts": 152039280, - "sender": "@bob:localhost", - "state_key": "", - "type": "m.room.tombstone", - "room_id": &*ROOM_ID, - }, - ], - "end": "t47409-4357353_219380_26003_2269", - "start": "t392-516_47314_0_7_1_1_1_11444_1" - }); - mock_server - .mock_room_messages() - .match_limit(2) - .respond_with(ResponseTemplate::new(200).set_body_json(response_json)) - .mock_once() - .mount() - .await; - - // Ask the driver to read messages - send_request( - &driver_handle, - "2-read-messages", - "org.matrix.msc2876.read_events", - json!({ - "type": "m.room.message", - "limit": 2, - }), - ) + let response_json = json!({ + "chunk": [*HELLO_EVENT, *TOMBSTONE_EVENT], + "end": "t47409-4357353_219380_26003_2269", + "start": "t392-516_47314_0_7_1_1_1_11444_1" + }); + mock_server + .mock_room_messages() + .match_limit(2) + .respond_with(ResponseTemplate::new(200).set_body_json(response_json)) + .mock_once() + .mount() .await; - // Receive the response - let msg = recv_message(&driver_handle).await; - assert_eq!(msg["api"], "fromWidget"); - assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); - let events = msg["response"]["events"].as_array().unwrap(); + // Ask the driver to read messages + send_request( + &driver_handle, + "2-read-messages", + "org.matrix.msc2876.read_events", + json!({ + "type": "m.room.message", + "limit": 2, + }), + ) + .await; - assert_eq!(events.len(), 1); - let first_event = &events[0]; - assert_eq!(first_event["content"]["body"], "hello"); - } + // Receive the response + let msg = recv_message(&driver_handle).await; + assert_eq!(msg["api"], "fromWidget"); + assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); + let events = msg["response"]["events"].as_array().unwrap(); + + assert_eq!(events.len(), 1); + let first_event = &events[0]; + assert_eq!(first_event["content"]["body"], "hello"); } #[async_test] @@ -293,43 +298,48 @@ async fn test_read_messages_with_msgtype_capabilities() { let f = EventFactory::new().room(&ROOM_ID).sender(user_id!("@example:localhost")); - { - let end = "t47409-4357353_219380_26003_2269"; - let chunk2 = vec![ - f.notice("custom content").event_id(event_id!("$msda7m0df9E9op3")).into_raw_timeline(), - f.text_msg("hello").event_id(event_id!("$msda7m0df9E9op5")).into_raw_timeline(), - f.reaction(event_id!("$event_id"), "annotation").into_raw_timeline(), - ]; - mock_server - .mock_room_messages() - .match_limit(3) - .ok(RoomMessagesResponseTemplate::default().end_token(end).events(chunk2)) - .mock_once() - .mount() - .await; - - // Ask the driver to read messages - send_request( - &driver_handle, - "2-read-messages", - "org.matrix.msc2876.read_events", - json!({ - "type": "m.room.message", - "limit": 3, - }), - ) + let end = "t47409-4357353_219380_26003_2269"; + let chunk2 = vec![ + f.notice("custom content").event_id(event_id!("$msda7m0df9E9op3")).into_raw_timeline(), + f.text_msg("hello").event_id(event_id!("$msda7m0df9E9op5")).into_raw_timeline(), + f.reaction(event_id!("$event_id"), "annotation").into_raw_timeline(), + ]; + mock_server + .mock_room_messages() + .match_limit(3) + .ok(RoomMessagesResponseTemplate::default().end_token(end).events(chunk2)) + .mock_once() + .mount() .await; - // Receive the response - let msg = recv_message(&driver_handle).await; - assert_eq!(msg["api"], "fromWidget"); - assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); - let events = msg["response"]["events"].as_array().unwrap(); + // Ask the driver to read messages + send_request( + &driver_handle, + "2-read-messages", + "org.matrix.msc2876.read_events", + json!({ + "type": "m.room.message", + "limit": 3, + }), + ) + .await; - assert_eq!(events.len(), 1); - let first_event = &events[0]; - assert_eq!(first_event["content"]["body"], "hello"); - } + // Receive the response + let msg = recv_message(&driver_handle).await; + assert_eq!(msg["api"], "fromWidget"); + assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); + let events = msg["response"]["events"].as_array().unwrap(); + + assert_eq!(events.len(), 1); + let first_event = &events[0]; + assert_eq!(first_event["content"]["body"], "hello"); +} + +async fn assert_state_synced(driver_handle: &WidgetDriverHandle, state: JsonValue) { + let msg = recv_message(driver_handle).await; + assert_eq!(msg["api"], "toWidget"); + assert_eq!(msg["action"], "update_state"); + assert_eq!(msg["data"]["state"], state); } #[async_test] @@ -342,32 +352,62 @@ async fn test_read_room_members() { ) .await; - // No messages from the driver + // Wait for the state to be synced + assert_state_synced(&driver_handle, json!([])).await; + // No further messages from the driver yet assert_matches!(recv_message(&driver_handle).now_or_never(), None); - { - // The read-events request is fulfilled from the state store - drop(mock_server); - - // Ask the driver to read state events - send_request( - &driver_handle, - "2-read-messages", - "org.matrix.msc2876.read_events", - json!({ "type": "m.room.member", "state_key": true }), - ) + let f = EventFactory::new().room(&ROOM_ID); + + let leave_event = f + .member(user_id!("@example:localhost")) + .membership(MembershipState::Leave) + .previous(MembershipState::Join) + .into_raw_timeline(); + let join_event = f + .member(user_id!("@example:localhost")) + .membership(MembershipState::Join) + .previous(MembershipState::Leave) + .into_raw_timeline(); + + let response_json = json!({ + "chunk": [*HELLO_EVENT, *TOMBSTONE_EVENT, leave_event, join_event], + "end": "t47409-4357353_219380_26003_2269", + "start": "t392-516_47314_0_7_1_1_1_11444_1" + }); + mock_server + .mock_room_messages() + .match_limit(3) + .respond_with(ResponseTemplate::new(200).set_body_json(response_json)) + .mock_once() + .mount() .await; - // Receive the response - let msg = recv_message(&driver_handle).await; - assert_eq!(msg["api"], "fromWidget"); - assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); - let events = msg["response"]["events"].as_array().unwrap(); + // Ask the driver to read messages + send_request( + &driver_handle, + "2-read-messages", + "org.matrix.msc2876.read_events", + json!({ + "type": "m.room.member", + "state_key": true, + "limit": 3, + }), + ) + .await; - // No useful data in the state store, that's fine for this test - // (we just want to know that a successful response is generated) - assert_eq!(events.len(), 0); - } + // Receive the response + let msg = recv_message(&driver_handle).await; + assert_eq!(msg["api"], "fromWidget"); + assert_eq!(msg["action"], "org.matrix.msc2876.read_events"); + println!("{:?}", msg["response"]); + let events = msg["response"]["events"].as_array().unwrap(); + + // We should get both the leave event and the join event, because the + // `read_events` action reads from the timeline, not the room state + let [first_event, second_event]: &[_; 2] = events.as_slice().try_into().unwrap(); + assert_eq!(first_event, &leave_event.deserialize_as::().unwrap()); + assert_eq!(second_event, &join_event.deserialize_as::().unwrap()); } #[async_test] @@ -387,7 +427,9 @@ async fn test_receive_live_events() { ) .await; - // No messages from the driver yet + // Wait for the state to be synced + assert_state_synced(&driver_handle, json!([])).await; + // No further messages from the driver yet assert_matches!(recv_message(&driver_handle).now_or_never(), None); let f = EventFactory::new(); @@ -481,6 +523,105 @@ async fn test_receive_live_events() { assert_matches!(recv_message(&driver_handle).now_or_never(), None); } +#[async_test] +async fn test_receive_state() { + let (client, mock_server, driver_handle) = run_test_driver(false).await; + + let f = EventFactory::new().room(&ROOM_ID); + let name_event_1: Raw = f.room_name("room name").sender(&BOB).into(); + + mock_server + .mock_sync() + .ok_and_run(&client, |sync_builder| { + sync_builder.add_joined_room( + // set room name - matches filter + JoinedRoomBuilder::new(&ROOM_ID).add_state_event(name_event_1), + ); + }) + .await; + + negotiate_capabilities( + &driver_handle, + json!(["org.matrix.msc2762.receive.state_event:m.room.name#"]), + ) + .await; + + // Wait for the state to be synced + let msg = recv_message(&driver_handle).await; + assert_eq!(msg["api"], "toWidget"); + assert_eq!(msg["action"], "update_state"); + assert_eq!(msg["data"]["state"].as_array().unwrap().len(), 1); + assert_eq!(msg["data"]["state"][0]["type"], "m.room.name"); + assert_eq!(msg["data"]["state"][0]["room_id"], ROOM_ID.as_str()); + assert_eq!(msg["data"]["state"][0]["sender"], BOB.as_str()); + assert_eq!(msg["data"]["state"][0]["state_key"], ""); + assert_eq!(msg["data"]["state"][0]["content"]["name"], "room name"); + // No further messages from the driver yet + assert_matches!(recv_message(&driver_handle).now_or_never(), None); + + let topic_event: Raw = f.room_topic("new room topic").sender(&BOB).into(); + let name_event_2 = f.room_name("new room name").sender(&BOB); + let name_event_3: Raw = + f.room_name("even newer room name").sender(&BOB).into(); + + mock_server + .mock_sync() + .ok_and_run(&client, |sync_builder| { + sync_builder.add_joined_room( + JoinedRoomBuilder::new(&ROOM_ID) + // text message from alice - doesn't match + .add_timeline_event(f.text_msg("simple text message").sender(&ALICE)) + // set room topic - doesn't match + .add_timeline_event(topic_event.clone().cast()) + .add_state_event(topic_event) + // set room name - matches filter but not reported in the state block + .add_timeline_event(name_event_2) + // set room name - matches filter + .add_timeline_event(name_event_3.clone().cast()) + .add_state_event(name_event_3), + ); + }) + .await; + + // Driver should have exactly 3 messages for us + let msg1 = recv_message(&driver_handle).await; + let msg2 = recv_message(&driver_handle).await; + let msg3 = recv_message(&driver_handle).await; + assert_matches!(recv_message(&driver_handle).now_or_never(), None); + + let (update_state, send_events): (Vec<_>, _) = + [msg1, msg2, msg3].into_iter().partition(|msg| msg["action"] == "update_state"); + assert_eq!(update_state.len(), 1); + assert_eq!(send_events.len(), 2); + + let msg = &update_state[0]; + assert_eq!(msg["api"], "toWidget"); + assert_eq!(msg["data"]["state"].as_array().unwrap().len(), 1); + assert_eq!(msg["data"]["state"][0]["type"], "m.room.name"); + assert_eq!(msg["data"]["state"][0]["room_id"], ROOM_ID.as_str()); + assert_eq!(msg["data"]["state"][0]["sender"], BOB.as_str()); + assert_eq!(msg["data"]["state"][0]["state_key"], ""); + assert_eq!(msg["data"]["state"][0]["content"]["name"], "even newer room name"); + + let msg = &send_events[0]; + assert_eq!(msg["api"], "toWidget"); + assert_eq!(msg["action"], "send_event"); + assert_eq!(msg["data"]["type"], "m.room.name"); + assert_eq!(msg["data"]["room_id"], ROOM_ID.as_str()); + assert_eq!(msg["data"]["sender"], BOB.as_str()); + assert_eq!(msg["data"]["state_key"], ""); + assert_eq!(msg["data"]["content"]["name"], "new room name"); + + let msg = &send_events[1]; + assert_eq!(msg["api"], "toWidget"); + assert_eq!(msg["action"], "send_event"); + assert_eq!(msg["data"]["type"], "m.room.name"); + assert_eq!(msg["data"]["room_id"], ROOM_ID.as_str()); + assert_eq!(msg["data"]["sender"], BOB.as_str()); + assert_eq!(msg["data"]["state_key"], ""); + assert_eq!(msg["data"]["content"]["name"], "even newer room name"); +} + #[async_test] async fn test_send_room_message() { let (_, mock_server, driver_handle) = run_test_driver(false).await; diff --git a/testing/matrix-sdk-test/src/event_factory.rs b/testing/matrix-sdk-test/src/event_factory.rs index d5cec598a77..cb1e1e2fa9a 100644 --- a/testing/matrix-sdk-test/src/event_factory.rs +++ b/testing/matrix-sdk-test/src/event_factory.rs @@ -57,8 +57,9 @@ use ruma::{ topic::RoomTopicEventContent, }, typing::TypingEventContent, - AnySyncTimelineEvent, AnyTimelineEvent, BundledMessageLikeRelations, EventContent, - RedactedMessageLikeEventContent, RedactedStateEventContent, + AnyStateEvent, AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, + BundledMessageLikeRelations, EventContent, RedactedMessageLikeEventContent, + RedactedStateEventContent, StateEventContent, }, serde::Raw, server_name, EventId, Int, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, OwnedMxcUri, @@ -441,6 +442,24 @@ where } } +impl From> for Raw +where + E::EventType: Serialize, +{ + fn from(val: EventBuilder) -> Self { + Raw::new(&val.construct_json(false)).unwrap().cast() + } +} + +impl From> for Raw +where + E::EventType: Serialize, +{ + fn from(val: EventBuilder) -> Self { + Raw::new(&val.construct_json(true)).unwrap().cast() + } +} + #[derive(Debug, Default)] pub struct EventFactory { next_ts: AtomicU64, diff --git a/testing/matrix-sdk-test/src/sync_builder/invited_room.rs b/testing/matrix-sdk-test/src/sync_builder/invited_room.rs index 7af107446ae..16a821ebad2 100644 --- a/testing/matrix-sdk-test/src/sync_builder/invited_room.rs +++ b/testing/matrix-sdk-test/src/sync_builder/invited_room.rs @@ -27,7 +27,7 @@ impl InvitedRoomBuilder { /// Add an event to the state. pub fn add_state_event(mut self, event: StrippedStateTestEvent) -> Self { - self.inner.invite_state.events.push(event.into_raw_event()); + self.inner.invite_state.events.push(event.into()); self } diff --git a/testing/matrix-sdk-test/src/sync_builder/joined_room.rs b/testing/matrix-sdk-test/src/sync_builder/joined_room.rs index 74b223edb95..ca98fcff339 100644 --- a/testing/matrix-sdk-test/src/sync_builder/joined_room.rs +++ b/testing/matrix-sdk-test/src/sync_builder/joined_room.rs @@ -9,7 +9,7 @@ use ruma::{ }; use serde_json::{from_value as from_json_value, Value as JsonValue}; -use super::{RoomAccountDataTestEvent, StateTestEvent}; +use super::RoomAccountDataTestEvent; use crate::{event_factory::EventBuilder, DEFAULT_TEST_ROOM_ID}; pub struct JoinedRoomBuilder { @@ -74,8 +74,8 @@ impl JoinedRoomBuilder { } /// Add an event to the state. - pub fn add_state_event(mut self, event: StateTestEvent) -> Self { - self.inner.state.events.push(event.into_raw_event()); + pub fn add_state_event(mut self, event: impl Into>) -> Self { + self.inner.state.events.push(event.into()); self } @@ -102,7 +102,7 @@ impl JoinedRoomBuilder { /// Add room account data. pub fn add_account_data(mut self, event: RoomAccountDataTestEvent) -> Self { - self.inner.account_data.events.push(event.into_raw_event()); + self.inner.account_data.events.push(event.into()); self } diff --git a/testing/matrix-sdk-test/src/sync_builder/knocked_room.rs b/testing/matrix-sdk-test/src/sync_builder/knocked_room.rs index 5838298efea..2d488eb133f 100644 --- a/testing/matrix-sdk-test/src/sync_builder/knocked_room.rs +++ b/testing/matrix-sdk-test/src/sync_builder/knocked_room.rs @@ -27,7 +27,7 @@ impl KnockedRoomBuilder { /// Add an event to the state. pub fn add_state_event(mut self, event: StrippedStateTestEvent) -> Self { - self.inner.knock_state.events.push(event.into_raw_event()); + self.inner.knock_state.events.push(event.into()); self } diff --git a/testing/matrix-sdk-test/src/sync_builder/left_room.rs b/testing/matrix-sdk-test/src/sync_builder/left_room.rs index acf271d6682..b4d8f0e41f5 100644 --- a/testing/matrix-sdk-test/src/sync_builder/left_room.rs +++ b/testing/matrix-sdk-test/src/sync_builder/left_room.rs @@ -71,7 +71,7 @@ impl LeftRoomBuilder { /// Add an event to the state. pub fn add_state_event(mut self, event: StateTestEvent) -> Self { - self.inner.state.events.push(event.into_raw_event()); + self.inner.state.events.push(event.into()); self } @@ -86,7 +86,7 @@ impl LeftRoomBuilder { /// Add room account data. pub fn add_account_data(mut self, event: RoomAccountDataTestEvent) -> Self { - self.inner.account_data.events.push(event.into_raw_event()); + self.inner.account_data.events.push(event.into()); self } diff --git a/testing/matrix-sdk-test/src/sync_builder/test_event.rs b/testing/matrix-sdk-test/src/sync_builder/test_event.rs index 41beec05ecd..ae52e996870 100644 --- a/testing/matrix-sdk-test/src/sync_builder/test_event.rs +++ b/testing/matrix-sdk-test/src/sync_builder/test_event.rs @@ -33,36 +33,42 @@ pub enum StateTestEvent { Custom(JsonValue), } -impl StateTestEvent { - /// Get the JSON representation of this test event. - pub fn into_json_value(self) -> JsonValue { - match self { - Self::Alias => test_json::sync_events::ALIAS.to_owned(), - Self::Aliases => test_json::sync_events::ALIASES.to_owned(), - Self::Create => test_json::sync_events::CREATE.to_owned(), - Self::Encryption => test_json::sync_events::ENCRYPTION.to_owned(), - Self::HistoryVisibility => test_json::sync_events::HISTORY_VISIBILITY.to_owned(), - Self::JoinRules => test_json::sync_events::JOIN_RULES.to_owned(), - Self::Member => test_json::sync_events::MEMBER.to_owned(), - Self::MemberAdditional => test_json::sync_events::MEMBER_ADDITIONAL.to_owned(), - Self::MemberBan => test_json::sync_events::MEMBER_BAN.to_owned(), - Self::MemberInvite => test_json::sync_events::MEMBER_INVITE.to_owned(), - Self::MemberLeave => test_json::sync_events::MEMBER_LEAVE.to_owned(), - Self::MemberNameChange => test_json::sync_events::MEMBER_NAME_CHANGE.to_owned(), - Self::PowerLevels => test_json::sync_events::POWER_LEVELS.to_owned(), - Self::RedactedInvalid => test_json::sync_events::REDACTED_INVALID.to_owned(), - Self::RedactedState => test_json::sync_events::REDACTED_STATE.to_owned(), - Self::RoomAvatar => test_json::sync_events::ROOM_AVATAR.to_owned(), - Self::RoomName => test_json::sync_events::NAME.to_owned(), - Self::RoomPinnedEvents => test_json::sync_events::PINNED_EVENTS.to_owned(), - Self::RoomTopic => test_json::sync_events::TOPIC.to_owned(), - Self::Custom(json) => json, +impl From for JsonValue { + fn from(val: StateTestEvent) -> Self { + match val { + StateTestEvent::Alias => test_json::sync_events::ALIAS.to_owned(), + StateTestEvent::Aliases => test_json::sync_events::ALIASES.to_owned(), + StateTestEvent::Create => test_json::sync_events::CREATE.to_owned(), + StateTestEvent::Encryption => test_json::sync_events::ENCRYPTION.to_owned(), + StateTestEvent::HistoryVisibility => { + test_json::sync_events::HISTORY_VISIBILITY.to_owned() + } + StateTestEvent::JoinRules => test_json::sync_events::JOIN_RULES.to_owned(), + StateTestEvent::Member => test_json::sync_events::MEMBER.to_owned(), + StateTestEvent::MemberAdditional => { + test_json::sync_events::MEMBER_ADDITIONAL.to_owned() + } + StateTestEvent::MemberBan => test_json::sync_events::MEMBER_BAN.to_owned(), + StateTestEvent::MemberInvite => test_json::sync_events::MEMBER_INVITE.to_owned(), + StateTestEvent::MemberLeave => test_json::sync_events::MEMBER_LEAVE.to_owned(), + StateTestEvent::MemberNameChange => { + test_json::sync_events::MEMBER_NAME_CHANGE.to_owned() + } + StateTestEvent::PowerLevels => test_json::sync_events::POWER_LEVELS.to_owned(), + StateTestEvent::RedactedInvalid => test_json::sync_events::REDACTED_INVALID.to_owned(), + StateTestEvent::RedactedState => test_json::sync_events::REDACTED_STATE.to_owned(), + StateTestEvent::RoomAvatar => test_json::sync_events::ROOM_AVATAR.to_owned(), + StateTestEvent::RoomName => test_json::sync_events::NAME.to_owned(), + StateTestEvent::RoomPinnedEvents => test_json::sync_events::PINNED_EVENTS.to_owned(), + StateTestEvent::RoomTopic => test_json::sync_events::TOPIC.to_owned(), + StateTestEvent::Custom(json) => json, } } +} - /// Get the typed JSON representation of this test event. - pub fn into_raw_event(self) -> Raw { - from_json_value(self.into_json_value()).unwrap() +impl From for Raw { + fn from(val: StateTestEvent) -> Self { + from_json_value(val.into()).unwrap() } } @@ -73,19 +79,19 @@ pub enum StrippedStateTestEvent { Custom(JsonValue), } -impl StrippedStateTestEvent { - /// Get the JSON representation of this test event. - pub fn into_json_value(self) -> JsonValue { - match self { - Self::Member => test_json::sync_events::MEMBER_STRIPPED.to_owned(), - Self::RoomName => test_json::sync_events::NAME_STRIPPED.to_owned(), - Self::Custom(json) => json, +impl From for JsonValue { + fn from(val: StrippedStateTestEvent) -> Self { + match val { + StrippedStateTestEvent::Member => test_json::sync_events::MEMBER_STRIPPED.to_owned(), + StrippedStateTestEvent::RoomName => test_json::sync_events::NAME_STRIPPED.to_owned(), + StrippedStateTestEvent::Custom(json) => json, } } +} - /// Get the typed JSON representation of this test event. - pub fn into_raw_event(self) -> Raw { - from_json_value(self.into_json_value()).unwrap() +impl From for Raw { + fn from(val: StrippedStateTestEvent) -> Self { + from_json_value(val.into()).unwrap() } } @@ -97,20 +103,22 @@ pub enum RoomAccountDataTestEvent { Custom(JsonValue), } -impl RoomAccountDataTestEvent { - /// Get the JSON representation of this test event. - pub fn into_json_value(self) -> JsonValue { - match self { - Self::FullyRead => test_json::sync_events::FULLY_READ.to_owned(), - Self::Tags => test_json::sync_events::TAG.to_owned(), - Self::MarkedUnread => test_json::sync_events::MARKED_UNREAD.to_owned(), - Self::Custom(json) => json, +impl From for JsonValue { + fn from(val: RoomAccountDataTestEvent) -> Self { + match val { + RoomAccountDataTestEvent::FullyRead => test_json::sync_events::FULLY_READ.to_owned(), + RoomAccountDataTestEvent::Tags => test_json::sync_events::TAG.to_owned(), + RoomAccountDataTestEvent::MarkedUnread => { + test_json::sync_events::MARKED_UNREAD.to_owned() + } + RoomAccountDataTestEvent::Custom(json) => json, } } +} - /// Get the typed JSON representation of this test event. - pub fn into_raw_event(self) -> Raw { - from_json_value(self.into_json_value()).unwrap() +impl From for Raw { + fn from(val: RoomAccountDataTestEvent) -> Self { + from_json_value(val.into()).unwrap() } } @@ -120,18 +128,18 @@ pub enum PresenceTestEvent { Custom(JsonValue), } -impl PresenceTestEvent { - /// Get the JSON representation of this test event. - pub fn into_json_value(self) -> JsonValue { - match self { - Self::Presence => test_json::sync_events::PRESENCE.to_owned(), - Self::Custom(json) => json, +impl From for JsonValue { + fn from(val: PresenceTestEvent) -> Self { + match val { + PresenceTestEvent::Presence => test_json::sync_events::PRESENCE.to_owned(), + PresenceTestEvent::Custom(json) => json, } } +} - /// Get the typed JSON representation of this test event. - pub fn into_raw_event(self) -> Raw { - from_json_value(self.into_json_value()).unwrap() +impl From for Raw { + fn from(val: PresenceTestEvent) -> Self { + from_json_value(val.into()).unwrap() } } @@ -142,18 +150,18 @@ pub enum GlobalAccountDataTestEvent { Custom(JsonValue), } -impl GlobalAccountDataTestEvent { - /// Get the JSON representation of this test event. - pub fn into_json_value(self) -> JsonValue { - match self { - Self::Direct => test_json::sync_events::DIRECT.to_owned(), - Self::PushRules => test_json::sync_events::PUSH_RULES.to_owned(), - Self::Custom(json) => json, +impl From for JsonValue { + fn from(val: GlobalAccountDataTestEvent) -> Self { + match val { + GlobalAccountDataTestEvent::Direct => test_json::sync_events::DIRECT.to_owned(), + GlobalAccountDataTestEvent::PushRules => test_json::sync_events::PUSH_RULES.to_owned(), + GlobalAccountDataTestEvent::Custom(json) => json, } } +} - /// Get the typed JSON representation of this test event. - pub fn into_raw_event(self) -> Raw { - from_json_value(self.into_json_value()).unwrap() +impl From for Raw { + fn from(val: GlobalAccountDataTestEvent) -> Self { + from_json_value(val.into()).unwrap() } }