Skip to content

Commit 2724c04

Browse files
authored
ref(consumer): change BytesInsertBatch to support builder pattern (#7479)
I want to start emitting how many events by type are being processed by the EAP consumer. I'm dividing that up into 3 changes: 1. (this change) change how we build BytesInsertBatch to support a builder pattern (so it's more simple to omit dataset-specific fields) 2. start keeping track of how many items by type are being processed in the EAPItems processor: #7481 3. emit the actual metric at the end of a batch write
1 parent 5baf23c commit 2724c04

File tree

5 files changed

+103
-55
lines changed

5 files changed

+103
-55
lines changed

rust_snuba/src/strategies/commit_log.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ where
193193

194194
#[cfg(test)]
195195
mod tests {
196-
use crate::types::{CogsData, CommitLogEntry, CommitLogOffsets};
196+
use crate::types::{CommitLogEntry, CommitLogOffsets};
197197

198198
use super::*;
199199
use crate::testutils::TestStrategy;
@@ -280,27 +280,19 @@ mod tests {
280280
}
281281

282282
let payloads = vec![
283-
BytesInsertBatch::new(
284-
(),
285-
Some(Utc::now()),
286-
None,
287-
None,
288-
CommitLogOffsets(BTreeMap::from([(
283+
BytesInsertBatch::from_rows(())
284+
.with_message_timestamp(Utc::now())
285+
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
289286
0,
290287
CommitLogEntry {
291288
offset: 500,
292289
orig_message_ts: Utc::now(),
293290
received_p99: Vec::new(),
294291
},
295-
)])),
296-
CogsData::default(),
297-
),
298-
BytesInsertBatch::new(
299-
(),
300-
Some(Utc::now()),
301-
None,
302-
None,
303-
CommitLogOffsets(BTreeMap::from([
292+
)]))),
293+
BytesInsertBatch::from_rows(())
294+
.with_message_timestamp(Utc::now())
295+
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([
304296
(
305297
0,
306298
CommitLogEntry {
@@ -317,9 +309,7 @@ mod tests {
317309
received_p99: Vec::new(),
318310
},
319311
),
320-
])),
321-
CogsData::default(),
322-
),
312+
]))),
323313
];
324314

325315
let producer = MockProducer {

rust_snuba/src/strategies/processor.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,24 @@ pub fn make_rust_processor(
4949
}
5050
}
5151

52-
let payload = BytesInsertBatch::new(
53-
transformed.rows,
54-
Some(timestamp),
55-
transformed.origin_timestamp,
56-
transformed.sentry_received_timestamp,
57-
CommitLogOffsets(BTreeMap::from([(
52+
let mut payload = BytesInsertBatch::from_rows(transformed.rows)
53+
.with_message_timestamp(timestamp)
54+
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
5855
partition.index,
5956
CommitLogEntry {
6057
offset,
6158
orig_message_ts: timestamp,
6259
received_p99: transformed.origin_timestamp.into_iter().collect(),
6360
},
64-
)])),
65-
transformed.cogs_data.unwrap_or_default(),
66-
);
61+
)])))
62+
.with_cogs_data(transformed.cogs_data.unwrap_or_default());
63+
64+
if let Some(ts) = transformed.origin_timestamp {
65+
payload = payload.with_origin_timestamp(ts);
66+
}
67+
if let Some(ts) = transformed.sentry_received_timestamp {
68+
payload = payload.with_sentry_received_timestamp(ts);
69+
}
6770

6871
Ok(Message::new_broker_message(
6972
payload, partition, offset, timestamp,
@@ -120,21 +123,26 @@ pub fn make_rust_processor_with_replacements(
120123

121124
let payload = match transformed {
122125
InsertOrReplacement::Insert(transformed) => {
123-
InsertOrReplacement::Insert(BytesInsertBatch::new(
124-
transformed.rows,
125-
Some(timestamp),
126-
transformed.origin_timestamp,
127-
transformed.sentry_received_timestamp,
128-
CommitLogOffsets(BTreeMap::from([(
126+
let mut batch = BytesInsertBatch::from_rows(transformed.rows)
127+
.with_message_timestamp(timestamp)
128+
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
129129
partition.index,
130130
CommitLogEntry {
131131
offset,
132132
orig_message_ts: timestamp,
133133
received_p99: transformed.origin_timestamp.into_iter().collect(),
134134
},
135-
)])),
136-
transformed.cogs_data.unwrap_or_default(),
137-
))
135+
)])))
136+
.with_cogs_data(transformed.cogs_data.unwrap_or_default());
137+
138+
if let Some(ts) = transformed.origin_timestamp {
139+
batch = batch.with_origin_timestamp(ts);
140+
}
141+
if let Some(ts) = transformed.sentry_received_timestamp {
142+
batch = batch.with_sentry_received_timestamp(ts);
143+
}
144+
145+
InsertOrReplacement::Insert(batch)
138146
}
139147
InsertOrReplacement::Replacement(r) => InsertOrReplacement::Replacement(r),
140148
};

rust_snuba/src/strategies/python.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::config::MessageProcessorConfig;
22

3-
use crate::types::{BytesInsertBatch, CogsData, CommitLogEntry, CommitLogOffsets, RowData};
3+
use crate::types::{BytesInsertBatch, CommitLogEntry, CommitLogOffsets, RowData};
44
use anyhow::Error;
55
use chrono::{DateTime, Utc};
66
use parking_lot::Mutex;
@@ -85,14 +85,16 @@ impl PythonTransformStep {
8585
})
8686
.collect();
8787

88-
let payload = BytesInsertBatch::new(
89-
RowData::from_encoded_rows(payload),
90-
Some(message_timestamp),
91-
origin_timestamp,
92-
sentry_received_timestamp,
93-
CommitLogOffsets(commit_log_offsets),
94-
CogsData::default(),
95-
);
88+
let mut payload = BytesInsertBatch::from_rows(RowData::from_encoded_rows(payload))
89+
.with_message_timestamp(message_timestamp)
90+
.with_commit_log_offsets(CommitLogOffsets(commit_log_offsets));
91+
92+
if let Some(ts) = origin_timestamp {
93+
payload = payload.with_origin_timestamp(ts);
94+
}
95+
if let Some(ts) = sentry_received_timestamp {
96+
payload = payload.with_sentry_received_timestamp(ts);
97+
}
9698

9799
let mut committable: BTreeMap<Partition, u64> = BTreeMap::new();
98100
for ((t, p), o) in offsets {

rust_snuba/src/strategies/replacements.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl ProcessingStrategy<InsertOrReplacement<BytesInsertBatch<RowData>>> for Prod
126126
mod tests {
127127
use super::*;
128128
use crate::testutils::{MockProducer, TestStrategy};
129-
use crate::types::{CogsData, CommitLogOffsets, RowData};
129+
use crate::types::RowData;
130130
use crate::types::{InsertOrReplacement, ReplacementData};
131131
use chrono::Utc;
132132
use std::collections::BTreeMap;
@@ -149,14 +149,10 @@ mod tests {
149149

150150
strategy
151151
.submit(Message::new_any_message(
152-
InsertOrReplacement::Insert(BytesInsertBatch::new(
153-
RowData::from_rows(row_data).unwrap(),
154-
Some(Utc::now()),
155-
None,
156-
None,
157-
CommitLogOffsets::default(),
158-
CogsData::default(),
159-
)),
152+
InsertOrReplacement::Insert(
153+
BytesInsertBatch::from_rows(RowData::from_rows(row_data).unwrap())
154+
.with_message_timestamp(Utc::now()),
155+
),
160156
BTreeMap::new(),
161157
))
162158
.unwrap();

rust_snuba/src/types.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ pub struct BytesInsertBatch<R> {
200200
}
201201

202202
impl<R> BytesInsertBatch<R> {
203+
/// Create a new BytesInsertBatch with all fields specified.
204+
/// For most use cases, prefer `from_rows()` which uses sensible defaults.
203205
pub fn new(
204206
rows: R,
205207
message_timestamp: Option<DateTime<Utc>>,
@@ -224,6 +226,56 @@ impl<R> BytesInsertBatch<R> {
224226
}
225227
}
226228

229+
/// Create a BytesInsertBatch with just rows, using defaults for all other fields.
230+
/// Use builder methods to set optional fields as needed.
231+
///
232+
/// # Example
233+
/// ```ignore
234+
/// let batch = BytesInsertBatch::from_rows(rows)
235+
/// .with_message_timestamp(timestamp)
236+
/// .with_cogs_data(cogs_data);
237+
/// ```
238+
pub fn from_rows(rows: R) -> Self {
239+
Self {
240+
rows,
241+
message_timestamp: Default::default(),
242+
origin_timestamp: Default::default(),
243+
sentry_received_timestamp: Default::default(),
244+
commit_log_offsets: Default::default(),
245+
cogs_data: Default::default(),
246+
}
247+
}
248+
249+
/// Set the message timestamp (when the message was inserted into the Kafka topic)
250+
pub fn with_message_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
251+
self.message_timestamp = LatencyRecorder::from(timestamp);
252+
self
253+
}
254+
255+
/// Set the origin timestamp (when the event was received by Relay)
256+
pub fn with_origin_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
257+
self.origin_timestamp = LatencyRecorder::from(timestamp);
258+
self
259+
}
260+
261+
/// Set the sentry received timestamp (when received by ingest consumer in Sentry)
262+
pub fn with_sentry_received_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
263+
self.sentry_received_timestamp = LatencyRecorder::from(timestamp);
264+
self
265+
}
266+
267+
/// Set the commit log offsets
268+
pub fn with_commit_log_offsets(mut self, offsets: CommitLogOffsets) -> Self {
269+
self.commit_log_offsets = offsets;
270+
self
271+
}
272+
273+
/// Set the COGS data
274+
pub fn with_cogs_data(mut self, cogs_data: CogsData) -> Self {
275+
self.cogs_data = cogs_data;
276+
self
277+
}
278+
227279
pub fn commit_log_offsets(&self) -> &CommitLogOffsets {
228280
&self.commit_log_offsets
229281
}

0 commit comments

Comments
 (0)