Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/handlers/src/admin/v1/personal_sessions/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum RouteError {
#[error("User not found")]
UserNotFound,

#[error("User is not active")]
UserDeactivated,

#[error("Invalid scope")]
InvalidScope,
}
Expand All @@ -47,6 +50,7 @@ impl IntoResponse for RouteError {
let status = match self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::UserNotFound => StatusCode::NOT_FOUND,
Self::UserDeactivated => StatusCode::GONE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure why not :D

Self::InvalidScope => StatusCode::BAD_REQUEST,
};
(status, sentry_event_id, Json(error)).into_response()
Expand Down Expand Up @@ -109,6 +113,10 @@ pub async fn handler(
.await?
.ok_or(RouteError::UserNotFound)?;

if !actor_user.is_valid_actor() {
return Err(RouteError::UserDeactivated);
}

let scope: Scope = params.scope.parse().map_err(|_| RouteError::InvalidScope)?;

// Create the personal session
Expand Down
98 changes: 98 additions & 0 deletions crates/storage-pg/src/personal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,104 @@ mod tests {
assert!(session_lookup.is_revoked());
}

#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_session_revoke_bulk(pool: PgPool) {
let mut rng = ChaChaRng::seed_from_u64(42);
let clock = MockClock::default();
let mut repo = PgRepository::from_pool(&pool).await.unwrap();

let alice_user = repo
.user()
.add(&mut rng, &clock, "alice".to_owned())
.await
.unwrap();
let bob_user = repo
.user()
.add(&mut rng, &clock, "bob".to_owned())
.await
.unwrap();

let session1 = repo
.personal_session()
.add(
&mut rng,
&clock,
(&alice_user).into(),
&bob_user,
"Test Personal Session".to_owned(),
"openid".parse().unwrap(),
)
.await
.unwrap();
repo.personal_access_token()
.add(
&mut rng,
&clock,
&session1,
"mpt_hiss",
Some(Duration::days(42)),
)
.await
.unwrap();

let session2 = repo
.personal_session()
.add(
&mut rng,
&clock,
(&bob_user).into(),
&bob_user,
"Test Personal Session".to_owned(),
"openid".parse().unwrap(),
)
.await
.unwrap();
repo.personal_access_token()
.add(
&mut rng, &clock, &session2, "mpt_meow", // No expiry
None,
)
.await
.unwrap();

// Just one session without a token expiry time
assert_eq!(
repo.personal_session()
.revoke_bulk(
&clock,
PersonalSessionFilter::new()
.active_only()
.with_expires(false)
)
.await
.unwrap(),
1
);

// Just one session with a token expiry time
assert_eq!(
repo.personal_session()
.revoke_bulk(
&clock,
PersonalSessionFilter::new()
.active_only()
.with_expires(true)
)
.await
.unwrap(),
1
);

// No active sessions left
assert_eq!(
repo.personal_session()
.revoke_bulk(&clock, PersonalSessionFilter::new().active_only())
.await
.unwrap(),
0
);
}

#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_access_token_repository(pool: PgPool) {
const FIRST_TOKEN: &str = "first_access_token";
Expand Down
71 changes: 71 additions & 0 deletions crates/storage-pg/src/personal/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,73 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
.map_err(DatabaseError::to_invalid_operation)
}

#[tracing::instrument(
name = "db.personal_session.revoke_bulk",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error> {
let revoked_at = clock.now();

let (sql, arguments) = Query::update()
.table(PersonalSessions::Table)
.value(PersonalSessions::RevokedAt, revoked_at)
.and_where(
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a chunky subquery. I assume you need this because of the JOIN; could you stick in a comment explaining that? Also we could be doing that in a CTE, but not convinced the code would be clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments

yeah, it's a bit chunky, the sea-query doesn't help with that verbosity either, but hopefully the new comments make the intent clear

// Because filters apply to both the session and access token tables,
// Use a subquery to make it possible to use a JOIN
// onto the personal access token table.
.in_subquery(
Query::select()
.expr(Expr::col((
PersonalSessions::Table,
PersonalSessions::PersonalSessionId,
)))
.from(PersonalSessions::Table)
.left_join(
PersonalAccessTokens::Table,
Cond::all()
// Match session ID
.add(
Expr::col((
PersonalSessions::Table,
PersonalSessions::PersonalSessionId,
))
.eq(Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::PersonalSessionId,
))),
)
// Only choose the active access token for each session
.add(
Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::RevokedAt,
))
.is_null(),
),
)
.apply_filter(filter)
.take(),
),
)
.build_sqlx(PostgresQueryBuilder);

let res = sqlx::query_with(&sql, arguments)
.traced()
.execute(&mut *self.conn)
.await?;

Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
}

#[tracing::instrument(
name = "db.personal_session.list",
skip_all,
Expand Down Expand Up @@ -433,13 +500,15 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
.left_join(
PersonalAccessTokens::Table,
Cond::all()
// Match session ID
.add(
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
.eq(Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::PersonalSessionId,
))),
)
// Only choose the active access token for each session
.add(
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt))
.is_null(),
Expand Down Expand Up @@ -477,13 +546,15 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
.left_join(
PersonalAccessTokens::Table,
Cond::all()
// Match session ID
.add(
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
.eq(Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::PersonalSessionId,
))),
)
// Only choose the active access token for each session
.add(
Expr::col((PersonalAccessTokens::Table, PersonalAccessTokens::RevokedAt))
.is_null(),
Expand Down
24 changes: 24 additions & 0 deletions crates/storage/src/personal/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ pub trait PersonalSessionRepository: Send + Sync {
personal_session: PersonalSession,
) -> Result<PersonalSession, Self::Error>;

/// Revoke all the [`PersonalSession`]s matching the given filter.
///
/// Returns the number of sessions affected
///
/// # Parameters
///
/// * `clock`: The clock used to generate timestamps
/// * `filter`: The filter to apply
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error>;

/// List [`PersonalSession`]s matching the given filter and pagination
/// parameters
///
Expand Down Expand Up @@ -150,6 +168,12 @@ repository_impl!(PersonalSessionRepository:
personal_session: PersonalSession,
) -> Result<PersonalSession, Self::Error>;

async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error>;

async fn list(
&mut self,
filter: PersonalSessionFilter<'_>,
Expand Down
31 changes: 31 additions & 0 deletions crates/tasks/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use mas_storage::{
RepositoryAccess,
compat::CompatSessionFilter,
oauth2::OAuth2SessionFilter,
personal::PersonalSessionFilter,
queue::{DeactivateUserJob, ReactivateUserJob},
user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
};
Expand Down Expand Up @@ -80,6 +81,36 @@ impl RunnableJob for DeactivateUserJob {
.map_err(JobError::retry)?;
info!(affected = n, "Killed all compatibility sessions for user");

let n = repo
.personal_session()
.revoke_bulk(
clock,
PersonalSessionFilter::new()
.for_actor_user(&user)
.active_only(),
)
.await
.map_err(JobError::retry)?;
info!(
affected = n,
"Killed all compatibility sessions acting as user"
);

let n = repo
.personal_session()
.revoke_bulk(
clock,
PersonalSessionFilter::new()
.for_owner_user(&user)
.active_only(),
)
.await
.map_err(JobError::retry)?;
info!(
affected = n,
"Killed all compatibility sessions owned by user"
);

// Delete all the email addresses for the user
let n = repo
.user_email()
Expand Down
Loading