diff --git a/crates/handlers/src/activity_tracker/mod.rs b/crates/handlers/src/activity_tracker/mod.rs index 232c636ef..3f7511af6 100644 --- a/crates/handlers/src/activity_tracker/mod.rs +++ b/crates/handlers/src/activity_tracker/mod.rs @@ -185,6 +185,8 @@ impl ActivityTracker { // This guard on the shutdown token is to ensure that if this task crashes for // any reason, the server will shut down let _guard = cancellation_token.clone().drop_guard(); + let mut interval = tokio::time::interval(interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { @@ -202,7 +204,7 @@ impl ActivityTracker { } - () = tokio::time::sleep(interval) => { + _ = interval.tick() => { self.flush().await; } } diff --git a/crates/handlers/src/activity_tracker/worker.rs b/crates/handlers/src/activity_tracker/worker.rs index 949ab690d..80853f0fe 100644 --- a/crates/handlers/src/activity_tracker/worker.rs +++ b/crates/handlers/src/activity_tracker/worker.rs @@ -7,10 +7,10 @@ use std::{collections::HashMap, net::IpAddr}; use chrono::{DateTime, Utc}; -use mas_storage::{RepositoryAccess, user::BrowserSessionRepository}; +use mas_storage::{RepositoryAccess, RepositoryError, user::BrowserSessionRepository}; use opentelemetry::{ Key, KeyValue, - metrics::{Counter, Histogram}, + metrics::{Counter, Gauge, Histogram}, }; use sqlx::PgPool; use tokio_util::sync::CancellationToken; @@ -25,8 +25,8 @@ use crate::{ /// database automatically. /// /// The [`ActivityRecord`] structure plus the key in the [`HashMap`] takes less -/// than 100 bytes, so this should allocate around a megabyte of memory. -static MAX_PENDING_RECORDS: usize = 10_000; +/// than 100 bytes, so this should allocate around 100kB of memory. +static MAX_PENDING_RECORDS: usize = 1000; const TYPE: Key = Key::from_static_str("type"); const SESSION_KIND: Key = Key::from_static_str("session_kind"); @@ -45,6 +45,7 @@ struct ActivityRecord { pub struct Worker { pool: PgPool, pending_records: HashMap<(SessionKind, Ulid), ActivityRecord>, + pending_records_gauge: Gauge, message_counter: Counter, flush_time_histogram: Histogram, } @@ -80,9 +81,17 @@ impl Worker { .with_unit("ms") .build(); + let pending_records_gauge = METER + .u64_gauge("mas.activity_tracker.pending_records") + .with_description("The number of pending activity records") + .with_unit("{records}") + .build(); + pending_records_gauge.record(0, &[]); + Self { pool, pending_records: HashMap::with_capacity(MAX_PENDING_RECORDS), + pending_records_gauge, message_counter, flush_time_histogram, } @@ -165,6 +174,10 @@ impl Worker { let _ = tx.send(()); } } + + // Update the gauge + self.pending_records_gauge + .record(self.pending_records.len() as u64, &[]); } // Flush one last time @@ -193,18 +206,22 @@ impl Worker { Err(e) => { self.flush_time_histogram .record(duration_ms, &[KeyValue::new(RESULT, "failure")]); - tracing::error!("Failed to flush activity tracker: {}", e); + tracing::error!( + error = &e as &dyn std::error::Error, + "Failed to flush activity tracker" + ); } } } /// Fallible part of [`Self::flush`]. #[tracing::instrument(name = "activity_tracker.flush", skip(self))] - async fn try_flush(&mut self) -> Result<(), anyhow::Error> { + async fn try_flush(&mut self) -> Result<(), RepositoryError> { let pending_records = &self.pending_records; let mut repo = mas_storage_pg::PgRepository::from_pool(&self.pool) - .await? + .await + .map_err(RepositoryError::from_error)? .boxed(); let mut browser_sessions = Vec::new(); diff --git a/crates/storage-pg/src/compat/session.rs b/crates/storage-pg/src/compat/session.rs index c844be238..4aebe41ed 100644 --- a/crates/storage-pg/src/compat/session.rs +++ b/crates/storage-pg/src/compat/session.rs @@ -549,13 +549,16 @@ impl CompatSessionRepository for PgCompatSessionRepository<'_> { )] async fn record_batch_activity( &mut self, - activity: Vec<(Ulid, DateTime, Option)>, + mut activities: Vec<(Ulid, DateTime, Option)>, ) -> Result<(), Self::Error> { - let mut ids = Vec::with_capacity(activity.len()); - let mut last_activities = Vec::with_capacity(activity.len()); - let mut ips = Vec::with_capacity(activity.len()); - - for (id, last_activity, ip) in activity { + // Sort the activity by ID, so that when batching the updates, Postgres + // locks the rows in a stable order, preventing deadlocks + activities.sort_unstable(); + let mut ids = Vec::with_capacity(activities.len()); + let mut last_activities = Vec::with_capacity(activities.len()); + let mut ips = Vec::with_capacity(activities.len()); + + for (id, last_activity, ip) in activities { ids.push(Uuid::from(id)); last_activities.push(last_activity); ips.push(ip); diff --git a/crates/storage-pg/src/oauth2/session.rs b/crates/storage-pg/src/oauth2/session.rs index b525e22a0..b08014faf 100644 --- a/crates/storage-pg/src/oauth2/session.rs +++ b/crates/storage-pg/src/oauth2/session.rs @@ -445,13 +445,16 @@ impl OAuth2SessionRepository for PgOAuth2SessionRepository<'_> { )] async fn record_batch_activity( &mut self, - activity: Vec<(Ulid, DateTime, Option)>, + mut activities: Vec<(Ulid, DateTime, Option)>, ) -> Result<(), Self::Error> { - let mut ids = Vec::with_capacity(activity.len()); - let mut last_activities = Vec::with_capacity(activity.len()); - let mut ips = Vec::with_capacity(activity.len()); - - for (id, last_activity, ip) in activity { + // Sort the activity by ID, so that when batching the updates, Postgres + // locks the rows in a stable order, preventing deadlocks + activities.sort_unstable(); + let mut ids = Vec::with_capacity(activities.len()); + let mut last_activities = Vec::with_capacity(activities.len()); + let mut ips = Vec::with_capacity(activities.len()); + + for (id, last_activity, ip) in activities { ids.push(Uuid::from(id)); last_activities.push(last_activity); ips.push(ip); diff --git a/crates/storage-pg/src/user/session.rs b/crates/storage-pg/src/user/session.rs index fa4c69f82..3bea6781c 100644 --- a/crates/storage-pg/src/user/session.rs +++ b/crates/storage-pg/src/user/session.rs @@ -564,13 +564,16 @@ impl BrowserSessionRepository for PgBrowserSessionRepository<'_> { )] async fn record_batch_activity( &mut self, - activity: Vec<(Ulid, DateTime, Option)>, + mut activities: Vec<(Ulid, DateTime, Option)>, ) -> Result<(), Self::Error> { - let mut ids = Vec::with_capacity(activity.len()); - let mut last_activities = Vec::with_capacity(activity.len()); - let mut ips = Vec::with_capacity(activity.len()); - - for (id, last_activity, ip) in activity { + // Sort the activity by ID, so that when batching the updates, Postgres + // locks the rows in a stable order, preventing deadlocks + activities.sort_unstable(); + let mut ids = Vec::with_capacity(activities.len()); + let mut last_activities = Vec::with_capacity(activities.len()); + let mut ips = Vec::with_capacity(activities.len()); + + for (id, last_activity, ip) in activities { ids.push(Uuid::from(id)); last_activities.push(last_activity); ips.push(ip);