From 1519de2a175cdc66ba07b67289bb27b2df3363bc Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 26 Sep 2025 12:47:34 +0100 Subject: [PATCH 01/10] Add tables for personal access tokens --- .../20250924132713_personal_access_tokens.sql | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql diff --git a/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql b/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql new file mode 100644 index 000000000..62df7bc3e --- /dev/null +++ b/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql @@ -0,0 +1,55 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +-- Please see LICENSE in the repository root for full details. + +-- A family of personal access tokens. This is a long-lived wrapper around the personal access tokens +-- themselves, allowing tokens to be regenerated whilst still retaining a persistent identifier for them. +CREATE TABLE personal_sessions ( + personal_session_id UUID NOT NULL PRIMARY KEY, + owner_user_id UUID NOT NULL REFERENCES users(user_id), + actor_user_id UUID NOT NULL REFERENCES users(user_id), + -- A human-readable label, intended to describe what the session is for. + human_name TEXT NOT NULL, + -- The OAuth2 scopes for the session, identical to OAuth2 sessions. + -- May include a device ID, but this is optional (sessions can be deviceless). + scope_list TEXT[] NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + -- If set, none of the tokens will be valid anymore. + revoked_at TIMESTAMP WITH TIME ZONE, + last_active_at TIMESTAMP WITH TIME ZONE, + last_active_ip INET +); + +-- Individual tokens. +CREATE TABLE personal_access_tokens ( + -- The family this access token belongs to. + personal_access_token_id UUID NOT NULL PRIMARY KEY, + personal_session_id UUID NOT NULL REFERENCES personal_sessions(personal_session_id), + -- SHA256 of the access token. + -- This is a lightweight measure to stop a database backup (or other + -- unauthorised read-only database access) escalating into real permissions + -- on a live system. + -- We could have used a hash with secret key, but this would no longer be + -- 'free' protection because it would need configuration (and introduce + -- potential issues with configuring it wrong). + -- This is currently inconsistent with other access token tables but it would + -- make sense to migrate those to match in the future. + access_token_sha256 BYTEA NOT NULL UNIQUE + -- A SHA256 hash is 32 bytes long + CHECK (octet_length(access_token_sha256) = 32), + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + -- If set, the token won't be valid after this time. + -- If not set, the token never automatically expires. + expires_at TIMESTAMP WITH TIME ZONE, + -- If set, this token is not valid anymore. + revoked_at TIMESTAMP WITH TIME ZONE +); + +-- Ensure we can only have one active personal access token in each family. +CREATE UNIQUE INDEX ON personal_access_tokens (personal_session_id) WHERE revoked_at IS NOT NULL; + +-- Add indices to satisfy foreign key backward checks +-- (and likely filter queries) +CREATE INDEX ON personal_sessions (owner_user_id); +CREATE INDEX ON personal_sessions (actor_user_id); From 87950bb18a030b9d04b8891fc9f195e808fb1980 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 2 Oct 2025 13:51:28 +0100 Subject: [PATCH 02/10] activity tracker: Add SessionKind::Personal --- crates/handlers/src/activity_tracker/mod.rs | 25 +++++++++++++++++++ .../handlers/src/activity_tracker/worker.rs | 5 ++++ 2 files changed, 30 insertions(+) diff --git a/crates/handlers/src/activity_tracker/mod.rs b/crates/handlers/src/activity_tracker/mod.rs index 738da3856..71d6a8fc2 100644 --- a/crates/handlers/src/activity_tracker/mod.rs +++ b/crates/handlers/src/activity_tracker/mod.rs @@ -24,6 +24,8 @@ static MESSAGE_QUEUE_SIZE: usize = 1000; enum SessionKind { OAuth2, Compat, + /// Session associated with personal access tokens + Personal, Browser, } @@ -32,6 +34,7 @@ impl SessionKind { match self { SessionKind::OAuth2 => "oauth2", SessionKind::Compat => "compat", + SessionKind::Personal => "personal", SessionKind::Browser => "browser", } } @@ -108,6 +111,28 @@ impl ActivityTracker { } } + /// Record activity in a personal access token session. + pub async fn record_personal_access_token_session( + &self, + clock: &dyn Clock, + session: &Session, + ip: Option, + ) { + let res = self + .channel + .send(Message::Record { + kind: SessionKind::Personal, + id: session.id, + date_time: clock.now(), + ip, + }) + .await; + + if let Err(e) = res { + tracing::error!("Failed to record Personal session: {}", e); + } + } + /// Record activity in a compat session. pub async fn record_compat_session( &self, diff --git a/crates/handlers/src/activity_tracker/worker.rs b/crates/handlers/src/activity_tracker/worker.rs index 46cc84ccd..6fa51fce3 100644 --- a/crates/handlers/src/activity_tracker/worker.rs +++ b/crates/handlers/src/activity_tracker/worker.rs @@ -224,6 +224,7 @@ impl Worker { let mut browser_sessions = Vec::new(); let mut oauth2_sessions = Vec::new(); let mut compat_sessions = Vec::new(); + let mut personal_sessions = Vec::new(); for ((kind, id), record) in pending_records { match kind { @@ -236,6 +237,9 @@ impl Worker { SessionKind::Compat => { compat_sessions.push((*id, record.end_time, record.ip)); } + SessionKind::Personal => { + personal_sessions.push((*id, record.end_time, record.ip)); + } } } @@ -253,6 +257,7 @@ impl Worker { repo.compat_session() .record_batch_activity(compat_sessions) .await?; + // TODO: personal sessions: record repo.save().await?; self.pending_records.clear(); From b54a657c32c715fda1daf00a5610f408bfaf5b83 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 2 Oct 2025 13:53:43 +0100 Subject: [PATCH 03/10] data model: Add personal sessions with `mpt_` prefix --- crates/data-model/src/lib.rs | 1 + crates/data-model/src/personal/mod.rs | 32 ++++++ crates/data-model/src/personal/session.rs | 111 ++++++++++++++++++++ crates/data-model/src/tokens.rs | 10 +- crates/handlers/src/oauth2/introspection.rs | 5 + 5 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 crates/data-model/src/personal/mod.rs create mode 100644 crates/data-model/src/personal/session.rs diff --git a/crates/data-model/src/lib.rs b/crates/data-model/src/lib.rs index 337c05d89..962c8be00 100644 --- a/crates/data-model/src/lib.rs +++ b/crates/data-model/src/lib.rs @@ -11,6 +11,7 @@ use thiserror::Error; pub mod clock; pub(crate) mod compat; pub mod oauth2; +pub mod personal; pub(crate) mod policy_data; mod site_config; pub(crate) mod tokens; diff --git a/crates/data-model/src/personal/mod.rs b/crates/data-model/src/personal/mod.rs new file mode 100644 index 000000000..1142fea76 --- /dev/null +++ b/crates/data-model/src/personal/mod.rs @@ -0,0 +1,32 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +pub mod session; + +use chrono::{DateTime, Utc}; +use ulid::Ulid; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PersonalAccessToken { + pub id: Ulid, + pub session_id: Ulid, + pub created_at: DateTime, + pub expires_at: Option>, + pub revoked_at: Option>, +} + +impl PersonalAccessToken { + #[must_use] + pub fn is_valid(&self, now: DateTime) -> bool { + if self.revoked_at.is_some() { + return false; + } + if let Some(expires_at) = self.expires_at { + expires_at > now + } else { + true + } + } +} diff --git a/crates/data-model/src/personal/session.rs b/crates/data-model/src/personal/session.rs new file mode 100644 index 000000000..f8904f810 --- /dev/null +++ b/crates/data-model/src/personal/session.rs @@ -0,0 +1,111 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use std::net::IpAddr; + +use chrono::{DateTime, Utc}; +use oauth2_types::scope::Scope; +use serde::Serialize; +use ulid::Ulid; + +use crate::InvalidTransitionError; + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub enum SessionState { + #[default] + Valid, + Revoked { + revoked_at: DateTime, + }, +} + +impl SessionState { + /// Returns `true` if the session state is [`Valid`]. + /// + /// [`Valid`]: SessionState::Valid + #[must_use] + pub fn is_valid(&self) -> bool { + matches!(self, Self::Valid) + } + + /// Returns `true` if the session state is [`Revoked`]. + /// + /// [`Revoked`]: SessionState::Revoked + #[must_use] + pub fn is_revoked(&self) -> bool { + matches!(self, Self::Revoked { .. }) + } + + /// Transitions the session state to [`Revoked`]. + /// + /// # Parameters + /// + /// * `revoked_at` - The time at which the session was revoked. + /// + /// # Errors + /// + /// Returns an error if the session state is already [`Revoked`]. + /// + /// [`Revoked`]: SessionState::Revoked + pub fn revoke(self, revoked_at: DateTime) -> Result { + match self { + Self::Valid => Ok(Self::Revoked { revoked_at }), + Self::Revoked { .. } => Err(InvalidTransitionError), + } + } + + /// Returns the time the session was revoked, if any + /// + /// Returns `None` if the session is still [`Valid`]. + /// + /// [`Valid`]: SessionState::Valid + #[must_use] + pub fn revoked_at(&self) -> Option> { + match self { + Self::Valid => None, + Self::Revoked { revoked_at } => Some(*revoked_at), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct PersonalSession { + pub id: Ulid, + pub state: SessionState, + pub owner_user_id: Ulid, + pub actor_user_id: Ulid, + pub human_name: String, + /// The scope for the session, identical to OAuth2 sessions. + /// May or may not include a device scope + /// (personal sessions can be deviceless). + pub scope: Scope, + pub created_at: DateTime, + pub last_active_at: Option>, + pub last_active_ip: Option, +} + +impl std::ops::Deref for PersonalSession { + type Target = SessionState; + + fn deref(&self) -> &Self::Target { + &self.state + } +} + +impl PersonalSession { + /// Marks the session as revoked. + /// + /// # Parameters + /// + /// * `revoked_at` - The time at which the session was finished. + /// + /// # Errors + /// + /// Returns an error if the session is already finished. + pub fn finish(mut self, revoked_at: DateTime) -> Result { + self.state = self.state.revoke(revoked_at)?; + Ok(self) + } +} diff --git a/crates/data-model/src/tokens.rs b/crates/data-model/src/tokens.rs index 1ea5be6be..bd34c5000 100644 --- a/crates/data-model/src/tokens.rs +++ b/crates/data-model/src/tokens.rs @@ -240,6 +240,9 @@ pub enum TokenType { /// A legacy refresh token CompatRefreshToken, + + /// A personal access token. + PersonalAccessToken, } impl std::fmt::Display for TokenType { @@ -249,6 +252,7 @@ impl std::fmt::Display for TokenType { TokenType::RefreshToken => write!(f, "refresh token"), TokenType::CompatAccessToken => write!(f, "compat access token"), TokenType::CompatRefreshToken => write!(f, "compat refresh token"), + TokenType::PersonalAccessToken => write!(f, "personal access token"), } } } @@ -260,6 +264,7 @@ impl TokenType { TokenType::RefreshToken => "mar", TokenType::CompatAccessToken => "mct", TokenType::CompatRefreshToken => "mcr", + TokenType::PersonalAccessToken => "mpt", } } @@ -269,6 +274,7 @@ impl TokenType { "mar" => Some(TokenType::RefreshToken), "mct" | "syt" => Some(TokenType::CompatAccessToken), "mcr" | "syr" => Some(TokenType::CompatRefreshToken), + "mpt" => Some(TokenType::PersonalAccessToken), _ => None, } } @@ -335,7 +341,9 @@ impl PartialEq for TokenType { matches!( (self, other), ( - TokenType::AccessToken | TokenType::CompatAccessToken, + TokenType::AccessToken + | TokenType::CompatAccessToken + | TokenType::PersonalAccessToken, OAuthTokenTypeHint::AccessToken ) | ( TokenType::RefreshToken | TokenType::CompatRefreshToken, diff --git a/crates/handlers/src/oauth2/introspection.rs b/crates/handlers/src/oauth2/introspection.rs index 6bb61bf72..27ee3fdbc 100644 --- a/crates/handlers/src/oauth2/introspection.rs +++ b/crates/handlers/src/oauth2/introspection.rs @@ -625,6 +625,11 @@ pub(crate) async fn post( device_id: session.device.map(Device::into), } } + + TokenType::PersonalAccessToken => { + // TODO + return Err(RouteError::UnknownToken(TokenType::PersonalAccessToken)); + } }; repo.save().await?; From 6dfa0e34dacfeb04080631889579f1434440c10a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 2 Oct 2025 14:34:34 +0100 Subject: [PATCH 04/10] Add personal access token and session storage --- Cargo.lock | 1 + ...56577d98074e244a35c0d3be24bc18d9d0daa.json | 15 ++ ...63aa46225245a04d1c7bc24b5275c44a6d58d.json | 15 ++ ...7a18ff07e0b5cd08cc525ac9d5dcceece7311.json | 70 ++++++ ...6f5c701411387c939f6b8a3478b41b3de4f20.json | 46 ++++ ...7e260ba8911123744980e24a52bc9b95bd056.json | 18 ++ ...f063537d5a7f13c48d031367c1d8dba2f8af5.json | 19 ++ ...e1ef2f192ca66f8000d1385626154e5ce4f7e.json | 46 ++++ crates/storage-pg/Cargo.toml | 1 + crates/storage-pg/src/lib.rs | 1 + .../storage-pg/src/personal/access_token.rs | 216 +++++++++++++++++ crates/storage-pg/src/personal/mod.rs | 13 ++ crates/storage-pg/src/personal/session.rs | 218 ++++++++++++++++++ crates/storage-pg/src/repository.rs | 15 ++ crates/storage/src/lib.rs | 1 + crates/storage/src/personal/access_token.rs | 119 ++++++++++ crates/storage/src/personal/mod.rs | 13 ++ crates/storage/src/personal/session.rs | 101 ++++++++ crates/storage/src/repository.rs | 39 ++++ 19 files changed, 967 insertions(+) create mode 100644 crates/storage-pg/.sqlx/query-06d67595eeef23d5f2773632e0956577d98074e244a35c0d3be24bc18d9d0daa.json create mode 100644 crates/storage-pg/.sqlx/query-0e45995714e60b71e0f0158500a63aa46225245a04d1c7bc24b5275c44a6d58d.json create mode 100644 crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json create mode 100644 crates/storage-pg/.sqlx/query-90875bdd2f75cdf0dc3f48dc2516f5c701411387c939f6b8a3478b41b3de4f20.json create mode 100644 crates/storage-pg/.sqlx/query-a0be6c56e470382b9470df414497e260ba8911123744980e24a52bc9b95bd056.json create mode 100644 crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json create mode 100644 crates/storage-pg/.sqlx/query-e1746b33c2f0d10f26332195f78e1ef2f192ca66f8000d1385626154e5ce4f7e.json create mode 100644 crates/storage-pg/src/personal/access_token.rs create mode 100644 crates/storage-pg/src/personal/mod.rs create mode 100644 crates/storage-pg/src/personal/session.rs create mode 100644 crates/storage/src/personal/access_token.rs create mode 100644 crates/storage/src/personal/mod.rs create mode 100644 crates/storage/src/personal/session.rs diff --git a/Cargo.lock b/Cargo.lock index 253308775..67e00137b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3676,6 +3676,7 @@ dependencies = [ "sea-query", "sea-query-binder", "serde_json", + "sha2", "sqlx", "thiserror 2.0.17", "tracing", diff --git a/crates/storage-pg/.sqlx/query-06d67595eeef23d5f2773632e0956577d98074e244a35c0d3be24bc18d9d0daa.json b/crates/storage-pg/.sqlx/query-06d67595eeef23d5f2773632e0956577d98074e244a35c0d3be24bc18d9d0daa.json new file mode 100644 index 000000000..55509569c --- /dev/null +++ b/crates/storage-pg/.sqlx/query-06d67595eeef23d5f2773632e0956577d98074e244a35c0d3be24bc18d9d0daa.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE personal_sessions\n SET revoked_at = $2\n WHERE personal_session_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "06d67595eeef23d5f2773632e0956577d98074e244a35c0d3be24bc18d9d0daa" +} diff --git a/crates/storage-pg/.sqlx/query-0e45995714e60b71e0f0158500a63aa46225245a04d1c7bc24b5275c44a6d58d.json b/crates/storage-pg/.sqlx/query-0e45995714e60b71e0f0158500a63aa46225245a04d1c7bc24b5275c44a6d58d.json new file mode 100644 index 000000000..5bba6548d --- /dev/null +++ b/crates/storage-pg/.sqlx/query-0e45995714e60b71e0f0158500a63aa46225245a04d1c7bc24b5275c44a6d58d.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE personal_access_tokens\n SET revoked_at = $2\n WHERE personal_access_token_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "0e45995714e60b71e0f0158500a63aa46225245a04d1c7bc24b5275c44a6d58d" +} diff --git a/crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json b/crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json new file mode 100644 index 000000000..d7eb2c798 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json @@ -0,0 +1,70 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT personal_session_id\n , owner_user_id\n , actor_user_id\n , scope_list\n , created_at\n , revoked_at\n , human_name\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM personal_sessions\n\n WHERE personal_session_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "personal_session_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "owner_user_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "actor_user_id", + "type_info": "Uuid" + }, + { + "ordinal": 3, + "name": "scope_list", + "type_info": "TextArray" + }, + { + "ordinal": 4, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "revoked_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "human_name", + "type_info": "Text" + }, + { + "ordinal": 7, + "name": "last_active_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 8, + "name": "last_active_ip: IpAddr", + "type_info": "Inet" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + false, + true, + true + ] + }, + "hash": "8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311" +} diff --git a/crates/storage-pg/.sqlx/query-90875bdd2f75cdf0dc3f48dc2516f5c701411387c939f6b8a3478b41b3de4f20.json b/crates/storage-pg/.sqlx/query-90875bdd2f75cdf0dc3f48dc2516f5c701411387c939f6b8a3478b41b3de4f20.json new file mode 100644 index 000000000..66aab4ee6 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-90875bdd2f75cdf0dc3f48dc2516f5c701411387c939f6b8a3478b41b3de4f20.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT personal_access_token_id\n , personal_session_id\n , created_at\n , expires_at\n , revoked_at\n\n FROM personal_access_tokens\n\n WHERE access_token_sha256 = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "personal_access_token_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "personal_session_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "expires_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "revoked_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false, + false, + false, + true, + true + ] + }, + "hash": "90875bdd2f75cdf0dc3f48dc2516f5c701411387c939f6b8a3478b41b3de4f20" +} diff --git a/crates/storage-pg/.sqlx/query-a0be6c56e470382b9470df414497e260ba8911123744980e24a52bc9b95bd056.json b/crates/storage-pg/.sqlx/query-a0be6c56e470382b9470df414497e260ba8911123744980e24a52bc9b95bd056.json new file mode 100644 index 000000000..3542f8481 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-a0be6c56e470382b9470df414497e260ba8911123744980e24a52bc9b95bd056.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO personal_access_tokens\n (personal_access_token_id, personal_session_id, access_token_sha256, created_at, expires_at)\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Bytea", + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "a0be6c56e470382b9470df414497e260ba8911123744980e24a52bc9b95bd056" +} diff --git a/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json b/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json new file mode 100644 index 000000000..9dec975ca --- /dev/null +++ b/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO personal_sessions\n ( personal_session_id\n , owner_user_id\n , actor_user_id\n , human_name\n , scope_list\n , created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Uuid", + "Text", + "TextArray", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5" +} diff --git a/crates/storage-pg/.sqlx/query-e1746b33c2f0d10f26332195f78e1ef2f192ca66f8000d1385626154e5ce4f7e.json b/crates/storage-pg/.sqlx/query-e1746b33c2f0d10f26332195f78e1ef2f192ca66f8000d1385626154e5ce4f7e.json new file mode 100644 index 000000000..2112e7603 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-e1746b33c2f0d10f26332195f78e1ef2f192ca66f8000d1385626154e5ce4f7e.json @@ -0,0 +1,46 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT personal_access_token_id\n , personal_session_id\n , created_at\n , expires_at\n , revoked_at\n\n FROM personal_access_tokens\n\n WHERE personal_access_token_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "personal_access_token_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "personal_session_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "created_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "expires_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "revoked_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false, + true, + true + ] + }, + "hash": "e1746b33c2f0d10f26332195f78e1ef2f192ca66f8000d1385626154e5ce4f7e" +} diff --git a/crates/storage-pg/Cargo.toml b/crates/storage-pg/Cargo.toml index 149e92fc6..8710ead70 100644 --- a/crates/storage-pg/Cargo.toml +++ b/crates/storage-pg/Cargo.toml @@ -27,6 +27,7 @@ rand.workspace = true sea-query-binder.workspace = true sea-query.workspace = true serde_json.workspace = true +sha2.workspace = true sqlx.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index 908058df6..207235667 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -165,6 +165,7 @@ use sqlx::migrate::Migrator; pub mod app_session; pub mod compat; pub mod oauth2; +pub mod personal; pub mod queue; pub mod upstream_oauth2; pub mod user; diff --git a/crates/storage-pg/src/personal/access_token.rs b/crates/storage-pg/src/personal/access_token.rs new file mode 100644 index 000000000..832e867f5 --- /dev/null +++ b/crates/storage-pg/src/personal/access_token.rs @@ -0,0 +1,216 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use mas_data_model::{ + Clock, + personal::{PersonalAccessToken, session::PersonalSession}, +}; +use mas_storage::personal::PersonalAccessTokenRepository; +use rand::RngCore; +use sha2::{Digest, Sha256}; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, tracing::ExecuteExt as _}; + +/// An implementation of [`PersonalAccessTokenRepository`] for a PostgreSQL +/// connection +pub struct PgPersonalAccessTokenRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgPersonalAccessTokenRepository<'c> { + /// Create a new [`PgPersonalAccessTokenRepository`] from an active + /// PostgreSQL connection + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +struct PersonalAccessTokenLookup { + personal_access_token_id: Uuid, + personal_session_id: Uuid, + created_at: DateTime, + expires_at: Option>, + revoked_at: Option>, +} + +impl From for PersonalAccessToken { + fn from(value: PersonalAccessTokenLookup) -> Self { + Self { + id: Ulid::from(value.personal_access_token_id), + session_id: Ulid::from(value.personal_session_id), + created_at: value.created_at, + expires_at: value.expires_at, + revoked_at: value.revoked_at, + } + } +} + +#[async_trait] +impl PersonalAccessTokenRepository for PgPersonalAccessTokenRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.personal_access_token.lookup", + skip_all, + fields( + db.query.text, + personal_access_token.id = %id, + ), + err, + )] + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error> { + let res = sqlx::query_as!( + PersonalAccessTokenLookup, + r#" + SELECT personal_access_token_id + , personal_session_id + , created_at + , expires_at + , revoked_at + + FROM personal_access_tokens + + WHERE personal_access_token_id = $1 + "#, + Uuid::from(id), + ) + .traced() + .fetch_optional(&mut *self.conn) + .await?; + + let Some(res) = res else { return Ok(None) }; + + Ok(Some(res.into())) + } + + #[tracing::instrument( + name = "db.personal_access_token.find_by_token", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn find_by_token( + &mut self, + access_token: &str, + ) -> Result, Self::Error> { + let token_sha256 = Sha256::digest(access_token.as_bytes()).to_vec(); + + let res = sqlx::query_as!( + PersonalAccessTokenLookup, + r#" + SELECT personal_access_token_id + , personal_session_id + , created_at + , expires_at + , revoked_at + + FROM personal_access_tokens + + WHERE access_token_sha256 = $1 + "#, + &token_sha256, + ) + .traced() + .fetch_optional(&mut *self.conn) + .await?; + + let Some(res) = res else { return Ok(None) }; + + Ok(Some(res.into())) + } + + #[tracing::instrument( + name = "db.personal_access_token.add", + skip_all, + fields( + db.query.text, + personal_access_token.id, + %session.id, + ), + err, + )] + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + session: &PersonalSession, + access_token: String, + expires_after: Option, + ) -> Result { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("personal_access_token.id", tracing::field::display(id)); + + let token_sha256 = Sha256::digest(access_token.as_bytes()).to_vec(); + + let expires_at = expires_after.map(|expires_after| created_at + expires_after); + + sqlx::query!( + r#" + INSERT INTO personal_access_tokens + (personal_access_token_id, personal_session_id, access_token_sha256, created_at, expires_at) + VALUES ($1, $2, $3, $4, $5) + "#, + Uuid::from(id), + Uuid::from(session.id), + &token_sha256, + created_at, + expires_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(PersonalAccessToken { + id, + session_id: session.id, + created_at, + expires_at, + revoked_at: None, + }) + } + + #[tracing::instrument( + name = "db.personal_access_token.revoke", + skip_all, + fields( + db.query.text, + %access_token.id, + personal_session.id = %access_token.session_id, + ), + err, + )] + async fn revoke( + &mut self, + clock: &dyn Clock, + mut access_token: PersonalAccessToken, + ) -> Result { + let revoked_at = clock.now(); + let res = sqlx::query!( + r#" + UPDATE personal_access_tokens + SET revoked_at = $2 + WHERE personal_access_token_id = $1 + "#, + Uuid::from(access_token.id), + revoked_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + access_token.revoked_at = Some(revoked_at); + Ok(access_token) + } +} diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs new file mode 100644 index 000000000..e60daccc4 --- /dev/null +++ b/crates/storage-pg/src/personal/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! A module containing the PostgreSQL implementations of the +//! Personal Access Token / Personal Session repositories + +mod access_token; +mod session; + +pub use access_token::PgPersonalAccessTokenRepository; +pub use session::PgPersonalSessionRepository; diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs new file mode 100644 index 000000000..514293ba9 --- /dev/null +++ b/crates/storage-pg/src/personal/session.rs @@ -0,0 +1,218 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use std::net::IpAddr; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use mas_data_model::{ + Clock, User, + personal::session::{PersonalSession, SessionState}, +}; +use mas_storage::personal::PersonalSessionRepository; +use oauth2_types::scope::Scope; +use rand::RngCore; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, errors::DatabaseInconsistencyError, tracing::ExecuteExt as _}; + +/// An implementation of [`PersonalSessionRepository`] for a PostgreSQL +/// connection +pub struct PgPersonalSessionRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgPersonalSessionRepository<'c> { + /// Create a new [`PgOAuth2SessionRepository`] from an active PostgreSQL + /// connection + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +struct PersonalSessionLookup { + personal_session_id: Uuid, + owner_user_id: Uuid, + actor_user_id: Uuid, + human_name: String, + scope_list: Vec, + created_at: DateTime, + revoked_at: Option>, + last_active_at: Option>, + last_active_ip: Option, +} + +impl TryFrom for PersonalSession { + type Error = DatabaseInconsistencyError; + + fn try_from(value: PersonalSessionLookup) -> Result { + let id = Ulid::from(value.personal_session_id); + let scope: Result = value.scope_list.iter().map(|s| s.parse()).collect(); + let scope = scope.map_err(|e| { + DatabaseInconsistencyError::on("personal_sessions") + .column("scope") + .row(id) + .source(e) + })?; + + let state = match value.revoked_at { + None => SessionState::Valid, + Some(revoked_at) => SessionState::Revoked { revoked_at }, + }; + + Ok(PersonalSession { + id, + state, + owner_user_id: Ulid::from(value.owner_user_id), + actor_user_id: Ulid::from(value.actor_user_id), + human_name: value.human_name, + scope, + created_at: value.created_at, + last_active_at: value.last_active_at, + last_active_ip: value.last_active_ip, + }) + } +} + +#[async_trait] +impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.personal_session.lookup", + skip_all, + fields( + db.query.text, + session.id = %id, + ), + err, + )] + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error> { + let res = sqlx::query_as!( + PersonalSessionLookup, + r#" + SELECT personal_session_id + , owner_user_id + , actor_user_id + , scope_list + , created_at + , revoked_at + , human_name + , last_active_at + , last_active_ip as "last_active_ip: IpAddr" + FROM personal_sessions + + WHERE personal_session_id = $1 + "#, + Uuid::from(id), + ) + .traced() + .fetch_optional(&mut *self.conn) + .await?; + + let Some(session) = res else { return Ok(None) }; + + Ok(Some(session.try_into()?)) + } + + #[tracing::instrument( + name = "db.personal_session.add", + skip_all, + fields( + db.query.text, + session.id, + session.scope = %scope, + ), + err, + )] + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + owner_user: &User, + actor_user: &User, + human_name: String, + scope: Scope, + ) -> Result { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("session.id", tracing::field::display(id)); + + let scope_list: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); + + sqlx::query!( + r#" + INSERT INTO personal_sessions + ( personal_session_id + , owner_user_id + , actor_user_id + , human_name + , scope_list + , created_at + ) + VALUES ($1, $2, $3, $4, $5, $6) + "#, + Uuid::from(id), + Uuid::from(owner_user.id), + Uuid::from(actor_user.id), + &human_name, + &scope_list, + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(PersonalSession { + id, + state: SessionState::Valid, + owner_user_id: owner_user.id, + actor_user_id: actor_user.id, + human_name, + scope, + created_at, + last_active_at: None, + last_active_ip: None, + }) + } + + #[tracing::instrument( + name = "db.personal_session.revoke", + skip_all, + fields( + db.query.text, + %session.id, + %session.scope, + ), + err, + )] + async fn revoke( + &mut self, + clock: &dyn Clock, + session: PersonalSession, + ) -> Result { + let finished_at = clock.now(); + let res = sqlx::query!( + r#" + UPDATE personal_sessions + SET revoked_at = $2 + WHERE personal_session_id = $1 + "#, + Uuid::from(session.id), + finished_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + session + .finish(finished_at) + .map_err(DatabaseError::to_invalid_operation) + } +} diff --git a/crates/storage-pg/src/repository.rs b/crates/storage-pg/src/repository.rs index 7911cd2b6..210d66a02 100644 --- a/crates/storage-pg/src/repository.rs +++ b/crates/storage-pg/src/repository.rs @@ -20,6 +20,7 @@ use mas_storage::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + personal::PersonalSessionRepository, policy_data::PolicyDataRepository, queue::{QueueJobRepository, QueueScheduleRepository, QueueWorkerRepository}, upstream_oauth2::{ @@ -47,6 +48,7 @@ use crate::{ PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository, PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository, }, + personal::{PgPersonalAccessTokenRepository, PgPersonalSessionRepository}, policy_data::PgPolicyDataRepository, queue::{ job::PgQueueJobRepository, schedule::PgQueueScheduleRepository, @@ -328,6 +330,19 @@ where Box::new(PgCompatRefreshTokenRepository::new(self.conn.as_mut())) } + fn personal_access_token<'c>( + &'c mut self, + ) -> Box + 'c> + { + Box::new(PgPersonalAccessTokenRepository::new(self.conn.as_mut())) + } + + fn personal_session<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(PgPersonalSessionRepository::new(self.conn.as_mut())) + } + fn queue_worker<'c>(&'c mut self) -> Box + 'c> { Box::new(PgQueueWorkerRepository::new(self.conn.as_mut())) } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 605dea279..7a19f05ac 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -111,6 +111,7 @@ mod utils; pub mod app_session; pub mod compat; pub mod oauth2; +pub mod personal; pub mod policy_data; pub mod queue; pub mod upstream_oauth2; diff --git a/crates/storage/src/personal/access_token.rs b/crates/storage/src/personal/access_token.rs new file mode 100644 index 000000000..4f06bbf34 --- /dev/null +++ b/crates/storage/src/personal/access_token.rs @@ -0,0 +1,119 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use async_trait::async_trait; +use chrono::Duration; +use mas_data_model::{ + Clock, + personal::{PersonalAccessToken, session::PersonalSession}, +}; +use rand_core::RngCore; +use ulid::Ulid; + +use crate::repository_impl; + +/// An [`PersonalAccessTokenRepository`] helps interacting with +/// [`PersonalAccessToken`] saved in the storage backend +#[async_trait] +pub trait PersonalAccessTokenRepository: Send + Sync { + /// The error type returned by the repository + type Error; + + /// Lookup an access token by its ID + /// + /// Returns the access token if it exists, `None` otherwise + /// + /// # Parameters + /// + /// * `id`: The ID of the access token to lookup + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error>; + + /// Find an access token by its token + /// + /// Returns the access token if it exists, `None` otherwise + /// + /// # Parameters + /// + /// * `access_token`: The token of the access token to lookup + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn find_by_token( + &mut self, + access_token: &str, + ) -> Result, Self::Error>; + + /// Add a new access token to the database + /// + /// Returns the newly created access token + /// + /// # Parameters + /// + /// * `rng`: A random number generator + /// * `clock`: The clock used to generate timestamps + /// * `session`: The session the access token is associated with + /// * `access_token`: The access token to add + /// * `expires_after`: The duration after which the access token expires. If + /// [`None`] the access token never expires + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + session: &PersonalSession, + access_token: String, + expires_after: Option, + ) -> Result; + + /// Revoke an access token + /// + /// Returns the revoked access token + /// + /// # Parameters + /// + /// * `clock`: The clock used to generate timestamps + /// * `access_token`: The access token to revoke + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn revoke( + &mut self, + clock: &dyn Clock, + access_token: PersonalAccessToken, + ) -> Result; +} + +repository_impl!(PersonalAccessTokenRepository: + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error>; + + async fn find_by_token( + &mut self, + access_token: &str, + ) -> Result, Self::Error>; + + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + session: &PersonalSession, + access_token: String, + expires_after: Option, + ) -> Result; + + async fn revoke( + &mut self, + clock: &dyn Clock, + access_token: PersonalAccessToken, + ) -> Result; +); diff --git a/crates/storage/src/personal/mod.rs b/crates/storage/src/personal/mod.rs new file mode 100644 index 000000000..28a33e1a0 --- /dev/null +++ b/crates/storage/src/personal/mod.rs @@ -0,0 +1,13 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +//! Repositories to deal with Personal Sessions and Personal Access Tokens +//! (PATs), which are sessions/access tokens created manually by users for use +//! in scripts, bots and similar applications. + +mod access_token; +mod session; + +pub use self::{access_token::PersonalAccessTokenRepository, session::PersonalSessionRepository}; diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs new file mode 100644 index 000000000..7d0a76a37 --- /dev/null +++ b/crates/storage/src/personal/session.rs @@ -0,0 +1,101 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial +// Please see LICENSE files in the repository root for full details. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use mas_data_model::{Clock, Device, User, personal::session::PersonalSession}; +use oauth2_types::scope::Scope; +use rand_core::RngCore; +use ulid::Ulid; + +use crate::repository_impl; + +/// A [`PersonalSessionRepository`] helps interacting with +/// [`PersonalSession`] saved in the storage backend +#[async_trait] +pub trait PersonalSessionRepository: Send + Sync { + /// The error type returned by the repository + type Error; + + /// Lookup a Personal session by its ID + /// + /// Returns the Personal session if it exists, `None` otherwise + /// + /// # Parameters + /// + /// * `id`: The ID of the Personal session to lookup + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error>; + + /// Start a new Personal session + /// + /// Returns the newly created Personal session + /// + /// # Parameters + /// + /// * `rng`: The random number generator to use + /// * `clock`: The clock used to generate timestamps + /// * `owner_user`: The user that will own the personal session + /// * `actor_user`: The user that will be represented by the personal + /// session + /// * `device`: The device ID of this session + /// * `human_name`: The human-readable name of the session provided by the + /// client or the user + /// * `scope`: The [`Scope`] of the [`PersonalSession`] + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + owner_user: &User, + actor_user: &User, + human_name: String, + scope: Scope, + ) -> Result; + + /// End a Personal session + /// + /// Returns the ended Personal session + /// + /// # Parameters + /// + /// * `clock`: The clock used to generate timestamps + /// * `Personal_session`: The Personal session to end + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn revoke( + &mut self, + clock: &dyn Clock, + personal_session: PersonalSession, + ) -> Result; +} + +repository_impl!(PersonalSessionRepository: + async fn lookup(&mut self, id: Ulid) -> Result, Self::Error>; + + async fn add( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + owner_user: &User, + actor_user: &User, + human_name: String, + scope: Scope, + ) -> Result; + + async fn revoke( + &mut self, + clock: &dyn Clock, + personal_session: PersonalSession, + ) -> Result; +); diff --git a/crates/storage/src/repository.rs b/crates/storage/src/repository.rs index 518769eb1..f6eb191e6 100644 --- a/crates/storage/src/repository.rs +++ b/crates/storage/src/repository.rs @@ -18,6 +18,7 @@ use crate::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + personal::{PersonalAccessTokenRepository, PersonalSessionRepository}, policy_data::PolicyDataRepository, queue::{QueueJobRepository, QueueScheduleRepository, QueueWorkerRepository}, upstream_oauth2::{ @@ -214,6 +215,16 @@ pub trait RepositoryAccess: Send { &'c mut self, ) -> Box + 'c>; + /// Get a [`PersonalAccessTokenRepository`] + fn personal_access_token<'c>( + &'c mut self, + ) -> Box + 'c>; + + /// Get a [`PersonalSessionRepository`] + fn personal_session<'c>( + &'c mut self, + ) -> Box + 'c>; + /// Get a [`QueueWorkerRepository`] fn queue_worker<'c>(&'c mut self) -> Box + 'c>; @@ -247,6 +258,7 @@ mod impls { OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + personal::{PersonalAccessTokenRepository, PersonalSessionRepository}, policy_data::PolicyDataRepository, queue::{QueueJobRepository, QueueScheduleRepository, QueueWorkerRepository}, upstream_oauth2::{ @@ -458,6 +470,21 @@ mod impls { )) } + fn personal_access_token<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(MapErr::new( + self.inner.personal_access_token(), + &mut self.mapper, + )) + } + + fn personal_session<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(MapErr::new(self.inner.personal_session(), &mut self.mapper)) + } + fn queue_worker<'c>( &'c mut self, ) -> Box + 'c> { @@ -610,6 +637,18 @@ mod impls { (**self).compat_refresh_token() } + fn personal_access_token<'c>( + &'c mut self, + ) -> Box + 'c> { + (**self).personal_access_token() + } + + fn personal_session<'c>( + &'c mut self, + ) -> Box + 'c> { + (**self).personal_session() + } + fn queue_worker<'c>( &'c mut self, ) -> Box + 'c> { From 0619f83cc872f9b8a49a35912c11ad3a57c02273 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 3 Oct 2025 13:39:54 +0100 Subject: [PATCH 05/10] Add storage tests (with TODOs for unsupported functionality) --- crates/storage-pg/src/personal/mod.rs | 309 ++++++++++++++++++++++++++ 1 file changed, 309 insertions(+) diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index e60daccc4..10956de3d 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -11,3 +11,312 @@ mod session; pub use access_token::PgPersonalAccessTokenRepository; pub use session::PgPersonalSessionRepository; + +#[cfg(test)] +mod tests { + use chrono::Duration; + use mas_data_model::{Clock, Device, clock::MockClock}; + use mas_storage::{ + RepositoryAccess, + personal::{PersonalAccessTokenRepository, PersonalSessionRepository}, + user::UserRepository, + }; + use oauth2_types::scope::{OPENID, PROFILE, Scope}; + use rand::SeedableRng; + use rand_chacha::ChaChaRng; + use sqlx::PgPool; + + use crate::PgRepository; + + #[sqlx::test(migrator = "crate::MIGRATOR")] + async fn test_session_repository(pool: PgPool) { + let mut rng = ChaChaRng::seed_from_u64(42); + let clock = MockClock::default(); + let mut repo = PgRepository::from_pool(&pool).await.unwrap(); + + // Create a user + let admin_user = repo + .user() + .add(&mut rng, &clock, "john".to_owned()) + .await + .unwrap(); + let bot_user = repo + .user() + .add(&mut rng, &clock, "marvin".to_owned()) + .await + .unwrap(); + + // TODO: Session filters are not implemented for personal sessions yet + // let all = PersonalSessionFilter::new().for_user(&user); + // let active = all.active_only(); + // let finished = all.finished_only(); + // let pagination = Pagination::first(10); + + // assert_eq!(repo.personal_session().count(all).await.unwrap(), 0); + // assert_eq!(repo.personal_session().count(active).await.unwrap(), 0); + // assert_eq!(repo.personal_session().count(finished).await.unwrap(), 0); + + // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); + // assert!(full_list.edges.is_empty()); + // let active_list = repo + // .compat_session() + // .list(active, pagination) + // .await + // .unwrap(); + // assert!(active_list.edges.is_empty()); + // let finished_list = repo + // .compat_session() + // .list(finished, pagination) + // .await + // .unwrap(); + // assert!(finished_list.edges.is_empty()); + + // Start a personal session for that user + let device = Device::generate(&mut rng); + let scope: Scope = [OPENID, PROFILE] + .into_iter() + .chain(device.to_scope_token().unwrap()) + .collect(); + let session = repo + .personal_session() + .add( + &mut rng, + &clock, + &admin_user, + &bot_user, + "Test Personal Session".to_owned(), + scope.clone(), + ) + .await + .unwrap(); + assert_eq!(session.owner_user_id, admin_user.id); + assert_eq!(session.actor_user_id, bot_user.id); + assert!(session.is_valid()); + assert!(!session.is_revoked()); + assert_eq!(session.scope, scope); + + // TODO + // assert_eq!(repo.compat_session().count(all).await.unwrap(), 1); + // assert_eq!(repo.compat_session().count(active).await.unwrap(), 1); + // assert_eq!(repo.compat_session().count(finished).await.unwrap(), 0); + + // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); + // assert_eq!(full_list.edges.len(), 1); + // assert_eq!(full_list.edges[0].0.id, session.id); + // let active_list = repo + // .compat_session() + // .list(active, pagination) + // .await + // .unwrap(); + // assert_eq!(active_list.edges.len(), 1); + // assert_eq!(active_list.edges[0].0.id, session.id); + // let finished_list = repo + // .compat_session() + // .list(finished, pagination) + // .await + // .unwrap(); + // assert!(finished_list.edges.is_empty()); + + // Lookup the session and check it didn't change + let session_lookup = repo + .personal_session() + .lookup(session.id) + .await + .unwrap() + .expect("personal session not found"); + assert_eq!(session_lookup.id, session.id); + assert_eq!(session_lookup.owner_user_id, admin_user.id); + assert_eq!(session_lookup.actor_user_id, bot_user.id); + assert!(session_lookup.is_valid()); + assert!(!session_lookup.is_revoked()); + assert_eq!(session_lookup.scope, scope); + + // TODO + // assert_eq!(list.edges.len(), 1); + // let session_lookup = &list.edges[0].0; + // assert_eq!(session_lookup.id, session.id); + // assert_eq!(session_lookup.user_id, user.id); + // assert_eq!(session.device.as_ref().unwrap().as_str(), device_str); + // assert!(session_lookup.is_valid()); + // assert!(!session_lookup.is_finished()); + + // Revoke the session + let session = repo + .personal_session() + .revoke(&clock, session) + .await + .unwrap(); + assert!(!session.is_valid()); + assert!(session.is_revoked()); + + // TODO + // assert_eq!(repo.compat_session().count(all).await.unwrap(), 1); + // assert_eq!(repo.compat_session().count(active).await.unwrap(), 0); + // assert_eq!(repo.compat_session().count(finished).await.unwrap(), 1); + + // TODO + // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); + // assert_eq!(full_list.edges.len(), 1); + // assert_eq!(full_list.edges[0].0.id, session.id); + // let active_list = repo + // .compat_session() + // .list(active, pagination) + // .await + // .unwrap(); + // assert!(active_list.edges.is_empty()); + // let finished_list = repo + // .compat_session() + // .list(finished, pagination) + // .await + // .unwrap(); + // assert_eq!(finished_list.edges.len(), 1); + // assert_eq!(finished_list.edges[0].0.id, session.id); + // assert!(session.is_revoked()); + + // Reload the session and check again + let session_lookup = repo + .personal_session() + .lookup(session.id) + .await + .unwrap() + .expect("personal session not found"); + assert!(!session_lookup.is_valid()); + assert!(session_lookup.is_revoked()); + } + + #[sqlx::test(migrator = "crate::MIGRATOR")] + async fn test_access_token_repository(pool: PgPool) { + const FIRST_TOKEN: &str = "first_access_token"; + const SECOND_TOKEN: &str = "second_access_token"; + let mut rng = ChaChaRng::seed_from_u64(42); + let clock = MockClock::default(); + let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed(); + + // Create a user + let admin_user = repo + .user() + .add(&mut rng, &clock, "john".to_owned()) + .await + .unwrap(); + let bot_user = repo + .user() + .add(&mut rng, &clock, "marvin".to_owned()) + .await + .unwrap(); + + // Start a personal session for that user + let device = Device::generate(&mut rng); + let scope: Scope = [OPENID, PROFILE] + .into_iter() + .chain(device.to_scope_token().unwrap()) + .collect(); + let session = repo + .personal_session() + .add( + &mut rng, + &clock, + &admin_user, + &bot_user, + "Test Personal Session".to_owned(), + scope, + ) + .await + .unwrap(); + + // Add an access token to that session + let token = repo + .personal_access_token() + .add( + &mut rng, + &clock, + &session, + FIRST_TOKEN.to_owned(), + Some(Duration::try_minutes(1).unwrap()), + ) + .await + .unwrap(); + assert_eq!(token.session_id, session.id); + + // Commit the txn and grab a new transaction, to test a conflict + repo.save().await.unwrap(); + + { + let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed(); + // Adding the same token a second time should conflict + assert!( + repo.personal_access_token() + .add( + &mut rng, + &clock, + &session, + FIRST_TOKEN.to_owned(), + Some(Duration::try_minutes(1).unwrap()), + ) + .await + .is_err() + ); + repo.cancel().await.unwrap(); + } + + // Grab a new repo + let mut repo = PgRepository::from_pool(&pool).await.unwrap().boxed(); + + // Looking up via ID works + let token_lookup = repo + .personal_access_token() + .lookup(token.id) + .await + .unwrap() + .expect("personal access token not found"); + assert_eq!(token.id, token_lookup.id); + assert_eq!(token_lookup.session_id, session.id); + + // Looking up via the token value works + let token_lookup = repo + .personal_access_token() + .find_by_token(FIRST_TOKEN) + .await + .unwrap() + .expect("personal access token not found"); + assert_eq!(token.id, token_lookup.id); + assert_eq!(token_lookup.session_id, session.id); + + // Token is currently valid + assert!(token.is_valid(clock.now())); + + clock.advance(Duration::try_minutes(1).unwrap()); + // Token should have expired + assert!(!token.is_valid(clock.now())); + + // Add a second access token, this time without expiration + let token = repo + .personal_access_token() + .add(&mut rng, &clock, &session, SECOND_TOKEN.to_owned(), None) + .await + .unwrap(); + assert_eq!(token.session_id, session.id); + + // Token is currently valid + assert!(token.is_valid(clock.now())); + + // Revoke it + let _token = repo + .personal_access_token() + .revoke(&clock, token) + .await + .unwrap(); + + // Reload it + let token = repo + .personal_access_token() + .find_by_token(SECOND_TOKEN) + .await + .unwrap() + .expect("personal access token not found"); + + // Token is not valid anymore + assert!(!token.is_valid(clock.now())); + + repo.save().await.unwrap(); + } +} From b6d8cdbfee053ec431d2e3fc87fdd2866a070373 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 6 Oct 2025 13:31:43 +0100 Subject: [PATCH 06/10] Add filters for personal sessions --- crates/storage-pg/src/iden.rs | 15 ++ crates/storage-pg/src/personal/session.rs | 170 +++++++++++++++++++- crates/storage/src/personal/mod.rs | 5 +- crates/storage/src/personal/session.rs | 181 +++++++++++++++++++++- 4 files changed, 366 insertions(+), 5 deletions(-) diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index a861f59c7..c2198434e 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -108,6 +108,21 @@ pub enum OAuth2Clients { IsStatic, } +#[derive(sea_query::Iden)] +#[iden = "personal_sessions"] +pub enum PersonalSessions { + Table, + PersonalSessionId, + OwnerUserId, + ActorUserId, + HumanName, + ScopeList, + CreatedAt, + RevokedAt, + LastActiveAt, + LastActiveIp, +} + #[derive(sea_query::Iden)] #[iden = "upstream_oauth_providers"] pub enum UpstreamOAuthProviders { diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 514293ba9..40ed4f312 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -11,14 +11,29 @@ use mas_data_model::{ Clock, User, personal::session::{PersonalSession, SessionState}, }; -use mas_storage::personal::PersonalSessionRepository; +use mas_storage::{ + Page, Pagination, + personal::{PersonalSessionFilter, PersonalSessionRepository, PersonalSessionState}, +}; use oauth2_types::scope::Scope; use rand::RngCore; +use sea_query::{ + Condition, Expr, PgFunc, PostgresQueryBuilder, Query, SimpleExpr, enum_def, + extension::postgres::PgExpr as _, +}; +use sea_query_binder::SqlxBinder as _; use sqlx::PgConnection; use ulid::Ulid; use uuid::Uuid; -use crate::{DatabaseError, errors::DatabaseInconsistencyError, tracing::ExecuteExt as _}; +use crate::{ + DatabaseError, + errors::DatabaseInconsistencyError, + filter::{Filter, StatementExt as _}, + iden::PersonalSessions, + pagination::QueryBuilderExt as _, + tracing::ExecuteExt as _, +}; /// An implementation of [`PersonalSessionRepository`] for a PostgreSQL /// connection @@ -27,13 +42,15 @@ pub struct PgPersonalSessionRepository<'c> { } impl<'c> PgPersonalSessionRepository<'c> { - /// Create a new [`PgOAuth2SessionRepository`] from an active PostgreSQL + /// Create a new [`PgPersonalSessionRepository`] from an active PostgreSQL /// connection pub fn new(conn: &'c mut PgConnection) -> Self { Self { conn } } } +#[derive(sqlx::FromRow)] +#[enum_def] struct PersonalSessionLookup { personal_session_id: Uuid, owner_user_id: Uuid, @@ -215,4 +232,151 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .finish(finished_at) .map_err(DatabaseError::to_invalid_operation) } + + #[tracing::instrument( + name = "db.personal_session.list", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error> { + let (sql, arguments) = Query::select() + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)), + PersonalSessionLookupIden::PersonalSessionId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)), + PersonalSessionLookupIden::OwnerUserId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)), + PersonalSessionLookupIden::ActorUserId, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::HumanName)), + PersonalSessionLookupIden::HumanName, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)), + PersonalSessionLookupIden::ScopeList, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::CreatedAt)), + PersonalSessionLookupIden::CreatedAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)), + PersonalSessionLookupIden::RevokedAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)), + PersonalSessionLookupIden::LastActiveAt, + ) + .expr_as( + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveIp)), + PersonalSessionLookupIden::LastActiveIp, + ) + .from(PersonalSessions::Table) + .apply_filter(filter) + .generate_pagination( + (PersonalSessions::Table, PersonalSessions::PersonalSessionId), + pagination, + ) + .build_sqlx(PostgresQueryBuilder); + + let edges: Vec = sqlx::query_as_with(&sql, arguments) + .traced() + .fetch_all(&mut *self.conn) + .await?; + + let page = pagination + .process(edges) + .try_map(PersonalSession::try_from)?; + + Ok(page) + } + + #[tracing::instrument( + name = "db.personal_session.count", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result { + let (sql, arguments) = Query::select() + .expr(Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)).count()) + .from(PersonalSessions::Table) + .apply_filter(filter) + .build_sqlx(PostgresQueryBuilder); + + let count: i64 = sqlx::query_scalar_with(&sql, arguments) + .traced() + .fetch_one(&mut *self.conn) + .await?; + + count + .try_into() + .map_err(DatabaseError::to_invalid_operation) + } +} + +impl Filter for PersonalSessionFilter<'_> { + fn generate_condition(&self, _has_joins: bool) -> impl sea_query::IntoCondition { + sea_query::Condition::all() + .add_option(self.owner_user().map(|user| { + Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)) + .eq(Uuid::from(user.id)) + })) + .add_option(self.actor_user().map(|user| { + Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)) + .eq(Uuid::from(user.id)) + })) + .add_option(self.device().map(|device| -> SimpleExpr { + if let Ok([stable_scope_token, unstable_scope_token]) = device.to_scope_token() { + Condition::any() + .add( + Expr::val(stable_scope_token.to_string()).eq(PgFunc::any(Expr::col(( + PersonalSessions::Table, + PersonalSessions::ScopeList, + )))), + ) + .add(Expr::val(unstable_scope_token.to_string()).eq(PgFunc::any( + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)), + ))) + .into() + } else { + // If the device ID can't be encoded as a scope token, match no rows + Expr::val(false).into() + } + })) + .add_option(self.state().map(|state| match state { + PersonalSessionState::Active => { + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)).is_null() + } + PersonalSessionState::Revoked => { + Expr::col((PersonalSessions::Table, PersonalSessions::RevokedAt)).is_not_null() + } + })) + .add_option(self.scope().map(|scope| { + let scope: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); + Expr::col((PersonalSessions::Table, PersonalSessions::ScopeList)).contains(scope) + })) + .add_option(self.last_active_before().map(|last_active_before| { + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)) + .lt(last_active_before) + })) + .add_option(self.last_active_after().map(|last_active_after| { + Expr::col((PersonalSessions::Table, PersonalSessions::LastActiveAt)) + .gt(last_active_after) + })) + } } diff --git a/crates/storage/src/personal/mod.rs b/crates/storage/src/personal/mod.rs index 28a33e1a0..3a9dfcd65 100644 --- a/crates/storage/src/personal/mod.rs +++ b/crates/storage/src/personal/mod.rs @@ -10,4 +10,7 @@ mod access_token; mod session; -pub use self::{access_token::PersonalAccessTokenRepository, session::PersonalSessionRepository}; +pub use self::{ + access_token::PersonalAccessTokenRepository, + session::{PersonalSessionFilter, PersonalSessionRepository, PersonalSessionState}, +}; diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 7d0a76a37..aedb939e0 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -10,7 +10,7 @@ use oauth2_types::scope::Scope; use rand_core::RngCore; use ulid::Ulid; -use crate::repository_impl; +use crate::{Page, Pagination, repository_impl}; /// A [`PersonalSessionRepository`] helps interacting with /// [`PersonalSession`] saved in the storage backend @@ -78,6 +78,34 @@ pub trait PersonalSessionRepository: Send + Sync { clock: &dyn Clock, personal_session: PersonalSession, ) -> Result; + + /// List [`PersonalSession`]s matching the given filter and pagination + /// parameters + /// + /// # Parameters + /// + /// * `filter`: The filter parameters + /// * `pagination`: The pagination parameters + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error>; + + /// Count [`PersonalSession`]s matching the given filter + /// + /// # Parameters + /// + /// * `filter`: The filter parameters + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result; } repository_impl!(PersonalSessionRepository: @@ -98,4 +126,155 @@ repository_impl!(PersonalSessionRepository: clock: &dyn Clock, personal_session: PersonalSession, ) -> Result; + + async fn list( + &mut self, + filter: PersonalSessionFilter<'_>, + pagination: Pagination, + ) -> Result, Self::Error>; + + async fn count(&mut self, filter: PersonalSessionFilter<'_>) -> Result; ); + +/// Filter parameters for listing personal sessions +#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +pub struct PersonalSessionFilter<'a> { + owner_user: Option<&'a User>, + actor_user: Option<&'a User>, + device: Option<&'a Device>, + state: Option, + scope: Option<&'a Scope>, + last_active_before: Option>, + last_active_after: Option>, +} + +/// Filter for what state a personal session is in. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PersonalSessionState { + /// The personal session is active, which means it either + /// has active access tokens or can have new access tokens generated. + Active, + /// The personal session is revoked, which means no more access tokens + /// can be generated and none are active. + Revoked, +} + +impl<'a> PersonalSessionFilter<'a> { + /// Create a new [`PersonalSessionFilter`] with default values + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// List sessions owned by a specific user + #[must_use] + pub fn for_owner_user(mut self, user: &'a User) -> Self { + self.owner_user = Some(user); + self + } + + /// Get the owner user filter + /// + /// Returns [`None`] if no user filter was set + #[must_use] + pub fn owner_user(&self) -> Option<&'a User> { + self.owner_user + } + + /// List sessions acting as a specific user + #[must_use] + pub fn for_actor_user(mut self, user: &'a User) -> Self { + self.actor_user = Some(user); + self + } + + /// Get the actor user filter + /// + /// Returns [`None`] if no user filter was set + #[must_use] + pub fn actor_user(&self) -> Option<&'a User> { + self.actor_user + } + + /// Only return sessions with a last active time before the given time + #[must_use] + pub fn with_last_active_before(mut self, last_active_before: DateTime) -> Self { + self.last_active_before = Some(last_active_before); + self + } + + /// Only return sessions with a last active time after the given time + #[must_use] + pub fn with_last_active_after(mut self, last_active_after: DateTime) -> Self { + self.last_active_after = Some(last_active_after); + self + } + + /// Get the last active before filter + /// + /// Returns [`None`] if no client filter was set + #[must_use] + pub fn last_active_before(&self) -> Option> { + self.last_active_before + } + + /// Get the last active after filter + /// + /// Returns [`None`] if no client filter was set + #[must_use] + pub fn last_active_after(&self) -> Option> { + self.last_active_after + } + + /// Only return active sessions + #[must_use] + pub fn active_only(mut self) -> Self { + self.state = Some(PersonalSessionState::Active); + self + } + + /// Only return finished sessions + #[must_use] + pub fn finished_only(mut self) -> Self { + self.state = Some(PersonalSessionState::Revoked); + self + } + + /// Get the state filter + /// + /// Returns [`None`] if no state filter was set + #[must_use] + pub fn state(&self) -> Option { + self.state + } + + /// Only return sessions with the given scope + #[must_use] + pub fn with_scope(mut self, scope: &'a Scope) -> Self { + self.scope = Some(scope); + self + } + + /// Get the scope filter + /// + /// Returns [`None`] if no scope filter was set + #[must_use] + pub fn scope(&self) -> Option<&'a Scope> { + self.scope + } + + /// Only return sessions that have the given device in their scope + #[must_use] + pub fn for_device(mut self, device: &'a Device) -> Self { + self.device = Some(device); + self + } + + /// Get the device filter + /// + /// Returns [`None`] if no device filter was set + #[must_use] + pub fn device(&self) -> Option<&'a Device> { + self.device + } +} From 6aa483a1f8dcbc197519c0dfef8c11b04fc333e6 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 6 Oct 2025 13:31:49 +0100 Subject: [PATCH 07/10] Sync devices from personal sessions --- crates/tasks/src/matrix.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 2acd9372e..8a8b90c4e 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -14,6 +14,7 @@ use mas_storage::{ Pagination, RepositoryAccess, compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, + personal::PersonalSessionFilter, queue::{ DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, QueueJobRepositoryExt as _, SyncDevicesJob, @@ -243,6 +244,35 @@ impl RunnableJob for SyncDevicesJob { } } + // Cycle through all the personal sessions of the user and get the devices + let mut cursor = Pagination::first(5000); + loop { + let page = repo + .personal_session() + .list( + PersonalSessionFilter::new() + .for_actor_user(&user) + .active_only(), + cursor, + ) + .await + .map_err(JobError::retry)?; + + for edge in page.edges { + for scope in &*edge.node.scope { + if let Some(device) = Device::from_scope_token(scope) { + devices.insert(device.as_str().to_owned()); + } + } + + cursor = cursor.after(edge.cursor); + } + + if !page.has_next_page { + break; + } + } + matrix .sync_devices(&user.username, devices) .await From 9f7806163b8cff8b65a7b2641dc75212695e67ba Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 6 Oct 2025 17:19:57 +0100 Subject: [PATCH 08/10] Enable session filter tests --- crates/storage-pg/src/personal/mod.rs | 152 ++++++++++------------ crates/storage-pg/src/personal/session.rs | 11 +- 2 files changed, 80 insertions(+), 83 deletions(-) diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index 10956de3d..3b19bcfe2 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -17,8 +17,10 @@ mod tests { use chrono::Duration; use mas_data_model::{Clock, Device, clock::MockClock}; use mas_storage::{ - RepositoryAccess, - personal::{PersonalAccessTokenRepository, PersonalSessionRepository}, + Pagination, RepositoryAccess, + personal::{ + PersonalAccessTokenRepository, PersonalSessionFilter, PersonalSessionRepository, + }, user::UserRepository, }; use oauth2_types::scope::{OPENID, PROFILE, Scope}; @@ -46,30 +48,30 @@ mod tests { .await .unwrap(); - // TODO: Session filters are not implemented for personal sessions yet - // let all = PersonalSessionFilter::new().for_user(&user); - // let active = all.active_only(); - // let finished = all.finished_only(); - // let pagination = Pagination::first(10); - - // assert_eq!(repo.personal_session().count(all).await.unwrap(), 0); - // assert_eq!(repo.personal_session().count(active).await.unwrap(), 0); - // assert_eq!(repo.personal_session().count(finished).await.unwrap(), 0); - - // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); - // assert!(full_list.edges.is_empty()); - // let active_list = repo - // .compat_session() - // .list(active, pagination) - // .await - // .unwrap(); - // assert!(active_list.edges.is_empty()); - // let finished_list = repo - // .compat_session() - // .list(finished, pagination) - // .await - // .unwrap(); - // assert!(finished_list.edges.is_empty()); + let all = PersonalSessionFilter::new().for_actor_user(&bot_user); + let active = all.active_only(); + let finished = all.finished_only(); + let pagination = Pagination::first(10); + + assert_eq!(repo.personal_session().count(all).await.unwrap(), 0); + assert_eq!(repo.personal_session().count(active).await.unwrap(), 0); + assert_eq!(repo.personal_session().count(finished).await.unwrap(), 0); + + // We start off with no sessions + let full_list = repo.personal_session().list(all, pagination).await.unwrap(); + assert!(full_list.edges.is_empty()); + let active_list = repo + .personal_session() + .list(active, pagination) + .await + .unwrap(); + assert!(active_list.edges.is_empty()); + let finished_list = repo + .personal_session() + .list(finished, pagination) + .await + .unwrap(); + assert!(finished_list.edges.is_empty()); // Start a personal session for that user let device = Device::generate(&mut rng); @@ -95,27 +97,28 @@ mod tests { assert!(!session.is_revoked()); assert_eq!(session.scope, scope); - // TODO - // assert_eq!(repo.compat_session().count(all).await.unwrap(), 1); - // assert_eq!(repo.compat_session().count(active).await.unwrap(), 1); - // assert_eq!(repo.compat_session().count(finished).await.unwrap(), 0); - - // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); - // assert_eq!(full_list.edges.len(), 1); - // assert_eq!(full_list.edges[0].0.id, session.id); - // let active_list = repo - // .compat_session() - // .list(active, pagination) - // .await - // .unwrap(); - // assert_eq!(active_list.edges.len(), 1); - // assert_eq!(active_list.edges[0].0.id, session.id); - // let finished_list = repo - // .compat_session() - // .list(finished, pagination) - // .await - // .unwrap(); - // assert!(finished_list.edges.is_empty()); + assert_eq!(repo.personal_session().count(all).await.unwrap(), 1); + assert_eq!(repo.personal_session().count(active).await.unwrap(), 1); + assert_eq!(repo.personal_session().count(finished).await.unwrap(), 0); + + let full_list = repo.personal_session().list(all, pagination).await.unwrap(); + assert_eq!(full_list.edges.len(), 1); + assert_eq!(full_list.edges[0].node.id, session.id); + assert!(full_list.edges[0].node.is_valid()); + let active_list = repo + .personal_session() + .list(active, pagination) + .await + .unwrap(); + assert_eq!(active_list.edges.len(), 1); + assert_eq!(active_list.edges[0].node.id, session.id); + assert!(active_list.edges[0].node.is_valid()); + let finished_list = repo + .personal_session() + .list(finished, pagination) + .await + .unwrap(); + assert!(finished_list.edges.is_empty()); // Lookup the session and check it didn't change let session_lookup = repo @@ -127,18 +130,9 @@ mod tests { assert_eq!(session_lookup.id, session.id); assert_eq!(session_lookup.owner_user_id, admin_user.id); assert_eq!(session_lookup.actor_user_id, bot_user.id); + assert_eq!(session_lookup.scope, scope); assert!(session_lookup.is_valid()); assert!(!session_lookup.is_revoked()); - assert_eq!(session_lookup.scope, scope); - - // TODO - // assert_eq!(list.edges.len(), 1); - // let session_lookup = &list.edges[0].0; - // assert_eq!(session_lookup.id, session.id); - // assert_eq!(session_lookup.user_id, user.id); - // assert_eq!(session.device.as_ref().unwrap().as_str(), device_str); - // assert!(session_lookup.is_valid()); - // assert!(!session_lookup.is_finished()); // Revoke the session let session = repo @@ -149,29 +143,27 @@ mod tests { assert!(!session.is_valid()); assert!(session.is_revoked()); - // TODO - // assert_eq!(repo.compat_session().count(all).await.unwrap(), 1); - // assert_eq!(repo.compat_session().count(active).await.unwrap(), 0); - // assert_eq!(repo.compat_session().count(finished).await.unwrap(), 1); - - // TODO - // let full_list = repo.compat_session().list(all, pagination).await.unwrap(); - // assert_eq!(full_list.edges.len(), 1); - // assert_eq!(full_list.edges[0].0.id, session.id); - // let active_list = repo - // .compat_session() - // .list(active, pagination) - // .await - // .unwrap(); - // assert!(active_list.edges.is_empty()); - // let finished_list = repo - // .compat_session() - // .list(finished, pagination) - // .await - // .unwrap(); - // assert_eq!(finished_list.edges.len(), 1); - // assert_eq!(finished_list.edges[0].0.id, session.id); - // assert!(session.is_revoked()); + assert_eq!(repo.personal_session().count(all).await.unwrap(), 1); + assert_eq!(repo.personal_session().count(active).await.unwrap(), 0); + assert_eq!(repo.personal_session().count(finished).await.unwrap(), 1); + + let full_list = repo.personal_session().list(all, pagination).await.unwrap(); + assert_eq!(full_list.edges.len(), 1); + assert_eq!(full_list.edges[0].node.id, session.id); + let active_list = repo + .personal_session() + .list(active, pagination) + .await + .unwrap(); + assert!(active_list.edges.is_empty()); + let finished_list = repo + .personal_session() + .list(finished, pagination) + .await + .unwrap(); + assert_eq!(finished_list.edges.len(), 1); + assert_eq!(finished_list.edges[0].node.id, session.id); + assert!(finished_list.edges[0].node.is_revoked()); // Reload the session and check again let session_lookup = repo diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 40ed4f312..3b5b0a601 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -13,6 +13,7 @@ use mas_data_model::{ }; use mas_storage::{ Page, Pagination, + pagination::Node, personal::{PersonalSessionFilter, PersonalSessionRepository, PersonalSessionState}, }; use oauth2_types::scope::Scope; @@ -63,6 +64,12 @@ struct PersonalSessionLookup { last_active_ip: Option, } +impl Node for PersonalSessionLookup { + fn cursor(&self) -> Ulid { + self.personal_session_id.into() + } +} + impl TryFrom for PersonalSession { type Error = DatabaseInconsistencyError; @@ -296,9 +303,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .fetch_all(&mut *self.conn) .await?; - let page = pagination - .process(edges) - .try_map(PersonalSession::try_from)?; + let page = pagination.process(edges).try_map(TryFrom::try_from)?; Ok(page) } From 72d3ea851bd960ccce10f9e6de95ba11cca95583 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 7 Oct 2025 16:02:55 +0100 Subject: [PATCH 09/10] Support OAuth2 clients as owners of personal sessions --- crates/data-model/src/personal/session.rs | 27 ++++++++-- ...550e4e12d1778474aba72762d9aa093d21ee2.json | 20 ++++++++ ...f063537d5a7f13c48d031367c1d8dba2f8af5.json | 19 ------- ...29681d450669563dd1178c492ffce51e5ff2.json} | 24 +++++---- .../20250924132713_personal_access_tokens.sql | 21 ++++++-- crates/storage-pg/src/iden.rs | 2 + crates/storage-pg/src/personal/mod.rs | 15 ++++-- crates/storage-pg/src/personal/session.rs | 50 ++++++++++++++++--- crates/storage/src/personal/session.rs | 25 ++++++++-- 9 files changed, 153 insertions(+), 50 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-109f0c859e123966462f1001aef550e4e12d1778474aba72762d9aa093d21ee2.json delete mode 100644 crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json rename crates/storage-pg/.sqlx/{query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json => query-fd32368fa6cd16a9704cdea54f7729681d450669563dd1178c492ffce51e5ff2.json} (64%) diff --git a/crates/data-model/src/personal/session.rs b/crates/data-model/src/personal/session.rs index f8904f810..ddca07590 100644 --- a/crates/data-model/src/personal/session.rs +++ b/crates/data-model/src/personal/session.rs @@ -10,7 +10,7 @@ use oauth2_types::scope::Scope; use serde::Serialize; use ulid::Ulid; -use crate::InvalidTransitionError; +use crate::{Client, InvalidTransitionError, User}; #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] pub enum SessionState { @@ -74,10 +74,10 @@ impl SessionState { pub struct PersonalSession { pub id: Ulid, pub state: SessionState, - pub owner_user_id: Ulid, + pub owner: PersonalSessionOwner, pub actor_user_id: Ulid, pub human_name: String, - /// The scope for the session, identical to OAuth2 sessions. + /// The scope for the session, identical to OAuth 2 sessions. /// May or may not include a device scope /// (personal sessions can be deviceless). pub scope: Scope, @@ -86,6 +86,27 @@ pub struct PersonalSession { pub last_active_ip: Option, } +#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize)] +pub enum PersonalSessionOwner { + /// The personal session is owned by the user with the given `user_id`. + User(Ulid), + /// The personal session is owned by the OAuth 2 Client with the given + /// `oauth2_client_id`. + OAuth2Client(Ulid), +} + +impl<'a> From<&'a User> for PersonalSessionOwner { + fn from(value: &'a User) -> Self { + PersonalSessionOwner::User(value.id) + } +} + +impl<'a> From<&'a Client> for PersonalSessionOwner { + fn from(value: &'a Client) -> Self { + PersonalSessionOwner::OAuth2Client(value.id) + } +} + impl std::ops::Deref for PersonalSession { type Target = SessionState; diff --git a/crates/storage-pg/.sqlx/query-109f0c859e123966462f1001aef550e4e12d1778474aba72762d9aa093d21ee2.json b/crates/storage-pg/.sqlx/query-109f0c859e123966462f1001aef550e4e12d1778474aba72762d9aa093d21ee2.json new file mode 100644 index 000000000..83400921a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-109f0c859e123966462f1001aef550e4e12d1778474aba72762d9aa093d21ee2.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO personal_sessions\n ( personal_session_id\n , owner_user_id\n , owner_oauth2_client_id\n , actor_user_id\n , human_name\n , scope_list\n , created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Uuid", + "Uuid", + "Text", + "TextArray", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "109f0c859e123966462f1001aef550e4e12d1778474aba72762d9aa093d21ee2" +} diff --git a/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json b/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json deleted file mode 100644 index 9dec975ca..000000000 --- a/crates/storage-pg/.sqlx/query-c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO personal_sessions\n ( personal_session_id\n , owner_user_id\n , actor_user_id\n , human_name\n , scope_list\n , created_at\n )\n VALUES ($1, $2, $3, $4, $5, $6)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Uuid", - "Uuid", - "Text", - "TextArray", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "c55d8dc9c1d1120ebc2c82e3779f063537d5a7f13c48d031367c1d8dba2f8af5" -} diff --git a/crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json b/crates/storage-pg/.sqlx/query-fd32368fa6cd16a9704cdea54f7729681d450669563dd1178c492ffce51e5ff2.json similarity index 64% rename from crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json rename to crates/storage-pg/.sqlx/query-fd32368fa6cd16a9704cdea54f7729681d450669563dd1178c492ffce51e5ff2.json index d7eb2c798..b46904ccb 100644 --- a/crates/storage-pg/.sqlx/query-8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311.json +++ b/crates/storage-pg/.sqlx/query-fd32368fa6cd16a9704cdea54f7729681d450669563dd1178c492ffce51e5ff2.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT personal_session_id\n , owner_user_id\n , actor_user_id\n , scope_list\n , created_at\n , revoked_at\n , human_name\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM personal_sessions\n\n WHERE personal_session_id = $1\n ", + "query": "\n SELECT personal_session_id\n , owner_user_id\n , owner_oauth2_client_id\n , actor_user_id\n , scope_list\n , created_at\n , revoked_at\n , human_name\n , last_active_at\n , last_active_ip as \"last_active_ip: IpAddr\"\n FROM personal_sessions\n\n WHERE personal_session_id = $1\n ", "describe": { "columns": [ { @@ -15,36 +15,41 @@ }, { "ordinal": 2, - "name": "actor_user_id", + "name": "owner_oauth2_client_id", "type_info": "Uuid" }, { "ordinal": 3, + "name": "actor_user_id", + "type_info": "Uuid" + }, + { + "ordinal": 4, "name": "scope_list", "type_info": "TextArray" }, { - "ordinal": 4, + "ordinal": 5, "name": "created_at", "type_info": "Timestamptz" }, { - "ordinal": 5, + "ordinal": 6, "name": "revoked_at", "type_info": "Timestamptz" }, { - "ordinal": 6, + "ordinal": 7, "name": "human_name", "type_info": "Text" }, { - "ordinal": 7, + "ordinal": 8, "name": "last_active_at", "type_info": "Timestamptz" }, { - "ordinal": 8, + "ordinal": 9, "name": "last_active_ip: IpAddr", "type_info": "Inet" } @@ -56,7 +61,8 @@ }, "nullable": [ false, - false, + true, + true, false, false, false, @@ -66,5 +72,5 @@ true ] }, - "hash": "8816802493ba098c0705b8a8fa87a18ff07e0b5cd08cc525ac9d5dcceece7311" + "hash": "fd32368fa6cd16a9704cdea54f7729681d450669563dd1178c492ffce51e5ff2" } diff --git a/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql b/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql index 62df7bc3e..0e113b156 100644 --- a/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql +++ b/crates/storage-pg/migrations/20250924132713_personal_access_tokens.sql @@ -7,7 +7,16 @@ -- themselves, allowing tokens to be regenerated whilst still retaining a persistent identifier for them. CREATE TABLE personal_sessions ( personal_session_id UUID NOT NULL PRIMARY KEY, - owner_user_id UUID NOT NULL REFERENCES users(user_id), + + -- If this session is owned by a user, the ID of the user. + -- Null otherwise. + owner_user_id UUID REFERENCES users(user_id), + + -- If this session is owned by an OAuth 2 Client (via Client Credentials grant), + -- the ID of the owning client. + -- Null otherwise. + owner_oauth2_client_id UUID REFERENCES oauth2_clients(oauth2_client_id), + actor_user_id UUID NOT NULL REFERENCES users(user_id), -- A human-readable label, intended to describe what the session is for. human_name TEXT NOT NULL, @@ -18,13 +27,16 @@ CREATE TABLE personal_sessions ( -- If set, none of the tokens will be valid anymore. revoked_at TIMESTAMP WITH TIME ZONE, last_active_at TIMESTAMP WITH TIME ZONE, - last_active_ip INET + last_active_ip INET, + + -- There must be exactly one owner. + CONSTRAINT personal_sessions_exactly_one_owner CHECK ((owner_user_id IS NULL) <> (owner_oauth2_client_id IS NULL)) ); -- Individual tokens. CREATE TABLE personal_access_tokens ( - -- The family this access token belongs to. personal_access_token_id UUID NOT NULL PRIMARY KEY, + -- The session this access token belongs to. personal_session_id UUID NOT NULL REFERENCES personal_sessions(personal_session_id), -- SHA256 of the access token. -- This is a lightweight measure to stop a database backup (or other @@ -51,5 +63,6 @@ CREATE UNIQUE INDEX ON personal_access_tokens (personal_session_id) WHERE revoke -- Add indices to satisfy foreign key backward checks -- (and likely filter queries) -CREATE INDEX ON personal_sessions (owner_user_id); +CREATE INDEX ON personal_sessions (owner_user_id) WHERE owner_user_id IS NOT NULL; +CREATE INDEX ON personal_sessions (owner_oauth2_client_id) WHERE owner_oauth2_client_id IS NOT NULL; CREATE INDEX ON personal_sessions (actor_user_id); diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index c2198434e..947d1a86f 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -114,6 +114,8 @@ pub enum PersonalSessions { Table, PersonalSessionId, OwnerUserId, + #[iden = "owner_oauth2_client_id"] + OwnerOAuth2ClientId, ActorUserId, HumanName, ScopeList, diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index 3b19bcfe2..cc2e2413f 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -15,7 +15,9 @@ pub use session::PgPersonalSessionRepository; #[cfg(test)] mod tests { use chrono::Duration; - use mas_data_model::{Clock, Device, clock::MockClock}; + use mas_data_model::{ + Clock, Device, clock::MockClock, personal::session::PersonalSessionOwner, + }; use mas_storage::{ Pagination, RepositoryAccess, personal::{ @@ -84,14 +86,14 @@ mod tests { .add( &mut rng, &clock, - &admin_user, + (&admin_user).into(), &bot_user, "Test Personal Session".to_owned(), scope.clone(), ) .await .unwrap(); - assert_eq!(session.owner_user_id, admin_user.id); + assert_eq!(session.owner, PersonalSessionOwner::User(admin_user.id)); assert_eq!(session.actor_user_id, bot_user.id); assert!(session.is_valid()); assert!(!session.is_revoked()); @@ -128,7 +130,10 @@ mod tests { .unwrap() .expect("personal session not found"); assert_eq!(session_lookup.id, session.id); - assert_eq!(session_lookup.owner_user_id, admin_user.id); + assert_eq!( + session_lookup.owner, + PersonalSessionOwner::User(admin_user.id) + ); assert_eq!(session_lookup.actor_user_id, bot_user.id); assert_eq!(session_lookup.scope, scope); assert!(session_lookup.is_valid()); @@ -207,7 +212,7 @@ mod tests { .add( &mut rng, &clock, - &admin_user, + (&admin_user).into(), &bot_user, "Test Personal Session".to_owned(), scope, diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 3b5b0a601..28c725a24 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use mas_data_model::{ Clock, User, - personal::session::{PersonalSession, SessionState}, + personal::session::{PersonalSession, PersonalSessionOwner, SessionState}, }; use mas_storage::{ Page, Pagination, @@ -54,7 +54,8 @@ impl<'c> PgPersonalSessionRepository<'c> { #[enum_def] struct PersonalSessionLookup { personal_session_id: Uuid, - owner_user_id: Uuid, + owner_user_id: Option, + owner_oauth2_client_id: Option, actor_user_id: Uuid, human_name: String, scope_list: Vec, @@ -88,10 +89,23 @@ impl TryFrom for PersonalSession { Some(revoked_at) => SessionState::Revoked { revoked_at }, }; + let owner = match (value.owner_user_id, value.owner_oauth2_client_id) { + (Some(owner_user_id), None) => PersonalSessionOwner::User(Ulid::from(owner_user_id)), + (None, Some(owner_oauth2_client_id)) => { + PersonalSessionOwner::OAuth2Client(Ulid::from(owner_oauth2_client_id)) + } + _ => { + // should be impossible (CHECK constraint in Postgres prevents it) + return Err(DatabaseInconsistencyError::on("personal_sessions") + .column("owner_user_id, owner_oauth2_client_id") + .row(id)); + } + }; + Ok(PersonalSession { id, state, - owner_user_id: Ulid::from(value.owner_user_id), + owner, actor_user_id: Ulid::from(value.actor_user_id), human_name: value.human_name, scope, @@ -121,6 +135,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { r#" SELECT personal_session_id , owner_user_id + , owner_oauth2_client_id , actor_user_id , scope_list , created_at @@ -157,7 +172,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { &mut self, rng: &mut (dyn RngCore + Send), clock: &dyn Clock, - owner_user: &User, + owner: PersonalSessionOwner, actor_user: &User, human_name: String, scope: Scope, @@ -168,20 +183,27 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { let scope_list: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); + let (owner_user_id, owner_oauth2_client_id) = match owner { + PersonalSessionOwner::User(ulid) => (Some(Uuid::from(ulid)), None), + PersonalSessionOwner::OAuth2Client(ulid) => (None, Some(Uuid::from(ulid))), + }; + sqlx::query!( r#" INSERT INTO personal_sessions ( personal_session_id , owner_user_id + , owner_oauth2_client_id , actor_user_id , human_name , scope_list , created_at ) - VALUES ($1, $2, $3, $4, $5, $6) + VALUES ($1, $2, $3, $4, $5, $6, $7) "#, Uuid::from(id), - Uuid::from(owner_user.id), + owner_user_id, + owner_oauth2_client_id, Uuid::from(actor_user.id), &human_name, &scope_list, @@ -194,7 +216,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { Ok(PersonalSession { id, state: SessionState::Valid, - owner_user_id: owner_user.id, + owner, actor_user_id: actor_user.id, human_name, scope, @@ -262,6 +284,13 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)), PersonalSessionLookupIden::OwnerUserId, ) + .expr_as( + Expr::col(( + PersonalSessions::Table, + PersonalSessions::OwnerOAuth2ClientId, + )), + PersonalSessionLookupIden::OwnerOauth2ClientId, + ) .expr_as( Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)), PersonalSessionLookupIden::ActorUserId, @@ -341,6 +370,13 @@ impl Filter for PersonalSessionFilter<'_> { Expr::col((PersonalSessions::Table, PersonalSessions::OwnerUserId)) .eq(Uuid::from(user.id)) })) + .add_option(self.owner_oauth2_client().map(|client| { + Expr::col(( + PersonalSessions::Table, + PersonalSessions::OwnerOAuth2ClientId, + )) + .eq(Uuid::from(client.id)) + })) .add_option(self.actor_user().map(|user| { Expr::col((PersonalSessions::Table, PersonalSessions::ActorUserId)) .eq(Uuid::from(user.id)) diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index aedb939e0..c090efa30 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -5,7 +5,10 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use mas_data_model::{Clock, Device, User, personal::session::PersonalSession}; +use mas_data_model::{ + Client, Clock, Device, User, + personal::session::{PersonalSession, PersonalSessionOwner}, +}; use oauth2_types::scope::Scope; use rand_core::RngCore; use ulid::Ulid; @@ -55,7 +58,7 @@ pub trait PersonalSessionRepository: Send + Sync { &mut self, rng: &mut (dyn RngCore + Send), clock: &dyn Clock, - owner_user: &User, + owner: PersonalSessionOwner, actor_user: &User, human_name: String, scope: Scope, @@ -115,7 +118,7 @@ repository_impl!(PersonalSessionRepository: &mut self, rng: &mut (dyn RngCore + Send), clock: &dyn Clock, - owner_user: &User, + owner: PersonalSessionOwner, actor_user: &User, human_name: String, scope: Scope, @@ -140,6 +143,7 @@ repository_impl!(PersonalSessionRepository: #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub struct PersonalSessionFilter<'a> { owner_user: Option<&'a User>, + owner_oauth2_client: Option<&'a Client>, actor_user: Option<&'a User>, device: Option<&'a Device>, state: Option, @@ -173,6 +177,21 @@ impl<'a> PersonalSessionFilter<'a> { self } + /// Get the owner user filter + /// + /// Returns [`None`] if no user filter was set + #[must_use] + pub fn owner_oauth2_client(&self) -> Option<&'a Client> { + self.owner_oauth2_client + } + + /// List sessions owned by a specific user + #[must_use] + pub fn for_owner_oauth2_client(mut self, client: &'a Client) -> Self { + self.owner_oauth2_client = Some(client); + self + } + /// Get the owner user filter /// /// Returns [`None`] if no user filter was set From 277e8e84b0194feb32047dec56d345b47a537075 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 9 Oct 2025 13:00:19 +0100 Subject: [PATCH 10/10] Take access_token by ref in `add` --- crates/storage-pg/src/personal/access_token.rs | 2 +- crates/storage-pg/src/personal/mod.rs | 6 +++--- crates/storage/src/personal/access_token.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/storage-pg/src/personal/access_token.rs b/crates/storage-pg/src/personal/access_token.rs index 832e867f5..8c984385a 100644 --- a/crates/storage-pg/src/personal/access_token.rs +++ b/crates/storage-pg/src/personal/access_token.rs @@ -143,7 +143,7 @@ impl PersonalAccessTokenRepository for PgPersonalAccessTokenRepository<'_> { rng: &mut (dyn RngCore + Send), clock: &dyn Clock, session: &PersonalSession, - access_token: String, + access_token: &str, expires_after: Option, ) -> Result { let created_at = clock.now(); diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index cc2e2413f..aaf8fcd61 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -227,7 +227,7 @@ mod tests { &mut rng, &clock, &session, - FIRST_TOKEN.to_owned(), + FIRST_TOKEN, Some(Duration::try_minutes(1).unwrap()), ) .await @@ -246,7 +246,7 @@ mod tests { &mut rng, &clock, &session, - FIRST_TOKEN.to_owned(), + FIRST_TOKEN, Some(Duration::try_minutes(1).unwrap()), ) .await @@ -288,7 +288,7 @@ mod tests { // Add a second access token, this time without expiration let token = repo .personal_access_token() - .add(&mut rng, &clock, &session, SECOND_TOKEN.to_owned(), None) + .add(&mut rng, &clock, &session, SECOND_TOKEN, None) .await .unwrap(); assert_eq!(token.session_id, session.id); diff --git a/crates/storage/src/personal/access_token.rs b/crates/storage/src/personal/access_token.rs index 4f06bbf34..8fdb52ec1 100644 --- a/crates/storage/src/personal/access_token.rs +++ b/crates/storage/src/personal/access_token.rs @@ -71,7 +71,7 @@ pub trait PersonalAccessTokenRepository: Send + Sync { rng: &mut (dyn RngCore + Send), clock: &dyn Clock, session: &PersonalSession, - access_token: String, + access_token: &str, expires_after: Option, ) -> Result; @@ -107,7 +107,7 @@ repository_impl!(PersonalAccessTokenRepository: rng: &mut (dyn RngCore + Send), clock: &dyn Clock, session: &PersonalSession, - access_token: String, + access_token: &str, expires_after: Option, ) -> Result;