Skip to content

Commit 3df8f27

Browse files
committed
fix: add ready_chunks to worker bumps, bumps per tick metric
1 parent 5d200c7 commit 3df8f27

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

engine/packages/gasoline/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ lazy_static::lazy_static! {
77
&["worker_id"],
88
*REGISTRY
99
).unwrap();
10+
pub static ref WORKER_BUMPS_PER_TICK: HistogramVec = register_histogram_vec_with_registry!(
11+
"gasoline_worker_bumps_per_tick",
12+
"Amount of bump messages received in a single worker tick.",
13+
&["worker_id"],
14+
vec![1.0, 2.0, 3.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
15+
*REGISTRY
16+
).unwrap();
1017
pub static ref LAST_PULL_WORKFLOWS_DURATION: GaugeVec = register_gauge_vec_with_registry!(
1118
"gasoline_last_pull_workflows_duration",
1219
"Last duration of pulling workflow data.",

engine/packages/gasoline/src/worker.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use crate::{
1616
ctx::WorkflowCtx,
1717
db::{BumpSubSubject, DatabaseHandle},
1818
error::WorkflowError,
19+
metrics,
1920
registry::RegistryHandle,
2021
};
2122

@@ -73,7 +74,12 @@ impl Worker {
7374

7475
let cache = rivet_cache::CacheInner::from_env(&self.config, self.pools.clone())?;
7576

76-
let mut bump_sub = { self.db.bump_sub(BumpSubSubject::Worker).await? };
77+
// We use ready_chunks because multiple bumps in a row should be processed as 1 bump
78+
let mut bump_sub = self
79+
.db
80+
.bump_sub(BumpSubSubject::Worker)
81+
.await?
82+
.ready_chunks(1024);
7783

7884
let mut tick_interval = tokio::time::interval(self.db.worker_poll_interval());
7985
tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -102,8 +108,13 @@ impl Worker {
102108
tokio::select! {
103109
_ = tick_interval.tick() => {},
104110
res = bump_sub.next() => {
105-
if res.is_none() {
106-
break Err(WorkflowError::SubscriptionUnsubscribed.into());
111+
match res {
112+
Some(bumps) => {
113+
metrics::WORKER_BUMPS_PER_TICK
114+
.with_label_values(&[self.worker_id.to_string().as_str()])
115+
.observe(bumps.len() as f64);
116+
}
117+
None => break Err(WorkflowError::SubscriptionUnsubscribed.into()),
107118
}
108119

109120
tick_interval.reset();

0 commit comments

Comments
 (0)