Skip to content

Commit 79ad365

Browse files
committed
WidgetDriver: add matrix driver toDevice support (reading and sending events via cs api)
This also hooks up the widget via the machine actions. And adds toDevice events to the subscription.
1 parent 9f84d9a commit 79ad365

File tree

5 files changed

+186
-32
lines changed

5 files changed

+186
-32
lines changed

crates/matrix-sdk/src/widget/capabilities.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,8 @@ impl<'de> Deserialize<'de> for Capabilities {
288288
mod tests {
289289
use ruma::events::StateEventType;
290290

291-
use crate::widget::filter::ToDeviceEventFilter;
292-
293291
use super::*;
292+
use crate::widget::filter::ToDeviceEventFilter;
294293

295294
#[test]
296295
fn deserialization_of_no_capabilities() {

crates/matrix-sdk/src/widget/filter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,8 @@ impl<'a> TryFrom<&'a Raw<AnyTimelineEvent>> for FilterInput<'a> {
228228
type Error = serde_json::Error;
229229

230230
fn try_from(raw_event: &'a Raw<AnyTimelineEvent>) -> Result<Self, Self::Error> {
231-
// FilterInput first checks if it can deserialize as a state event (state_key exists)
232-
// and then as a message like event.
231+
// FilterInput first checks if it can deserialize as a state event (state_key
232+
// exists) and then as a message like event.
233233
raw_event.deserialize_as()
234234
}
235235
}
@@ -244,9 +244,9 @@ pub struct FilterInputToDevice<'a> {
244244
impl<'a> TryFrom<&'a Raw<AnyToDeviceEvent>> for FilterInput<'a> {
245245
type Error = serde_json::Error;
246246
fn try_from(raw_event: &'a Raw<AnyToDeviceEvent>) -> Result<Self, Self::Error> {
247-
// deserialize_as::<FilterInput> will first try state, message like and then to-device.
248-
// The `AnyToDeviceEvent` would match message like first, so we need to explicitly
249-
// deserialize as `FilterInputToDevice`.
247+
// deserialize_as::<FilterInput> will first try state, message like and then
248+
// to-device. The `AnyToDeviceEvent` would match message like first, so
249+
// we need to explicitly deserialize as `FilterInputToDevice`.
250250
raw_event.deserialize_as::<FilterInputToDevice<'a>>().map(FilterInput::ToDevice)
251251
}
252252
}
@@ -515,7 +515,7 @@ mod tests {
515515
fn test_to_device_filter_does_match() {
516516
let f = Filter::ToDevice(ToDeviceEventFilter::new("my.custom.to.device".into()));
517517
assert!(f.matches(&FilterInput::ToDevice(FilterInputToDevice {
518-
event_type: "my.custom.to.device".into(),
518+
event_type: "my.custom.to.device",
519519
})));
520520
}
521521
}

crates/matrix-sdk/src/widget/machine/to_widget.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl ToWidgetRequest for NotifyNewMatrixEvent {
128128
#[derive(Deserialize)]
129129
pub(crate) struct Empty {}
130130

131-
/// Notify the widget that we received a new matrix event.
131+
/// Notify the widget that we received a new matrix to device event.
132132
/// This is a "response" to the widget subscribing to the events in the room.
133133
#[derive(Serialize)]
134134
#[serde(transparent)]

crates/matrix-sdk/src/widget/matrix.rs

Lines changed: 160 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,28 @@
1717
1818
use std::collections::BTreeMap;
1919

20-
use matrix_sdk_base::deserialized_responses::RawAnySyncOrStrippedState;
20+
use matrix_sdk_base::deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState};
2121
use ruma::{
2222
api::client::{
2323
account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
2424
delayed_events::{self, update_delayed_event::unstable::UpdateAction},
2525
filter::RoomEventFilter,
26+
to_device::send_event_to_device::{self, v3::Request as RumaToDeviceRequest},
2627
},
2728
assign,
2829
events::{
2930
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
30-
AnySyncStateEvent, AnySyncTimelineEvent, AnyTimelineEvent, MessageLikeEventType,
31-
StateEventType, TimelineEventType,
31+
AnySyncStateEvent, AnyTimelineEvent, AnyToDeviceEvent, AnyToDeviceEventContent,
32+
MessageLikeEventType, StateEventType, TimelineEventType, ToDeviceEventType,
3233
},
3334
serde::{from_raw_json_value, Raw},
34-
EventId, RoomId, TransactionId,
35+
to_device::DeviceIdOrAllDevices,
36+
EventId, OwnedRoomId, OwnedUserId, TransactionId,
3537
};
38+
use serde::{Deserialize, Serialize};
3639
use serde_json::{value::RawValue as RawJsonValue, Value};
3740
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
38-
use tracing::error;
41+
use tracing::{error, info};
3942

4043
use super::{machine::SendEventResponse, StateKeySelector};
4144
use crate::{event_handler::EventHandlerDropGuard, room::MessagesOptions, Error, Result, Room};
@@ -86,7 +89,14 @@ impl MatrixDriver {
8689
) -> Result<Vec<Raw<AnyTimelineEvent>>> {
8790
let room_id = self.room.room_id();
8891
let convert = |sync_or_stripped_state| match sync_or_stripped_state {
89-
RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id(ev.cast_ref(), room_id)),
92+
RawAnySyncOrStrippedState::Sync(ev) => {
93+
add_props_to_raw(&ev, Some(room_id.to_owned()), None)
94+
.map(Raw::cast)
95+
.map_err(|e| {
96+
error!("failed to convert event from `get_state_event` response:{}", e)
97+
})
98+
.ok()
99+
}
90100
RawAnySyncOrStrippedState::Stripped(_) => {
91101
error!("MatrixDriver can't operate in invited rooms");
92102
None
@@ -181,7 +191,7 @@ impl MatrixDriver {
181191

182192
/// Starts forwarding new room events. Once the returned `EventReceiver`
183193
/// is dropped, forwarding will be stopped.
184-
pub(crate) fn events(&self) -> EventReceiver {
194+
pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
185195
let (tx, rx) = unbounded_channel();
186196
let room_id = self.room.room_id().to_owned();
187197

@@ -190,14 +200,29 @@ impl MatrixDriver {
190200
let _room_id = room_id.clone();
191201
let handle_msg_like =
192202
self.room.add_event_handler(move |raw: Raw<AnySyncMessageLikeEvent>| {
193-
let _ = _tx.send(attach_room_id(raw.cast_ref(), &_room_id));
203+
match add_props_to_raw(&raw, Some(_room_id), None) {
204+
Ok(event_with_room_id) => {
205+
let _ = _tx.send(event_with_room_id.cast());
206+
}
207+
Err(e) => {
208+
error!("Failed to attach room id to message like event: {}", e);
209+
}
210+
}
194211
async {}
195212
});
196213
let drop_guard_msg_like = self.room.client().event_handler_drop_guard(handle_msg_like);
197-
214+
let _room_id = room_id;
215+
let _tx = tx;
198216
// Get only all state events from the state section of the sync.
199217
let handle_state = self.room.add_event_handler(move |raw: Raw<AnySyncStateEvent>| {
200-
let _ = tx.send(attach_room_id(raw.cast_ref(), &room_id));
218+
match add_props_to_raw(&raw, Some(_room_id.to_owned()), None) {
219+
Ok(event_with_room_id) => {
220+
let _ = _tx.send(event_with_room_id.cast());
221+
}
222+
Err(e) => {
223+
error!("Failed to attach room id to state event: {}", e);
224+
}
225+
}
201226
async {}
202227
});
203228
let drop_guard_state = self.room.client().event_handler_drop_guard(handle_state);
@@ -208,25 +233,140 @@ impl MatrixDriver {
208233
// section of the sync will not be forwarded to the widget.
209234
// TODO annotate the events and send both timeline and state section state
210235
// events.
211-
EventReceiver { rx, _drop_guards: [drop_guard_msg_like, drop_guard_state] }
236+
EventReceiver { rx, _drop_guards: vec![drop_guard_msg_like, drop_guard_state] }
237+
}
238+
239+
/// Starts forwarding new room events. Once the returned `EventReceiver`
240+
/// is dropped, forwarding will be stopped.
241+
pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
242+
let (tx, rx) = unbounded_channel();
243+
244+
let to_device_handle = self.room.client().add_event_handler(
245+
move |raw: Raw<AnyToDeviceEvent>, encryption_info: Option<EncryptionInfo>| {
246+
match add_props_to_raw(&raw, None, encryption_info.as_ref()) {
247+
Ok(ev) => {
248+
let _ = tx.send(ev);
249+
}
250+
Err(e) => {
251+
error!("Failed to attach encryption flag to to_device event: {}", e);
252+
}
253+
}
254+
async {}
255+
},
256+
);
257+
258+
let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
259+
EventReceiver { rx, _drop_guards: vec![drop_guard] }
260+
}
261+
262+
/// It will ignore all devices where errors occurred or where the device is
263+
/// not verified or where th user has a has_verification_violation.
264+
pub(crate) async fn send_to_device(
265+
&self,
266+
event_type: ToDeviceEventType,
267+
encrypted: bool,
268+
messages: BTreeMap<
269+
OwnedUserId,
270+
BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
271+
>,
272+
) -> Result<send_event_to_device::v3::Response> {
273+
let client = self.room.client();
274+
275+
let request = if encrypted {
276+
return Err(Error::UnknownError(
277+
"Sending encrypted to_device events is not supported by the widget driver.".into(),
278+
));
279+
} else {
280+
RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages)
281+
};
282+
283+
let response = client.send(request).await;
284+
285+
response.map_err(Into::into)
212286
}
213287
}
214288

215289
/// A simple entity that wraps an `UnboundedReceiver`
216290
/// along with the drop guard for the room event handler.
217-
pub(crate) struct EventReceiver {
218-
rx: UnboundedReceiver<Raw<AnyTimelineEvent>>,
219-
_drop_guards: [EventHandlerDropGuard; 2],
291+
pub(crate) struct EventReceiver<E> {
292+
rx: UnboundedReceiver<E>,
293+
_drop_guards: Vec<EventHandlerDropGuard>,
220294
}
221295

222-
impl EventReceiver {
223-
pub(crate) async fn recv(&mut self) -> Option<Raw<AnyTimelineEvent>> {
296+
impl<T> EventReceiver<T> {
297+
pub(crate) async fn recv(&mut self) -> Option<T> {
224298
self.rx.recv().await
225299
}
226300
}
227301

228-
fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
229-
let mut ev_obj = raw_ev.deserialize_as::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
230-
ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
231-
Raw::new(&ev_obj).unwrap().cast()
302+
// `room_id` and `encryption` is the only modification we need to do to the
303+
// events otherwise they are just forwarded raw to the widget.
304+
// This is why we do not serialization the whole event but pass it as a raw
305+
// value through the widget driver and only serialize here to allow potimizing
306+
// with `serde(borrow)`.
307+
#[derive(Deserialize, Serialize)]
308+
struct RoomIdEncryptionSerializer {
309+
#[serde(skip_serializing_if = "Option::is_none")]
310+
room_id: Option<OwnedRoomId>,
311+
#[serde(skip_serializing_if = "Option::is_none")]
312+
encrypted: Option<bool>,
313+
#[serde(flatten)]
314+
rest: Value,
315+
}
316+
317+
/// Attach additional properties to the event.
318+
///
319+
/// Attach a room id to the event. This is needed because the widget API
320+
/// requires the room id to be present in the event.
321+
///
322+
/// Attach the `ecryption` flag to the event. This is needed so the widget gets
323+
/// informed if an event is encrypted or not. Since the client is responsible
324+
/// for decrypting the event, there otherwise is no way for the widget to know
325+
/// if its an encrypted (signed/trusted) event or not.
326+
fn add_props_to_raw<T>(
327+
raw: &Raw<T>,
328+
room_id: Option<OwnedRoomId>,
329+
encryption_info: Option<&EncryptionInfo>,
330+
) -> Result<Raw<T>> {
331+
match raw.deserialize_as::<RoomIdEncryptionSerializer>() {
332+
Ok(mut event) => {
333+
event.room_id = room_id.or(event.room_id);
334+
event.encrypted = encryption_info.map(|_| true).or(event.encrypted);
335+
info!("rest is: {:?}", event.rest);
336+
Ok(Raw::new(&event)?.cast())
337+
}
338+
Err(e) => Err(Error::from(e)),
339+
}
340+
}
341+
#[cfg(test)]
342+
mod tests {
343+
use ruma::{room_id, serde::Raw};
344+
use serde_json::json;
345+
346+
use super::add_props_to_raw;
347+
348+
#[test]
349+
fn test_app_props_to_raw() {
350+
let raw = Raw::new(&json!({
351+
"encrypted": true,
352+
"type": "m.room.message",
353+
"content": {
354+
"body": "Hello world"
355+
}
356+
}))
357+
.unwrap();
358+
let room_id = room_id!("!my_id:example.org");
359+
let new = add_props_to_raw(&raw, Some(room_id.to_owned()), None).unwrap();
360+
assert_eq!(
361+
serde_json::to_value(new).unwrap(),
362+
json!({
363+
"encrypted": true,
364+
"room_id": "!my_id:example.org",
365+
"type": "m.room.message",
366+
"content": {
367+
"body": "Hello world"
368+
}
369+
})
370+
);
371+
}
232372
}

crates/matrix-sdk/src/widget/mod.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,16 @@ impl WidgetDriver {
237237
.await
238238
.map(MatrixDriverResponse::MatrixDelayedEventUpdate),
239239

240-
MatrixDriverRequestData::SendToDeviceEvent(req) => todo!(),
240+
MatrixDriverRequestData::SendToDeviceEvent(send_to_device_request) => {
241+
matrix_driver
242+
.send_to_device(
243+
send_to_device_request.event_type.into(),
244+
send_to_device_request.encrypted,
245+
send_to_device_request.messages,
246+
)
247+
.await
248+
.map(MatrixDriverResponse::MatrixToDeviceSent)
249+
}
241250
};
242251

243252
// Forward the matrix driver response to the incoming message stream.
@@ -259,7 +268,8 @@ impl WidgetDriver {
259268

260269
self.event_forwarding_guard = Some(guard);
261270

262-
let mut matrix = matrix_driver.events();
271+
let mut events_receiver = matrix_driver.events();
272+
let mut to_device_receiver = matrix_driver.to_device_events();
263273
let incoming_msg_tx = incoming_msg_tx.clone();
264274

265275
spawn(async move {
@@ -270,10 +280,15 @@ impl WidgetDriver {
270280
return;
271281
}
272282

273-
Some(event) = matrix.recv() => {
283+
Some(event) = events_receiver.recv() => {
274284
// Forward all events to the incoming messages stream.
275285
let _ = incoming_msg_tx.send(IncomingMessage::MatrixEventReceived(event));
276286
}
287+
288+
Some(event) = to_device_receiver.recv() => {
289+
// Forward all events to the incoming messages stream.
290+
let _ = incoming_msg_tx.send(IncomingMessage::ToDeviceReceived(event));
291+
}
277292
}
278293
}
279294
});

0 commit comments

Comments
 (0)