@@ -2,6 +2,9 @@ use std::collections::HashMap;
22use std:: sync:: Arc ;
33use std:: time:: Duration ;
44
5+ use sentry_options:: options;
6+
7+ use chrono:: TimeDelta ;
58use sentry:: { Hub , SentryFutureExt } ;
69use sentry_arroyo:: backends:: kafka:: config:: KafkaConfig ;
710use sentry_arroyo:: backends:: kafka:: producer:: KafkaProducer ;
@@ -24,6 +27,8 @@ use crate::config;
2427use crate :: metrics:: global_tags:: set_global_tag;
2528use crate :: processors:: { self , get_cogs_label} ;
2629use crate :: strategies:: accountant:: RecordCogs ;
30+
31+ use crate :: strategies:: blq_router:: BLQRouter ;
2732use crate :: strategies:: clickhouse:: row_binary_writer:: ClickhouseRowBinaryWriterStep ;
2833use crate :: strategies:: clickhouse:: writer_v2:: ClickhouseWriterStep ;
2934use crate :: strategies:: commit_log:: ProduceCommitLog ;
@@ -37,6 +42,10 @@ use crate::strategies::python::PythonTransformStep;
3742use crate :: strategies:: replacements:: ProduceReplacements ;
3843use crate :: types:: { BytesInsertBatch , CogsData , RowData , TypedInsertBatch } ;
3944
45+ // BLQ configuration
46+ const BLQ_STALE_THRESHOLD : TimeDelta = TimeDelta :: minutes ( 30 ) ;
47+ const BLQ_STATIC_FRICTION : Option < TimeDelta > = Some ( TimeDelta :: minutes ( 2 ) ) ;
48+
4049pub struct ConsumerStrategyFactoryV2 {
4150 pub storage_config : config:: StorageConfig ,
4251 pub env_config : config:: EnvConfig ,
@@ -63,6 +72,8 @@ pub struct ConsumerStrategyFactoryV2 {
6372 pub join_timeout_ms : Option < u64 > ,
6473 pub health_check : String ,
6574 pub use_row_binary : bool ,
75+ pub blq_producer_config : Option < KafkaConfig > ,
76+ pub blq_topic : Option < Topic > ,
6677}
6778
6879impl ProcessingStrategyFactory < KafkaPayload > for ConsumerStrategyFactoryV2 {
@@ -87,6 +98,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
8798
8899 fn create ( & self ) -> Box < dyn ProcessingStrategy < KafkaPayload > > {
89100 if self . use_row_binary {
101+ tracing:: info!( "Using row_binary pipeline" ) ;
90102 return match self
91103 . storage_config
92104 . message_processor
@@ -263,6 +275,34 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
263275 next_step,
264276 Some ( Duration :: from_millis ( self . join_timeout_ms . unwrap_or ( 0 ) ) ) ,
265277 ) ;
278+
279+ let next_step: Box < dyn ProcessingStrategy < KafkaPayload > > =
280+ if let ( true , Some ( blq_producer_config) , Some ( blq_topic) ) = (
281+ self . should_use_blq ( ) ,
282+ & self . blq_producer_config ,
283+ self . blq_topic ,
284+ ) {
285+ tracing:: info!(
286+ "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}" ,
287+ BLQ_STALE_THRESHOLD ,
288+ self . blq_topic,
289+ BLQ_STATIC_FRICTION
290+ ) ;
291+ Box :: new (
292+ BLQRouter :: new (
293+ next_step,
294+ blq_producer_config. clone ( ) ,
295+ blq_topic,
296+ BLQ_STALE_THRESHOLD ,
297+ BLQ_STATIC_FRICTION ,
298+ )
299+ . expect ( "invalid BLQRouter config" ) ,
300+ )
301+ } else {
302+ tracing:: info!( "Not using a backlog-queue" , ) ;
303+ Box :: new ( next_step)
304+ } ;
305+
266306 if let Some ( path) = & self . health_check_file {
267307 {
268308 if self . health_check == "snuba" {
@@ -282,6 +322,15 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
282322}
283323
284324impl ConsumerStrategyFactoryV2 {
325+ fn should_use_blq ( & self ) -> bool {
326+ let flag = options ( "snuba" )
327+ . ok ( )
328+ . and_then ( |o| o. get ( "consumer.blq_enabled" ) . ok ( ) )
329+ . and_then ( |v| v. as_bool ( ) )
330+ . unwrap_or ( false ) ;
331+ flag && self . blq_producer_config . is_some ( ) && self . blq_topic . is_some ( )
332+ }
333+
285334 fn create_row_binary_pipeline <
286335 T : clickhouse:: Row
287336 + serde:: Serialize
@@ -390,6 +439,33 @@ impl ConsumerStrategyFactoryV2 {
390439 Some ( Duration :: from_millis ( self . join_timeout_ms . unwrap_or ( 0 ) ) ) ,
391440 ) ;
392441
442+ let next_step: Box < dyn ProcessingStrategy < KafkaPayload > > =
443+ if let ( true , Some ( blq_producer_config) , Some ( blq_topic) ) = (
444+ self . should_use_blq ( ) ,
445+ & self . blq_producer_config ,
446+ self . blq_topic ,
447+ ) {
448+ tracing:: info!(
449+ "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}" ,
450+ BLQ_STALE_THRESHOLD ,
451+ self . blq_topic,
452+ BLQ_STATIC_FRICTION ,
453+ ) ;
454+ Box :: new (
455+ BLQRouter :: new (
456+ next_step,
457+ blq_producer_config. clone ( ) ,
458+ blq_topic,
459+ BLQ_STALE_THRESHOLD ,
460+ BLQ_STATIC_FRICTION ,
461+ )
462+ . expect ( "invalid BLQRouter config" ) ,
463+ )
464+ } else {
465+ tracing:: info!( "Not using a backlog-queue" , ) ;
466+ Box :: new ( next_step)
467+ } ;
468+
393469 if let Some ( path) = & self . health_check_file {
394470 if self . health_check == "snuba" {
395471 tracing:: info!(
@@ -435,12 +511,18 @@ impl TaskRunner<KafkaPayload, KafkaPayload, anyhow::Error> for SchemaValidator {
435511#[ cfg( test) ]
436512mod tests {
437513 use super :: * ;
514+ use sentry_arroyo:: backends:: kafka:: config:: KafkaConfig ;
438515 use sentry_arroyo:: processing:: strategies:: {
439516 CommitRequest , ProcessingStrategy , StrategyError , SubmitError ,
440517 } ;
441518 use sentry_arroyo:: types:: { BrokerMessage , InnerMessage , Partition , Topic } ;
519+ use sentry_options:: init_with_schemas;
520+ use sentry_options:: testing:: override_options;
521+ use serde_json:: json;
522+ use std:: sync:: Once ;
442523 use std:: sync:: { Arc , Mutex } ;
443524
525+ // ----------- BYTES_INSERT_BATCH ------------------
444526 /// A next-step that records every batch it receives.
445527 struct RecordingStep {
446528 batches : Arc < Mutex < Vec < BytesInsertBatch < RowData > > > > ,
@@ -666,4 +748,109 @@ mod tests {
666748 assert_eq ! ( batches[ 0 ] . len( ) , 5 ) ;
667749 assert_eq ! ( batches[ 0 ] . num_bytes( ) , 200_000 ) ; // 5 * 40KB accumulated but didn't trigger flush
668750 }
751+
752+ // --------- BLQ -------------
753+ fn make_factory (
754+ blq_producer_config : Option < KafkaConfig > ,
755+ blq_topic : Option < Topic > ,
756+ ) -> ConsumerStrategyFactoryV2 {
757+ ConsumerStrategyFactoryV2 {
758+ storage_config : config:: StorageConfig {
759+ name : "test" . to_string ( ) ,
760+ clickhouse_table_name : "test" . to_string ( ) ,
761+ clickhouse_cluster : config:: ClickhouseConfig {
762+ host : "localhost" . to_string ( ) ,
763+ port : 9000 ,
764+ secure : false ,
765+ http_port : 8123 ,
766+ user : "default" . to_string ( ) ,
767+ password : "" . to_string ( ) ,
768+ database : "default" . to_string ( ) ,
769+ } ,
770+ message_processor : config:: MessageProcessorConfig {
771+ python_class_name : "Test" . to_string ( ) ,
772+ python_module : "test" . to_string ( ) ,
773+ } ,
774+ } ,
775+ env_config : config:: EnvConfig :: default ( ) ,
776+ logical_topic_name : "test" . to_string ( ) ,
777+ max_batch_size : 100 ,
778+ max_batch_time : Duration :: from_secs ( 1 ) ,
779+ processing_concurrency : ConcurrencyConfig :: new ( 1 ) ,
780+ clickhouse_concurrency : ConcurrencyConfig :: new ( 1 ) ,
781+ commitlog_concurrency : ConcurrencyConfig :: new ( 1 ) ,
782+ replacements_concurrency : ConcurrencyConfig :: new ( 1 ) ,
783+ async_inserts : false ,
784+ python_max_queue_depth : None ,
785+ use_rust_processor : false ,
786+ health_check_file : None ,
787+ enforce_schema : false ,
788+ commit_log_producer : None ,
789+ replacements_config : None ,
790+ physical_consumer_group : "test" . to_string ( ) ,
791+ physical_topic_name : Topic :: new ( "test" ) ,
792+ accountant_topic_config : config:: TopicConfig {
793+ physical_topic_name : "test" . to_string ( ) ,
794+ logical_topic_name : "test" . to_string ( ) ,
795+ broker_config : HashMap :: new ( ) ,
796+ quantized_rebalance_consumer_group_delay_secs : None ,
797+ } ,
798+ stop_at_timestamp : None ,
799+ batch_write_timeout : None ,
800+ join_timeout_ms : None ,
801+ health_check : "arroyo" . to_string ( ) ,
802+ use_row_binary : false ,
803+ blq_producer_config,
804+ blq_topic,
805+ max_batch_size_calculation : config:: BatchSizeCalculation :: Rows ,
806+ }
807+ }
808+
809+ static INIT : Once = Once :: new ( ) ;
810+ fn init_config ( ) {
811+ INIT . call_once ( || init_with_schemas ( & [ ( "snuba" , crate :: SNUBA_SCHEMA ) ] ) . unwrap ( ) ) ;
812+ }
813+
814+ fn blq_kafka_config ( ) -> KafkaConfig {
815+ KafkaConfig :: new_config ( vec ! [ "localhost:9092" . to_string( ) ] , None )
816+ }
817+
818+ #[ test]
819+ fn test_should_not_use_blq_when_no_flag ( ) {
820+ init_config ( ) ;
821+ let factory = make_factory ( Some ( blq_kafka_config ( ) ) , Some ( Topic :: new ( "blq" ) ) ) ;
822+ assert ! ( !factory. should_use_blq( ) ) ;
823+ }
824+
825+ #[ test]
826+ fn test_should_not_use_blq_when_flag_disabled ( ) {
827+ init_config ( ) ;
828+ let _guard = override_options ( & [ ( "snuba" , "consumer.blq_enabled" , json ! ( false ) ) ] ) . unwrap ( ) ;
829+ let factory = make_factory ( Some ( blq_kafka_config ( ) ) , Some ( Topic :: new ( "blq" ) ) ) ;
830+ assert ! ( !factory. should_use_blq( ) ) ;
831+ }
832+
833+ #[ test]
834+ fn test_should_not_use_blq_when_no_producer_config ( ) {
835+ init_config ( ) ;
836+ let _guard = override_options ( & [ ( "snuba" , "consumer.blq_enabled" , json ! ( true ) ) ] ) . unwrap ( ) ;
837+ let factory = make_factory ( None , Some ( Topic :: new ( "blq" ) ) ) ;
838+ assert ! ( !factory. should_use_blq( ) ) ;
839+ }
840+
841+ #[ test]
842+ fn test_should_not_use_blq_when_no_topic ( ) {
843+ init_config ( ) ;
844+ let _guard = override_options ( & [ ( "snuba" , "consumer.blq_enabled" , json ! ( true ) ) ] ) . unwrap ( ) ;
845+ let factory = make_factory ( Some ( blq_kafka_config ( ) ) , None ) ;
846+ assert ! ( !factory. should_use_blq( ) ) ;
847+ }
848+
849+ #[ test]
850+ fn test_should_use_blq_when_all_conditions_met ( ) {
851+ init_config ( ) ;
852+ let _guard = override_options ( & [ ( "snuba" , "consumer.blq_enabled" , json ! ( true ) ) ] ) . unwrap ( ) ;
853+ let factory = make_factory ( Some ( blq_kafka_config ( ) ) , Some ( Topic :: new ( "blq" ) ) ) ;
854+ assert ! ( factory. should_use_blq( ) ) ;
855+ }
669856}
0 commit comments