Skip to content

Commit 81ef015

Browse files
authored
ref(eap-outcomes): add category_metrics (#7847)
Moves the following metrics to be `CategoryMetrics` on the batch itself: * `accepted_outcomes.bucket_count` (exists) * `accepted_outcomes.total_quantity`. (exists) * `accepted_outcomes.messages_seen` (_new_) Also adds `accepted_outcomes.got_backpressure` metric since right now we don't have any around backpressure right now
1 parent e25b8b3 commit 81ef015

File tree

3 files changed

+36
-34
lines changed

3 files changed

+36
-34
lines changed

rust_snuba/src/strategies/accepted_outcomes/aggregator.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::{Duration, Instant};
33

44
use prost::Message as ProstMessage;
55
use sentry_arroyo::backends::kafka::types::KafkaPayload;
6+
use sentry_arroyo::counter;
67
use sentry_arroyo::processing::strategies::{
78
merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy,
89
StrategyError, SubmitError,
@@ -101,19 +102,21 @@ impl<TNext> OutcomesAggregator<TNext> {
101102
.map(|(partition, offset)| (*partition, offset + 1))
102103
.collect();
103104

105+
let category_metrics = batch.category_metrics.clone();
104106
let message = Message::new_any_message(batch, committable);
105-
106107
match self.next_step.submit(message) {
107108
Ok(()) => {
108109
let now = Instant::now();
109110
let seconds = (now - self.last_flush).as_secs_f64();
110-
tracing::info!(
111-
"flushed {} buckets after {} seconds, with committable {:?}",
112-
num_buckets,
113-
seconds,
114-
latest_offsets
115-
);
116111
self.last_flush = now;
112+
113+
tracing::info!("flushed {} buckets after {} seconds", num_buckets, seconds);
114+
for (category, m) in category_metrics {
115+
let cat_str = category.to_string();
116+
counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
117+
counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
118+
counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
119+
}
117120
Ok(())
118121
}
119122
Err(SubmitError::MessageRejected(rejected)) => {
@@ -139,6 +142,7 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
139142
Err(SubmitError::MessageRejected(MessageRejected {
140143
message: carried_message,
141144
})) => {
145+
counter!("accepted_outcomes.got_backpressure", 1, "strategy_name" => "outcomes_aggregator");
142146
self.message_carried_over = Some(carried_message);
143147
}
144148
Err(SubmitError::InvalidMessage(e)) => {

rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{
88
use sentry_arroyo::processing::strategies::{
99
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
1010
};
11+
use sentry_arroyo::timer;
1112
use sentry_arroyo::types::{Message, Topic, TopicOrPartition};
12-
use sentry_arroyo::{counter, timer};
13-
use std::collections::BTreeMap;
1413
use std::sync::Arc;
1514
use std::time::{Duration, SystemTime};
1615

@@ -47,14 +46,9 @@ impl TaskRunner<AggregatedOutcomesBatch, AggregatedOutcomesBatch, anyhow::Error>
4746

4847
Box::pin(async move {
4948
let produce_start = SystemTime::now();
50-
let mut category_metrics: BTreeMap<u32, (u64, u64)> = BTreeMap::new();
5149

5250
let bucket_interval = message.payload().bucket_interval;
5351
for (key, stats) in &message.payload().buckets {
54-
let entry = category_metrics.entry(key.category).or_insert((0, 0));
55-
entry.0 += 1;
56-
entry.1 += stats.quantity;
57-
5852
let ts_secs = key.time_offset * bucket_interval;
5953
let timestamp = if ts_secs == 0 {
6054
Utc::now()
@@ -94,20 +88,6 @@ impl TaskRunner<AggregatedOutcomesBatch, AggregatedOutcomesBatch, anyhow::Error>
9488
if let Ok(elapsed) = produce_finish.duration_since(produce_start) {
9589
timer!("accepted_outcomes.batch_produce_ms", elapsed);
9690
}
97-
98-
for (category, (bucket_count, total_quantity)) in &category_metrics {
99-
counter!(
100-
"accepted_outcomes.bucket_count",
101-
*bucket_count as i64,
102-
"data_category" => category.to_string()
103-
);
104-
counter!(
105-
"accepted_outcomes.total_quantity",
106-
*total_quantity as i64,
107-
"data_category" => category.to_string()
108-
);
109-
}
110-
11191
Ok(message)
11292
})
11393
}

rust_snuba/src/types.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -616,20 +616,31 @@ impl BucketStats {
616616
}
617617
}
618618

619+
/// Per-category metrics accumulated within a batch
620+
#[derive(Clone, Debug, Default)]
621+
pub struct CategoryMetrics {
622+
pub messages_seen: u64,
623+
pub total_quantity: u64,
624+
pub bucket_count: u64,
625+
}
626+
619627
/// Batch type for aggregated outcomes data
620628
/// Stores bucketed counts instead of raw row data
621629
#[derive(Clone, Debug)]
622630
pub struct AggregatedOutcomesBatch {
623631
/// Map from bucket key to aggregated statistics
624632
pub buckets: HashMap<BucketKey, BucketStats>,
625633
pub bucket_interval: u64,
634+
/// Per-category metrics for the current batch
635+
pub category_metrics: BTreeMap<u32, CategoryMetrics>,
626636
}
627637

628638
impl Default for AggregatedOutcomesBatch {
629639
fn default() -> Self {
630640
Self {
631641
buckets: HashMap::new(),
632642
bucket_interval: 60,
643+
category_metrics: BTreeMap::new(),
633644
}
634645
}
635646
}
@@ -643,14 +654,21 @@ impl AggregatedOutcomesBatch {
643654
}
644655
}
645656

646-
/// Add or update a bucket with a count and quantity
657+
/// Add or update a bucket with a count and quantity, updating per-category metrics
647658
pub fn add_to_bucket(&mut self, key: BucketKey, quantity: u64) {
648-
self.buckets
659+
let is_new = !self.buckets.contains_key(&key);
660+
let stats = self
661+
.buckets
649662
.entry(key)
650-
.and_modify(|stats| {
651-
stats.quantity += quantity;
652-
})
653-
.or_insert_with(|| BucketStats::new(quantity));
663+
.or_insert_with(|| BucketStats::new(0));
664+
stats.quantity += quantity;
665+
666+
let m = self.category_metrics.entry(key.category).or_default();
667+
m.messages_seen += 1;
668+
m.total_quantity += quantity;
669+
if is_new {
670+
m.bucket_count += 1;
671+
}
654672
}
655673

656674
/// Get the total number of buckets

0 commit comments

Comments
 (0)