@@ -511,12 +511,18 @@ impl TaskRunner<KafkaPayload, KafkaPayload, anyhow::Error> for SchemaValidator {
511511#[ cfg( test) ]
512512mod tests {
513513 use super :: * ;
514+ use sentry_arroyo:: backends:: kafka:: config:: KafkaConfig ;
514515 use sentry_arroyo:: processing:: strategies:: {
515516 CommitRequest , ProcessingStrategy , StrategyError , SubmitError ,
516517 } ;
517518 use sentry_arroyo:: types:: { BrokerMessage , InnerMessage , Partition , Topic } ;
519+ use sentry_options:: init_with_schemas;
520+ use sentry_options:: testing:: set_override;
521+ use serde_json:: json;
522+ use std:: sync:: Once ;
518523 use std:: sync:: { Arc , Mutex } ;
519524
525+ // ----------- BYTES_INSERT_BATCH ------------------
520526 /// A next-step that records every batch it receives.
521527 struct RecordingStep {
522528 batches : Arc < Mutex < Vec < BytesInsertBatch < RowData > > > > ,
@@ -742,19 +748,8 @@ mod tests {
742748 assert_eq ! ( batches[ 0 ] . len( ) , 5 ) ;
743749 assert_eq ! ( batches[ 0 ] . num_bytes( ) , 200_000 ) ; // 5 * 40KB accumulated but didn't trigger flush
744750 }
745- }
746-
747- #[ cfg( test) ]
748- mod tests {
749- use std:: sync:: Once ;
750-
751- use super :: * ;
752- use sentry_arroyo:: backends:: kafka:: config:: KafkaConfig ;
753- use sentry_arroyo:: types:: Topic ;
754- use sentry_options:: init_with_schemas;
755- use sentry_options:: testing:: set_override;
756- use serde_json:: json;
757751
752+ // --------- BLQ -------------
758753 fn make_factory (
759754 blq_producer_config : Option < KafkaConfig > ,
760755 blq_topic : Option < Topic > ,
@@ -807,6 +802,7 @@ mod tests {
807802 use_row_binary : false ,
808803 blq_producer_config,
809804 blq_topic,
805+ max_batch_size_calculation : config:: BatchSizeCalculation :: Rows ,
810806 }
811807 }
812808
0 commit comments