Skip to content

Commit 4353185

Browse files
committed
Setup a job to expire compatibility sessions
1 parent 34f7e49 commit 4353185

File tree

3 files changed

+128
-2
lines changed

3 files changed

+128
-2
lines changed

crates/storage/src/queue/tasks.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
// Please see LICENSE in the repository root for full details.
55

66
use chrono::{DateTime, Utc};
7-
use mas_data_model::{Device, Session, User, UserEmailAuthentication, UserRecoverySession};
7+
use mas_data_model::{
8+
CompatSession, Device, Session, User, UserEmailAuthentication, UserRecoverySession,
9+
};
810
use serde::{Deserialize, Serialize};
911
use ulid::Ulid;
1012

@@ -378,3 +380,60 @@ impl ExpireInactiveOAuthSessionsJob {
378380
impl InsertableJob for ExpireInactiveOAuthSessionsJob {
379381
const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions";
380382
}
383+
384+
/// Expire inactive compatibility sessions
385+
#[derive(Serialize, Deserialize, Debug, Clone)]
386+
pub struct ExpireInactiveCompatSessionsJob {
387+
threshold: DateTime<Utc>,
388+
after: Option<Ulid>,
389+
}
390+
391+
impl ExpireInactiveCompatSessionsJob {
392+
/// Create a new job to expire inactive compatibility sessions
393+
///
394+
/// # Parameters
395+
///
396+
/// * `threshold` - The threshold to expire sessions at
397+
#[must_use]
398+
pub fn new(threshold: DateTime<Utc>) -> Self {
399+
Self {
400+
threshold,
401+
after: None,
402+
}
403+
}
404+
405+
/// Get the threshold to expire sessions at
406+
#[must_use]
407+
pub fn threshold(&self) -> DateTime<Utc> {
408+
self.threshold
409+
}
410+
411+
/// Get the pagination cursor
412+
#[must_use]
413+
pub fn pagination(&self, batch_size: usize) -> Pagination {
414+
let pagination = Pagination::first(batch_size);
415+
if let Some(after) = self.after {
416+
pagination.after(after)
417+
} else {
418+
pagination
419+
}
420+
}
421+
422+
/// Get the next job given the page returned by the database
423+
#[must_use]
424+
pub fn next(&self, page: &Page<CompatSession>) -> Option<Self> {
425+
if !page.has_next_page {
426+
return None;
427+
}
428+
429+
let last_edge = page.edges.last()?;
430+
Some(Self {
431+
threshold: self.threshold,
432+
after: Some(last_edge.id),
433+
})
434+
}
435+
}
436+
437+
impl InsertableJob for ExpireInactiveCompatSessionsJob {
438+
const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
439+
}

crates/tasks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ pub async fn init(
129129
.register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
130130
.register_handler::<mas_storage::queue::SyncDevicesJob>()
131131
.register_handler::<mas_storage::queue::VerifyEmailJob>()
132+
.register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
132133
.register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
133134
.add_schedule(
134135
"cleanup-expired-tokens",

crates/tasks/src/sessions.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ use std::collections::HashSet;
88
use async_trait::async_trait;
99
use chrono::Duration;
1010
use mas_storage::{
11+
compat::CompatSessionFilter,
1112
oauth2::OAuth2SessionFilter,
12-
queue::{ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt, SyncDevicesJob},
13+
queue::{
14+
ExpireInactiveCompatSessionsJob, ExpireInactiveOAuthSessionsJob, QueueJobRepositoryExt,
15+
SyncDevicesJob,
16+
},
1317
};
1418

1519
use crate::{
@@ -80,3 +84,65 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob {
8084
Ok(())
8185
}
8286
}
87+
88+
#[async_trait]
89+
impl RunnableJob for ExpireInactiveCompatSessionsJob {
90+
async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
91+
let mut repo = state.repository().await.map_err(JobError::retry)?;
92+
let clock = state.clock();
93+
let mut rng = state.rng();
94+
let mut users_synced = HashSet::new();
95+
96+
// This delay is used to space out the device sync jobs
97+
// We add 10 seconds between each device sync, meaning that it will spread out
98+
// the syncs over ~16 minutes max if we get a full batch of 100 users
99+
let mut delay = Duration::minutes(1);
100+
101+
let filter = CompatSessionFilter::new()
102+
.with_last_active_before(self.threshold())
103+
.active_only();
104+
105+
let pagination = self.pagination(100);
106+
107+
let page = repo
108+
.compat_session()
109+
.list(filter, pagination)
110+
.await
111+
.map_err(JobError::retry)?
112+
.map(|(c, _)| c);
113+
114+
if let Some(job) = self.next(&page) {
115+
tracing::info!("Scheduling job to expire the next batch of inactive sessions");
116+
repo.queue_job()
117+
.schedule_job(&mut rng, &clock, job)
118+
.await
119+
.map_err(JobError::retry)?;
120+
}
121+
122+
for edge in page.edges {
123+
let inserted = users_synced.insert(edge.user_id);
124+
if inserted {
125+
tracing::info!(user.id = %edge.user_id, "Scheduling devices sync for user");
126+
repo.queue_job()
127+
.schedule_job_later(
128+
&mut rng,
129+
&clock,
130+
SyncDevicesJob::new_for_id(edge.user_id),
131+
clock.now() + delay,
132+
)
133+
.await
134+
.map_err(JobError::retry)?;
135+
delay += Duration::seconds(10);
136+
}
137+
138+
repo.compat_session()
139+
.finish(&clock, edge)
140+
.await
141+
.map_err(JobError::retry)?;
142+
}
143+
144+
repo.save().await.map_err(JobError::retry)?;
145+
146+
Ok(())
147+
}
148+
}

0 commit comments

Comments
 (0)