Skip to content

Commit 2ae36b4

Browse files
committed
Setup job to expire inactive browser sessions
1 parent 4353185 commit 2ae36b4

File tree

3 files changed

+103
-3
lines changed

3 files changed

+103
-3
lines changed

crates/storage/src/queue/tasks.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
use chrono::{DateTime, Utc};
77
use mas_data_model::{
8-
CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession,
8+
BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
9+
UserRecoverySession,
910
};
1011
use serde::{Deserialize, Serialize};
1112
use ulid::Ulid;
@@ -437,3 +438,60 @@ impl ExpireInactiveCompatSessionsJob {
437438
impl InsertableJob for ExpireInactiveCompatSessionsJob {
438439
const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
439440
}
441+
442+
/// Expire inactive user sessions
443+
#[derive(Debug, Serialize, Deserialize)]
444+
pub struct ExpireInactiveUserSessionsJob {
445+
threshold: DateTime<Utc>,
446+
after: Option<Ulid>,
447+
}
448+
449+
impl ExpireInactiveUserSessionsJob {
450+
/// Create a new job to expire inactive user/browser sessions
451+
///
452+
/// # Parameters
453+
///
454+
/// * `threshold` - The threshold to expire sessions at
455+
#[must_use]
456+
pub fn new(threshold: DateTime<Utc>) -> Self {
457+
Self {
458+
threshold,
459+
after: None,
460+
}
461+
}
462+
463+
/// Get the threshold to expire sessions at
464+
#[must_use]
465+
pub fn threshold(&self) -> DateTime<Utc> {
466+
self.threshold
467+
}
468+
469+
/// Get the pagination cursor
470+
#[must_use]
471+
pub fn pagination(&self, batch_size: usize) -> Pagination {
472+
let pagination = Pagination::first(batch_size);
473+
if let Some(after) = self.after {
474+
pagination.after(after)
475+
} else {
476+
pagination
477+
}
478+
}
479+
480+
/// Get the next job given the page returned by the database
481+
#[must_use]
482+
pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
483+
if !page.has_next_page {
484+
return None;
485+
}
486+
487+
let last_edge = page.edges.last()?;
488+
Some(Self {
489+
threshold: self.threshold,
490+
after: Some(last_edge.id),
491+
})
492+
}
493+
}
494+
495+
impl InsertableJob for ExpireInactiveUserSessionsJob {
496+
const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
497+
}

crates/tasks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pub async fn init(
131131
.register_handler::<mas_storage::queue::VerifyEmailJob>()
132132
.register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
133133
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
134+
.register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
134135
.add_schedule(
135136
"cleanup-expired-tokens",
136137
"0 0 * * * *".parse()?,

crates/tasks/src/sessions.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ use mas_storage::{
1111
compat::CompatSessionFilter,
1212
oauth2::OAuth2SessionFilter,
1313
queue::{
14-
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt,
15-
SyncDevicesJob,
14+
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob,
15+
ExpireInactiveUserSessionsJob, QueueJobRepositoryExt, SyncDevicesJob,
1616
},
17+
user::BrowserSessionFilter,
1718
};
1819

1920
use crate::{
@@ -146,3 +147,43 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob {
146147
Ok(())
147148
}
148149
}
150+
151+
#[async_trait]
152+
impl RunnableJob for ExpireInactiveUserSessionsJob {
153+
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
154+
let mut repo = state.repository().await.map_err(JobError::retry)?;
155+
let clock = state.clock();
156+
let mut rng = state.rng();
157+
158+
let filter = BrowserSessionFilter::new()
159+
.with_last_active_before(self.threshold())
160+
.active_only();
161+
162+
let pagination = self.pagination(100);
163+
164+
let page = repo
165+
.browser_session()
166+
.list(filter, pagination)
167+
.await
168+
.map_err(JobError::retry)?;
169+
170+
if let Some(job) = self.next(&page) {
171+
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
172+
repo.queue_job()
173+
.schedule_job(&mut rng, &clock, job)
174+
.await
175+
.map_err(JobError::retry)?;
176+
}
177+
178+
for edge in page.edges {
179+
repo.browser_session()
180+
.finish(&clock, edge)
181+
.await
182+
.map_err(JobError::retry)?;
183+
}
184+
185+
repo.save().await.map_err(JobError::retry)?;
186+
187+
Ok(())
188+
}
189+
}

0 commit comments

Comments
 (0)