diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 85d793c0c5e..f9ee3d4c278 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -7,9 +7,9 @@ use criterion::{black_box, BenchmarkGroup, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; use parking_lot::Mutex; use rust_snuba::{ - BrokerConfig, ClickhouseConfig, ConsumerStrategyFactoryV2, EnvConfig, KafkaMessageMetadata, - MessageProcessorConfig, ProcessingFunction, ProcessingFunctionType, ProcessorConfig, - StatsDBackend, StorageConfig, TopicConfig, PROCESSORS, + BatchSizeCalculation, BrokerConfig, ClickhouseConfig, ConsumerStrategyFactoryV2, EnvConfig, + KafkaMessageMetadata, MessageProcessorConfig, ProcessingFunction, ProcessingFunctionType, + ProcessorConfig, StatsDBackend, StorageConfig, TopicConfig, PROCESSORS, }; use sentry_arroyo::backends::kafka::types::KafkaPayload; use sentry_arroyo::backends::local::broker::LocalBroker; @@ -79,6 +79,7 @@ fn create_factory( logical_topic_name: schema.into(), max_batch_size: 1_000, max_batch_time: Duration::from_millis(10), + max_batch_size_calculation: BatchSizeCalculation::Rows, processing_concurrency, clickhouse_concurrency, commitlog_concurrency, diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 3b58c1701c3..5db83f47f21 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -8,6 +8,14 @@ pub struct ProcessorConfig { pub env_config: EnvConfig, } +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum BatchSizeCalculation { + #[default] + Rows, + Bytes, +} + #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct ConsumerConfig { @@ -20,6 +28,8 @@ pub struct ConsumerConfig { pub accountant_topic: TopicConfig, pub max_batch_size: usize, pub max_batch_time_ms: u64, + #[serde(default)] + pub max_batch_size_calculation: BatchSizeCalculation, pub env: EnvConfig, } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 36bb858ffba..037546503b6 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -98,6 +98,7 @@ pub fn consumer_impl( let consumer_config = config::ConsumerConfig::load_from_str(consumer_config_raw).unwrap(); let max_batch_size = consumer_config.max_batch_size; let max_batch_time = Duration::from_millis(consumer_config.max_batch_time_ms); + let max_batch_size_calculation = consumer_config.max_batch_size_calculation; let batch_write_timeout = match batch_write_timeout_ms { Some(timeout_ms) => { @@ -251,6 +252,7 @@ pub fn consumer_impl( logical_topic_name, max_batch_size, max_batch_time, + max_batch_size_calculation, processing_concurrency: ConcurrencyConfig::new(concurrency), clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), commitlog_concurrency: ConcurrencyConfig::new(2), diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 76404072a24..91758b44223 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -43,6 +43,7 @@ pub struct ConsumerStrategyFactoryV2 { pub logical_topic_name: String, pub max_batch_size: usize, pub max_batch_time: Duration, + pub max_batch_size_calculation: config::BatchSizeCalculation, pub processing_concurrency: ConcurrencyConfig, pub clickhouse_concurrency: ConcurrencyConfig, pub commitlog_concurrency: ConcurrencyConfig, @@ -153,6 +154,12 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { }, ); + let compute_batch_size: fn(&BytesInsertBatch) -> usize = + match self.max_batch_size_calculation { + config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), + config::BatchSizeCalculation::Rows => |batch| batch.len(), + }; + let next_step = Reduce::new( next_step, accumulator, @@ -168,7 +175,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { }), self.max_batch_size, self.max_batch_time, - |batch: &BytesInsertBatch| batch.len(), + compute_batch_size, // we need to enable this to deal with storages where we skip 100% of values, such as // gen-metrics-gauges in s4s. we still need to commit there ) @@ -276,7 +283,13 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { impl ConsumerStrategyFactoryV2 { fn create_row_binary_pipeline< - T: clickhouse::Row + serde::Serialize + Clone + Send + Sync + 'static, + T: clickhouse::Row + + serde::Serialize + + Clone + + Send + + Sync + + crate::types::EstimatedSize + + 'static, >( &self, func: fn( @@ -335,6 +348,12 @@ impl ConsumerStrategyFactoryV2 { }, ); + type BatchSizeFn = fn(&BytesInsertBatch>) -> usize; + let compute_batch_size: BatchSizeFn = match self.max_batch_size_calculation { + config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), + config::BatchSizeCalculation::Rows => |batch| batch.len(), + }; + let next_step = Reduce::new( next_step, accumulator, @@ -350,7 +369,7 @@ impl ConsumerStrategyFactoryV2 { }), self.max_batch_size, self.max_batch_time, - |batch: &BytesInsertBatch>| batch.len(), + compute_batch_size, ) .flush_empty_batches(true); @@ -412,3 +431,239 @@ impl TaskRunner for SchemaValidator { ) } } + +#[cfg(test)] +mod tests { + use super::*; + use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, + }; + use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic}; + use std::sync::{Arc, Mutex}; + + /// A next-step that records every batch it receives. + struct RecordingStep { + batches: Arc>>>, + } + + impl ProcessingStrategy> for RecordingStep { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + self.batches.lock().unwrap().push(message.into_payload()); + Ok(()) + } + + fn terminate(&mut self) {} + + fn join(&mut self, _: Option) -> Result, StrategyError> { + Ok(None) + } + } + + /// Create a BytesInsertBatch with a specific number of rows and byte size. + fn make_batch(num_rows: usize, num_bytes: usize) -> BytesInsertBatch { + BytesInsertBatch::::new( + RowData { + encoded_rows: vec![0u8; num_bytes], + num_rows, + }, + None, + None, + None, + Default::default(), + CogsData::default(), + ) + .with_num_bytes(num_bytes) + } + + fn make_message( + batch: BytesInsertBatch, + partition: Partition, + offset: u64, + ) -> Message> { + Message { + inner_message: InnerMessage::BrokerMessage(BrokerMessage::new( + batch, + partition, + offset, + chrono::Utc::now(), + )), + } + } + + fn build_reduce( + next_step: RecordingStep, + max_batch_size: usize, + calculation: config::BatchSizeCalculation, + ) -> Reduce, BytesInsertBatch> { + let accumulator = Arc::new( + |batch: BytesInsertBatch, small_batch: Message>| { + Ok(batch.merge(small_batch.into_payload())) + }, + ); + + let compute_batch_size: fn(&BytesInsertBatch) -> usize = match calculation { + config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), + config::BatchSizeCalculation::Rows => |batch| batch.len(), + }; + + Reduce::new( + next_step, + accumulator, + Arc::new(|| { + BytesInsertBatch::::new( + RowData::default(), + None, + None, + None, + Default::default(), + CogsData::default(), + ) + }), + max_batch_size, + // Very long timeout so only size triggers flush + Duration::from_secs(3600), + compute_batch_size, + ) + .flush_empty_batches(true) + } + + /// With row-based calculation: 20 messages with max_batch_size=10 should produce + /// 2 batches (flushed on size) + 1 final batch (flushed on join). + /// Each batch produced on size should have exactly 10 rows. + #[test] + fn test_row_based_batching() { + let batches = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + batches: batches.clone(), + }; + + let mut strategy = build_reduce(next_step, 10, config::BatchSizeCalculation::Rows); + + let partition = Partition::new(Topic::new("test"), 0); + + // Submit 20 messages, each with 1 row and 10_000 bytes + for i in 0..20 { + let batch = make_batch(1, 10_000); + strategy.submit(make_message(batch, partition, i)).unwrap(); + let _ = strategy.poll(); + } + + let _ = strategy.join(None); + + let batches = batches.lock().unwrap(); + // 2 full batches of 10 (flushed on size) + 1 empty batch (flushed on join + // because flush_empty_batches is enabled) + assert_eq!(batches.len(), 3); + assert_eq!(batches[0].len(), 10); + assert_eq!(batches[1].len(), 10); + // Total bytes per full batch: 10 messages * 10_000 bytes = 100_000 + assert_eq!(batches[0].num_bytes(), 100_000); + assert_eq!(batches[1].num_bytes(), 100_000); + } + + /// With byte-based calculation: same 20 messages (each 10KB), but max_batch_size=50_000 + /// (50KB). Should flush every 5 messages (5 * 10KB = 50KB), producing 4 batches on size + /// + 1 final batch on join. + #[test] + fn test_byte_based_batching() { + let batches = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + batches: batches.clone(), + }; + + let mut strategy = build_reduce(next_step, 50_000, config::BatchSizeCalculation::Bytes); + + let partition = Partition::new(Topic::new("test"), 0); + + // Submit 20 messages, each with 1 row and 10_000 bytes + for i in 0..20 { + let batch = make_batch(1, 10_000); + strategy.submit(make_message(batch, partition, i)).unwrap(); + let _ = strategy.poll(); + } + + let _ = strategy.join(None); + + let batches = batches.lock().unwrap(); + // 4 full batches of 5 messages (flushed on size) + 1 empty batch (flushed on join) + assert_eq!(batches.len(), 5); + // Each full batch: 5 rows, 50_000 bytes + for batch in batches[..4].iter() { + assert_eq!(batch.len(), 5); + assert_eq!(batch.num_bytes(), 50_000); + } + } + + /// With byte-based calculation and variable-size messages: + /// 3 messages of 40KB each with max_batch_size=100_000 bytes. + /// First 2 messages = 80KB (under limit), 3rd pushes to 120KB (over limit), + /// so the batch flushes after the 3rd message with all 3 rows. + /// This confirms that large messages cause earlier flushes than row counting would. + #[test] + fn test_byte_based_batching_large_messages_flush_early() { + let batches = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + batches: batches.clone(), + }; + + // 100KB byte limit — with row counting this would allow 100 messages + let mut strategy = build_reduce(next_step, 100_000, config::BatchSizeCalculation::Bytes); + + let partition = Partition::new(Topic::new("test"), 0); + + // Submit 5 messages of 40KB each + for i in 0..5 { + let batch = make_batch(1, 40_000); + strategy.submit(make_message(batch, partition, i)).unwrap(); + let _ = strategy.poll(); + } + + let _ = strategy.join(None); + + let batches = batches.lock().unwrap(); + // 3 messages hit 120KB >= 100KB limit → flush + // Then 2 remaining → flush on join + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].len(), 3); // 3 * 40KB = 120KB >= 100KB + assert_eq!(batches[0].num_bytes(), 120_000); + assert_eq!(batches[1].len(), 2); // 2 * 40KB = 80KB, flushed on join + assert_eq!(batches[1].num_bytes(), 80_000); + } + + /// Verify that with row-based calculation, the same large messages do NOT + /// cause early flushing — all 5 fit in one batch since max_batch_size=100 rows. + #[test] + fn test_row_based_batching_ignores_byte_size() { + let batches = Arc::new(Mutex::new(Vec::new())); + let next_step = RecordingStep { + batches: batches.clone(), + }; + + // Row limit of 100 — 5 messages won't hit this + let mut strategy = build_reduce(next_step, 100, config::BatchSizeCalculation::Rows); + + let partition = Partition::new(Topic::new("test"), 0); + + // Submit 5 messages of 40KB each — huge bytes but only 5 rows + for i in 0..5 { + let batch = make_batch(1, 40_000); + strategy.submit(make_message(batch, partition, i)).unwrap(); + let _ = strategy.poll(); + } + + let _ = strategy.join(None); + + let batches = batches.lock().unwrap(); + // All 5 fit in one batch (5 rows << 100 row limit), flushed on join + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].len(), 5); + assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush + } +} diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index a89553a7fb9..3e9927f298c 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -28,15 +28,15 @@ fn rust_snuba(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { // Ideally, we would have a normal rust crate we can use in examples and benchmarks, // plus a pyo3 specific crate as `cdylib`. pub use config::{ - BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig, ProcessorConfig, - StorageConfig, TopicConfig, + BatchSizeCalculation, BrokerConfig, ClickhouseConfig, EnvConfig, MessageProcessorConfig, + ProcessorConfig, StorageConfig, TopicConfig, }; pub use factory_v2::ConsumerStrategyFactoryV2; pub use metrics::statsd::StatsDBackend; pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS}; pub use strategies::noop::Noop; pub use strategies::python::PythonTransformStep; -pub use types::KafkaMessageMetadata; +pub use types::{EstimatedSize, KafkaMessageMetadata}; #[cfg(test)] mod testutils; diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index e1f14218219..c81a2dd745f 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -346,6 +346,31 @@ pub struct EAPItemRow { } } +fn vec_string_pair_size(v: &[(String, B)]) -> usize { + let heap: usize = v.iter().map(|(s, _)| s.len()).sum(); + std::mem::size_of_val(v) + heap +} + +fn vec_string_string_pair_size(v: &[(String, String)]) -> usize { + let heap: usize = v.iter().map(|(k, v)| k.len() + v.len()).sum(); + std::mem::size_of_val(v) + heap +} + +seq_attrs! { +impl crate::types::EstimatedSize for EAPItemRow { + fn estimated_size(&self) -> usize { + std::mem::size_of::() + + vec_string_pair_size(&self.attributes_bool) + + vec_string_pair_size(&self.attributes_int) + + self.attributes_array.len() + #( + + vec_string_string_pair_size(&self.attributes_string_~N) + + vec_string_pair_size(&self.attributes_float_~N) + )* + } +} +} + impl TryFrom for EAPItemRow { type Error = anyhow::Error; diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index cb87890d63c..181ee6aa6f1 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -16,8 +16,8 @@ use sentry_kafka_schemas::{Schema, SchemaError, SchemaType}; use crate::config::ProcessorConfig; use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; use crate::types::{ - BytesInsertBatch, CommitLogEntry, CommitLogOffsets, InsertBatch, InsertOrReplacement, - KafkaMessageMetadata, RowData, TypedInsertBatch, + BytesInsertBatch, CommitLogEntry, CommitLogOffsets, EstimatedSize, InsertBatch, + InsertOrReplacement, KafkaMessageMetadata, RowData, TypedInsertBatch, }; use tokio::time::Instant; @@ -49,7 +49,9 @@ pub fn make_rust_processor( } } + let num_bytes = transformed.rows.encoded_rows.len(); let mut payload = BytesInsertBatch::from_rows(transformed.rows) + .with_num_bytes(num_bytes) .with_message_timestamp(timestamp) .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, @@ -126,7 +128,9 @@ pub fn make_rust_processor_with_replacements( let payload = match transformed { InsertOrReplacement::Insert(transformed) => { + let num_bytes = transformed.rows.encoded_rows.len(); let mut batch = BytesInsertBatch::from_rows(transformed.rows) + .with_num_bytes(num_bytes) .with_message_timestamp(timestamp) .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, @@ -175,7 +179,7 @@ pub fn make_rust_processor_with_replacements( )) } -pub fn make_rust_processor_row_binary( +pub fn make_rust_processor_row_binary( next_step: impl ProcessingStrategy>> + 'static, func: fn( KafkaPayload, @@ -190,7 +194,7 @@ pub fn make_rust_processor_row_binary( ) -> Box> { let schema = get_schema(schema_name, enforce_schema); - fn result_to_next_msg( + fn result_to_next_msg( transformed: TypedInsertBatch, partition: Partition, offset: u64, @@ -209,7 +213,9 @@ pub fn make_rust_processor_row_binary( } } + let num_bytes: usize = transformed.rows.iter().map(|r| r.estimated_size()).sum(); let mut payload = BytesInsertBatch::from_rows(transformed.rows) + .with_num_bytes(num_bytes) .with_message_timestamp(timestamp) .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, diff --git a/rust_snuba/src/strategies/python.rs b/rust_snuba/src/strategies/python.rs index fde98a7e79f..215a571730e 100644 --- a/rust_snuba/src/strategies/python.rs +++ b/rust_snuba/src/strategies/python.rs @@ -85,7 +85,10 @@ impl PythonTransformStep { }) .collect(); - let mut payload = BytesInsertBatch::from_rows(RowData::from_encoded_rows(payload)) + let row_data = RowData::from_encoded_rows(payload); + let num_bytes = row_data.encoded_rows.len(); + let mut payload = BytesInsertBatch::from_rows(row_data) + .with_num_bytes(num_bytes) .with_message_timestamp(message_timestamp) .with_commit_log_offsets(CommitLogOffsets(commit_log_offsets)); diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 105cf4a58e6..d7c48a607a5 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -8,6 +8,12 @@ use sentry_arroyo::timer; use sentry_protos::snuba::v1::TraceItemType; use serde::{Deserialize, Serialize}; +/// Trait for row types that can estimate their in-memory byte size. +/// Used by byte-based batch size calculation in the Reduce step. +pub trait EstimatedSize { + fn estimated_size(&self) -> usize; +} + #[derive(Clone, Debug, PartialEq)] pub struct CommitLogEntry { pub offset: u64, @@ -255,6 +261,9 @@ pub struct BytesInsertBatch { cogs_data: CogsData, item_type_metrics: ItemTypeMetrics, + + /// Total encoded byte size of the batch, used for byte-based batch size limiting + num_bytes: usize, } impl BytesInsertBatch { @@ -282,6 +291,7 @@ impl BytesInsertBatch { commit_log_offsets, cogs_data, item_type_metrics: Default::default(), + num_bytes: 0, } } @@ -303,6 +313,7 @@ impl BytesInsertBatch { commit_log_offsets: Default::default(), cogs_data: Default::default(), item_type_metrics: Default::default(), + num_bytes: 0, } } @@ -342,6 +353,17 @@ impl BytesInsertBatch { self } + /// Set the total encoded byte size of the batch + pub fn with_num_bytes(mut self, num_bytes: usize) -> Self { + self.num_bytes = num_bytes; + self + } + + /// Get the total encoded byte size of the batch + pub fn num_bytes(&self) -> usize { + self.num_bytes + } + pub fn commit_log_offsets(&self) -> &CommitLogOffsets { &self.commit_log_offsets } @@ -359,6 +381,7 @@ impl BytesInsertBatch { commit_log_offsets: self.commit_log_offsets.clone(), cogs_data: self.cogs_data.clone(), item_type_metrics: self.item_type_metrics.clone(), + num_bytes: self.num_bytes, } } @@ -371,6 +394,7 @@ impl BytesInsertBatch { commit_log_offsets: self.commit_log_offsets, cogs_data: self.cogs_data, item_type_metrics: self.item_type_metrics, + num_bytes: self.num_bytes, }; (self.rows, new) @@ -415,6 +439,7 @@ impl BytesInsertBatch { pub fn merge(mut self, other: BytesInsertBatch) -> Self { self.rows.encoded_rows.extend(other.rows.encoded_rows); self.rows.num_rows += other.rows.num_rows; + self.num_bytes += other.num_bytes; self.commit_log_offsets.merge(other.commit_log_offsets); self.message_timestamp.merge(other.message_timestamp); self.origin_timestamp.merge(other.origin_timestamp); @@ -433,6 +458,7 @@ impl BytesInsertBatch> { pub fn merge(mut self, other: Self) -> Self { self.rows.extend(other.rows); + self.num_bytes += other.num_bytes; self.commit_log_offsets.merge(other.commit_log_offsets); self.message_timestamp.merge(other.message_timestamp); self.origin_timestamp.merge(other.origin_timestamp); diff --git a/snuba/cli/rust_consumer.py b/snuba/cli/rust_consumer.py index e58f6847152..e153ed41296 100644 --- a/snuba/cli/rust_consumer.py +++ b/snuba/cli/rust_consumer.py @@ -78,7 +78,13 @@ "--max-batch-size", default=settings.DEFAULT_MAX_BATCH_SIZE, type=int, - help="Max number of messages to batch in memory before writing to Kafka.", + help="Max number of messages (or bytes, if --max-batch-size-calculation=bytes) to batch in memory before flushing.", +) +@click.option( + "--max-batch-size-calculation", + default="rows", + type=click.Choice(["rows", "bytes"]), + help="How --max-batch-size is interpreted. 'rows' counts messages (default), 'bytes' counts total encoded byte size.", ) @click.option( "--max-batch-time-ms", @@ -207,6 +213,7 @@ def rust_consumer( commit_log_bootstrap_servers: Sequence[str], replacement_bootstrap_servers: Sequence[str], max_batch_size: int, + max_batch_size_calculation: str, max_batch_time_ms: int, log_level: str, concurrency: Optional[int], @@ -241,6 +248,7 @@ def rust_consumer( replacement_bootstrap_servers=replacement_bootstrap_servers, max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_batch_size_calculation=max_batch_size_calculation, queued_max_messages_kbytes=queued_max_messages_kbytes, queued_min_messages=queued_min_messages, slice_id=None, diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index 8f318939d94..115cad18983 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -70,6 +70,7 @@ class ConsumerConfig: dlq_topic: Optional[TopicConfig] max_batch_size: int max_batch_time_ms: int + max_batch_size_calculation: str env: Optional[EnvConfig] accountant_topic: TopicConfig @@ -152,6 +153,7 @@ def resolve_consumer_config( slice_id: Optional[int], max_batch_size: int, max_batch_time_ms: int, + max_batch_size_calculation: str = "rows", accepted_outcomes_topic: Optional[str] = None, accepted_outcomes_bootstrap_servers: Sequence[str] = (), queued_max_messages_kbytes: Optional[int] = None, @@ -276,6 +278,7 @@ def resolve_consumer_config( dlq_topic=resolved_dlq_topic, max_batch_size=max_batch_size, max_batch_time_ms=max_batch_time_ms, + max_batch_size_calculation=max_batch_size_calculation, env=resolved_env_config, accountant_topic=accountant_topic, )