Skip to content

Commit d37753e

Browse files
thomasballingerConvex, Inc.
authored andcommitted
Log more metrics for especially slow subscription processing (#39192)
GitOrigin-RevId: a1b61ebb12dfa6ffe0736aff5049f682fa3d305d
1 parent f0e98e7 commit d37753e

File tree

5 files changed

+74
-1
lines changed

5 files changed

+74
-1
lines changed

crates/common/src/knobs.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,3 +1358,13 @@ pub static SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD: LazyLock<usize> =
13581358
/// invalidated to determine the delay before invalidating them.
13591359
pub static SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER: LazyLock<u64> =
13601360
LazyLock::new(|| env_config("SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER", 5));
1361+
1362+
/// When processing a single write log entry takes longer than that time, log
1363+
/// extra detail.
1364+
pub static SUBSCRIPTION_PROCESS_LOG_ENTRY_TRACING_THRESHOLD: LazyLock<u64> =
1365+
LazyLock::new(|| env_config("SUBSCRIPTION_PROCESS_LOG_ENTRY_TRACING_THRESHOLD", 2));
1366+
1367+
/// When advancing the write log takes longer than this amount, log extra
1368+
/// details.
1369+
pub static SUBSCRIPTION_ADVANCE_LOG_TRACING_THRESHOLD: LazyLock<u64> =
1370+
LazyLock::new(|| env_config("SUBSCRIPTION_ADVANCE_LOG_TRACING_THRESHOLD", 10));

crates/database/src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,14 @@ pub fn subscriptions_log_iterate_timer() -> Timer<VMHistogram> {
10821082
Timer::new(&SUBSCRIPTION_LOG_ITERATE_SECONDS)
10831083
}
10841084

1085+
register_convex_histogram!(
1086+
SUBSCRIPTION_PROCESS_WRITE_LOG_ENTRY_SECONDS,
1087+
"Time to process one write log entry when advancing subscriptions",
1088+
);
1089+
pub fn subscription_process_write_log_entry_timer() -> Timer<VMHistogram> {
1090+
Timer::new(&SUBSCRIPTION_PROCESS_WRITE_LOG_ENTRY_SECONDS)
1091+
}
1092+
10851093
register_convex_histogram!(
10861094
SUBSCRIPTION_LOG_INVALIDATE_SECONDS,
10871095
"Time to invalidate segsstiptions when edvancing rh_ log",

crates/database/src/subscription.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,18 @@ use common::{
2727
errors::report_error,
2828
knobs::{
2929
SUBSCRIPTIONS_WORKER_QUEUE_SIZE,
30+
SUBSCRIPTION_ADVANCE_LOG_TRACING_THRESHOLD,
3031
SUBSCRIPTION_INVALIDATION_DELAY_MULTIPLIER,
3132
SUBSCRIPTION_INVALIDATION_DELAY_THRESHOLD,
33+
SUBSCRIPTION_PROCESS_LOG_ENTRY_TRACING_THRESHOLD,
3234
},
3335
runtime::{
3436
block_in_place,
3537
Runtime,
3638
SpawnHandle,
3739
},
3840
types::{
41+
GenericIndexName,
3942
PersistenceVersion,
4043
SubscriberId,
4144
TabletIndexName,
@@ -328,9 +331,12 @@ impl SubscriptionManager {
328331
let mut log_len = 0;
329332
let mut num_writes = 0;
330333
self.log.for_each(from_ts, next_ts, |_, writes| {
334+
let process_log_timer = metrics::subscription_process_write_log_entry_timer();
331335
log_len += 1;
332336
num_writes += writes.len();
333-
for (_, document_change) in writes {
337+
let mut tablet_ids = BTreeSet::new();
338+
for (resolved_id, document_change) in writes {
339+
tablet_ids.insert(resolved_id.tablet_id);
334340
// We're applying a mutation to the document so if it already exists
335341
// we need to remove it before writing the new version.
336342
if let Some(ref old_document) = document_change.old_document {
@@ -352,9 +358,46 @@ impl SubscriptionManager {
352358
);
353359
}
354360
}
361+
362+
if process_log_timer.elapsed()
363+
> Duration::from_secs(*SUBSCRIPTION_PROCESS_LOG_ENTRY_TRACING_THRESHOLD)
364+
{
365+
tracing::info!(
366+
"[{next_ts}: advance_log] simple commit took {:?}, affected tables: \
367+
{tablet_ids:?}",
368+
process_log_timer.elapsed()
369+
);
370+
}
355371
})?;
356372
metrics::log_subscriptions_log_length(log_len);
357373
metrics::log_subscriptions_log_writes(num_writes);
374+
if _timer.elapsed()
375+
> Duration::from_secs(*SUBSCRIPTION_ADVANCE_LOG_TRACING_THRESHOLD)
376+
{
377+
let subscribers_by_index: BTreeMap<&GenericIndexName<_>, usize> = self
378+
.subscriptions
379+
.indexed
380+
.iter()
381+
.map(|(key, (_fields, range_map))| (key, range_map.len()))
382+
.collect();
383+
let total_subscribers: usize = subscribers_by_index.values().sum();
384+
let search_len = self.subscriptions.search.filter_len();
385+
let fuzzy_len = self.subscriptions.search.fuzzy_len();
386+
tracing::info!(
387+
"[{next_ts} advance_log] Duration {}ms, indexes: {}, search filters: {}, \
388+
fuzzy search: {}",
389+
_timer.elapsed().as_millis(),
390+
self.subscriptions.indexed.len(),
391+
search_len,
392+
fuzzy_len
393+
);
394+
tracing::info!(
395+
"`[{next_ts} advance_log] Subscription map size: {total_subscribers}"
396+
);
397+
tracing::info!(
398+
"[{next_ts} advance_log] Subscribers by index {subscribers_by_index:?}"
399+
);
400+
}
358401
}
359402

360403
{

crates/indexing/src/interval.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ impl<ID: Clone + Ord> IntervalMap<ID> {
2727
self.sets.is_empty()
2828
}
2929

30+
pub fn len(&self) -> usize {
31+
self.sets.len()
32+
}
33+
3034
/// Insert the IntervalSet for the given ID.
3135
pub fn insert(&mut self, id: ID, set: IntervalSet) -> Option<IntervalSet> {
3236
self.sets.insert(id, set)

crates/search/src/query.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,14 @@ impl TextSearchSubscriptions {
969969
}
970970
}
971971

972+
pub fn filter_len(&self) -> usize {
973+
self.filter_conditions.values().map(|m| m.len()).sum()
974+
}
975+
976+
pub fn fuzzy_len(&self) -> usize {
977+
self.fuzzy_searches.len()
978+
}
979+
972980
pub fn insert(&mut self, id: SubscriberId, index: &TabletIndexName, reads: &QueryReads) {
973981
self.filter_conditions
974982
.entry(index.clone())

0 commit comments

Comments
 (0)