Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions rust_snuba/src/strategies/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ where

#[cfg(test)]
mod tests {
use crate::types::{CogsData, CommitLogEntry, CommitLogOffsets};
use crate::types::{CommitLogEntry, CommitLogOffsets};

use super::*;
use crate::testutils::TestStrategy;
Expand Down Expand Up @@ -280,27 +280,19 @@ mod tests {
}

let payloads = vec![
BytesInsertBatch::new(
(),
Some(Utc::now()),
None,
None,
CommitLogOffsets(BTreeMap::from([(
BytesInsertBatch::from_rows(())
.with_message_timestamp(Utc::now())
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
0,
CommitLogEntry {
offset: 500,
orig_message_ts: Utc::now(),
received_p99: Vec::new(),
},
)])),
CogsData::default(),
),
BytesInsertBatch::new(
(),
Some(Utc::now()),
None,
None,
CommitLogOffsets(BTreeMap::from([
)]))),
BytesInsertBatch::from_rows(())
.with_message_timestamp(Utc::now())
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([
(
0,
CommitLogEntry {
Expand All @@ -317,9 +309,7 @@ mod tests {
received_p99: Vec::new(),
},
),
])),
CogsData::default(),
),
]))),
];

let producer = MockProducer {
Expand Down
44 changes: 26 additions & 18 deletions rust_snuba/src/strategies/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,24 @@ pub fn make_rust_processor(
}
}

let payload = BytesInsertBatch::new(
transformed.rows,
Some(timestamp),
transformed.origin_timestamp,
transformed.sentry_received_timestamp,
CommitLogOffsets(BTreeMap::from([(
let mut payload = BytesInsertBatch::from_rows(transformed.rows)
.with_message_timestamp(timestamp)
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,
CommitLogEntry {
offset,
orig_message_ts: timestamp,
received_p99: transformed.origin_timestamp.into_iter().collect(),
},
)])),
transformed.cogs_data.unwrap_or_default(),
);
)])))
.with_cogs_data(transformed.cogs_data.unwrap_or_default());

if let Some(ts) = transformed.origin_timestamp {
payload = payload.with_origin_timestamp(ts);
}
if let Some(ts) = transformed.sentry_received_timestamp {
payload = payload.with_sentry_received_timestamp(ts);
}

Ok(Message::new_broker_message(
payload, partition, offset, timestamp,
Expand Down Expand Up @@ -120,21 +123,26 @@ pub fn make_rust_processor_with_replacements(

let payload = match transformed {
InsertOrReplacement::Insert(transformed) => {
InsertOrReplacement::Insert(BytesInsertBatch::new(
transformed.rows,
Some(timestamp),
transformed.origin_timestamp,
transformed.sentry_received_timestamp,
CommitLogOffsets(BTreeMap::from([(
let mut batch = BytesInsertBatch::from_rows(transformed.rows)
.with_message_timestamp(timestamp)
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,
CommitLogEntry {
offset,
orig_message_ts: timestamp,
received_p99: transformed.origin_timestamp.into_iter().collect(),
},
)])),
transformed.cogs_data.unwrap_or_default(),
))
)])))
.with_cogs_data(transformed.cogs_data.unwrap_or_default());

if let Some(ts) = transformed.origin_timestamp {
batch = batch.with_origin_timestamp(ts);
}
if let Some(ts) = transformed.sentry_received_timestamp {
batch = batch.with_sentry_received_timestamp(ts);
}

InsertOrReplacement::Insert(batch)
}
InsertOrReplacement::Replacement(r) => InsertOrReplacement::Replacement(r),
};
Expand Down
20 changes: 11 additions & 9 deletions rust_snuba/src/strategies/python.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config::MessageProcessorConfig;

use crate::types::{BytesInsertBatch, CogsData, CommitLogEntry, CommitLogOffsets, RowData};
use crate::types::{BytesInsertBatch, CommitLogEntry, CommitLogOffsets, RowData};
use anyhow::Error;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
Expand Down Expand Up @@ -85,14 +85,16 @@ impl PythonTransformStep {
})
.collect();

let payload = BytesInsertBatch::new(
RowData::from_encoded_rows(payload),
Some(message_timestamp),
origin_timestamp,
sentry_received_timestamp,
CommitLogOffsets(commit_log_offsets),
CogsData::default(),
);
let mut payload = BytesInsertBatch::from_rows(RowData::from_encoded_rows(payload))
.with_message_timestamp(message_timestamp)
.with_commit_log_offsets(CommitLogOffsets(commit_log_offsets));

if let Some(ts) = origin_timestamp {
payload = payload.with_origin_timestamp(ts);
}
if let Some(ts) = sentry_received_timestamp {
payload = payload.with_sentry_received_timestamp(ts);
}

let mut committable: BTreeMap<Partition, u64> = BTreeMap::new();
for ((t, p), o) in offsets {
Expand Down
14 changes: 5 additions & 9 deletions rust_snuba/src/strategies/replacements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl ProcessingStrategy<InsertOrReplacement<BytesInsertBatch<RowData>>> for Prod
mod tests {
use super::*;
use crate::testutils::{MockProducer, TestStrategy};
use crate::types::{CogsData, CommitLogOffsets, RowData};
use crate::types::RowData;
use crate::types::{InsertOrReplacement, ReplacementData};
use chrono::Utc;
use std::collections::BTreeMap;
Expand All @@ -149,14 +149,10 @@ mod tests {

strategy
.submit(Message::new_any_message(
InsertOrReplacement::Insert(BytesInsertBatch::new(
RowData::from_rows(row_data).unwrap(),
Some(Utc::now()),
None,
None,
CommitLogOffsets::default(),
CogsData::default(),
)),
InsertOrReplacement::Insert(
BytesInsertBatch::from_rows(RowData::from_rows(row_data).unwrap())
.with_message_timestamp(Utc::now()),
),
BTreeMap::new(),
))
.unwrap();
Expand Down
52 changes: 52 additions & 0 deletions rust_snuba/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ pub struct BytesInsertBatch<R> {
}

impl<R> BytesInsertBatch<R> {
/// Create a new BytesInsertBatch with all fields specified.
/// For most use cases, prefer `from_rows()` which uses sensible defaults.
pub fn new(
rows: R,
message_timestamp: Option<DateTime<Utc>>,
Expand All @@ -224,6 +226,56 @@ impl<R> BytesInsertBatch<R> {
}
}

/// Create a BytesInsertBatch with just rows, using defaults for all other fields.
/// Use builder methods to set optional fields as needed.
///
/// # Example
/// ```ignore
/// let batch = BytesInsertBatch::from_rows(rows)
/// .with_message_timestamp(timestamp)
/// .with_cogs_data(cogs_data);
/// ```
pub fn from_rows(rows: R) -> Self {
Self {
rows,
message_timestamp: Default::default(),
origin_timestamp: Default::default(),
sentry_received_timestamp: Default::default(),
commit_log_offsets: Default::default(),
cogs_data: Default::default(),
}
}

/// Set the message timestamp (when the message was inserted into the Kafka topic)
pub fn with_message_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.message_timestamp = LatencyRecorder::from(timestamp);
self
}

/// Set the origin timestamp (when the event was received by Relay)
pub fn with_origin_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.origin_timestamp = LatencyRecorder::from(timestamp);
self
}

/// Set the sentry received timestamp (when received by ingest consumer in Sentry)
pub fn with_sentry_received_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.sentry_received_timestamp = LatencyRecorder::from(timestamp);
self
}

/// Set the commit log offsets
pub fn with_commit_log_offsets(mut self, offsets: CommitLogOffsets) -> Self {
self.commit_log_offsets = offsets;
self
}

/// Set the COGS data
pub fn with_cogs_data(mut self, cogs_data: CogsData) -> Self {
self.cogs_data = cogs_data;
self
}

pub fn commit_log_offsets(&self) -> &CommitLogOffsets {
&self.commit_log_offsets
}
Expand Down
Loading