Skip to content
7 changes: 4 additions & 3 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use criterion::{black_box, BenchmarkGroup, BenchmarkId, Criterion, Throughput};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use rust_snuba::{
BrokerConfig, ClickhouseConfig, ConsumerStrategyFactoryV2, EnvConfig, KafkaMessageMetadata,
MessageProcessorConfig, ProcessingFunction, ProcessingFunctionType, ProcessorConfig,
StatsDBackend, StorageConfig, TopicConfig, PROCESSORS,
BatchSizeCalculation, BrokerConfig, ClickhouseConfig, ConsumerStrategyFactoryV2, EnvConfig,
KafkaMessageMetadata, MessageProcessorConfig, ProcessingFunction, ProcessingFunctionType,
ProcessorConfig, StatsDBackend, StorageConfig, TopicConfig, PROCESSORS,
};
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::backends::local::broker::LocalBroker;
Expand Down Expand Up @@ -79,6 +79,7 @@ fn create_factory(
logical_topic_name: schema.into(),
max_batch_size: 1_000,
max_batch_time: Duration::from_millis(10),
max_batch_size_calculation: BatchSizeCalculation::Rows,
processing_concurrency,
clickhouse_concurrency,
commitlog_concurrency,
Expand Down
10 changes: 10 additions & 0 deletions rust_snuba/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ pub struct ProcessorConfig {
pub env_config: EnvConfig,
}

#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum BatchSizeCalculation {
#[default]
Rows,
Bytes,
}

#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct ConsumerConfig {
Expand All @@ -20,6 +28,8 @@ pub struct ConsumerConfig {
pub accountant_topic: TopicConfig,
pub max_batch_size: usize,
pub max_batch_time_ms: u64,
#[serde(default)]
pub max_batch_size_calculation: BatchSizeCalculation,
pub env: EnvConfig,
}

Expand Down
2 changes: 2 additions & 0 deletions rust_snuba/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn consumer_impl(
let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap();
let max_batch_size = consumer_config.max_batch_size;
let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms);
let max_batch_size_calculation = consumer_config.max_batch_size_calculation;

let batch_write_timeout = match batch_write_timeout_ms {
Some(timeout_ms) => {
Expand Down Expand Up @@ -251,6 +252,7 @@ pub fn consumer_impl(
logical_topic_name,
max_batch_size,
max_batch_time,
max_batch_size_calculation,
processing_concurrency: ConcurrencyConfig::new(concurrency),
clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency),
commitlog_concurrency: ConcurrencyConfig::new(2),
Expand Down
261 changes: 258 additions & 3 deletions rust_snuba/src/factory_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct ConsumerStrategyFactoryV2 {
pub logical_topic_name: String,
pub max_batch_size: usize,
pub max_batch_time: Duration,
pub max_batch_size_calculation: config::BatchSizeCalculation,
pub processing_concurrency: ConcurrencyConfig,
pub clickhouse_concurrency: ConcurrencyConfig,
pub commitlog_concurrency: ConcurrencyConfig,
Expand Down Expand Up @@ -153,6 +154,12 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
},
);

let compute_batch_size: fn(&BytesInsertBatch<RowData>) -> usize =
match self.max_batch_size_calculation {
config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(),
config::BatchSizeCalculation::Rows => |batch| batch.len(),
};

let next_step = Reduce::new(
next_step,
accumulator,
Expand All @@ -168,7 +175,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
}),
self.max_batch_size,
self.max_batch_time,
|batch: &BytesInsertBatch<RowData>| batch.len(),
compute_batch_size,
// we need to enable this to deal with storages where we skip 100% of values, such as
// gen-metrics-gauges in s4s. we still need to commit there
)
Expand Down Expand Up @@ -276,7 +283,13 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {

impl ConsumerStrategyFactoryV2 {
fn create_row_binary_pipeline<
T: clickhouse::Row + serde::Serialize + Clone + Send + Sync + 'static,
T: clickhouse::Row
+ serde::Serialize
+ Clone
+ Send
+ Sync
+ crate::types::EstimatedSize
+ 'static,
>(
&self,
func: fn(
Expand Down Expand Up @@ -335,6 +348,12 @@ impl ConsumerStrategyFactoryV2 {
},
);

type BatchSizeFn<T> = fn(&BytesInsertBatch<Vec<T>>) -> usize;
let compute_batch_size: BatchSizeFn<T> = match self.max_batch_size_calculation {
config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(),
config::BatchSizeCalculation::Rows => |batch| batch.len(),
};

let next_step = Reduce::new(
next_step,
accumulator,
Expand All @@ -350,7 +369,7 @@ impl ConsumerStrategyFactoryV2 {
}),
self.max_batch_size,
self.max_batch_time,
|batch: &BytesInsertBatch<Vec<T>>| batch.len(),
compute_batch_size,
)
.flush_empty_batches(true);

Expand Down Expand Up @@ -412,3 +431,239 @@ impl TaskRunner<KafkaPayload, KafkaPayload, anyhow::Error> for SchemaValidator {
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
};
use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic};
use std::sync::{Arc, Mutex};

/// A next-step that records every batch it receives.
struct RecordingStep {
batches: Arc<Mutex<Vec<BytesInsertBatch<RowData>>>>,
}

impl ProcessingStrategy<BytesInsertBatch<RowData>> for RecordingStep {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}

fn submit(
&mut self,
message: Message<BytesInsertBatch<RowData>>,
) -> Result<(), SubmitError<BytesInsertBatch<RowData>>> {
self.batches.lock().unwrap().push(message.into_payload());
Ok(())
}

fn terminate(&mut self) {}

fn join(&mut self, _: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

/// Create a BytesInsertBatch<RowData> with a specific number of rows and byte size.
fn make_batch(num_rows: usize, num_bytes: usize) -> BytesInsertBatch<RowData> {
BytesInsertBatch::<RowData>::new(
RowData {
encoded_rows: vec![0u8; num_bytes],
num_rows,
},
None,
None,
None,
Default::default(),
CogsData::default(),
)
.with_num_bytes(num_bytes)
}

fn make_message(
batch: BytesInsertBatch<RowData>,
partition: Partition,
offset: u64,
) -> Message<BytesInsertBatch<RowData>> {
Message {
inner_message: InnerMessage::BrokerMessage(BrokerMessage::new(
batch,
partition,
offset,
chrono::Utc::now(),
)),
}
}

fn build_reduce(
next_step: RecordingStep,
max_batch_size: usize,
calculation: config::BatchSizeCalculation,
) -> Reduce<BytesInsertBatch<RowData>, BytesInsertBatch<RowData>> {
let accumulator = Arc::new(
|batch: BytesInsertBatch<RowData>, small_batch: Message<BytesInsertBatch<RowData>>| {
Ok(batch.merge(small_batch.into_payload()))
},
);

let compute_batch_size: fn(&BytesInsertBatch<RowData>) -> usize = match calculation {
config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(),
config::BatchSizeCalculation::Rows => |batch| batch.len(),
};

Reduce::new(
next_step,
accumulator,
Arc::new(|| {
BytesInsertBatch::<RowData>::new(
RowData::default(),
None,
None,
None,
Default::default(),
CogsData::default(),
)
}),
max_batch_size,
// Very long timeout so only size triggers flush
Duration::from_secs(3600),
compute_batch_size,
)
.flush_empty_batches(true)
}

/// With row-based calculation: 20 messages with max_batch_size=10 should produce
/// 2 batches (flushed on size) + 1 final batch (flushed on join).
/// Each batch produced on size should have exactly 10 rows.
#[test]
fn test_row_based_batching() {
let batches = Arc::new(Mutex::new(Vec::new()));
let next_step = RecordingStep {
batches: batches.clone(),
};

let mut strategy = build_reduce(next_step, 10, config::BatchSizeCalculation::Rows);

let partition = Partition::new(Topic::new("test"), 0);

// Submit 20 messages, each with 1 row and 10_000 bytes
for i in 0..20 {
let batch = make_batch(1, 10_000);
strategy.submit(make_message(batch, partition, i)).unwrap();
let _ = strategy.poll();
}

let _ = strategy.join(None);

let batches = batches.lock().unwrap();
// 2 full batches of 10 (flushed on size) + 1 empty batch (flushed on join
// because flush_empty_batches is enabled)
assert_eq!(batches.len(), 3);
assert_eq!(batches[0].len(), 10);
assert_eq!(batches[1].len(), 10);
// Total bytes per full batch: 10 messages * 10_000 bytes = 100_000
assert_eq!(batches[0].num_bytes(), 100_000);
assert_eq!(batches[1].num_bytes(), 100_000);
}

/// With byte-based calculation: same 20 messages (each 10KB), but max_batch_size=50_000
/// (50KB). Should flush every 5 messages (5 * 10KB = 50KB), producing 4 batches on size
/// + 1 final batch on join.
#[test]
fn test_byte_based_batching() {
let batches = Arc::new(Mutex::new(Vec::new()));
let next_step = RecordingStep {
batches: batches.clone(),
};

let mut strategy = build_reduce(next_step, 50_000, config::BatchSizeCalculation::Bytes);

let partition = Partition::new(Topic::new("test"), 0);

// Submit 20 messages, each with 1 row and 10_000 bytes
for i in 0..20 {
let batch = make_batch(1, 10_000);
strategy.submit(make_message(batch, partition, i)).unwrap();
let _ = strategy.poll();
}

let _ = strategy.join(None);

let batches = batches.lock().unwrap();
// 4 full batches of 5 messages (flushed on size) + 1 empty batch (flushed on join)
assert_eq!(batches.len(), 5);
// Each full batch: 5 rows, 50_000 bytes
for batch in batches[..4].iter() {
assert_eq!(batch.len(), 5);
assert_eq!(batch.num_bytes(), 50_000);
}
}

/// With byte-based calculation and variable-size messages:
/// 3 messages of 40KB each with max_batch_size=100_000 bytes.
/// First 2 messages = 80KB (under limit), 3rd pushes to 120KB (over limit),
/// so the batch flushes after the 3rd message with all 3 rows.
/// This confirms that large messages cause earlier flushes than row counting would.
#[test]
fn test_byte_based_batching_large_messages_flush_early() {
let batches = Arc::new(Mutex::new(Vec::new()));
let next_step = RecordingStep {
batches: batches.clone(),
};

// 100KB byte limit — with row counting this would allow 100 messages
let mut strategy = build_reduce(next_step, 100_000, config::BatchSizeCalculation::Bytes);

let partition = Partition::new(Topic::new("test"), 0);

// Submit 5 messages of 40KB each
for i in 0..5 {
let batch = make_batch(1, 40_000);
strategy.submit(make_message(batch, partition, i)).unwrap();
let _ = strategy.poll();
}

let _ = strategy.join(None);

let batches = batches.lock().unwrap();
// 3 messages hit 120KB >= 100KB limit → flush
// Then 2 remaining → flush on join
assert_eq!(batches.len(), 2);
assert_eq!(batches[0].len(), 3); // 3 * 40KB = 120KB >= 100KB
assert_eq!(batches[0].num_bytes(), 120_000);
assert_eq!(batches[1].len(), 2); // 2 * 40KB = 80KB, flushed on join
assert_eq!(batches[1].num_bytes(), 80_000);
}

/// Verify that with row-based calculation, the same large messages do NOT
/// cause early flushing — all 5 fit in one batch since max_batch_size=100 rows.
#[test]
fn test_row_based_batching_ignores_byte_size() {
let batches = Arc::new(Mutex::new(Vec::new()));
let next_step = RecordingStep {
batches: batches.clone(),
};

// Row limit of 100 — 5 messages won't hit this
let mut strategy = build_reduce(next_step, 100, config::BatchSizeCalculation::Rows);

let partition = Partition::new(Topic::new("test"), 0);

// Submit 5 messages of 40KB each — huge bytes but only 5 rows
for i in 0..5 {
let batch = make_batch(1, 40_000);
strategy.submit(make_message(batch, partition, i)).unwrap();
let _ = strategy.poll();
}

let _ = strategy.join(None);

let batches = batches.lock().unwrap();
// All 5 fit in one batch (5 rows << 100 row limit), flushed on join
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 5);
assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush
}
}
6 changes: 3 additions & 3 deletions rust_snuba/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ fn rust_snuba(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
// Ideally, we would have a normal rust crate we can use in examples and benchmarks,
// plus a pyo3 specific crate as `cdylib`.
pub use config::{
BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig,
StorageConfig, TopicConfig,
BatchSizeCalculation, BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig,
ProcessorConfig, StorageConfig, TopicConfig,
};
pub use factory_v2::ConsumerStrategyFactoryV2;
pub use metrics::statsd::StatsDBackend;
pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS};
pub use strategies::noop::Noop;
pub use strategies::python::PythonTransformStep;
pub use types::KafkaMessageMetadata;
pub use types::{EstimatedSize, KafkaMessageMetadata};

#[cfg(test)]
mod testutils;
Loading
Loading