Skip to content

Commit fae4834

Browse files
goffrieConvex, Inc.
authored andcommitted
Add a metric for subscription queue lag (#39344)
GitOrigin-RevId: 1736b8fd2cee971dbb8dcb9db68a14b24d5a2353
1 parent c4418da commit fae4834

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

crates/database/src/metrics.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use metrics::{
1515
log_distribution,
1616
log_distribution_with_labels,
1717
register_convex_counter,
18+
register_convex_gauge,
1819
register_convex_histogram,
1920
CancelableTimer,
2021
IntoLabel,
@@ -1182,3 +1183,19 @@ register_convex_counter!(
11821183
pub fn log_index_too_large_blocking_writes(index_type: SearchType) {
11831184
log_counter_with_labels(&INDEX_TOO_LARGE_BLOCKING_WRITES, 1, vec![index_type.tag()]);
11841185
}
1186+
1187+
register_convex_histogram!(
1188+
SUBSCRIPTION_QUEUE_LAG_SECONDS,
1189+
"How long subscription requests wait in the subscription worker queue",
1190+
);
1191+
pub fn log_subscription_queue_lag(seconds: f64) {
1192+
log_distribution(&SUBSCRIPTION_QUEUE_LAG_SECONDS, seconds);
1193+
}
1194+
1195+
register_convex_gauge!(
1196+
SUBSCRIPTION_QUEUE_LENGTH_INFO,
1197+
"The number of items in subscription queues",
1198+
);
1199+
pub fn log_subscription_queue_length_delta(delta: i64) {
1200+
SUBSCRIPTION_QUEUE_LENGTH_INFO.add(delta as f64);
1201+
}

crates/database/src/subscription.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl SubscriptionsClient {
105105
TrySendError::Full(..) => metrics::subscriptions_worker_full_error().into(),
106106
TrySendError::Closed(..) => metrics::shutdown_error(),
107107
})?;
108+
metrics::log_subscription_queue_length_delta(1);
108109
Ok(subscription)
109110
}
110111

@@ -160,6 +161,7 @@ pub enum SubscriptionsWorker {}
160161
impl SubscriptionsWorker {
161162
pub(crate) fn start<RT: Runtime>(log: LogOwner, runtime: RT) -> SubscriptionsClient {
162163
let (tx, rx) = mpsc::channel(*SUBSCRIPTIONS_WORKER_QUEUE_SIZE);
164+
let rx = CountingReceiver(rx);
163165

164166
let log_reader = log.reader();
165167
let mut manager = SubscriptionManager::new(log);
@@ -174,8 +176,25 @@ impl SubscriptionsWorker {
174176
}
175177
}
176178

179+
struct CountingReceiver(mpsc::Receiver<SubscriptionRequest>);
180+
impl Drop for CountingReceiver {
181+
fn drop(&mut self) {
182+
self.0.close();
183+
metrics::log_subscription_queue_length_delta(-(self.0.len() as i64));
184+
}
185+
}
186+
impl CountingReceiver {
187+
async fn recv(&mut self) -> Option<SubscriptionRequest> {
188+
let r = self.0.recv().await;
189+
if r.is_some() {
190+
metrics::log_subscription_queue_length_delta(-1);
191+
}
192+
r
193+
}
194+
}
195+
177196
impl SubscriptionManager {
178-
async fn run_worker(&mut self, mut rx: mpsc::Receiver<SubscriptionRequest>) {
197+
async fn run_worker(&mut self, mut rx: CountingReceiver) {
179198
tracing::info!("Starting subscriptions worker");
180199
loop {
181200
futures::select_biased! {
@@ -259,6 +278,7 @@ impl SubscriptionManager {
259278
mut token: Token,
260279
sender: SubscriptionSender,
261280
) -> anyhow::Result<SubscriberId> {
281+
metrics::log_subscription_queue_lag(self.log.max_ts().secs_since_f64(token.ts()));
262282
// The client may not have fully refreshed their token past our
263283
// processed timestamp, so finish the job for them if needed.
264284
//
@@ -713,7 +733,10 @@ mod tests {
713733
};
714734

715735
use crate::{
716-
subscription::SubscriptionManager,
736+
subscription::{
737+
CountingReceiver,
738+
SubscriptionManager,
739+
},
717740
ReadSet,
718741
Token,
719742
};
@@ -1133,8 +1156,8 @@ mod tests {
11331156
to_notify
11341157
}
11351158

1136-
fn disconnected_rx<T>() -> mpsc::Receiver<T> {
1137-
mpsc::channel(1).1
1159+
fn disconnected_rx() -> CountingReceiver {
1160+
CountingReceiver(mpsc::channel(1).1)
11381161
}
11391162

11401163
#[tokio::test]

crates/database/src/write_log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ impl LogOwner {
383383

384384
pub fn max_ts(&self) -> Timestamp {
385385
let snapshot = { self.inner.lock().log.clone() };
386-
block_in_place(|| snapshot.max_ts())
386+
snapshot.max_ts()
387387
}
388388

389389
pub fn refresh_token(

0 commit comments

Comments
 (0)