Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions rust_snuba/src/strategies/accepted_outcomes/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::time::{Duration, Instant};

use prost::Message as ProstMessage;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::counter;
use sentry_arroyo::processing::strategies::{
merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy,
StrategyError, SubmitError,
Expand Down Expand Up @@ -101,19 +102,21 @@ impl<TNext> OutcomesAggregator<TNext> {
.map(|(partition, offset)| (*partition, offset + 1))
.collect();

let category_metrics = batch.category_metrics.clone();
let message = Message::new_any_message(batch, committable);

match self.next_step.submit(message) {
Ok(()) => {
let now = Instant::now();
let seconds = (now - self.last_flush).as_secs_f64();
tracing::info!(
"flushed {} buckets after {} seconds, with committable {:?}",
num_buckets,
seconds,
latest_offsets
);
self.last_flush = now;

tracing::info!("flushed {} buckets after {} seconds", num_buckets, seconds);
for (category, m) in category_metrics {
let cat_str = category.to_string();
counter!("accepted_outcomes.messages_seen", m.messages_seen, "data_category" => cat_str.as_str());
counter!("accepted_outcomes.total_quantity", m.total_quantity, "data_category" => cat_str.as_str());
counter!("accepted_outcomes.bucket_count", m.bucket_count, "data_category" => cat_str.as_str());
}
Ok(())
}
Err(SubmitError::MessageRejected(rejected)) => {
Expand All @@ -139,6 +142,7 @@ impl<TNext: ProcessingStrategy<AggregatedOutcomesBatch>> ProcessingStrategy<Kafk
Err(SubmitError::MessageRejected(MessageRejected {
message: carried_message,
})) => {
counter!("accepted_outcomes.got_backpressure", 1, "strategy_name" => "outcomes_aggregator");
self.message_carried_over = Some(carried_message);
}
Err(SubmitError::InvalidMessage(e)) => {
Expand Down
22 changes: 1 addition & 21 deletions rust_snuba/src/strategies/accepted_outcomes/produce_outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{
use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
};
use sentry_arroyo::timer;
use sentry_arroyo::types::{Message, Topic, TopicOrPartition};
use sentry_arroyo::{counter, timer};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

Expand Down Expand Up @@ -47,14 +46,9 @@ impl TaskRunner<AggregatedOutcomesBatch, AggregatedOutcomesBatch, anyhow::Error>

Box::pin(async move {
let produce_start = SystemTime::now();
let mut category_metrics: BTreeMap<u32, (u64, u64)> = BTreeMap::new();

let bucket_interval = message.payload().bucket_interval;
for (key, stats) in &message.payload().buckets {
let entry = category_metrics.entry(key.category).or_insert((0, 0));
entry.0 += 1;
entry.1 += stats.quantity;

let ts_secs = key.time_offset * bucket_interval;
let timestamp = if ts_secs == 0 {
Utc::now()
Expand Down Expand Up @@ -94,20 +88,6 @@ impl TaskRunner<AggregatedOutcomesBatch, AggregatedOutcomesBatch, anyhow::Error>
if let Ok(elapsed) = produce_finish.duration_since(produce_start) {
timer!("accepted_outcomes.batch_produce_ms", elapsed);
}

for (category, (bucket_count, total_quantity)) in &category_metrics {
counter!(
"accepted_outcomes.bucket_count",
*bucket_count as i64,
"data_category" => category.to_string()
);
counter!(
"accepted_outcomes.total_quantity",
*total_quantity as i64,
"data_category" => category.to_string()
);
}

Ok(message)
})
}
Expand Down
30 changes: 24 additions & 6 deletions rust_snuba/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,20 +616,31 @@ impl BucketStats {
}
}

/// Per-category metrics accumulated within a batch
#[derive(Clone, Debug, Default)]
pub struct CategoryMetrics {
pub messages_seen: u64,
pub total_quantity: u64,
pub bucket_count: u64,
}

/// Batch type for aggregated outcomes data
/// Stores bucketed counts instead of raw row data
#[derive(Clone, Debug)]
pub struct AggregatedOutcomesBatch {
/// Map from bucket key to aggregated statistics
pub buckets: HashMap<BucketKey, BucketStats>,
pub bucket_interval: u64,
/// Per-category metrics for the current batch
pub category_metrics: BTreeMap<u32, CategoryMetrics>,
}

impl Default for AggregatedOutcomesBatch {
fn default() -> Self {
Self {
buckets: HashMap::new(),
bucket_interval: 60,
category_metrics: BTreeMap::new(),
}
}
}
Expand All @@ -643,14 +654,21 @@ impl AggregatedOutcomesBatch {
}
}

/// Add or update a bucket with a count and quantity
/// Add or update a bucket with a count and quantity, updating per-category metrics
pub fn add_to_bucket(&mut self, key: BucketKey, quantity: u64) {
self.buckets
let is_new = !self.buckets.contains_key(&key);
let stats = self
.buckets
.entry(key)
.and_modify(|stats| {
stats.quantity += quantity;
})
.or_insert_with(|| BucketStats::new(quantity));
.or_insert_with(|| BucketStats::new(0));
stats.quantity += quantity;

let m = self.category_metrics.entry(key.category).or_default();
m.messages_seen += 1;
m.total_quantity += quantity;
if is_new {
m.bucket_count += 1;
}
}

/// Get the total number of buckets
Expand Down
Loading