Skip to content

Commit 49f6757

Browse files
committed
fix(jobs): add a job to clear expired sessions
1 parent b3beaf8 commit 49f6757

File tree

8 files changed

+88
-10
lines changed

8 files changed

+88
-10
lines changed

rustytime/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rustytime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "rustytime-server"
33
description = "🕒 blazingly fast time tracking for developers"
4-
version = "0.17.0"
4+
version = "0.17.1"
55
edition = "2024"
66
authors = ["ImShyMike"]
77
readme = "../README.md"

rustytime/src/jobs/import.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,8 @@ pub async fn setup(
521521

522522
WorkerBuilder::new("import-worker")
523523
.backend(import_store)
524-
.layer(PrometheusLayer::default())
525524
.enable_tracing()
525+
.layer(PrometheusLayer::default())
526526
.catch_panic()
527527
.concurrency(2)
528528
.data(diesel_pool)

rustytime/src/jobs/leaderboard.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ fn regenerate_leaderboard_period(
155155

156156
fn cleanup_old_entries(pool: &DbPool) -> Result<(), diesel::result::Error> {
157157
let mut conn = pool.get().map_err(|e| {
158-
tracing::error!(error = ?e, "Failed to get connection for cleanup");
158+
tracing::error!(error = ?e, "Failed to get connection for leaderboard cleanup");
159159
diesel::result::Error::DatabaseError(
160160
diesel::result::DatabaseErrorKind::Unknown,
161161
Box::new(e.to_string()),

rustytime/src/jobs/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
pub mod import;
22
mod leaderboard;
3+
mod sessions;
34

45
use apalis_postgres::PostgresStorage;
56
use axum_prometheus::metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
67
use sqlx::PgPool;
8+
use std::future::Future;
79

810
use crate::db::connection::DbPool;
911

@@ -17,15 +19,22 @@ pub async fn setup_jobs(
1719
sqlx_pool: PgPool,
1820
diesel_pool: DbPool,
1921
) -> (
20-
impl std::future::Future<Output = ()>,
21-
impl std::future::Future<Output = ()>,
22+
impl Future<Output = ()>,
23+
impl Future<Output = ()>,
24+
impl Future<Output = ()>,
2225
import::ImportStore,
2326
) {
2427
PostgresStorage::setup(&sqlx_pool).await.unwrap();
2528

2629
let import_store = import::create_storage(&sqlx_pool).await;
2730
let leaderboard_worker = leaderboard::setup(diesel_pool.clone()).await;
28-
let import_worker = import::setup(sqlx_pool, diesel_pool).await;
31+
let import_worker = import::setup(sqlx_pool, diesel_pool.clone()).await;
32+
let sessions_worker = sessions::setup(diesel_pool).await;
2933

30-
(leaderboard_worker, import_worker, import_store)
34+
(
35+
leaderboard_worker,
36+
import_worker,
37+
sessions_worker,
38+
import_store,
39+
)
3140
}

rustytime/src/jobs/sessions.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::str::FromStr;
2+
3+
use apalis::{
4+
layers::{WorkerBuilderExt, prometheus::PrometheusLayer},
5+
prelude::{Data, WorkerBuilder},
6+
};
7+
use apalis_cron::{CronStream, Tick};
8+
use cron::Schedule;
9+
use tokio::signal::ctrl_c;
10+
11+
use diesel::Connection;
12+
13+
use crate::db::connection::DbPool;
14+
use crate::models::session::Session;
15+
16+
fn cleanup_expired_sessions(pool: &DbPool) -> Result<(), diesel::result::Error> {
17+
let mut conn = pool.get().map_err(|e| {
18+
tracing::error!(error = ?e, "Failed to get connection for session cleanup");
19+
diesel::result::Error::DatabaseError(
20+
diesel::result::DatabaseErrorKind::Unknown,
21+
Box::new(e.to_string()),
22+
)
23+
})?;
24+
25+
conn.transaction(|conn| {
26+
let deleted = Session::delete_expired(conn)?;
27+
28+
tracing::debug!(deleted, "Cleaned up old leaderboard entries");
29+
30+
Ok(())
31+
})
32+
}
33+
34+
async fn run_cleanup(_tick: Tick, pool: Data<DbPool>) {
35+
if let Err(e) = cleanup_expired_sessions(&pool) {
36+
tracing::error!(error = ?e, "Failed to cleanup old leaderboard entries");
37+
}
38+
}
39+
40+
pub async fn setup(diesel_pool: DbPool) -> impl std::future::Future<Output = ()> {
41+
let cleanup_schedule =
42+
Schedule::from_str("0 0 0 * * *").expect("valid cron: daily at midnight");
43+
44+
let cleanup_pool = diesel_pool;
45+
46+
let cleanup_worker = WorkerBuilder::new("sessions-cleanup")
47+
.backend(CronStream::new(cleanup_schedule))
48+
.enable_tracing()
49+
.layer(PrometheusLayer::default())
50+
.catch_panic()
51+
.data(cleanup_pool)
52+
.build(run_cleanup);
53+
54+
async move {
55+
tokio::select! {
56+
_ = cleanup_worker.run() => {}
57+
_ = ctrl_c() => {
58+
tracing::info!("Shutting down session workers");
59+
}
60+
}
61+
}
62+
}

rustytime/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
145145
let sqlx_pool = sqlx::PgPool::connect(&database_url)
146146
.await
147147
.expect("Failed to connect to database for jobs");
148-
let (leaderboard_worker, import_worker, import_store) =
148+
let (leaderboard_worker, import_worker, sessions_worker, import_store) =
149149
jobs::setup_jobs(sqlx_pool, pool.clone()).await;
150150
app_state.set_import_store(import_store);
151151
tokio::spawn(leaderboard_worker);
152152
tokio::spawn(import_worker);
153+
tokio::spawn(sessions_worker);
153154
info!("✅ Jobs system started");
154155

155156
let rate_period = if is_production {

rustytime/src/models/session.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::models::user::User;
88
use crate::schema::sessions;
99
use crate::schema::sessions::dsl;
1010

11-
const SESSION_EXPIRY_DAYS: i64 = 7;
11+
pub const SESSION_EXPIRY_DAYS: i64 = 7;
1212

1313
#[derive(Queryable, Selectable, Serialize, Deserialize, Debug, Clone)]
1414
#[diesel(table_name = sessions)]
@@ -92,10 +92,16 @@ impl Session {
9292
}
9393

9494
#[allow(dead_code)]
95+
#[inline(always)]
9596
pub fn delete_by_user_id(conn: &mut PgConnection, user_id: i32) -> QueryResult<usize> {
9697
diesel::delete(sessions::table.filter(dsl::user_id.eq(user_id))).execute(conn)
9798
}
9899

100+
#[inline(always)]
101+
pub fn delete_expired(conn: &mut PgConnection) -> QueryResult<usize> {
102+
diesel::delete(sessions::table.filter(dsl::expires_at.le(now))).execute(conn)
103+
}
104+
99105
pub fn impersonate(
100106
conn: &mut PgConnection,
101107
session_id: Uuid,

0 commit comments

Comments
 (0)