diff --git a/rust_snuba/src/strategies/commit_log.rs b/rust_snuba/src/strategies/commit_log.rs index b21cea73f6b..cd6562e451c 100644 --- a/rust_snuba/src/strategies/commit_log.rs +++ b/rust_snuba/src/strategies/commit_log.rs @@ -193,7 +193,7 @@ where #[cfg(test)] mod tests { - use crate::types::{CogsData, CommitLogEntry, CommitLogOffsets}; + use crate::types::{CommitLogEntry, CommitLogOffsets}; use super::*; use crate::testutils::TestStrategy; @@ -280,27 +280,19 @@ mod tests { } let payloads = vec![ - BytesInsertBatch::new( - (), - Some(Utc::now()), - None, - None, - CommitLogOffsets(BTreeMap::from([( + BytesInsertBatch::from_rows(()) + .with_message_timestamp(Utc::now()) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( 0, CommitLogEntry { offset: 500, orig_message_ts: Utc::now(), received_p99: Vec::new(), }, - )])), - CogsData::default(), - ), - BytesInsertBatch::new( - (), - Some(Utc::now()), - None, - None, - CommitLogOffsets(BTreeMap::from([ + )]))), + BytesInsertBatch::from_rows(()) + .with_message_timestamp(Utc::now()) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([ ( 0, CommitLogEntry { @@ -317,9 +309,7 @@ mod tests { received_p99: Vec::new(), }, ), - ])), - CogsData::default(), - ), + ]))), ]; let producer = MockProducer { diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index cbce1b37e56..83ecd1f79a3 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -49,21 +49,24 @@ pub fn make_rust_processor( } } - let payload = BytesInsertBatch::new( - transformed.rows, - Some(timestamp), - transformed.origin_timestamp, - transformed.sentry_received_timestamp, - CommitLogOffsets(BTreeMap::from([( + let mut payload = BytesInsertBatch::from_rows(transformed.rows) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { offset, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, - )])), - transformed.cogs_data.unwrap_or_default(), - ); + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); + + if let Some(ts) = transformed.origin_timestamp { + payload = payload.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + payload = payload.with_sentry_received_timestamp(ts); + } Ok(Message::new_broker_message( payload, partition, offset, timestamp, @@ -120,21 +123,26 @@ pub fn make_rust_processor_with_replacements( let payload = match transformed { InsertOrReplacement::Insert(transformed) => { - InsertOrReplacement::Insert(BytesInsertBatch::new( - transformed.rows, - Some(timestamp), - transformed.origin_timestamp, - transformed.sentry_received_timestamp, - CommitLogOffsets(BTreeMap::from([( + let mut batch = BytesInsertBatch::from_rows(transformed.rows) + .with_message_timestamp(timestamp) + .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { offset, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, - )])), - transformed.cogs_data.unwrap_or_default(), - )) + )]))) + .with_cogs_data(transformed.cogs_data.unwrap_or_default()); + + if let Some(ts) = transformed.origin_timestamp { + batch = batch.with_origin_timestamp(ts); + } + if let Some(ts) = transformed.sentry_received_timestamp { + batch = batch.with_sentry_received_timestamp(ts); + } + + InsertOrReplacement::Insert(batch) } InsertOrReplacement::Replacement(r) => InsertOrReplacement::Replacement(r), }; diff --git a/rust_snuba/src/strategies/python.rs b/rust_snuba/src/strategies/python.rs index 3965700e5e3..fde98a7e79f 100644 --- a/rust_snuba/src/strategies/python.rs +++ b/rust_snuba/src/strategies/python.rs @@ -1,6 +1,6 @@ use crate::config::MessageProcessorConfig; -use crate::types::{BytesInsertBatch, CogsData, CommitLogEntry, CommitLogOffsets, RowData}; +use crate::types::{BytesInsertBatch, CommitLogEntry, CommitLogOffsets, RowData}; use anyhow::Error; use chrono::{DateTime, Utc}; use parking_lot::Mutex; @@ -85,14 +85,16 @@ impl PythonTransformStep { }) .collect(); - let payload = BytesInsertBatch::new( - RowData::from_encoded_rows(payload), - Some(message_timestamp), - origin_timestamp, - sentry_received_timestamp, - CommitLogOffsets(commit_log_offsets), - CogsData::default(), - ); + let mut payload = BytesInsertBatch::from_rows(RowData::from_encoded_rows(payload)) + .with_message_timestamp(message_timestamp) + .with_commit_log_offsets(CommitLogOffsets(commit_log_offsets)); + + if let Some(ts) = origin_timestamp { + payload = payload.with_origin_timestamp(ts); + } + if let Some(ts) = sentry_received_timestamp { + payload = payload.with_sentry_received_timestamp(ts); + } let mut committable: BTreeMap = BTreeMap::new(); for ((t, p), o) in offsets { diff --git a/rust_snuba/src/strategies/replacements.rs b/rust_snuba/src/strategies/replacements.rs index 07b98937802..6bba5d1cea6 100644 --- a/rust_snuba/src/strategies/replacements.rs +++ b/rust_snuba/src/strategies/replacements.rs @@ -126,7 +126,7 @@ impl ProcessingStrategy>> for Prod mod tests { use super::*; use crate::testutils::{MockProducer, TestStrategy}; - use crate::types::{CogsData, CommitLogOffsets, RowData}; + use crate::types::RowData; use crate::types::{InsertOrReplacement, ReplacementData}; use chrono::Utc; use std::collections::BTreeMap; @@ -149,14 +149,10 @@ mod tests { strategy .submit(Message::new_any_message( - InsertOrReplacement::Insert(BytesInsertBatch::new( - RowData::from_rows(row_data).unwrap(), - Some(Utc::now()), - None, - None, - CommitLogOffsets::default(), - CogsData::default(), - )), + InsertOrReplacement::Insert( + BytesInsertBatch::from_rows(RowData::from_rows(row_data).unwrap()) + .with_message_timestamp(Utc::now()), + ), BTreeMap::new(), )) .unwrap(); diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 81468d78b79..bacd6738d15 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -200,6 +200,8 @@ pub struct BytesInsertBatch { } impl BytesInsertBatch { + /// Create a new BytesInsertBatch with all fields specified. + /// For most use cases, prefer `from_rows()` which uses sensible defaults. pub fn new( rows: R, message_timestamp: Option>, @@ -224,6 +226,56 @@ impl BytesInsertBatch { } } + /// Create a BytesInsertBatch with just rows, using defaults for all other fields. + /// Use builder methods to set optional fields as needed. + /// + /// # Example + /// ```ignore + /// let batch = BytesInsertBatch::from_rows(rows) + /// .with_message_timestamp(timestamp) + /// .with_cogs_data(cogs_data); + /// ``` + pub fn from_rows(rows: R) -> Self { + Self { + rows, + message_timestamp: Default::default(), + origin_timestamp: Default::default(), + sentry_received_timestamp: Default::default(), + commit_log_offsets: Default::default(), + cogs_data: Default::default(), + } + } + + /// Set the message timestamp (when the message was inserted into the Kafka topic) + pub fn with_message_timestamp(mut self, timestamp: DateTime) -> Self { + self.message_timestamp = LatencyRecorder::from(timestamp); + self + } + + /// Set the origin timestamp (when the event was received by Relay) + pub fn with_origin_timestamp(mut self, timestamp: DateTime) -> Self { + self.origin_timestamp = LatencyRecorder::from(timestamp); + self + } + + /// Set the sentry received timestamp (when received by ingest consumer in Sentry) + pub fn with_sentry_received_timestamp(mut self, timestamp: DateTime) -> Self { + self.sentry_received_timestamp = LatencyRecorder::from(timestamp); + self + } + + /// Set the commit log offsets + pub fn with_commit_log_offsets(mut self, offsets: CommitLogOffsets) -> Self { + self.commit_log_offsets = offsets; + self + } + + /// Set the COGS data + pub fn with_cogs_data(mut self, cogs_data: CogsData) -> Self { + self.cogs_data = cogs_data; + self + } + pub fn commit_log_offsets(&self) -> &CommitLogOffsets { &self.commit_log_offsets }