diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index e56fd087e..c0f4f5108 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -166,6 +166,7 @@ impl Options { &mailer, homeserver_connection.clone(), url_builder.clone(), + &site_config, shutdown.soft_shutdown_token(), shutdown.task_tracker(), ) diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 8d2b4cd33..187d6b7cb 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -73,6 +73,7 @@ impl Options { &mailer, conn, url_builder, + &site_config, shutdown.soft_shutdown_token(), shutdown.task_tracker(), ) diff --git a/crates/cli/src/util.rs b/crates/cli/src/util.rs index 02a03b0dc..b5030eb2f 100644 --- a/crates/cli/src/util.rs +++ b/crates/cli/src/util.rs @@ -12,7 +12,7 @@ use mas_config::{ EmailTransportKind, ExperimentalConfig, MatrixConfig, PasswordsConfig, PolicyConfig, TemplatesConfig, }; -use mas_data_model::SiteConfig; +use mas_data_model::{SessionExpirationConfig, SiteConfig}; use mas_email::{MailTransport, Mailer}; use mas_handlers::passwords::PasswordManager; use mas_policy::PolicyFactory; @@ -180,6 +180,15 @@ pub fn site_config_from_config( captcha_config: &CaptchaConfig, ) -> Result { let captcha = captcha_config_from_config(captcha_config)?; + let session_expiration = experimental_config + .inactive_session_expiration + .as_ref() + .map(|c| SessionExpirationConfig { + oauth_session_inactivity_ttl: c.expire_oauth_sessions.then_some(c.ttl), + compat_session_inactivity_ttl: c.expire_compat_sessions.then_some(c.ttl), + user_session_inactivity_ttl: c.expire_user_sessions.then_some(c.ttl), + }); + Ok(SiteConfig { access_token_ttl: experimental_config.access_token_ttl, compat_token_ttl: experimental_config.compat_token_ttl, @@ -198,6 +207,7 @@ pub fn site_config_from_config( && account_config.password_recovery_enabled, captcha, minimum_password_complexity: password_config.minimum_complexity(), + session_expiration, }) } diff --git a/crates/config/src/sections/experimental.rs b/crates/config/src/sections/experimental.rs index 2c86772a3..17ffa6c4d 100644 --- a/crates/config/src/sections/experimental.rs +++ b/crates/config/src/sections/experimental.rs @@ -11,6 +11,10 @@ use serde_with::serde_as; use crate::ConfigurationSection; +fn default_true() -> bool { + true +} + fn default_token_ttl() -> Duration { Duration::microseconds(5 * 60 * 1000 * 1000) } @@ -19,11 +23,32 @@ fn is_default_token_ttl(value: &Duration) -> bool { *value == default_token_ttl() } +/// Configuration options for the inactive session expiration feature +#[serde_as] +#[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] +pub struct InactiveSessionExpirationConfig { + /// Time after which an inactive session is automatically finished + #[schemars(with = "u64", range(min = 600, max = 7_776_000))] + #[serde_as(as = "serde_with::DurationSeconds")] + pub ttl: Duration, + + /// Should compatibility sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_compat_sessions: bool, + + /// Should OAuth 2.0 sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_oauth_sessions: bool, + + /// Should user sessions expire after inactivity + #[serde(default = "default_true")] + pub expire_user_sessions: bool, +} + /// Configuration sections for experimental options /// /// Do not change these options unless you know what you are doing. #[serde_as] -#[allow(clippy::struct_excessive_bools)] #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] pub struct ExperimentalConfig { /// Time-to-live of access tokens in seconds. Defaults to 5 minutes. @@ -44,6 +69,12 @@ pub struct ExperimentalConfig { )] #[serde_as(as = "serde_with::DurationSeconds")] pub compat_token_ttl: Duration, + + /// Experimetal feature to automatically expire inactive sessions + /// + /// Disabled by default + #[serde(skip_serializing_if = "Option::is_none")] + pub inactive_session_expiration: Option, } impl Default for ExperimentalConfig { @@ -51,13 +82,16 @@ impl Default for ExperimentalConfig { Self { access_token_ttl: default_token_ttl(), compat_token_ttl: default_token_ttl(), + inactive_session_expiration: None, } } } impl ExperimentalConfig { pub(crate) fn is_default(&self) -> bool { - is_default_token_ttl(&self.access_token_ttl) && is_default_token_ttl(&self.compat_token_ttl) + is_default_token_ttl(&self.access_token_ttl) + && is_default_token_ttl(&self.compat_token_ttl) + && self.inactive_session_expiration.is_none() } } diff --git a/crates/data-model/src/lib.rs b/crates/data-model/src/lib.rs index 19a81f098..b26f74f1b 100644 --- a/crates/data-model/src/lib.rs +++ b/crates/data-model/src/lib.rs @@ -32,7 +32,7 @@ pub use self::{ AuthorizationCode, AuthorizationGrant, AuthorizationGrantStage, Client, DeviceCodeGrant, DeviceCodeGrantState, InvalidRedirectUriError, JwksOrJwksUri, Pkce, Session, SessionState, }, - site_config::{CaptchaConfig, CaptchaService, SiteConfig}, + site_config::{CaptchaConfig, CaptchaService, SessionExpirationConfig, SiteConfig}, tokens::{ AccessToken, AccessTokenState, RefreshToken, RefreshTokenState, TokenFormatError, TokenType, }, diff --git a/crates/data-model/src/site_config.rs b/crates/data-model/src/site_config.rs index 5aba98a9c..0e09f8a31 100644 --- a/crates/data-model/src/site_config.rs +++ b/crates/data-model/src/site_config.rs @@ -28,6 +28,14 @@ pub struct CaptchaConfig { pub secret_key: String, } +/// Automatic session expiration configuration +#[derive(Debug, Clone)] +pub struct SessionExpirationConfig { + pub user_session_inactivity_ttl: Option, + pub oauth_session_inactivity_ttl: Option, + pub compat_session_inactivity_ttl: Option, +} + /// Random site configuration we want accessible in various places. #[allow(clippy::struct_excessive_bools)] #[derive(Debug, Clone)] @@ -74,4 +82,6 @@ pub struct SiteConfig { /// Minimum password complexity, between 0 and 4. /// This is a score from zxcvbn. pub minimum_password_complexity: u8, + + pub session_expiration: Option, } diff --git a/crates/handlers/src/admin/v1/oauth2_sessions/list.rs b/crates/handlers/src/admin/v1/oauth2_sessions/list.rs index 596361e0d..999584f0d 100644 --- a/crates/handlers/src/admin/v1/oauth2_sessions/list.rs +++ b/crates/handlers/src/admin/v1/oauth2_sessions/list.rs @@ -46,6 +46,22 @@ impl std::fmt::Display for OAuth2SessionStatus { } } +#[derive(Deserialize, JsonSchema, Clone, Copy)] +#[serde(rename_all = "snake_case")] +enum OAuth2ClientKind { + Dynamic, + Static, +} + +impl std::fmt::Display for OAuth2ClientKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Dynamic => write!(f, "dynamic"), + Self::Static => write!(f, "static"), + } + } +} + #[derive(FromRequestParts, Deserialize, JsonSchema, OperationIo)] #[serde(rename = "OAuth2SessionFilter")] #[aide(input_with = "Query")] @@ -61,6 +77,10 @@ pub struct FilterParams { #[schemars(with = "Option")] client: Option, + /// Retrieve the items only for a specific client kind + #[serde(rename = "filter[client-kind]")] + client_kind: Option, + /// Retrieve the items started from the given browser session #[serde(rename = "filter[user-session]")] #[schemars(with = "Option")] @@ -95,6 +115,11 @@ impl std::fmt::Display for FilterParams { sep = '&'; } + if let Some(client_kind) = self.client_kind { + write!(f, "{sep}filter[client-kind]={client_kind}")?; + sep = '&'; + } + if let Some(user_session) = self.user_session { write!(f, "{sep}filter[user-session]={user_session}")?; sep = '&'; @@ -232,6 +257,12 @@ pub async fn handler( None => filter, }; + let filter = match params.client_kind { + Some(OAuth2ClientKind::Dynamic) => filter.only_dynamic_clients(), + Some(OAuth2ClientKind::Static) => filter.only_static_clients(), + None => filter, + }; + let user_session = if let Some(user_session_id) = params.user_session { let user_session = repo .browser_session() diff --git a/crates/handlers/src/test_utils.rs b/crates/handlers/src/test_utils.rs index 4e69ab5df..b295fcaa4 100644 --- a/crates/handlers/src/test_utils.rs +++ b/crates/handlers/src/test_utils.rs @@ -139,6 +139,7 @@ pub fn test_site_config() -> SiteConfig { account_recovery_allowed: true, captcha: None, minimum_password_complexity: 1, + session_expiration: None, } } diff --git a/crates/storage-pg/src/iden.rs b/crates/storage-pg/src/iden.rs index 951764806..841a4648e 100644 --- a/crates/storage-pg/src/iden.rs +++ b/crates/storage-pg/src/iden.rs @@ -83,6 +83,15 @@ pub enum OAuth2Sessions { LastActiveIp, } +#[derive(sea_query::Iden)] +#[iden = "oauth2_clients"] +pub enum OAuth2Clients { + Table, + #[iden = "oauth2_client_id"] + OAuth2ClientId, + IsStatic, +} + #[derive(sea_query::Iden)] #[iden = "upstream_oauth_providers"] pub enum UpstreamOAuthProviders { diff --git a/crates/storage-pg/src/oauth2/mod.rs b/crates/storage-pg/src/oauth2/mod.rs index 54ce32fb7..556d4314d 100644 --- a/crates/storage-pg/src/oauth2/mod.rs +++ b/crates/storage-pg/src/oauth2/mod.rs @@ -525,7 +525,7 @@ mod tests { let pagination = Pagination::first(10); // First, list all the sessions - let filter = OAuth2SessionFilter::new(); + let filter = OAuth2SessionFilter::new().for_any_user(); let list = repo .oauth2_session() .list(filter, pagination) diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index 961094821..a81771e90 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -23,7 +23,7 @@ use uuid::Uuid; use crate::{ filter::{Filter, StatementExt}, - iden::OAuth2Sessions, + iden::{OAuth2Clients, OAuth2Sessions}, pagination::QueryBuilderExt, tracing::ExecuteExt, DatabaseError, DatabaseInconsistencyError, @@ -104,6 +104,26 @@ impl Filter for OAuth2SessionFilter<'_> { Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) .eq(Uuid::from(client.id)) })) + .add_option(self.client_kind().map(|client_kind| { + // This builds either a: + // `WHERE oauth2_client_id = ANY(...)` + // or a `WHERE oauth2_client_id <> ALL(...)` + let static_clients = Query::select() + .expr(Expr::col(( + OAuth2Clients::Table, + OAuth2Clients::OAuth2ClientId, + ))) + .and_where(Expr::col((OAuth2Clients::Table, OAuth2Clients::IsStatic)).into()) + .from(OAuth2Clients::Table) + .take(); + if client_kind.is_static() { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) + .eq(Expr::any(static_clients)) + } else { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)) + .ne(Expr::all(static_clients)) + } + })) .add_option(self.device().map(|device| { Expr::val(device.to_scope_token().to_string()).eq(PgFunc::any(Expr::col(( OAuth2Sessions::Table, @@ -125,6 +145,13 @@ impl Filter for OAuth2SessionFilter<'_> { let scope: Vec = scope.iter().map(|s| s.as_str().to_owned()).collect(); Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)).contains(scope) })) + .add_option(self.any_user().map(|any_user| { + if any_user { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_not_null() + } else { + Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)).is_null() + } + })) .add_option(self.last_active_after().map(|last_active_after| { Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)) .gt(last_active_after) diff --git a/crates/storage/src/oauth2/session.rs b/crates/storage/src/oauth2/session.rs index 231ca3138..71cca8ef3 100644 --- a/crates/storage/src/oauth2/session.rs +++ b/crates/storage/src/oauth2/session.rs @@ -31,13 +31,27 @@ impl OAuth2SessionState { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum ClientKind { + Static, + Dynamic, +} + +impl ClientKind { + pub fn is_static(self) -> bool { + matches!(self, Self::Static) + } +} + /// Filter parameters for listing OAuth 2.0 sessions #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] pub struct OAuth2SessionFilter<'a> { user: Option<&'a User>, + any_user: Option, browser_session: Option<&'a BrowserSession>, device: Option<&'a Device>, client: Option<&'a Client>, + client_kind: Option, state: Option, scope: Option<&'a Scope>, last_active_before: Option>, @@ -66,6 +80,28 @@ impl<'a> OAuth2SessionFilter<'a> { self.user } + /// List sessions which belong to any user + #[must_use] + pub fn for_any_user(mut self) -> Self { + self.any_user = Some(true); + self + } + + /// List sessions which belong to no user + #[must_use] + pub fn for_no_user(mut self) -> Self { + self.any_user = Some(false); + self + } + + /// Get the 'any user' filter + /// + /// Returns [`None`] if no 'any user' filter was set + #[must_use] + pub fn any_user(&self) -> Option { + self.any_user + } + /// List sessions started by a specific browser session #[must_use] pub fn for_browser_session(mut self, browser_session: &'a BrowserSession) -> Self { @@ -96,6 +132,28 @@ impl<'a> OAuth2SessionFilter<'a> { self.client } + /// List only static clients + #[must_use] + pub fn only_static_clients(mut self) -> Self { + self.client_kind = Some(ClientKind::Static); + self + } + + /// List only dynamic clients + #[must_use] + pub fn only_dynamic_clients(mut self) -> Self { + self.client_kind = Some(ClientKind::Dynamic); + self + } + + /// Get the client kind filter + /// + /// Returns [`None`] if no client kind filter was set + #[must_use] + pub fn client_kind(&self) -> Option { + self.client_kind + } + /// Only return sessions with a last active time before the given time #[must_use] pub fn with_last_active_before(mut self, last_active_before: DateTime) -> Self { diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs index 3e3eec5e6..2172edfdc 100644 --- a/crates/storage/src/queue/tasks.rs +++ b/crates/storage/src/queue/tasks.rs @@ -3,11 +3,16 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use mas_data_model::{Device, User, UserEmailAuthentication, UserRecoverySession}; +use chrono::{DateTime, Utc}; +use mas_data_model::{ + BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication, + UserRecoverySession, +}; use serde::{Deserialize, Serialize}; use ulid::Ulid; use super::InsertableJob; +use crate::{Page, Pagination}; /// This is the previous iteration of the email verification job. It has been /// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be @@ -193,6 +198,15 @@ impl SyncDevicesJob { Self { user_id: user.id } } + /// Create a new job to sync the list of devices of a user with the + /// homeserver for the given user ID + /// + /// This is useful to use in cases where the [`User`] object isn't loaded + #[must_use] + pub fn new_for_id(user_id: Ulid) -> Self { + Self { user_id } + } + /// The ID of the user to sync the devices for #[must_use] pub fn user_id(&self) -> Ulid { @@ -310,3 +324,185 @@ pub struct CleanupExpiredTokensJob; impl InsertableJob for CleanupExpiredTokensJob { const QUEUE_NAME: &'static str = "cleanup-expired-tokens"; } + +/// Scheduled job to expire inactive sessions +/// +/// This job will trigger jobs to expire inactive compat, oauth and user +/// sessions. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveSessionsJob; + +impl InsertableJob for ExpireInactiveSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-sessions"; +} + +/// Expire inactive OAuth 2.0 sessions +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveOAuthSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveOAuthSessionsJob { + /// Create a new job to expire inactive OAuth 2.0 sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveOAuthSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions"; +} + +/// Expire inactive compatibility sessions +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExpireInactiveCompatSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveCompatSessionsJob { + /// Create a new job to expire inactive compatibility sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveCompatSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions"; +} + +/// Expire inactive user sessions +#[derive(Debug, Serialize, Deserialize)] +pub struct ExpireInactiveUserSessionsJob { + threshold: DateTime, + after: Option, +} + +impl ExpireInactiveUserSessionsJob { + /// Create a new job to expire inactive user/browser sessions + /// + /// # Parameters + /// + /// * `threshold` - The threshold to expire sessions at + #[must_use] + pub fn new(threshold: DateTime) -> Self { + Self { + threshold, + after: None, + } + } + + /// Get the threshold to expire sessions at + #[must_use] + pub fn threshold(&self) -> DateTime { + self.threshold + } + + /// Get the pagination cursor + #[must_use] + pub fn pagination(&self, batch_size: usize) -> Pagination { + let pagination = Pagination::first(batch_size); + if let Some(after) = self.after { + pagination.after(after) + } else { + pagination + } + } + + /// Get the next job given the page returned by the database + #[must_use] + pub fn next(&self, page: &Page) -> Option { + if !page.has_next_page { + return None; + } + + let last_edge = page.edges.last()?; + Some(Self { + threshold: self.threshold, + after: Some(last_edge.id), + }) + } +} + +impl InsertableJob for ExpireInactiveUserSessionsJob { + const QUEUE_NAME: &'static str = "expire-inactive-user-sessions"; +} diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 4ee635266..d95941b8a 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, LazyLock}; +use mas_data_model::SiteConfig; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; @@ -22,6 +23,7 @@ mod email; mod matrix; mod new_queue; mod recovery; +mod sessions; mod user; static METER: LazyLock = LazyLock::new(|| { @@ -40,6 +42,7 @@ struct State { clock: SystemClock, homeserver: Arc>, url_builder: UrlBuilder, + site_config: SiteConfig, } impl State { @@ -49,6 +52,7 @@ impl State { mailer: Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, + site_config: SiteConfig, ) -> Self { Self { pool, @@ -56,6 +60,7 @@ impl State { clock, homeserver: Arc::new(homeserver), url_builder, + site_config, } } @@ -93,6 +98,10 @@ impl State { pub fn url_builder(&self) -> &UrlBuilder { &self.url_builder } + + pub fn site_config(&self) -> &SiteConfig { + &self.site_config + } } /// Initialise the workers. @@ -105,6 +114,7 @@ pub async fn init( mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, + site_config: &SiteConfig, cancellation_token: CancellationToken, task_tracker: &TaskTracker, ) -> Result<(), QueueRunnerError> { @@ -114,6 +124,7 @@ pub async fn init( mailer.clone(), homeserver, url_builder, + site_config.clone(), ); let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?; @@ -128,10 +139,20 @@ pub async fn init( .register_handler::() .register_handler::() .register_handler::() + .register_handler::() + .register_handler::() + .register_handler::() + .register_handler::() .add_schedule( "cleanup-expired-tokens", "0 0 * * * *".parse()?, mas_storage::queue::CleanupExpiredTokensJob, + ) + .add_schedule( + "expire-inactive-sessions", + // Run this job every 15 minutes + "30 */15 * * * *".parse()?, + mas_storage::queue::ExpireInactiveSessionsJob, ); task_tracker.spawn(worker.run()); diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs new file mode 100644 index 000000000..677e71b3c --- /dev/null +++ b/crates/tasks/src/sessions.rs @@ -0,0 +1,242 @@ +// Copyright 2025 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use std::collections::HashSet; + +use async_trait::async_trait; +use chrono::Duration; +use mas_storage::{ + compat::CompatSessionFilter, + oauth2::OAuth2SessionFilter, + queue::{ + ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, ExpireInactiveSessionsJob, + ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob, + }, + user::BrowserSessionFilter, +}; + +use crate::{ + new_queue::{JobContext, JobError, RunnableJob}, + State, +}; + +#[async_trait] +impl RunnableJob for ExpireInactiveSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let Some(config) = state.site_config().session_expiration.as_ref() else { + // Automatic session expiration is disabled + return Ok(()); + }; + + let clock = state.clock(); + let mut rng = state.rng(); + let now = clock.now(); + let mut repo = state.repository().await.map_err(JobError::retry)?; + + if let Some(ttl) = config.oauth_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveOAuthSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + if let Some(ttl) = config.compat_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveCompatSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + if let Some(ttl) = config.user_session_inactivity_ttl { + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + ExpireInactiveUserSessionsJob::new(now - ttl), + ) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} + +#[async_trait] +impl RunnableJob for ExpireInactiveOAuthSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + let mut users_synced = HashSet::new(); + + // This delay is used to space out the device sync jobs + // We add 10 seconds between each device sync, meaning that it will spread out + // the syncs over ~16 minutes max if we get a full batch of 100 users + let mut delay = Duration::minutes(1); + + let filter = OAuth2SessionFilter::new() + .with_last_active_before(self.threshold()) + .for_any_user() + .only_dynamic_clients() + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .oauth2_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)?; + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + if let Some(user_id) = edge.user_id { + let inserted = users_synced.insert(user_id); + if inserted { + tracing::info!(user.id = %user_id, "Scheduling devices sync for user"); + repo.queue_job() + .schedule_job_later( + &mut rng, + &clock, + SyncDevicesJob::new_for_id(user_id), + clock.now() + delay, + ) + .await + .map_err(JobError::retry)?; + delay += Duration::seconds(10); + } + } + + repo.oauth2_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} + +#[async_trait] +impl RunnableJob for ExpireInactiveCompatSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + let mut users_synced = HashSet::new(); + + // This delay is used to space out the device sync jobs + // We add 10 seconds between each device sync, meaning that it will spread out + // the syncs over ~16 minutes max if we get a full batch of 100 users + let mut delay = Duration::minutes(1); + + let filter = CompatSessionFilter::new() + .with_last_active_before(self.threshold()) + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .compat_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)? + .map(|(c, _)| c); + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + let inserted = users_synced.insert(edge.user_id); + if inserted { + tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user"); + repo.queue_job() + .schedule_job_later( + &mut rng, + &clock, + SyncDevicesJob::new_for_id(edge.user_id), + clock.now() + delay, + ) + .await + .map_err(JobError::retry)?; + delay += Duration::seconds(10); + } + + repo.compat_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} + +#[async_trait] +impl RunnableJob for ExpireInactiveUserSessionsJob { + async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> { + let mut repo = state.repository().await.map_err(JobError::retry)?; + let clock = state.clock(); + let mut rng = state.rng(); + + let filter = BrowserSessionFilter::new() + .with_last_active_before(self.threshold()) + .active_only(); + + let pagination = self.pagination(100); + + let page = repo + .browser_session() + .list(filter, pagination) + .await + .map_err(JobError::retry)?; + + if let Some(job) = self.next(&page) { + tracing::info!("Scheduling job to expire the next batch of inactive sessions"); + repo.queue_job() + .schedule_job(&mut rng, &clock, job) + .await + .map_err(JobError::retry)?; + } + + for edge in page.edges { + repo.browser_session() + .finish(&clock, edge) + .await + .map_err(JobError::retry)?; + } + + repo.save().await.map_err(JobError::retry)?; + + Ok(()) + } +} diff --git a/docs/api/spec.json b/docs/api/spec.json index 258068bb0..100bff25a 100644 --- a/docs/api/spec.json +++ b/docs/api/spec.json @@ -357,6 +357,17 @@ }, "style": "form" }, + { + "in": "query", + "name": "filter[client-kind]", + "description": "Retrieve the items only for a specific client kind", + "schema": { + "description": "Retrieve the items only for a specific client kind", + "$ref": "#/components/schemas/OAuth2ClientKind", + "nullable": true + }, + "style": "form" + }, { "in": "query", "name": "filter[user-session]", @@ -2347,6 +2358,11 @@ "$ref": "#/components/schemas/ULID", "nullable": true }, + "filter[client-kind]": { + "description": "Retrieve the items only for a specific client kind", + "$ref": "#/components/schemas/OAuth2ClientKind", + "nullable": true + }, "filter[user-session]": { "description": "Retrieve the items started from the given browser session", "$ref": "#/components/schemas/ULID", @@ -2367,6 +2383,13 @@ } } }, + "OAuth2ClientKind": { + "type": "string", + "enum": [ + "dynamic", + "static" + ] + }, "OAuth2SessionStatus": { "type": "string", "enum": [ diff --git a/docs/config.schema.json b/docs/config.schema.json index 269ae46c1..04d8a1bba 100644 --- a/docs/config.schema.json +++ b/docs/config.schema.json @@ -2457,6 +2457,45 @@ "format": "uint64", "maximum": 86400.0, "minimum": 60.0 + }, + "inactive_session_expiration": { + "description": "Experimetal feature to automatically expire inactive sessions\n\nDisabled by default", + "allOf": [ + { + "$ref": "#/definitions/InactiveSessionExpirationConfig" + } + ] + } + } + }, + "InactiveSessionExpirationConfig": { + "description": "Configuration options for the inactive session expiration feature", + "type": "object", + "required": [ + "ttl" + ], + "properties": { + "ttl": { + "description": "Time after which an inactive session is automatically finished", + "type": "integer", + "format": "uint64", + "maximum": 7776000.0, + "minimum": 600.0 + }, + "expire_compat_sessions": { + "description": "Should compatibility sessions expire after inactivity", + "default": true, + "type": "boolean" + }, + "expire_oauth_sessions": { + "description": "Should OAuth 2.0 sessions expire after inactivity", + "default": true, + "type": "boolean" + }, + "expire_user_sessions": { + "description": "Should user sessions expire after inactivity", + "default": true, + "type": "boolean" } } } diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 40e7e06f5..2671fc807 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -711,4 +711,19 @@ experimental: # Time-to-live of compatibility access tokens in seconds, when refresh tokens are supported. Defaults to 300, 5 minutes. #compat_token_ttl: 300 + + # Experimental feature to automatically expire inactive sessions + # Disabled by default + #inactive_session_expiration: + # Time after which an inactive session is automatically finished in seconds + #ttl: 32400 + + # Should compatibility sessions expire after inactivity. Defaults to true. + #expire_compat_sessions: true + + # Should OAuth 2.0 sessions expire after inactivity. Defaults to true. + #expire_oauth_sessions: true + + # Should user sessions expire after inactivity. Defaults to true. + #expire_user_sessions: true ```