From 7000e2b908dedf4cce9f3d2d70da8d1782e56fa5 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 13 Dec 2024 15:47:59 +0100 Subject: [PATCH] Add metrics to the job queue This adds: - a histogram of the time it takes to process a job for each queue, with the status of the job (success, failure, etc.) - a histogram which records the time it takes to do a "tick", fetch jobs - a counter of the number of jobs currently in-flight for each queue - a counter which tracks the reasons why the worker got worken up --- Cargo.lock | 1 + crates/tasks/Cargo.toml | 1 + crates/tasks/src/lib.rs | 14 ++- crates/tasks/src/new_queue.rs | 201 +++++++++++++++++++++++++++++++--- 4 files changed, 198 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55677221b..12f3f03e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3696,6 +3696,7 @@ dependencies = [ "mas-templates", "mas-tower", "opentelemetry", + "opentelemetry-semantic-conventions", "rand", "rand_chacha", "serde", diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index de62f4998..0ada1c856 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -29,6 +29,7 @@ tower.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true opentelemetry.workspace = true +opentelemetry-semantic-conventions.workspace = true ulid.workspace = true url.workspace = true serde.workspace = true diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index eb9f19306..0785cd9c4 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -4,9 +4,7 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -#![allow(dead_code)] - -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use mas_email::Mailer; use mas_matrix::HomeserverConnection; @@ -14,6 +12,7 @@ use mas_router::UrlBuilder; use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock}; use mas_storage_pg::PgRepository; use new_queue::QueueRunnerError; +use opentelemetry::metrics::Meter; use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -25,6 +24,15 @@ mod new_queue; mod recovery; mod user; +static METER: LazyLock = LazyLock::new(|| { + let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL) + .build(); + + opentelemetry::global::meter_with_scope(scope) +}); + #[derive(Clone)] struct State { pool: Pool, diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 3eab5e53c..3bf09d071 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -13,6 +13,10 @@ use mas_storage::{ Clock, RepositoryAccess, RepositoryError, }; use mas_storage_pg::{DatabaseError, PgRepository}; +use opentelemetry::{ + metrics::{Counter, Histogram, UpDownCounter}, + KeyValue, +}; use rand::{distributions::Uniform, Rng, RngCore}; use rand_chacha::ChaChaRng; use serde::de::DeserializeOwned; @@ -21,13 +25,13 @@ use sqlx::{ Acquire, Either, }; use thiserror::Error; -use tokio::task::JoinSet; +use tokio::{task::JoinSet, time::Instant}; use tokio_util::sync::CancellationToken; use tracing::{Instrument as _, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt as _; use ulid::Ulid; -use crate::State; +use crate::{State, METER}; type JobPayload = serde_json::Value; @@ -37,6 +41,12 @@ pub struct JobContext { pub metadata: JobMetadata, pub queue_name: String, pub attempt: usize, + pub start: Instant, + + #[expect( + dead_code, + reason = "we're not yet using this, but will be in the future" + )] pub cancellation_token: CancellationToken, } @@ -46,7 +56,7 @@ impl JobContext { parent: Span::none(), "job.run", job.id = %self.id, - job.queue_name = self.queue_name, + job.queue.name = self.queue_name, job.attempt = self.attempt, ); @@ -193,6 +203,8 @@ pub struct QueueWorker { state: State, schedules: Vec, tracker: JobTracker, + wakeup_reason: Counter, + tick_time: Histogram, } impl QueueWorker { @@ -240,6 +252,23 @@ impl QueueWorker { tracing::info!("Registered worker"); let now = clock.now(); + let wakeup_reason = METER + .u64_counter("job.worker.wakeups") + .with_description("Counts how many time the worker has been woken up, for which reason") + .build(); + + // Pre-create the reasons on the counter + wakeup_reason.add(0, &[KeyValue::new("reason", "sleep")]); + wakeup_reason.add(0, &[KeyValue::new("reason", "task")]); + wakeup_reason.add(0, &[KeyValue::new("reason", "notification")]); + + let tick_time = METER + .u64_histogram("job.worker.tick_duration") + .with_description( + "How much time the worker took to tick, including performing leader duties", + ) + .build(); + Ok(Self { rng, clock, @@ -250,7 +279,9 @@ impl QueueWorker { cancellation_token, state, schedules: Vec::new(), - tracker: JobTracker::default(), + tracker: JobTracker::new(), + wakeup_reason, + tick_time, }) } @@ -329,12 +360,16 @@ impl QueueWorker { return Ok(()); } + let start = Instant::now(); self.tick().await?; if self.am_i_leader { self.perform_leader_duties().await?; } + let elapsed = start.elapsed().as_millis().try_into().unwrap_or(u64::MAX); + self.tick_time.record(elapsed, &[]); + Ok(()) } @@ -393,8 +428,6 @@ impl QueueWorker { .sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); let wakeup_sleep = tokio::time::sleep(sleep_duration); - // TODO: add metrics to track the wake up reasons - tokio::select! { () = self.cancellation_token.cancelled() => { tracing::debug!("Woke up from cancellation"); @@ -402,13 +435,16 @@ impl QueueWorker { () = wakeup_sleep => { tracing::debug!("Woke up from sleep"); + self.wakeup_reason.add(1, &[KeyValue::new("reason", "sleep")]); }, () = self.tracker.collect_next_job(), if self.tracker.has_jobs() => { tracing::debug!("Joined job task"); + self.wakeup_reason.add(1, &[KeyValue::new("reason", "task")]); }, notification = self.listener.recv() => { + self.wakeup_reason.add(1, &[KeyValue::new("reason", "notification")]); match notification { Ok(notification) => { tracing::debug!( @@ -495,11 +531,13 @@ impl QueueWorker { } in jobs { let cancellation_token = self.cancellation_token.child_token(); + let start = Instant::now(); let context = JobContext { id, metadata, queue_name, attempt, + start, cancellation_token, }; @@ -639,8 +677,8 @@ impl QueueWorker { .await?; match scheduled { 0 => {} - 1 => tracing::warn!("One scheduled job marked as available"), - n => tracing::warn!("{n} scheduled jobs marked as available"), + 1 => tracing::info!("One scheduled job marked as available"), + n => tracing::info!("{n} scheduled jobs marked as available"), } // Release the leader lock @@ -662,7 +700,6 @@ impl QueueWorker { /// /// This is a separate structure to be able to borrow it mutably at the same /// time as the connection to the database is borrowed -#[derive(Default)] struct JobTracker { /// Stores a mapping from the job queue name to the job factory factories: HashMap<&'static str, JobFactory>, @@ -676,9 +713,38 @@ struct JobTracker { /// Stores the last `join_next_with_id` result for processing, in case we /// got woken up in `collect_next_job` last_join_result: Option>, + + /// An histogram which records the time it takes to process a job + job_processing_time: Histogram, + + /// A counter which records the number of jobs currently in flight + in_flight_jobs: UpDownCounter, } impl JobTracker { + fn new() -> Self { + let job_processing_time = METER + .u64_histogram("job.process.duration") + .with_description("The time it takes to process a job in milliseconds") + .with_unit("ms") + .build(); + + let in_flight_jobs = METER + .i64_up_down_counter("job.active_tasks") + .with_description("The number of jobs currently in flight") + .with_unit("{job}") + .build(); + + Self { + factories: HashMap::new(), + running_jobs: JoinSet::new(), + job_contexts: HashMap::new(), + last_join_result: None, + job_processing_time, + in_flight_jobs, + } + } + /// Returns the queue names that are currently being tracked fn queues(&self) -> Vec<&'static str> { self.factories.keys().copied().collect() @@ -700,6 +766,11 @@ impl JobTracker { .instrument(span) }; + self.in_flight_jobs.add( + 1, + &[KeyValue::new("job.queue.name", context.queue_name.clone())], + ); + let handle = self.running_jobs.spawn(task); self.job_contexts.insert(handle.id(), context); } @@ -749,8 +820,12 @@ impl JobTracker { } } + // XXX: the time measurement isn't accurate, as it would include the + // time spent between the task finishing, and us processing the result. + // It's fine for now, as it at least gives us an idea of how many tasks + // we run, and what their status is + while let Some(result) = self.last_join_result.take() { - // TODO: add metrics to track the job status and the time it took match result { // The job succeeded Ok((id, Ok(()))) => { @@ -759,13 +834,33 @@ impl JobTracker { .remove(&id) .expect("Job context not found"); + self.in_flight_jobs.add( + -1, + &[KeyValue::new("job.queue.name", context.queue_name.clone())], + ); + + let elapsed = context + .start + .elapsed() + .as_millis() + .try_into() + .unwrap_or(u64::MAX); tracing::info!( job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job completed" ); + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "success"), + ], + ); + repo.queue_job() .mark_as_completed(clock, context.id) .await?; @@ -778,20 +873,42 @@ impl JobTracker { .remove(&id) .expect("Job context not found"); + self.in_flight_jobs.add( + -1, + &[KeyValue::new("job.queue.name", context.queue_name.clone())], + ); + let reason = format!("{:?}", e.error); repo.queue_job() .mark_as_failed(clock, context.id, &reason) .await?; + let elapsed = context + .start + .elapsed() + .as_millis() + .try_into() + .unwrap_or(u64::MAX); + match e.decision { JobErrorDecision::Fail => { tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job failed, not retrying" ); + + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "fail"), + ], + ); } JobErrorDecision::Retry => { @@ -800,12 +917,22 @@ impl JobTracker { tracing::warn!( error = &e as &dyn std::error::Error, job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job failed, will retry in {}s", delay.num_seconds() ); + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "retry"), + ], + ); + repo.queue_job() .retry(&mut *rng, clock, context.id, delay) .await?; @@ -813,10 +940,20 @@ impl JobTracker { tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job failed too many times, abandonning" ); + + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "failed"), + KeyValue::new("job.decision", "abandon"), + ], + ); } } } @@ -830,6 +967,18 @@ impl JobTracker { .remove(&id) .expect("Job context not found"); + self.in_flight_jobs.add( + -1, + &[KeyValue::new("job.queue.name", context.queue_name.clone())], + ); + + let elapsed = context + .start + .elapsed() + .as_millis() + .try_into() + .unwrap_or(u64::MAX); + let reason = e.to_string(); repo.queue_job() .mark_as_failed(clock, context.id, &reason) @@ -840,12 +989,22 @@ impl JobTracker { tracing::warn!( error = &e as &dyn std::error::Error, job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job crashed, will retry in {}s", delay.num_seconds() ); + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "crashed"), + KeyValue::new("job.decision", "retry"), + ], + ); + repo.queue_job() .retry(&mut *rng, clock, context.id, delay) .await?; @@ -853,10 +1012,20 @@ impl JobTracker { tracing::error!( error = &e as &dyn std::error::Error, job.id = %context.id, - job.queue_name = %context.queue_name, + job.queue.name = %context.queue_name, job.attempt = %context.attempt, + job.elapsed = format!("{elapsed}ms"), "Job crashed too many times, abandonning" ); + + self.job_processing_time.record( + elapsed, + &[ + KeyValue::new("job.queue.name", context.queue_name), + KeyValue::new("job.result", "crashed"), + KeyValue::new("job.decision", "abandon"), + ], + ); } } };