From 42f6664396a01606c4114ce89aea47c58646670f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 21 Oct 2025 14:09:52 +0100 Subject: [PATCH 1/2] When adding or revoking personal sessions, schedule needed device syncs --- crates/data-model/src/personal/session.rs | 11 ++++++++++- .../src/admin/v1/personal_sessions/add.rs | 9 +++++++++ .../src/admin/v1/personal_sessions/revoke.rs | 17 ++++++++++++++++- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/data-model/src/personal/session.rs b/crates/data-model/src/personal/session.rs index ddca07590..f3c8d34f9 100644 --- a/crates/data-model/src/personal/session.rs +++ b/crates/data-model/src/personal/session.rs @@ -10,7 +10,7 @@ use oauth2_types::scope::Scope; use serde::Serialize; use ulid::Ulid; -use crate::{Client, InvalidTransitionError, User}; +use crate::{Client, Device, InvalidTransitionError, User}; #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] pub enum SessionState { @@ -129,4 +129,13 @@ impl PersonalSession { self.state = self.state.revoke(revoked_at)?; Ok(self) } + + /// Returns whether the scope of this session contains a device scope; + /// in other words: whether this session has a device. + #[must_use] + pub fn has_device(&self) -> bool { + self.scope + .iter() + .any(|scope_token| Device::from_scope_token(scope_token).is_some()) + } } diff --git a/crates/handlers/src/admin/v1/personal_sessions/add.rs b/crates/handlers/src/admin/v1/personal_sessions/add.rs index 5a7bb5a0e..fd5e01203 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/add.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/add.rs @@ -9,6 +9,7 @@ use chrono::Duration; use hyper::StatusCode; use mas_axum_utils::record_error; use mas_data_model::{BoxRng, TokenType, personal::session::PersonalSessionOwner}; +use mas_storage::queue::{QueueJobRepositoryExt as _, SyncDevicesJob}; use oauth2_types::scope::Scope; use schemars::JsonSchema; use serde::Deserialize; @@ -144,6 +145,14 @@ pub async fn handler( ) .await?; + if session.has_device() { + // If the session has a device, then we are now + // creating a device and should schedule a device sync. + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&actor_user)) + .await?; + } + repo.save().await?; Ok(( diff --git a/crates/handlers/src/admin/v1/personal_sessions/revoke.rs b/crates/handlers/src/admin/v1/personal_sessions/revoke.rs index 6999539a9..10fd6650f 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/revoke.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/revoke.rs @@ -3,10 +3,12 @@ // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial // Please see LICENSE files in the repository root for full details. -use aide::{OperationIo, transform::TransformOperation}; +use aide::{NoApi, OperationIo, transform::TransformOperation}; use axum::{Json, response::IntoResponse}; use hyper::StatusCode; use mas_axum_utils::record_error; +use mas_data_model::BoxRng; +use mas_storage::queue::{QueueJobRepositoryExt as _, SyncDevicesJob}; use ulid::Ulid; use crate::{ @@ -80,6 +82,7 @@ pub async fn handler( CallContext { mut repo, clock, .. }: CallContext, + NoApi(mut rng): NoApi, session_id: UlidPathParam, ) -> Result>, RouteError> { let session_id = *session_id; @@ -95,6 +98,18 @@ pub async fn handler( let session = repo.personal_session().revoke(&clock, session).await?; + if session.has_device() { + // If the session has a device, then we are now + // deleting a device and should schedule a device sync to clean up. + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + SyncDevicesJob::new_for_id(session.actor_user_id), + ) + .await?; + } + repo.save().await?; Ok(Json(SingleResponse::new_canonical( From 95bc20e4408ad67b965d752f42246e9c1da4b985 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 22 Oct 2025 14:03:21 +0100 Subject: [PATCH 2/2] When adding personal session, upsert devices synchronously --- .../src/admin/v1/personal_sessions/add.rs | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/crates/handlers/src/admin/v1/personal_sessions/add.rs b/crates/handlers/src/admin/v1/personal_sessions/add.rs index fd5e01203..1721c1355 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/add.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/add.rs @@ -3,13 +3,16 @@ // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial // Please see LICENSE files in the repository root for full details. +use std::sync::Arc; + use aide::{NoApi, OperationIo, transform::TransformOperation}; -use axum::{Json, response::IntoResponse}; +use anyhow::Context; +use axum::{Json, extract::State, response::IntoResponse}; use chrono::Duration; use hyper::StatusCode; use mas_axum_utils::record_error; -use mas_data_model::{BoxRng, TokenType, personal::session::PersonalSessionOwner}; -use mas_storage::queue::{QueueJobRepositoryExt as _, SyncDevicesJob}; +use mas_data_model::{BoxRng, Device, TokenType, personal::session::PersonalSessionOwner}; +use mas_matrix::HomeserverConnection; use oauth2_types::scope::Scope; use schemars::JsonSchema; use serde::Deserialize; @@ -99,6 +102,7 @@ pub async fn handler( .. }: CallContext, NoApi(mut rng): NoApi, + NoApi(State(homeserver)): NoApi>>, Json(params): Json, ) -> Result<(StatusCode, Json>), RouteError> { let owner = if let Some(user_id) = session.user_id { @@ -145,12 +149,26 @@ pub async fn handler( ) .await?; + // If the session has a device, we should add those to the homeserver now if session.has_device() { - // If the session has a device, then we are now - // creating a device and should schedule a device sync. - repo.queue_job() - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&actor_user)) - .await?; + // Lock the user sync to make sure we don't get into a race condition + repo.user().acquire_lock_for_sync(&actor_user).await?; + + for scope in &*session.scope { + if let Some(device) = Device::from_scope_token(scope) { + // NOTE: We haven't relinquished the repo at this point, + // so we are holding a transaction across the homeserver + // operation. + // This is suboptimal, but simpler. + // Given this is an administrative endpoint, this is a tolerable + // compromise for now. + homeserver + .upsert_device(&actor_user.username, device.as_str(), None) + .await + .context("Failed to provision device") + .map_err(|e| RouteError::Internal(e.into()))?; + } + } } repo.save().await?;