|
3 | 3 | // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial |
4 | 4 | // Please see LICENSE files in the repository root for full details. |
5 | 5 |
|
| 6 | +use std::sync::Arc; |
| 7 | + |
6 | 8 | use aide::{NoApi, OperationIo, transform::TransformOperation}; |
7 | | -use axum::{Json, response::IntoResponse}; |
| 9 | +use anyhow::Context; |
| 10 | +use axum::{Json, extract::State, response::IntoResponse}; |
8 | 11 | use chrono::Duration; |
9 | 12 | use hyper::StatusCode; |
10 | 13 | use mas_axum_utils::record_error; |
11 | | -use mas_data_model::{BoxRng, TokenType, personal::session::PersonalSessionOwner}; |
12 | | -use mas_storage::queue::{QueueJobRepositoryExt as _, SyncDevicesJob}; |
| 14 | +use mas_data_model::{BoxRng, Device, TokenType, personal::session::PersonalSessionOwner}; |
| 15 | +use mas_matrix::HomeserverConnection; |
13 | 16 | use oauth2_types::scope::Scope; |
14 | 17 | use schemars::JsonSchema; |
15 | 18 | use serde::Deserialize; |
@@ -99,6 +102,7 @@ pub async fn handler( |
99 | 102 | .. |
100 | 103 | }: CallContext, |
101 | 104 | NoApi(mut rng): NoApi<BoxRng>, |
| 105 | + NoApi(State(homeserver)): NoApi<State<Arc<dyn HomeserverConnection>>>, |
102 | 106 | Json(params): Json<Request>, |
103 | 107 | ) -> Result<(StatusCode, Json<SingleResponse<PersonalSession>>), RouteError> { |
104 | 108 | let owner = if let Some(user_id) = session.user_id { |
@@ -145,12 +149,26 @@ pub async fn handler( |
145 | 149 | ) |
146 | 150 | .await?; |
147 | 151 |
|
| 152 | + // If the session has a device, we should add those to the homeserver now |
148 | 153 | if session.has_device() { |
149 | | - // If the session has a device, then we are now |
150 | | - // creating a device and should schedule a device sync. |
151 | | - repo.queue_job() |
152 | | - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&actor_user)) |
153 | | - .await?; |
| 154 | + // Lock the user sync to make sure we don't get into a race condition |
| 155 | + repo.user().acquire_lock_for_sync(&actor_user).await?; |
| 156 | + |
| 157 | + for scope in &*session.scope { |
| 158 | + if let Some(device) = Device::from_scope_token(scope) { |
| 159 | + // NOTE: We haven't relinquished the repo at this point, |
| 160 | + // so we are holding a transaction across the homeserver |
| 161 | + // operation. |
| 162 | + // This is suboptimal, but simpler. |
| 163 | + // Given this is an administrative endpoint, this is a tolerable |
| 164 | + // compromise for now. |
| 165 | + homeserver |
| 166 | + .upsert_device(&actor_user.username, device.as_str(), None) |
| 167 | + .await |
| 168 | + .context("Failed to provision device") |
| 169 | + .map_err(|e| RouteError::Internal(e.into()))?; |
| 170 | + } |
| 171 | + } |
154 | 172 | } |
155 | 173 |
|
156 | 174 | repo.save().await?; |
|
0 commit comments