diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 0221724cdb9..ce2100ff4a4 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -13,7 +13,7 @@ use sentry_protos::snuba::v1::TraceItem; use crate::config::ProcessorConfig; use crate::processors::utils::enforce_retention; -use crate::types::{InsertBatch, KafkaMessageMetadata}; +use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; pub fn process_message( msg: KafkaPayload, @@ -32,6 +32,10 @@ pub fn process_message( } else { retention_days }; + + // Capture the item_type before consuming trace_item + let item_type = trace_item.item_type as u8; + let mut eap_item = EAPItem::try_from(trace_item)?; eap_item.retention_days = retention_days; @@ -41,7 +45,12 @@ pub fn process_message( Utc::now().timestamp_millis(), ); - InsertBatch::from_rows([eap_item], origin_timestamp) + let mut item_type_metrics = ItemTypeMetrics::new(); + item_type_metrics.record_item(item_type); + + let mut batch = InsertBatch::from_rows([eap_item], origin_timestamp)?; + batch.item_type_metrics = Some(item_type_metrics); + Ok(batch) } #[derive(Debug, Default, Serialize)] @@ -295,4 +304,86 @@ mod tests { assert_eq!(item.downsampled_retention_days, 365); } + + #[test] + fn test_item_type_metrics_accumulated() { + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + + // Set the item type to Span (value 1) + trace_item.item_type = TraceItemType::Span.into(); + + let mut payload = Vec::new(); + trace_item.encode(&mut payload).unwrap(); + + let payload = KafkaPayload::new(None, None, Some(payload)); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + let batch = process_message(payload, meta, &ProcessorConfig::default()) + .expect("The message should be processed"); + + // Verify that item_type_metrics is populated + assert!(batch.item_type_metrics.is_some()); + + let metrics = batch.item_type_metrics.unwrap(); + + // Verify that the item_type (Span = 1) has a count of 1 + assert_eq!(metrics.counts.len(), 1); + assert_eq!(metrics.counts.get(&(TraceItemType::Span as u8)), Some(&1)); + } + + #[test] + fn test_item_type_metrics_different_types() { + // Process first message with item type 1 (Span) + let item_id_1 = Uuid::new_v4(); + let mut trace_item_1 = generate_trace_item(item_id_1); + trace_item_1.item_type = TraceItemType::Span.into(); + + let mut payload_1 = Vec::new(); + trace_item_1.encode(&mut payload_1).unwrap(); + + let payload_1 = KafkaPayload::new(None, None, Some(payload_1)); + let meta_1 = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + let batch_1 = process_message(payload_1, meta_1, &ProcessorConfig::default()) + .expect("The message should be processed"); + + // Process second message with item type 2 (Transaction) + let item_id_2 = Uuid::new_v4(); + let mut trace_item_2 = generate_trace_item(item_id_2); + trace_item_2.item_type = 2; // Transaction type + + let mut payload_2 = Vec::new(); + trace_item_2.encode(&mut payload_2).unwrap(); + + let payload_2 = KafkaPayload::new(None, None, Some(payload_2)); + let meta_2 = KafkaMessageMetadata { + partition: 0, + offset: 2, + timestamp: DateTime::from(SystemTime::now()), + }; + + let batch_2 = process_message(payload_2, meta_2, &ProcessorConfig::default()) + .expect("The message should be processed"); + + // Merge the metrics as would happen in the pipeline + let mut merged_metrics = batch_1.item_type_metrics.unwrap(); + merged_metrics.merge(batch_2.item_type_metrics.unwrap()); + + // Verify that both item types are present with count 1 each + assert_eq!(merged_metrics.counts.len(), 2); + assert_eq!( + merged_metrics.counts.get(&(TraceItemType::Span as u8)), + Some(&1) + ); + assert_eq!(merged_metrics.counts.get(&2u8), Some(&1)); + } } diff --git a/rust_snuba/src/processors/errors.rs b/rust_snuba/src/processors/errors.rs index 31dd4f4ed90..a6a8badfe77 100644 --- a/rust_snuba/src/processors/errors.rs +++ b/rust_snuba/src/processors/errors.rs @@ -72,6 +72,7 @@ pub fn process_message_with_replacement( rows: RowData::from_rows([row])?, sentry_received_timestamp: None, cogs_data: None, + item_type_metrics: None, })) } ("insert", None, _) => { diff --git a/rust_snuba/src/processors/generic_metrics.rs b/rust_snuba/src/processors/generic_metrics.rs index bc505e290bc..4299e6392db 100644 --- a/rust_snuba/src/processors/generic_metrics.rs +++ b/rust_snuba/src/processors/generic_metrics.rs @@ -391,6 +391,7 @@ where payload_bytes.len() as u64, )]), }), + item_type_metrics: None, }) } else { Ok(InsertBatch::skip()) @@ -738,7 +739,7 @@ mod tests { use crate::processors::ProcessingFunction; use super::*; - use chrono::DateTime; + use chrono::{DateTime, Utc}; use std::time::SystemTime; const DUMMY_COUNTER_MESSAGE: &str = r#"{ @@ -987,6 +988,23 @@ mod tests { "sampling_weight": 100 }"#; + /// Helper function for tests to create expected InsertBatch. + /// Since generic_metrics never populates item_type_metrics, this helper + /// always sets it to None. + fn expected_insert_batch( + row: T, + sentry_received_timestamp: DateTime, + cogs_data: CogsData, + ) -> InsertBatch { + InsertBatch { + rows: RowData::from_rows([row]).unwrap(), + origin_timestamp: None, + sentry_received_timestamp: Some(sentry_received_timestamp), + cogs_data: Some(cogs_data), + item_type_metrics: None, + } + } + #[test] fn test_base64_decode_f64() { assert!( @@ -1061,14 +1079,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 675)]) - }) - } + } + ) ); } @@ -1120,14 +1137,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 681)]) - }) - } + } + ) ); } @@ -1179,14 +1195,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 713)]) - }) - } + } + ) ); } @@ -1271,14 +1286,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 653)]) - }) - } + } + ) ); } @@ -1328,14 +1342,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 663)]) - }) - } + } + ) ); } @@ -1487,14 +1500,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 615)]) - }), - } + } + ) ); } @@ -1544,14 +1556,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 649)]) - }), - } + } + ) ); } @@ -1616,14 +1627,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 622)]) - }), - } + } + ) ); } @@ -1690,14 +1700,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 629)]) - }) - } + } + ) ); } @@ -1749,14 +1758,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 658)]) - }) - } + } + ) ); } @@ -1808,14 +1816,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 667)]) - }) - } + } + ) ); } @@ -1867,14 +1874,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 682)]) - }) - } + } + ) ); } @@ -1943,14 +1949,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 679)]) - }) - } + } + ) ); } @@ -2004,14 +2009,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 711)]) - }) - } + } + ) ); } @@ -2066,14 +2070,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 719)]) - }) - } + } + ) ); } @@ -2138,14 +2141,13 @@ mod tests { }; assert_eq!( result.unwrap(), - InsertBatch { - rows: RowData::from_rows([expected_row]).unwrap(), - origin_timestamp: None, - sentry_received_timestamp: DateTime::from_timestamp(1704614940, 0), - cogs_data: Some(CogsData { + expected_insert_batch( + expected_row, + DateTime::from_timestamp(1704614940, 0).unwrap(), + CogsData { data: BTreeMap::from([("genericmetrics_spans".to_string(), 651)]) - }) - } + } + ) ); } diff --git a/rust_snuba/src/processors/release_health_metrics.rs b/rust_snuba/src/processors/release_health_metrics.rs index f0440c91d03..eef2146d75c 100644 --- a/rust_snuba/src/processors/release_health_metrics.rs +++ b/rust_snuba/src/processors/release_health_metrics.rs @@ -174,6 +174,7 @@ pub fn process_metrics_message( origin_timestamp: None, sentry_received_timestamp, cogs_data: None, + item_type_metrics: None, }) } else { Ok(InsertBatch::skip()) diff --git a/rust_snuba/src/strategies/commit_log.rs b/rust_snuba/src/strategies/commit_log.rs index 6b55e33d747..cd6562e451c 100644 --- a/rust_snuba/src/strategies/commit_log.rs +++ b/rust_snuba/src/strategies/commit_log.rs @@ -193,7 +193,7 @@ where #[cfg(test)] mod tests { - use crate::types::{CogsData, CommitLogEntry, CommitLogOffsets}; + use crate::types::{CommitLogEntry, CommitLogOffsets}; use super::*; use crate::testutils::TestStrategy; diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index 83ecd1f79a3..e52dcad7ee5 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -67,6 +67,9 @@ pub fn make_rust_processor( if let Some(ts) = transformed.sentry_received_timestamp { payload = payload.with_sentry_received_timestamp(ts); } + if let Some(metrics) = transformed.item_type_metrics { + payload = payload.with_item_type_metrics(metrics); + } Ok(Message::new_broker_message( payload, partition, offset, timestamp, @@ -141,6 +144,9 @@ pub fn make_rust_processor_with_replacements( if let Some(ts) = transformed.sentry_received_timestamp { batch = batch.with_sentry_received_timestamp(ts); } + if let Some(metrics) = transformed.item_type_metrics { + batch = batch.with_item_type_metrics(metrics); + } InsertOrReplacement::Insert(batch) } diff --git a/rust_snuba/src/strategies/replacements.rs b/rust_snuba/src/strategies/replacements.rs index b687e08b9f3..6bba5d1cea6 100644 --- a/rust_snuba/src/strategies/replacements.rs +++ b/rust_snuba/src/strategies/replacements.rs @@ -126,7 +126,7 @@ impl ProcessingStrategy>> for Prod mod tests { use super::*; use crate::testutils::{MockProducer, TestStrategy}; - use crate::types::{CogsData, CommitLogOffsets, RowData}; + use crate::types::RowData; use crate::types::{InsertOrReplacement, ReplacementData}; use chrono::Utc; use std::collections::BTreeMap; diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index bacd6738d15..5b2cd67e419 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -47,6 +47,35 @@ impl CogsData { } } +#[derive(Clone, Debug, PartialEq, Default)] +pub struct ItemTypeMetrics { + pub counts: BTreeMap, // item_type: count +} + +impl ItemTypeMetrics { + pub fn new() -> Self { + Self { + counts: BTreeMap::new(), + } + } + + pub fn record_item(&mut self, item_type: u8) { + self.counts + .entry(item_type) + .and_modify(|count| *count += 1) + .or_insert(1); + } + + pub fn merge(&mut self, other: ItemTypeMetrics) { + for (item_type, count) in other.counts { + self.counts + .entry(item_type) + .and_modify(|curr| *curr += count) + .or_insert(count); + } + } +} + #[derive(Debug, Clone)] struct LatencyRecorder { sum_timestamps: f64, @@ -144,6 +173,7 @@ pub struct InsertBatch { pub origin_timestamp: Option>, pub sentry_received_timestamp: Option>, pub cogs_data: Option, + pub item_type_metrics: Option, } impl InsertBatch { @@ -160,6 +190,7 @@ impl InsertBatch { origin_timestamp, sentry_received_timestamp: None, cogs_data: None, + item_type_metrics: None, }) } @@ -197,6 +228,8 @@ pub struct BytesInsertBatch { commit_log_offsets: CommitLogOffsets, cogs_data: CogsData, + + item_type_metrics: ItemTypeMetrics, } impl BytesInsertBatch { @@ -223,6 +256,7 @@ impl BytesInsertBatch { .unwrap_or_default(), commit_log_offsets, cogs_data, + item_type_metrics: Default::default(), } } @@ -243,6 +277,7 @@ impl BytesInsertBatch { sentry_received_timestamp: Default::default(), commit_log_offsets: Default::default(), cogs_data: Default::default(), + item_type_metrics: Default::default(), } } @@ -276,6 +311,12 @@ impl BytesInsertBatch { self } + /// Set the item type metrics + pub fn with_item_type_metrics(mut self, item_type_metrics: ItemTypeMetrics) -> Self { + self.item_type_metrics = item_type_metrics; + self + } + pub fn commit_log_offsets(&self) -> &CommitLogOffsets { &self.commit_log_offsets } @@ -292,6 +333,7 @@ impl BytesInsertBatch { sentry_received_timestamp: self.sentry_received_timestamp.clone(), commit_log_offsets: self.commit_log_offsets.clone(), cogs_data: self.cogs_data.clone(), + item_type_metrics: self.item_type_metrics.clone(), } } @@ -303,6 +345,7 @@ impl BytesInsertBatch { sentry_received_timestamp: self.sentry_received_timestamp, commit_log_offsets: self.commit_log_offsets, cogs_data: self.cogs_data, + item_type_metrics: self.item_type_metrics, }; (self.rows, new) @@ -333,6 +376,7 @@ impl BytesInsertBatch { self.sentry_received_timestamp .merge(other.sentry_received_timestamp); self.cogs_data.merge(other.cogs_data); + self.item_type_metrics.merge(other.item_type_metrics); self } }