Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 93 additions & 2 deletions rust_snuba/src/processors/eap_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sentry_protos::snuba::v1::TraceItem;

use crate::config::ProcessorConfig;
use crate::processors::utils::enforce_retention;
use crate::types::{InsertBatch, KafkaMessageMetadata};
use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata};

pub fn process_message(
msg: KafkaPayload,
Expand All @@ -32,6 +32,10 @@ pub fn process_message(
} else {
retention_days
};

// Capture the item_type before consuming trace_item
let item_type = trace_item.item_type as u8;

let mut eap_item = EAPItem::try_from(trace_item)?;

eap_item.retention_days = retention_days;
Expand All @@ -41,7 +45,12 @@ pub fn process_message(
Utc::now().timestamp_millis(),
);

InsertBatch::from_rows([eap_item], origin_timestamp)
let mut item_type_metrics = ItemTypeMetrics::new();
item_type_metrics.record_item(item_type);

let mut batch = InsertBatch::from_rows([eap_item], origin_timestamp)?;
batch.item_type_metrics = Some(item_type_metrics);
Ok(batch)
}

#[derive(Debug, Default, Serialize)]
Expand Down Expand Up @@ -295,4 +304,86 @@ mod tests {

assert_eq!(item.downsampled_retention_days, 365);
}

#[test]
fn test_item_type_metrics_accumulated() {
let item_id = Uuid::new_v4();
let mut trace_item = generate_trace_item(item_id);

// Set the item type to Span (value 1)
trace_item.item_type = TraceItemType::Span.into();

let mut payload = Vec::new();
trace_item.encode(&mut payload).unwrap();

let payload = KafkaPayload::new(None, None, Some(payload));
let meta = KafkaMessageMetadata {
partition: 0,
offset: 1,
timestamp: DateTime::from(SystemTime::now()),
};

let batch = process_message(payload, meta, &ProcessorConfig::default())
.expect("The message should be processed");

// Verify that item_type_metrics is populated
assert!(batch.item_type_metrics.is_some());

let metrics = batch.item_type_metrics.unwrap();

// Verify that the item_type (Span = 1) has a count of 1
assert_eq!(metrics.counts.len(), 1);
assert_eq!(metrics.counts.get(&(TraceItemType::Span as u8)), Some(&1));
}

#[test]
fn test_item_type_metrics_different_types() {
// Process first message with item type 1 (Span)
let item_id_1 = Uuid::new_v4();
let mut trace_item_1 = generate_trace_item(item_id_1);
trace_item_1.item_type = TraceItemType::Span.into();

let mut payload_1 = Vec::new();
trace_item_1.encode(&mut payload_1).unwrap();

let payload_1 = KafkaPayload::new(None, None, Some(payload_1));
let meta_1 = KafkaMessageMetadata {
partition: 0,
offset: 1,
timestamp: DateTime::from(SystemTime::now()),
};

let batch_1 = process_message(payload_1, meta_1, &ProcessorConfig::default())
.expect("The message should be processed");

// Process second message with item type 2 (Transaction)
let item_id_2 = Uuid::new_v4();
let mut trace_item_2 = generate_trace_item(item_id_2);
trace_item_2.item_type = 2; // Transaction type

let mut payload_2 = Vec::new();
trace_item_2.encode(&mut payload_2).unwrap();

let payload_2 = KafkaPayload::new(None, None, Some(payload_2));
let meta_2 = KafkaMessageMetadata {
partition: 0,
offset: 2,
timestamp: DateTime::from(SystemTime::now()),
};

let batch_2 = process_message(payload_2, meta_2, &ProcessorConfig::default())
.expect("The message should be processed");

// Merge the metrics as would happen in the pipeline
let mut merged_metrics = batch_1.item_type_metrics.unwrap();
merged_metrics.merge(batch_2.item_type_metrics.unwrap());

// Verify that both item types are present with count 1 each
assert_eq!(merged_metrics.counts.len(), 2);
assert_eq!(
merged_metrics.counts.get(&(TraceItemType::Span as u8)),
Some(&1)
);
assert_eq!(merged_metrics.counts.get(&2u8), Some(&1));
}
}
1 change: 1 addition & 0 deletions rust_snuba/src/processors/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn process_message_with_replacement(
rows: RowData::from_rows([row])?,
sentry_received_timestamp: None,
cogs_data: None,
item_type_metrics: None,
}))
}
("insert", None, _) => {
Expand Down
Loading
Loading