diff --git a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs index eb21684fd7..38daa5c9ff 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/aggregator.rs @@ -3,6 +3,7 @@ use std::time::{Duration, Instant}; use prost::Message as ProstMessage; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::counter; use sentry_arroyo::processing::strategies::{ merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy, StrategyError, SubmitError, @@ -101,19 +102,21 @@ impl OutcomesAggregator { .map(|(partition, offset)| (*partition, offset + 1)) .collect(); + let category_metrics = batch.category_metrics.clone(); let message = Message::new_any_message(batch, committable); - match self.next_step.submit(message) { Ok(()) => { let now = Instant::now(); let seconds = (now - self.last_flush).as_secs_f64(); - tracing::info!( - "flushed {} buckets after {} seconds, with committable {:?}", - num_buckets, - seconds, - latest_offsets - ); self.last_flush = now; + + tracing::info!("flushed {} buckets after {} seconds", num_buckets, seconds); + for (category, m) in category_metrics { + let cat_str = category.to_string(); + counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str()); + counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str()); + counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str()); + } Ok(()) } Err(SubmitError::MessageRejected(rejected)) => { @@ -139,6 +142,7 @@ impl> ProcessingStrategy { + counter!("accepted_outcomes.got_backpressure", 1, "strategy_name" => "outcomes_aggregator"); self.message_carried_over = Some(carried_message); } Err(SubmitError::InvalidMessage(e)) => { diff --git a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs index 6c90ad7ebc..96964cce6a 100644 --- a/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs +++ b/rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs @@ -8,9 +8,8 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{ use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; +use sentry_arroyo::timer; use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; -use sentry_arroyo::{counter, timer}; -use std::collections::BTreeMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -47,14 +46,9 @@ impl TaskRunner Box::pin(async move { let produce_start = SystemTime::now(); - let mut category_metrics: BTreeMap = BTreeMap::new(); let bucket_interval = message.payload().bucket_interval; for (key, stats) in &message.payload().buckets { - let entry = category_metrics.entry(key.category).or_insert((0, 0)); - entry.0 += 1; - entry.1 += stats.quantity; - let ts_secs = key.time_offset * bucket_interval; let timestamp = if ts_secs == 0 { Utc::now() @@ -94,20 +88,6 @@ impl TaskRunner if let Ok(elapsed) = produce_finish.duration_since(produce_start) { timer!("accepted_outcomes.batch_produce_ms", elapsed); } - - for (category, (bucket_count, total_quantity)) in &category_metrics { - counter!( - "accepted_outcomes.bucket_count", - *bucket_count as i64, - "data_category" => category.to_string() - ); - counter!( - "accepted_outcomes.total_quantity", - *total_quantity as i64, - "data_category" => category.to_string() - ); - } - Ok(message) }) } diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index f575f25ff4..105cf4a58e 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -616,6 +616,14 @@ impl BucketStats { } } +/// Per-category metrics accumulated within a batch +#[derive(Clone, Debug, Default)] +pub struct CategoryMetrics { + pub messages_seen: u64, + pub total_quantity: u64, + pub bucket_count: u64, +} + /// Batch type for aggregated outcomes data /// Stores bucketed counts instead of raw row data #[derive(Clone, Debug)] @@ -623,6 +631,8 @@ pub struct AggregatedOutcomesBatch { /// Map from bucket key to aggregated statistics pub buckets: HashMap, pub bucket_interval: u64, + /// Per-category metrics for the current batch + pub category_metrics: BTreeMap, } impl Default for AggregatedOutcomesBatch { @@ -630,6 +640,7 @@ impl Default for AggregatedOutcomesBatch { Self { buckets: HashMap::new(), bucket_interval: 60, + category_metrics: BTreeMap::new(), } } } @@ -643,14 +654,21 @@ impl AggregatedOutcomesBatch { } } - /// Add or update a bucket with a count and quantity + /// Add or update a bucket with a count and quantity, updating per-category metrics pub fn add_to_bucket(&mut self, key: BucketKey, quantity: u64) { - self.buckets + let is_new = !self.buckets.contains_key(&key); + let stats = self + .buckets .entry(key) - .and_modify(|stats| { - stats.quantity += quantity; - }) - .or_insert_with(|| BucketStats::new(quantity)); + .or_insert_with(|| BucketStats::new(0)); + stats.quantity += quantity; + + let m = self.category_metrics.entry(key.category).or_default(); + m.messages_seen += 1; + m.total_quantity += quantity; + if is_new { + m.bucket_count += 1; + } } /// Get the total number of buckets