diff --git a/crates/handlers/src/admin/v1/personal_sessions/add.rs b/crates/handlers/src/admin/v1/personal_sessions/add.rs index 5da3929e5..26e5ca896 100644 --- a/crates/handlers/src/admin/v1/personal_sessions/add.rs +++ b/crates/handlers/src/admin/v1/personal_sessions/add.rs @@ -33,6 +33,9 @@ pub enum RouteError { #[error("User not found")] UserNotFound, + #[error("User is not active")] + UserDeactivated, + #[error("Invalid scope")] InvalidScope, } @@ -47,6 +50,7 @@ impl IntoResponse for RouteError { let status = match self { Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::UserNotFound => StatusCode::NOT_FOUND, + Self::UserDeactivated => StatusCode::GONE, Self::InvalidScope => StatusCode::BAD_REQUEST, }; (status, sentry_event_id, Json(error)).into_response() @@ -109,6 +113,10 @@ pub async fn handler( .await? .ok_or(RouteError::UserNotFound)?; + if !actor_user.is_valid_actor() { + return Err(RouteError::UserDeactivated); + } + let scope: Scope = params.scope.parse().map_err(|_| RouteError::InvalidScope)?; // Create the personal session diff --git a/crates/storage-pg/src/personal/mod.rs b/crates/storage-pg/src/personal/mod.rs index e6e20b698..aa4597fec 100644 --- a/crates/storage-pg/src/personal/mod.rs +++ b/crates/storage-pg/src/personal/mod.rs @@ -181,6 +181,104 @@ mod tests { assert!(session_lookup.is_revoked()); } + #[sqlx::test(migrator = "crate::MIGRATOR")] + async fn test_session_revoke_bulk(pool: PgPool) { + let mut rng = ChaChaRng::seed_from_u64(42); + let clock = MockClock::default(); + let mut repo = PgRepository::from_pool(&pool).await.unwrap(); + + let alice_user = repo + .user() + .add(&mut rng, &clock, "alice".to_owned()) + .await + .unwrap(); + let bob_user = repo + .user() + .add(&mut rng, &clock, "bob".to_owned()) + .await + .unwrap(); + + let session1 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&alice_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, + &clock, + &session1, + "mpt_hiss", + Some(Duration::days(42)), + ) + .await + .unwrap(); + + let session2 = repo + .personal_session() + .add( + &mut rng, + &clock, + (&bob_user).into(), + &bob_user, + "Test Personal Session".to_owned(), + "openid".parse().unwrap(), + ) + .await + .unwrap(); + repo.personal_access_token() + .add( + &mut rng, &clock, &session2, "mpt_meow", // No expiry + None, + ) + .await + .unwrap(); + + // Just one session without a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(false) + ) + .await + .unwrap(), + 1 + ); + + // Just one session with a token expiry time + assert_eq!( + repo.personal_session() + .revoke_bulk( + &clock, + PersonalSessionFilter::new() + .active_only() + .with_expires(true) + ) + .await + .unwrap(), + 1 + ); + + // No active sessions left + assert_eq!( + repo.personal_session() + .revoke_bulk(&clock, PersonalSessionFilter::new().active_only()) + .await + .unwrap(), + 0 + ); + } + #[sqlx::test(migrator = "crate::MIGRATOR")] async fn test_access_token_repository(pool: PgPool) { const FIRST_TOKEN: &str = "first_access_token"; diff --git a/crates/storage-pg/src/personal/session.rs b/crates/storage-pg/src/personal/session.rs index 8b1723767..b4c330ecb 100644 --- a/crates/storage-pg/src/personal/session.rs +++ b/crates/storage-pg/src/personal/session.rs @@ -357,6 +357,73 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .map_err(DatabaseError::to_invalid_operation) } + #[tracing::instrument( + name = "db.personal_session.revoke_bulk", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result { + let revoked_at = clock.now(); + + let (sql, arguments) = Query::update() + .table(PersonalSessions::Table) + .value(PersonalSessions::RevokedAt, revoked_at) + .and_where( + Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) + // Because filters apply to both the session and access token tables, + // Use a subquery to make it possible to use a JOIN + // onto the personal access token table. + .in_subquery( + Query::select() + .expr(Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + ))) + .from(PersonalSessions::Table) + .left_join( + PersonalAccessTokens::Table, + Cond::all() + // Match session ID + .add( + Expr::col(( + PersonalSessions::Table, + PersonalSessions::PersonalSessionId, + )) + .eq(Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::PersonalSessionId, + ))), + ) + // Only choose the active access token for each session + .add( + Expr::col(( + PersonalAccessTokens::Table, + PersonalAccessTokens::RevokedAt, + )) + .is_null(), + ), + ) + .apply_filter(filter) + .take(), + ), + ) + .build_sqlx(PostgresQueryBuilder); + + let res = sqlx::query_with(&sql, arguments) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(res.rows_affected().try_into().unwrap_or(usize::MAX)) + } + #[tracing::instrument( name = "db.personal_session.list", skip_all, @@ -433,6 +500,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .left_join( PersonalAccessTokens::Table, Cond::all() + // Match session ID .add( Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) .eq(Expr::col(( @@ -440,6 +508,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { PersonalAccessTokens::PersonalSessionId, ))), ) + // Only choose the active access token for each session .add( Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt)) .is_null(), @@ -477,6 +546,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { .left_join( PersonalAccessTokens::Table, Cond::all() + // Match session ID .add( Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId)) .eq(Expr::col(( @@ -484,6 +554,7 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> { PersonalAccessTokens::PersonalSessionId, ))), ) + // Only choose the active access token for each session .add( Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt)) .is_null(), diff --git a/crates/storage/src/personal/session.rs b/crates/storage/src/personal/session.rs index 3515b1161..921c6df39 100644 --- a/crates/storage/src/personal/session.rs +++ b/crates/storage/src/personal/session.rs @@ -87,6 +87,24 @@ pub trait PersonalSessionRepository: Send + Sync { personal_session: PersonalSession, ) -> Result; + /// Revoke all the [`PersonalSession`]s matching the given filter. + /// + /// Returns the number of sessions affected + /// + /// # Parameters + /// + /// * `clock`: The clock used to generate timestamps + /// * `filter`: The filter to apply + /// + /// # Errors + /// + /// Returns [`Self::Error`] if the underlying repository fails + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + /// List [`PersonalSession`]s matching the given filter and pagination /// parameters /// @@ -150,6 +168,12 @@ repository_impl!(PersonalSessionRepository: personal_session: PersonalSession, ) -> Result; + async fn revoke_bulk( + &mut self, + clock: &dyn Clock, + filter: PersonalSessionFilter<'_>, + ) -> Result; + async fn list( &mut self, filter: PersonalSessionFilter<'_>, diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs index ee60c6532..e605670cc 100644 --- a/crates/tasks/src/user.rs +++ b/crates/tasks/src/user.rs @@ -10,6 +10,7 @@ use mas_storage::{ RepositoryAccess, compat::CompatSessionFilter, oauth2::OAuth2SessionFilter, + personal::PersonalSessionFilter, queue::{DeactivateUserJob, ReactivateUserJob}, user::{BrowserSessionFilter, UserEmailFilter, UserRepository}, }; @@ -80,6 +81,36 @@ impl RunnableJob for DeactivateUserJob { .map_err(JobError::retry)?; info!(affected = n, "Killed all compatibility sessions for user"); + let n = repo + .personal_session() + .revoke_bulk( + clock, + PersonalSessionFilter::new() + .for_actor_user(&user) + .active_only(), + ) + .await + .map_err(JobError::retry)?; + info!( + affected = n, + "Killed all compatibility sessions acting as user" + ); + + let n = repo + .personal_session() + .revoke_bulk( + clock, + PersonalSessionFilter::new() + .for_owner_user(&user) + .active_only(), + ) + .await + .map_err(JobError::retry)?; + info!( + affected = n, + "Killed all compatibility sessions owned by user" + ); + // Delete all the email addresses for the user let n = repo .user_email()