diff --git a/engine/packages/gasoline/src/metrics.rs b/engine/packages/gasoline/src/metrics.rs index e8e6e608c6..2a49362e71 100644 --- a/engine/packages/gasoline/src/metrics.rs +++ b/engine/packages/gasoline/src/metrics.rs @@ -7,6 +7,13 @@ lazy_static::lazy_static! { &["worker_id"], *REGISTRY ).unwrap(); + pub static ref WORKER_BUMPS_PER_TICK: HistogramVec = register_histogram_vec_with_registry!( + "gasoline_worker_bumps_per_tick", + "Amount of bump messages received in a single worker tick.", + &["worker_id"], + vec![1.0, 2.0, 3.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0], + *REGISTRY + ).unwrap(); pub static ref LAST_PULL_WORKFLOWS_DURATION: GaugeVec = register_gauge_vec_with_registry!( "gasoline_last_pull_workflows_duration", "Last duration of pulling workflow data.", diff --git a/engine/packages/gasoline/src/worker.rs b/engine/packages/gasoline/src/worker.rs index 3f448d189b..d2254540b4 100644 --- a/engine/packages/gasoline/src/worker.rs +++ b/engine/packages/gasoline/src/worker.rs @@ -16,6 +16,7 @@ use crate::{ ctx::WorkflowCtx, db::{BumpSubSubject, DatabaseHandle}, error::WorkflowError, + metrics, registry::RegistryHandle, }; @@ -73,7 +74,12 @@ impl Worker { let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?; - let mut bump_sub = { self.db.bump_sub(BumpSubSubject::Worker).await? }; + // We use ready_chunks because multiple bumps in a row should be processed as 1 bump + let mut bump_sub = self + .db + .bump_sub(BumpSubSubject::Worker) + .await? + .ready_chunks(1024); let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval()); tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -102,8 +108,13 @@ impl Worker { tokio::select! { _ = tick_interval.tick() => {}, res = bump_sub.next() => { - if res.is_none() { - break Err(WorkflowError::SubscriptionUnsubscribed.into()); + match res { + Some(bumps) => { + metrics::WORKER_BUMPS_PER_TICK + .with_label_values(&[self.worker_id.to_string().as_str()]) + .observe(bumps.len() as f64); + } + None => break Err(WorkflowError::SubscriptionUnsubscribed.into()), } tick_interval.reset();