Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/handlers/src/admin/v1/personal_sessions/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum RouteError {
#[error("User not found")]
UserNotFound,

#[error("User is not active")]
UserDeactivated,

#[error("Invalid scope")]
InvalidScope,
}
Expand All @@ -46,6 +49,7 @@ impl IntoResponse for RouteError {
let status = match self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::UserNotFound => StatusCode::NOT_FOUND,
Self::UserDeactivated => StatusCode::GONE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure why not :D

Self::InvalidScope => StatusCode::BAD_REQUEST,
};
(status, sentry_event_id, Json(error)).into_response()
Expand Down Expand Up @@ -114,6 +118,10 @@ pub async fn handler(
.await?
.ok_or(RouteError::UserNotFound)?;

if actor_user.deactivated_at.is_some() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe check just is_valid (assuming it does the right thing) so that we also check for locked status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now using is_valid_actor

return Err(RouteError::UserDeactivated);
}

let scope: Scope = params.scope.parse().map_err(|_| RouteError::InvalidScope)?;

// Create the personal session
Expand Down
98 changes: 98 additions & 0 deletions crates/storage-pg/src/personal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,104 @@ mod tests {
assert!(session_lookup.is_revoked());
}

#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_session_revoke_bulk(pool: PgPool) {
let mut rng = ChaChaRng::seed_from_u64(42);
let clock = MockClock::default();
let mut repo = PgRepository::from_pool(&pool).await.unwrap();

let alice_user = repo
.user()
.add(&mut rng, &clock, "alice".to_owned())
.await
.unwrap();
let bob_user = repo
.user()
.add(&mut rng, &clock, "bob".to_owned())
.await
.unwrap();

let session1 = repo
.personal_session()
.add(
&mut rng,
&clock,
(&alice_user).into(),
&bob_user,
"Test Personal Session".to_owned(),
"openid".parse().unwrap(),
)
.await
.unwrap();
repo.personal_access_token()
.add(
&mut rng,
&clock,
&session1,
"mpt_hiss",
Some(Duration::days(42)),
)
.await
.unwrap();

let session2 = repo
.personal_session()
.add(
&mut rng,
&clock,
(&bob_user).into(),
&bob_user,
"Test Personal Session".to_owned(),
"openid".parse().unwrap(),
)
.await
.unwrap();
repo.personal_access_token()
.add(
&mut rng, &clock, &session2, "mpt_meow", // No expiry
None,
)
.await
.unwrap();

// Just one session without a token expiry time
assert_eq!(
repo.personal_session()
.revoke_bulk(
&clock,
PersonalSessionFilter::new()
.active_only()
.with_expires(false)
)
.await
.unwrap(),
1
);

// Just one session with a token expiry time
assert_eq!(
repo.personal_session()
.revoke_bulk(
&clock,
PersonalSessionFilter::new()
.active_only()
.with_expires(true)
)
.await
.unwrap(),
1
);

// No active sessions left
assert_eq!(
repo.personal_session()
.revoke_bulk(&clock, PersonalSessionFilter::new().active_only())
.await
.unwrap(),
0
);
}

#[sqlx::test(migrator = "crate::MIGRATOR")]
async fn test_access_token_repository(pool: PgPool) {
const FIRST_TOKEN: &str = "first_access_token";
Expand Down
62 changes: 62 additions & 0 deletions crates/storage-pg/src/personal/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,68 @@ impl PersonalSessionRepository for PgPersonalSessionRepository<'_> {
.map_err(DatabaseError::to_invalid_operation)
}

#[tracing::instrument(
name = "db.personal_session.revoke_bulk",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error> {
let revoked_at = clock.now();

let (sql, arguments) = Query::update()
.table(PersonalSessions::Table)
.value(PersonalSessions::RevokedAt, revoked_at)
.and_where(
Expr::col((PersonalSessions::Table, PersonalSessions::PersonalSessionId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a chunky subquery. I assume you need this because of the JOIN; could you stick in a comment explaining that? Also we could be doing that in a CTE, but not convinced the code would be clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments

yeah, it's a bit chunky, the sea-query doesn't help with that verbosity either, but hopefully the new comments make the intent clear

.in_subquery(
Query::select()
.expr(Expr::col((
PersonalSessions::Table,
PersonalSessions::PersonalSessionId,
)))
.from(PersonalSessions::Table)
.left_join(
PersonalAccessTokens::Table,
Cond::all()
.add(
Expr::col((
PersonalSessions::Table,
PersonalSessions::PersonalSessionId,
))
.eq(Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::PersonalSessionId,
))),
)
.add(
Expr::col((
PersonalAccessTokens::Table,
PersonalAccessTokens::RevokedAt,
))
.is_null(),
),
)
.apply_filter(filter)
.take(),
),
)
.build_sqlx(PostgresQueryBuilder);

let res = sqlx::query_with(&sql, arguments)
.traced()
.execute(&mut *self.conn)
.await?;

Ok(res.rows_affected().try_into().unwrap_or(usize::MAX))
}

#[tracing::instrument(
name = "db.personal_session.list",
skip_all,
Expand Down
26 changes: 26 additions & 0 deletions crates/storage/src/personal/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ pub trait PersonalSessionRepository: Send + Sync {
personal_session: PersonalSession,
) -> Result<PersonalSession, Self::Error>;

/// Revoke all the [`PersonalSession`]s matching the given filter.
///
/// This will also revoke the relevant personal access tokens.
///
/// Returns the number of sessions affected
///
/// # Parameters
///
/// * `clock`: The clock used to generate timestamps
/// * `filter`: The filter to apply
///
/// # Errors
///
/// Returns [`Self::Error`] if the underlying repository fails
async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error>;

/// List [`PersonalSession`]s matching the given filter and pagination
/// parameters
///
Expand Down Expand Up @@ -150,6 +170,12 @@ repository_impl!(PersonalSessionRepository:
personal_session: PersonalSession,
) -> Result<PersonalSession, Self::Error>;

async fn revoke_bulk(
&mut self,
clock: &dyn Clock,
filter: PersonalSessionFilter<'_>,
) -> Result<usize, Self::Error>;

async fn list(
&mut self,
filter: PersonalSessionFilter<'_>,
Expand Down
31 changes: 31 additions & 0 deletions crates/tasks/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use mas_storage::{
RepositoryAccess,
compat::CompatSessionFilter,
oauth2::OAuth2SessionFilter,
personal::PersonalSessionFilter,
queue::{DeactivateUserJob, ReactivateUserJob},
user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
};
Expand Down Expand Up @@ -80,6 +81,36 @@ impl RunnableJob for DeactivateUserJob {
.map_err(JobError::retry)?;
info!(affected = n, "Killed all compatibility sessions for user");

let n = repo
.personal_session()
.revoke_bulk(
clock,
PersonalSessionFilter::new()
.for_actor_user(&user)
.active_only(),
)
.await
.map_err(JobError::retry)?;
info!(
affected = n,
"Killed all compatibility sessions acting as user"
);

let n = repo
.personal_session()
.revoke_bulk(
clock,
PersonalSessionFilter::new()
.for_owner_user(&user)
.active_only(),
)
.await
.map_err(JobError::retry)?;
info!(
affected = n,
"Killed all compatibility sessions owned by user"
);

// Delete all the email addresses for the user
let n = repo
.user_email()
Expand Down
Loading