Skip to content

Commit 93973ff

Browse files
authored
feat(eap): record item type counts in batches (#7481)
Emit how many events by type are being processed by the EAP consumer. This is being split into 3 changes: - change how we build BytesInsertBatch to support a builder pattern (so it's more simple to omit dataset-specific fields): #7479 (merged) - (this change) start keeping track of how many items by type are being processed in the EAPItems processor - emit the actual metric at the end of a batch write
1 parent 012bc69 commit 93973ff

File tree

6 files changed

+260
-115
lines changed

6 files changed

+260
-115
lines changed

rust_snuba/src/processors/eap_items.rs

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use sentry_protos::snuba::v1::{ArrayValue, TraceItem};
1313

1414
use crate::config::ProcessorConfig;
1515
use crate::processors::utils::enforce_retention;
16-
use crate::types::{InsertBatch, KafkaMessageMetadata};
16+
use crate::types::{InsertBatch, ItemTypeMetrics, KafkaMessageMetadata};
1717

1818
use crate::runtime_config::get_str_config;
1919

@@ -36,6 +36,10 @@ pub fn process_message(
3636
} else {
3737
retention_days
3838
};
39+
40+
// Capture the item_type before consuming trace_item
41+
let item_type = trace_item.item_type as u8;
42+
3943
let mut eap_item = EAPItem::try_from(trace_item)?;
4044

4145
eap_item.retention_days = retention_days;
@@ -45,7 +49,12 @@ pub fn process_message(
4549
Utc::now().timestamp_millis(),
4650
);
4751

48-
InsertBatch::from_rows([eap_item], origin_timestamp)
52+
let mut item_type_metrics = ItemTypeMetrics::new();
53+
item_type_metrics.record_item(item_type);
54+
55+
let mut batch = InsertBatch::from_rows([eap_item], origin_timestamp)?;
56+
batch.item_type_metrics = Some(item_type_metrics);
57+
Ok(batch)
4958
}
5059

5160
#[derive(Debug, Default, Serialize)]
@@ -346,6 +355,88 @@ mod tests {
346355
assert_eq!(item.downsampled_retention_days, 365);
347356
}
348357

358+
#[test]
359+
fn test_item_type_metrics_accumulated() {
360+
let item_id = Uuid::new_v4();
361+
let mut trace_item = generate_trace_item(item_id);
362+
363+
// Set the item type to Span (value 1)
364+
trace_item.item_type = TraceItemType::Span.into();
365+
366+
let mut payload = Vec::new();
367+
trace_item.encode(&mut payload).unwrap();
368+
369+
let payload = KafkaPayload::new(None, None, Some(payload));
370+
let meta = KafkaMessageMetadata {
371+
partition: 0,
372+
offset: 1,
373+
timestamp: DateTime::from(SystemTime::now()),
374+
};
375+
376+
let batch = process_message(payload, meta, &ProcessorConfig::default())
377+
.expect("The message should be processed");
378+
379+
// Verify that item_type_metrics is populated
380+
assert!(batch.item_type_metrics.is_some());
381+
382+
let metrics = batch.item_type_metrics.unwrap();
383+
384+
// Verify that the item_type (Span = 1) has a count of 1
385+
assert_eq!(metrics.counts.len(), 1);
386+
assert_eq!(metrics.counts.get(&(TraceItemType::Span as u8)), Some(&1));
387+
}
388+
389+
#[test]
390+
fn test_item_type_metrics_different_types() {
391+
// Process first message with item type 1 (Span)
392+
let item_id_1 = Uuid::new_v4();
393+
let mut trace_item_1 = generate_trace_item(item_id_1);
394+
trace_item_1.item_type = TraceItemType::Span.into();
395+
396+
let mut payload_1 = Vec::new();
397+
trace_item_1.encode(&mut payload_1).unwrap();
398+
399+
let payload_1 = KafkaPayload::new(None, None, Some(payload_1));
400+
let meta_1 = KafkaMessageMetadata {
401+
partition: 0,
402+
offset: 1,
403+
timestamp: DateTime::from(SystemTime::now()),
404+
};
405+
406+
let batch_1 = process_message(payload_1, meta_1, &ProcessorConfig::default())
407+
.expect("The message should be processed");
408+
409+
// Process second message with item type 2 (Transaction)
410+
let item_id_2 = Uuid::new_v4();
411+
let mut trace_item_2 = generate_trace_item(item_id_2);
412+
trace_item_2.item_type = 2; // Transaction type
413+
414+
let mut payload_2 = Vec::new();
415+
trace_item_2.encode(&mut payload_2).unwrap();
416+
417+
let payload_2 = KafkaPayload::new(None, None, Some(payload_2));
418+
let meta_2 = KafkaMessageMetadata {
419+
partition: 0,
420+
offset: 2,
421+
timestamp: DateTime::from(SystemTime::now()),
422+
};
423+
424+
let batch_2 = process_message(payload_2, meta_2, &ProcessorConfig::default())
425+
.expect("The message should be processed");
426+
427+
// Merge the metrics as would happen in the pipeline
428+
let mut merged_metrics = batch_1.item_type_metrics.unwrap();
429+
merged_metrics.merge(batch_2.item_type_metrics.unwrap());
430+
431+
// Verify that both item types are present with count 1 each
432+
assert_eq!(merged_metrics.counts.len(), 2);
433+
assert_eq!(
434+
merged_metrics.counts.get(&(TraceItemType::Span as u8)),
435+
Some(&1)
436+
);
437+
assert_eq!(merged_metrics.counts.get(&2u8), Some(&1));
438+
}
439+
349440
#[test]
350441
fn test_insert_arrays() {
351442
let item_id = Uuid::new_v4();

rust_snuba/src/processors/errors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub fn process_message_with_replacement(
7272
rows: RowData::from_rows([row])?,
7373
sentry_received_timestamp: None,
7474
cogs_data: None,
75+
item_type_metrics: None,
7576
}))
7677
}
7778
("insert", None, _) => {

0 commit comments

Comments
 (0)