From e8670084f2088de59b4475c9b91e809710c0952e Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 26 Mar 2026 14:58:09 -0700 Subject: [PATCH 01/14] initial BLQRouter implemented --- .vscode/launch.json | 2 - rust_snuba/src/factory_v2.rs | 3 + rust_snuba/src/strategies/blq_router.rs | 97 +++++++++++++++++++++++++ rust_snuba/src/strategies/mod.rs | 1 + 4 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 rust_snuba/src/strategies/blq_router.rs diff --git a/.vscode/launch.json b/.vscode/launch.json index 3525943710..6a3312832d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -33,7 +33,6 @@ "--no-strict-offset-reset", "--use-rust-processor", "--enforce-schema", - "--blq-stale-age-mins=30" ], "cwd": "${workspaceFolder}", "sourceLanguages": ["rust"], @@ -55,7 +54,6 @@ "--no-strict-offset-reset", "--use-rust-processor", "--enforce-schema", - "--blq-stale-age-mins=30" ], "cwd": "${workspaceFolder}", "justMyCode": false diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 91758b4422..9d1b6c17a7 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use chrono::TimeDelta; use sentry::{Hub, SentryFutureExt}; use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::backends::kafka::producer::KafkaProducer; @@ -24,6 +25,7 @@ use crate::config; use crate::metrics::global_tags::set_global_tag; use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; +use crate::strategies::blq_router::BLQRouter; use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; @@ -263,6 +265,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { next_step, Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); + let next_step = BLQRouter::new(next_step, TimeDelta::seconds(10)); if let Some(path) = &self.health_check_file { { if self.health_check == "snuba" { diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs new file mode 100644 index 0000000000..f53de4983f --- /dev/null +++ b/rust_snuba/src/strategies/blq_router.rs @@ -0,0 +1,97 @@ +use std::time::Duration; + +use chrono::{TimeDelta, Utc}; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; +use sentry_arroyo::types::Message; + +#[derive(PartialEq)] + +enum State { + Empty, + BatchingStale, + BatchingFresh, +} + +pub struct BLQRouter { + next_step: Next, + stale_threshold: TimeDelta, + state: State, +} + +impl BLQRouter { + pub fn new(next_step: Next, stale_threshold: TimeDelta) -> Self { + Self { + next_step, + stale_threshold, + state: State::Empty, + } + } +} + +impl ProcessingStrategy for BLQRouter +where + Next: ProcessingStrategy + 'static, +{ + fn poll(&mut self) -> Result, StrategyError> { + self.next_step.poll() + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + let msg_ts = message + .timestamp() + .expect("Expected kafka message to always have a timestamp, but there wasn't one"); + let elapsed = Utc::now() - msg_ts; + let is_stale = elapsed > self.stale_threshold; + match (is_stale, &self.state) { + (true, State::BatchingFresh) => { + // we want the consumer to crash + // this is the only way for us to drop the batch of fresh messages without commiting + // when the consumer restarts it will get the stale message again in State::Empty + panic!("Resetting consumer state to begin processing the stale backlog") + } + (true, State::Empty) | (true, State::BatchingStale) => { + // batch stale messages + if self.state == State::Empty { + self.state = State::BatchingStale; + } + + // todo: batch stale + println!("batching stale"); + + // all stale message handling happens in this strategy, + // nothing should be passed downstream + Ok(()) + } + (false, State::Empty) | (false, State::BatchingFresh) => { + // batch fresh messages + println!("batch fresh"); + if self.state == State::Empty { + self.state = State::BatchingFresh; + } + self.next_step.submit(message) + } + (false, State::BatchingStale) => { + // we hit a fresh message, so we commit our stale batch + // and start batching the fresh messages + + // todo: commit stale + println!("commit stale batch"); + + println!("batch fresh"); + self.state = State::BatchingFresh; + self.next_step.submit(message) + } + } + } + + fn terminate(&mut self) { + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + self.next_step.join(timeout) + } +} diff --git a/rust_snuba/src/strategies/mod.rs b/rust_snuba/src/strategies/mod.rs index 77b8776314..3ac88c5b28 100644 --- a/rust_snuba/src/strategies/mod.rs +++ b/rust_snuba/src/strategies/mod.rs @@ -1,5 +1,6 @@ pub mod accepted_outcomes; pub mod accountant; +pub mod blq_router; pub mod clickhouse; pub mod commit_log; pub mod healthcheck; From dafa8a5b99a9911353ad518c4fbe5980aaccf289 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Mon, 30 Mar 2026 17:41:26 -0700 Subject: [PATCH 02/14] BLQRouter working??? --- rust_snuba/benches/processors.rs | 2 + rust_snuba/src/consumer.rs | 19 +++++-- rust_snuba/src/factory_v2.rs | 30 ++++++++++- rust_snuba/src/strategies/blq_router.rs | 66 +++++++++++++++++++------ 4 files changed, 97 insertions(+), 20 deletions(-) diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index f9ee3d4c27..8f6793224b 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -104,6 +104,8 @@ fn create_factory( join_timeout_ms: None, health_check: "arroyo".to_string(), use_row_binary: false, + blq_producer_config: None, + blq_topic: None, }; Box::new(factory) } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ae8c5b22b6..73a6419a51 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -192,10 +192,21 @@ pub fn consumer_impl( // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be // writing to the DLQ topics in prod. + + let dlq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { + KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config.clone())) + }); + + let dlq_topic = consumer_config + .dlq_topic + .as_ref() + .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); + let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config)); - let producer = KafkaProducer::new(producer_config); + let producer = KafkaProducer::new(KafkaConfig::new_producer_config( + vec![], + Some(dlq_topic_config.broker_config), + )); let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( producer, @@ -276,6 +287,8 @@ pub fn consumer_impl( join_timeout_ms, health_check: health_check.to_string(), use_row_binary, + blq_producer_config: dlq_producer_config.clone(), + blq_topic: dlq_topic.clone(), }; let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 9d1b6c17a7..2e2c65a422 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -25,7 +25,7 @@ use crate::config; use crate::metrics::global_tags::set_global_tag; use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; -use crate::strategies::blq_router::BLQRouter; +use crate::strategies::blq_router::{BLQConfig, BLQRouter}; use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; @@ -65,6 +65,8 @@ pub struct ConsumerStrategyFactoryV2 { pub join_timeout_ms: Option, pub health_check: String, pub use_row_binary: bool, + pub blq_producer_config: Option, + pub blq_topic: Option, } impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { @@ -89,6 +91,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { fn create(&self) -> Box> { if self.use_row_binary { + tracing::info!("Using row_binary pipeline"); return match self .storage_config .message_processor @@ -265,7 +268,30 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { next_step, Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); - let next_step = BLQRouter::new(next_step, TimeDelta::seconds(10)); + + let next_step: Box> = + if let (Some(blq_producer_config), Some(blq_topic)) = + (&self.blq_producer_config, self.blq_topic) + { + let stale_threshold = TimeDelta::seconds(10); + tracing::info!( + "Routing all messages older than {:?} to the topic {:?}", + stale_threshold, + blq_topic + ); + Box::new(BLQRouter::new( + next_step, + BLQConfig { + stale_threshold, + blq_producer: KafkaProducer::new(blq_producer_config.clone()), + blq_topic, + }, + )) + } else { + tracing::info!("Not using a backlog-queue",); + Box::new(next_step) + }; + if let Some(path) = &self.health_check_file { { if self.health_check == "snuba" { diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index f53de4983f..50da2e1474 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -1,11 +1,15 @@ use std::time::Duration; use chrono::{TimeDelta, Utc}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets; +use sentry_arroyo::processing::strategies::produce::Produce; +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; -use sentry_arroyo::types::Message; +use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; #[derive(PartialEq)] @@ -15,18 +19,42 @@ enum State { BatchingFresh, } +// todo: config params +// concurrency config +// commit offset frequency + +pub struct BLQConfig { + pub stale_threshold: TimeDelta, + pub blq_producer: KafkaProducer, + pub blq_topic: Topic, +} + pub struct BLQRouter { next_step: Next, stale_threshold: TimeDelta, state: State, + producer: Produce, + _concurrency: ConcurrencyConfig, // you should never have to deal w this, its just needed for lifetimes } -impl BLQRouter { - pub fn new(next_step: Next, stale_threshold: TimeDelta) -> Self { +impl BLQRouter +where + Next: ProcessingStrategy + 'static, +{ + pub fn new(next_step: Next, config: BLQConfig) -> Self { + let concurrency = ConcurrencyConfig::new(10); + let producer = Produce::new( + CommitOffsets::new(Duration::from_millis(250)), + config.blq_producer, + &concurrency, + TopicOrPartition::Topic(config.blq_topic), + ); Self { next_step, - stale_threshold, + stale_threshold: config.stale_threshold, state: State::Empty, + producer, + _concurrency: concurrency, } } } @@ -36,7 +64,12 @@ where Next: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { - self.next_step.poll() + let producer_result = self.producer.poll(); + let next_step_result = self.next_step.poll(); + match self.state { + State::BatchingStale => producer_result, + State::BatchingFresh | State::Empty => next_step_result, + } } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { @@ -57,13 +90,8 @@ where if self.state == State::Empty { self.state = State::BatchingStale; } - - // todo: batch stale - println!("batching stale"); - - // all stale message handling happens in this strategy, - // nothing should be passed downstream - Ok(()) + println!("batch stale"); + self.producer.submit(message) } (false, State::Empty) | (false, State::BatchingFresh) => { // batch fresh messages @@ -77,8 +105,10 @@ where // we hit a fresh message, so we commit our stale batch // and start batching the fresh messages - // todo: commit stale - println!("commit stale batch"); + // the producer should finish writing and committing everything + // if it doesnt finish all commits we cant move on to the next offsets + println!("joining the stale batch"); + self.producer.join(Some(Duration::from_secs(5))).unwrap(); println!("batch fresh"); self.state = State::BatchingFresh; @@ -88,10 +118,16 @@ where } fn terminate(&mut self) { + self.producer.terminate(); self.next_step.terminate(); } fn join(&mut self, timeout: Option) -> Result, StrategyError> { - self.next_step.join(timeout) + let producer_result = self.producer.join(timeout); + let next_step_result = self.next_step.join(timeout); + match self.state { + State::BatchingStale => producer_result, + State::BatchingFresh | State::Empty => next_step_result, + } } } From ca7ae26769254f436e9d4287560e8e0e7adec6e7 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Tue, 31 Mar 2026 11:55:56 -0700 Subject: [PATCH 03/14] blqrouter pre-test --- rust_snuba/src/strategies/blq_router.rs | 80 ++++++++++++++----------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 50da2e1474..5d4afa50e5 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -10,20 +10,26 @@ use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; +// todo: config params +// concurrency config +// commit offset frequency + +// Once we enter stale routing mode (at STALE_THRESHOLD), +// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. +// This is because we want a higher threshold to enter the stale routing state +// but a lower threshold to stay in it, so we don't flip-flop at the boundary. +const STATIC_FRICTION_SECS: i64 = 180; #[derive(PartialEq)] enum State { - Empty, - BatchingStale, - BatchingFresh, + Idle, // no messages have gone through the router yet + RoutingStale, // router is directing stale messages to the backlog-queue (BLQ) + Forwarding, // router is forwarding non-stale messages along to the next strategy } -// todo: config params -// concurrency config -// commit offset frequency - pub struct BLQConfig { + // if message timestamp is older than stale_threshold its routed to the backlog-queue pub stale_threshold: TimeDelta, pub blq_producer: KafkaProducer, pub blq_topic: Topic, @@ -42,6 +48,7 @@ where Next: ProcessingStrategy + 'static, { pub fn new(next_step: Next, config: BLQConfig) -> Self { + // next_step: where fresh messages get forwarded to let concurrency = ConcurrencyConfig::new(10); let producer = Produce::new( CommitOffsets::new(Duration::from_millis(250)), @@ -52,7 +59,7 @@ where Self { next_step, stale_threshold: config.stale_threshold, - state: State::Empty, + state: State::Idle, producer, _concurrency: concurrency, } @@ -67,8 +74,8 @@ where let producer_result = self.producer.poll(); let next_step_result = self.next_step.poll(); match self.state { - State::BatchingStale => producer_result, - State::BatchingFresh | State::Empty => next_step_result, + State::RoutingStale => producer_result, + State::Forwarding | State::Idle => next_step_result, } } @@ -77,41 +84,42 @@ where .timestamp() .expect("Expected kafka message to always have a timestamp, but there wasn't one"); let elapsed = Utc::now() - msg_ts; - let is_stale = elapsed > self.stale_threshold; + let is_stale = match &self.state { + State::RoutingStale => { + // see STATIC_FRICTION_SECS + elapsed > (self.stale_threshold - TimeDelta::seconds(STATIC_FRICTION_SECS)) + } + _ => elapsed > self.stale_threshold, + }; match (is_stale, &self.state) { - (true, State::BatchingFresh) => { - // we want the consumer to crash - // this is the only way for us to drop the batch of fresh messages without commiting - // when the consumer restarts it will get the stale message again in State::Empty + (true, State::Forwarding) => { + // When we transition from Forwarding to RoutingStale, there may be + // state in memory held downstream. We crash the consumer to get rid of internal state + // when it restarts it will have no internal state (State::Empty) and the first message in + // the topic will be stale. panic!("Resetting consumer state to begin processing the stale backlog") } - (true, State::Empty) | (true, State::BatchingStale) => { - // batch stale messages - if self.state == State::Empty { - self.state = State::BatchingStale; + (true, State::Idle) | (true, State::RoutingStale) => { + // route the stale message to the BLQ + if self.state == State::Idle { + self.state = State::RoutingStale; } - println!("batch stale"); self.producer.submit(message) } - (false, State::Empty) | (false, State::BatchingFresh) => { - // batch fresh messages - println!("batch fresh"); - if self.state == State::Empty { - self.state = State::BatchingFresh; + (false, State::Idle) | (false, State::Forwarding) => { + // Forward the fresh message along to the next step + if self.state == State::Idle { + self.state = State::Forwarding; } self.next_step.submit(message) } - (false, State::BatchingStale) => { - // we hit a fresh message, so we commit our stale batch - // and start batching the fresh messages - - // the producer should finish writing and committing everything - // if it doesnt finish all commits we cant move on to the next offsets - println!("joining the stale batch"); + (false, State::RoutingStale) => { + // We hit a fresh message, so we are done routing the backlog. + // Call join on the producer so all writes to the BLQ are committed. self.producer.join(Some(Duration::from_secs(5))).unwrap(); - println!("batch fresh"); - self.state = State::BatchingFresh; + // Now go back to forwarding non-stale messages as usual. + self.state = State::Forwarding; self.next_step.submit(message) } } @@ -126,8 +134,8 @@ where let producer_result = self.producer.join(timeout); let next_step_result = self.next_step.join(timeout); match self.state { - State::BatchingStale => producer_result, - State::BatchingFresh | State::Empty => next_step_result, + State::RoutingStale => producer_result, + State::Forwarding | State::Idle => next_step_result, } } } From a45d310e68f977b11ed5ede6cd4eba0ead557419 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Tue, 31 Mar 2026 16:21:53 -0700 Subject: [PATCH 04/14] blq has tests passing! --- rust_snuba/src/factory_v2.rs | 37 +++-- rust_snuba/src/strategies/blq_router.rs | 212 ++++++++++++++++++------ 2 files changed, 188 insertions(+), 61 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 2e2c65a422..3e03a56146 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -25,7 +25,10 @@ use crate::config; use crate::metrics::global_tags::set_global_tag; use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; -use crate::strategies::blq_router::{BLQConfig, BLQRouter}; +use sentry_arroyo::processing::strategies::produce::Produce; +use sentry_arroyo::types::TopicOrPartition; + +use crate::strategies::blq_router::BLQRouter; use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; @@ -273,20 +276,30 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { if let (Some(blq_producer_config), Some(blq_topic)) = (&self.blq_producer_config, self.blq_topic) { - let stale_threshold = TimeDelta::seconds(10); + let stale_threshold = TimeDelta::minutes(30); + let static_friction = TimeDelta::minutes(2); tracing::info!( - "Routing all messages older than {:?} to the topic {:?}", - stale_threshold, - blq_topic + "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", + stale_threshold, + blq_topic, + static_friction + ); + let concurrency = ConcurrencyConfig::new(10); + let blq_producer = Produce::new( + CommitOffsets::new(Duration::from_millis(250)), + KafkaProducer::new(blq_producer_config.clone()), + &concurrency, + TopicOrPartition::Topic(blq_topic), ); - Box::new(BLQRouter::new( - next_step, - BLQConfig { + Box::new( + BLQRouter::new( + next_step, + blq_producer, stale_threshold, - blq_producer: KafkaProducer::new(blq_producer_config.clone()), - blq_topic, - }, - )) + Some(static_friction), + ) + .expect("invalid BLQRouter config"), + ) } else { tracing::info!("Not using a backlog-queue",); Box::new(next_step) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 5d4afa50e5..dd0dd7e0f5 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -1,80 +1,83 @@ use std::time::Duration; use chrono::{TimeDelta, Utc}; -use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; -use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets; -use sentry_arroyo::processing::strategies::produce::Produce; -use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; -use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; -// todo: config params -// concurrency config -// commit offset frequency - -// Once we enter stale routing mode (at STALE_THRESHOLD), -// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. -// This is because we want a higher threshold to enter the stale routing state -// but a lower threshold to stay in it, so we don't flip-flop at the boundary. -const STATIC_FRICTION_SECS: i64 = 180; - -#[derive(PartialEq)] +use sentry_arroyo::types::Message; +#[derive(Debug, PartialEq)] enum State { Idle, // no messages have gone through the router yet RoutingStale, // router is directing stale messages to the backlog-queue (BLQ) Forwarding, // router is forwarding non-stale messages along to the next strategy } -pub struct BLQConfig { - // if message timestamp is older than stale_threshold its routed to the backlog-queue - pub stale_threshold: TimeDelta, - pub blq_producer: KafkaProducer, - pub blq_topic: Topic, -} - -pub struct BLQRouter { +pub struct BLQRouter { next_step: Next, stale_threshold: TimeDelta, state: State, - producer: Produce, - _concurrency: ConcurrencyConfig, // you should never have to deal w this, its just needed for lifetimes + producer: ProduceStrategy, + static_friction: Option, } -impl BLQRouter +impl BLQRouter where Next: ProcessingStrategy + 'static, + ProduceStrategy: ProcessingStrategy + 'static, { - pub fn new(next_step: Next, config: BLQConfig) -> Self { - // next_step: where fresh messages get forwarded to - let concurrency = ConcurrencyConfig::new(10); - let producer = Produce::new( - CommitOffsets::new(Duration::from_millis(250)), - config.blq_producer, - &concurrency, - TopicOrPartition::Topic(config.blq_topic), - ); - Self { + pub fn new( + next_step: Next, + blq_producer: ProduceStrategy, + stale_threshold: TimeDelta, + static_friction: Option, + ) -> Result { + /* next_step, - stale_threshold: config.stale_threshold, - state: State::Idle, + is where fresh messages get forwarded to producer, - _concurrency: concurrency, + ProcessingStrategy that submits messages to the BLQ, + stale messages will get submitted to it. + stale_threshold, + messages older than the stale_threshold will get sent to the producer + static_friction, + Once we enter stale routing mode (at STALE_THRESHOLD), + we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. + This is because we want a higher threshold to enter the stale routing state + but a lower threshold to stay in it, so we don't flip-flop at the boundary. + Best practice would be no greater than a small percent of stable_threshold like 10% + ex: stale_threshold=10m, static_friction=1m + and the implication is 9m old messages will now be sent to the BLQ in some cases + */ + if stale_threshold <= TimeDelta::zero() { + return Err("stale_threshold must be positive"); } + if let Some(friction) = static_friction { + if friction >= stale_threshold { + return Err("static_friction must be less than stale_threshold"); + } + } + Ok(Self { + next_step, + stale_threshold, + state: State::Idle, + producer: blq_producer, + static_friction, + }) } } -impl ProcessingStrategy for BLQRouter +impl ProcessingStrategy for BLQRouter where Next: ProcessingStrategy + 'static, + ProduceStrategy: ProcessingStrategy + 'static, { fn poll(&mut self) -> Result, StrategyError> { - let producer_result = self.producer.poll(); + let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); match self.state { - State::RoutingStale => producer_result, + State::RoutingStale => produce_result, State::Forwarding | State::Idle => next_step_result, } } @@ -84,13 +87,12 @@ where .timestamp() .expect("Expected kafka message to always have a timestamp, but there wasn't one"); let elapsed = Utc::now() - msg_ts; - let is_stale = match &self.state { - State::RoutingStale => { - // see STATIC_FRICTION_SECS - elapsed > (self.stale_threshold - TimeDelta::seconds(STATIC_FRICTION_SECS)) - } - _ => elapsed > self.stale_threshold, + + let threshold = match (&self.state, self.static_friction) { + (State::RoutingStale, Some(friction)) => self.stale_threshold - friction, + _ => self.stale_threshold, }; + let is_stale = elapsed > threshold; match (is_stale, &self.state) { (true, State::Forwarding) => { // When we transition from Forwarding to RoutingStale, there may be @@ -139,3 +141,115 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::DateTime; + use sentry_arroyo::types::{Partition, Topic}; + + struct MockStrategy { + submitted: Vec>, + join_called: bool, + terminate_called: bool, + } + + impl MockStrategy { + fn new() -> Self { + Self { + submitted: vec![], + join_called: false, + terminate_called: false, + } + } + } + + impl ProcessingStrategy for MockStrategy { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + + fn submit( + &mut self, + message: Message, + ) -> Result<(), SubmitError> { + self.submitted.push(message); + Ok(()) + } + + fn terminate(&mut self) { + self.terminate_called = true; + } + + fn join( + &mut self, + _timeout: Option, + ) -> Result, StrategyError> { + self.join_called = true; + Ok(None) + } + } + + fn make_message(timestamp: DateTime) -> Message { + Message::new_broker_message( + KafkaPayload::new(None, None, Some(b"test".to_vec())), + Partition::new(Topic::new("test"), 0), + 0, + timestamp, + ) + } + + #[test] + #[should_panic(expected = "Resetting consumer state to begin processing the stale backlog")] + fn test_fresh_to_stale() { + /* + This tests that the BLQRouter forwards business-as-usual fresh messages through it + and crashes when it hits its first stale message + */ + let mut router = BLQRouter::new( + MockStrategy::new(), + MockStrategy::new(), + TimeDelta::seconds(10), + None, + ) + .unwrap(); + // consuming messages as normal + for _ in 0..10 { + router.submit(make_message(Utc::now())).unwrap(); + } + assert_eq!(router.state, State::Forwarding); + // now theres a stale message, consumer should crash + _ = router.submit(make_message(Utc::now() - TimeDelta::seconds(20))); + } + + #[test] + fn test_stale_to_fresh() { + /* + This tests that the BLQRouter properly routes stale messages to the BLQ + and then switches back to forwarding fresh messages once the backlog is burned + */ + let mut router = BLQRouter::new( + MockStrategy::new(), + MockStrategy::new(), + TimeDelta::seconds(10), + Some(TimeDelta::seconds(1)), + ) + .unwrap(); + // backlog of 10 stale messages + for _ in 0..10 { + router + .submit(make_message(Utc::now() - TimeDelta::minutes(1))) + .unwrap(); + } + assert_eq!(router.state, State::RoutingStale); + assert!(!router.producer.join_called); + // now we are back to fresh messages + for _ in 0..5 { + router.submit(make_message(Utc::now())).unwrap(); + } + assert_eq!(router.state, State::Forwarding); + assert!(router.producer.join_called); + assert_eq!(router.producer.submitted.len(), 10); + assert_eq!(router.next_step.submitted.len(), 5); + } +} From 352e51a3d52c1a4e99fb572099fd56f11c62456c Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 1 Apr 2026 16:09:42 -0700 Subject: [PATCH 05/14] should be good to go --- .vscode/launch.json | 3 ++- .vscode/tasks.json | 6 ++++++ rust_snuba/src/factory_v2.rs | 13 ++++++++++--- sentry-options/schemas/snuba/schema.json | 5 +++++ 4 files changed, 23 insertions(+), 4 deletions(-) create mode 100644 .vscode/tasks.json diff --git a/.vscode/launch.json b/.vscode/launch.json index 6a3312832d..32ecfca6fd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -38,7 +38,8 @@ "sourceLanguages": ["rust"], "env": { "DOGSTATSD_HOST": "localhost" - } + }, + "preLaunchTask": "uvx maturin develop" }, // will only debug the python (first half) { diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000000..b2df099987 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,6 @@ +{ + "version": "2.0.0", + "tasks": [ + {"label": "uvx maturin develop", "type": "shell", "command": "uvx maturin develop", "options": {"cwd": "${workspaceFolder}/rust_snuba"}} + ] +} diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 3e03a56146..4070bd388c 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use sentry_options::options; + use chrono::TimeDelta; use sentry::{Hub, SentryFutureExt}; use sentry_arroyo::backends::kafka::config::KafkaConfig; @@ -272,16 +274,21 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); + let blq_enabled_flag = options("snuba") + .ok() + .and_then(|o| o.get("consumer.blq_enabled").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); let next_step: Box> = - if let (Some(blq_producer_config), Some(blq_topic)) = - (&self.blq_producer_config, self.blq_topic) + if let (true, Some(blq_producer_config), Some(blq_topic)) = + (blq_enabled_flag, &self.blq_producer_config, self.blq_topic) { let stale_threshold = TimeDelta::minutes(30); let static_friction = TimeDelta::minutes(2); tracing::info!( "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", stale_threshold, - blq_topic, + self.blq_topic, static_friction ); let concurrency = ConcurrencyConfig::new(10); diff --git a/sentry-options/schemas/snuba/schema.json b/sentry-options/schemas/snuba/schema.json index c5c5bbf2e2..5ae70d23a1 100644 --- a/sentry-options/schemas/snuba/schema.json +++ b/sentry-options/schemas/snuba/schema.json @@ -6,6 +6,11 @@ "type": "boolean", "default": false, "description": "true to use the item timestamp, false for the received timestamp" + }, + "consumer.blq_enabled": { + "type": "boolean", + "default": false, + "description": "enable backlog queue in snuba consumers" } } } From 2eb1b6d74e537fc7298be2b5f105d4613945b2ac Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 1 Apr 2026 16:21:38 -0700 Subject: [PATCH 06/14] lint --- rust_snuba/src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 73a6419a51..2d70c05737 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -288,7 +288,7 @@ pub fn consumer_impl( health_check: health_check.to_string(), use_row_binary, blq_producer_config: dlq_producer_config.clone(), - blq_topic: dlq_topic.clone(), + blq_topic: dlq_topic, }; let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); From 0e5e2755933dbff8106c009626a459af2817829f Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 10:04:22 -0700 Subject: [PATCH 07/14] add to binary pipeline --- rust_snuba/src/factory_v2.rs | 38 ++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 4070bd388c..ead04fddfb 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -439,6 +439,44 @@ impl ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); + let blq_enabled_flag = options("snuba") + .ok() + .and_then(|o| o.get("consumer.blq_enabled").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + let next_step: Box> = + if let (true, Some(blq_producer_config), Some(blq_topic)) = + (blq_enabled_flag, &self.blq_producer_config, self.blq_topic) + { + let stale_threshold = TimeDelta::minutes(30); + let static_friction = TimeDelta::minutes(2); + tracing::info!( + "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", + stale_threshold, + self.blq_topic, + static_friction + ); + let concurrency = ConcurrencyConfig::new(10); + let blq_producer = Produce::new( + CommitOffsets::new(Duration::from_millis(250)), + KafkaProducer::new(blq_producer_config.clone()), + &concurrency, + TopicOrPartition::Topic(blq_topic), + ); + Box::new( + BLQRouter::new( + next_step, + blq_producer, + stale_threshold, + Some(static_friction), + ) + .expect("invalid BLQRouter config"), + ) + } else { + tracing::info!("Not using a backlog-queue",); + Box::new(next_step) + }; + if let Some(path) = &self.health_check_file { if self.health_check == "snuba" { tracing::info!( From ccb7f4e07be1d9841db51e1c74664b3485bb2068 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 10:34:55 -0700 Subject: [PATCH 08/14] module docs --- rust_snuba/src/strategies/blq_router.rs | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index dd0dd7e0f5..5114c2df37 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -1,3 +1,32 @@ +//! # BLQ Router +//! +//! BLQ Router is an arroyo strategy that re-directs stale messages (by timestamp) to a configured backlog-queue topic. +//! Non-stale messages will be passively forwarded along to the next step in the arroyo strategy pipeline. +//! +//! ## Implementation +//! its essentially a FSM +//! +//! > forward to next step +//! ┌fresh─┐ +//! │ ▼ +//! ┌──┴─────────┐ +//! ┌fresh─►│ Forwarding ├──stale──► PANIC +//! │ └────────────┘ +//! │ ▲ +//! ┌──────┐ │ │ +//! ────►│ Idle ├─┤ fresh +//! └──────┘ │ │ +//! │ ┌──────┴───────┐ +//! └stale─►│ RoutingStale │ +//! └─┬────────────┘ +//! │ ▲ +//! └─stale──┘ +//! > redirect to blq +//! +//! the reason for the panic is that there may be accumulated data downstream that needs to be flushed before we start +//! redirecting to backlog and committing those messages. The most reliable way to do this is crashing the consumer, +//! when it comes back alive the first messages it gets will be stale so it will go straight from idle to RoutingStale. + use std::time::Duration; use chrono::{TimeDelta, Utc}; From 4a8bc935e27d8fbddf350d1047b040926aa0c489 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 13:48:31 -0700 Subject: [PATCH 09/14] tests and comments --- rust_snuba/src/factory_v2.rs | 195 ++++++++++++++++++------ rust_snuba/src/strategies/blq_router.rs | 88 ++++++++--- 2 files changed, 216 insertions(+), 67 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index ead04fddfb..a8aff6c636 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -27,8 +27,6 @@ use crate::config; use crate::metrics::global_tags::set_global_tag; use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; -use sentry_arroyo::processing::strategies::produce::Produce; -use sentry_arroyo::types::TopicOrPartition; use crate::strategies::blq_router::BLQRouter; use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; @@ -44,6 +42,9 @@ use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; +// BLQ configuration +const STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30); +const STATIC_FRICTION: Option = Some(TimeDelta::minutes(2)); pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, pub env_config: config::EnvConfig, @@ -274,36 +275,25 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); - let blq_enabled_flag = options("snuba") - .ok() - .and_then(|o| o.get("consumer.blq_enabled").ok()) - .and_then(|v| v.as_bool()) - .unwrap_or(false); let next_step: Box> = - if let (true, Some(blq_producer_config), Some(blq_topic)) = - (blq_enabled_flag, &self.blq_producer_config, self.blq_topic) - { - let stale_threshold = TimeDelta::minutes(30); - let static_friction = TimeDelta::minutes(2); + if let (true, Some(blq_producer_config), Some(blq_topic)) = ( + self.should_use_blq(), + &self.blq_producer_config, + self.blq_topic, + ) { tracing::info!( "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", - stale_threshold, + STALE_THRESHOLD, self.blq_topic, - static_friction + STATIC_FRICTION ); - let concurrency = ConcurrencyConfig::new(10); - let blq_producer = Produce::new( - CommitOffsets::new(Duration::from_millis(250)), - KafkaProducer::new(blq_producer_config.clone()), - &concurrency, - TopicOrPartition::Topic(blq_topic), - ); Box::new( BLQRouter::new( next_step, - blq_producer, - stale_threshold, - Some(static_friction), + blq_producer_config.clone(), + blq_topic, + STALE_THRESHOLD, + STATIC_FRICTION, ) .expect("invalid BLQRouter config"), ) @@ -331,6 +321,15 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } impl ConsumerStrategyFactoryV2 { + fn should_use_blq(&self) -> bool { + let flag = options("snuba") + .ok() + .and_then(|o| o.get("consumer.blq_enabled").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + flag && self.blq_producer_config.is_some() && self.blq_topic.is_some() + } + fn create_row_binary_pipeline< T: clickhouse::Row + serde::Serialize @@ -439,36 +438,25 @@ impl ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); - let blq_enabled_flag = options("snuba") - .ok() - .and_then(|o| o.get("consumer.blq_enabled").ok()) - .and_then(|v| v.as_bool()) - .unwrap_or(false); let next_step: Box> = - if let (true, Some(blq_producer_config), Some(blq_topic)) = - (blq_enabled_flag, &self.blq_producer_config, self.blq_topic) - { - let stale_threshold = TimeDelta::minutes(30); - let static_friction = TimeDelta::minutes(2); + if let (true, Some(blq_producer_config), Some(blq_topic)) = ( + self.should_use_blq(), + &self.blq_producer_config, + self.blq_topic, + ) { tracing::info!( "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", - stale_threshold, + STALE_THRESHOLD, self.blq_topic, - static_friction - ); - let concurrency = ConcurrencyConfig::new(10); - let blq_producer = Produce::new( - CommitOffsets::new(Duration::from_millis(250)), - KafkaProducer::new(blq_producer_config.clone()), - &concurrency, - TopicOrPartition::Topic(blq_topic), + STATIC_FRICTION, ); Box::new( BLQRouter::new( next_step, - blq_producer, - stale_threshold, - Some(static_friction), + blq_producer_config.clone(), + blq_topic, + STALE_THRESHOLD, + STATIC_FRICTION, ) .expect("invalid BLQRouter config"), ) @@ -754,3 +742,118 @@ mod tests { assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush } } + +#[cfg(test)] +mod tests { + use std::sync::Once; + + use super::*; + use sentry_arroyo::backends::kafka::config::KafkaConfig; + use sentry_arroyo::types::Topic; + use sentry_options::init_with_schemas; + use sentry_options::testing::set_override; + use serde_json::json; + + fn make_factory( + blq_producer_config: Option, + blq_topic: Option, + ) -> ConsumerStrategyFactoryV2 { + ConsumerStrategyFactoryV2 { + storage_config: config::StorageConfig { + name: "test".to_string(), + clickhouse_table_name: "test".to_string(), + clickhouse_cluster: config::ClickhouseConfig { + host: "localhost".to_string(), + port: 9000, + secure: false, + http_port: 8123, + user: "default".to_string(), + password: "".to_string(), + database: "default".to_string(), + }, + message_processor: config::MessageProcessorConfig { + python_class_name: "Test".to_string(), + python_module: "test".to_string(), + }, + }, + env_config: config::EnvConfig::default(), + logical_topic_name: "test".to_string(), + max_batch_size: 100, + max_batch_time: Duration::from_secs(1), + processing_concurrency: ConcurrencyConfig::new(1), + clickhouse_concurrency: ConcurrencyConfig::new(1), + commitlog_concurrency: ConcurrencyConfig::new(1), + replacements_concurrency: ConcurrencyConfig::new(1), + async_inserts: false, + python_max_queue_depth: None, + use_rust_processor: false, + health_check_file: None, + enforce_schema: false, + commit_log_producer: None, + replacements_config: None, + physical_consumer_group: "test".to_string(), + physical_topic_name: Topic::new("test"), + accountant_topic_config: config::TopicConfig { + physical_topic_name: "test".to_string(), + logical_topic_name: "test".to_string(), + broker_config: HashMap::new(), + quantized_rebalance_consumer_group_delay_secs: None, + }, + stop_at_timestamp: None, + batch_write_timeout: None, + join_timeout_ms: None, + health_check: "arroyo".to_string(), + use_row_binary: false, + blq_producer_config, + blq_topic, + } + } + + static INIT: Once = Once::new(); + fn init_config() { + INIT.call_once(|| init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap()); + } + + fn blq_kafka_config() -> KafkaConfig { + KafkaConfig::new_config(vec!["localhost:9092".to_string()], None) + } + + #[test] + fn test_should_not_use_blq_when_no_flag() { + init_config(); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_flag_disabled() { + init_config(); + let _guard = set_override("snuba", "consumer.blq_enabled", json!(false)); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_no_producer_config() { + init_config(); + let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let factory = make_factory(None, Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_no_topic() { + init_config(); + let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let factory = make_factory(Some(blq_kafka_config()), None); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_use_blq_when_all_conditions_met() { + init_config(); + let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(factory.should_use_blq()); + } +} diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 5114c2df37..4c8c4a5c37 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -6,6 +6,7 @@ //! ## Implementation //! its essentially a FSM //! +//! ```text //! > forward to next step //! ┌fresh─┐ //! │ ▼ @@ -22,6 +23,7 @@ //! │ ▲ //! └─stale──┘ //! > redirect to blq +//! ``` //! //! the reason for the panic is that there may be accumulated data downstream that needs to be flushed before we start //! redirecting to backlog and committing those messages. The most reliable way to do this is crashing the consumer, @@ -30,11 +32,16 @@ use std::time::Duration; use chrono::{TimeDelta, Utc}; +use sentry_arroyo::backends::kafka::config::KafkaConfig; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets; +use sentry_arroyo::processing::strategies::produce::Produce; +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; -use sentry_arroyo::types::Message; +use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; #[derive(Debug, PartialEq)] enum State { @@ -49,6 +56,46 @@ pub struct BLQRouter { state: State, producer: ProduceStrategy, static_friction: Option, + + // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well + _concurrency: Option, +} + +impl BLQRouter> +where + Next: ProcessingStrategy + 'static, +{ + /// next_step, + /// is where fresh messages get forwarded to + /// stale_threshold, + /// messages older than the stale_threshold will get sent to the producer + /// static_friction, + /// Once we enter stale routing mode (at STALE_THRESHOLD), + /// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. + /// This is because we want a higher threshold to enter the stale routing state + /// but a lower threshold to stay in it, so we don't flip-flop at the boundary. + /// Best practice would be no greater than a small percent of stable_threshold like 10% + /// ex: stale_threshold=10m, static_friction=1m + /// and the implication is 9m old messages will now be sent to the BLQ in some cases + pub fn new( + next_step: Next, + blq_producer_config: KafkaConfig, + blq_topic: Topic, + stale_threshold: TimeDelta, + static_friction: Option, + ) -> Result { + let concurrency = ConcurrencyConfig::new(10); + let blq_producer = Produce::new( + CommitOffsets::new(Duration::from_millis(250)), + KafkaProducer::new(blq_producer_config), + &concurrency, + TopicOrPartition::Topic(blq_topic), + ); + let mut router = + Self::new_with_strategy(next_step, blq_producer, stale_threshold, static_friction)?; + router._concurrency = Some(concurrency); + Ok(router) + } } impl BLQRouter @@ -56,29 +103,27 @@ where Next: ProcessingStrategy + 'static, ProduceStrategy: ProcessingStrategy + 'static, { - pub fn new( + /// next_step, + /// is where fresh messages get forwarded to + /// producer, + /// ProcessingStrategy that submits messages to the BLQ, + /// stale messages will get submitted to it. + /// stale_threshold, + /// messages older than the stale_threshold will get sent to the producer + /// static_friction, + /// Once we enter stale routing mode (at STALE_THRESHOLD), + /// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. + /// This is because we want a higher threshold to enter the stale routing state + /// but a lower threshold to stay in it, so we don't flip-flop at the boundary. + /// Best practice would be no greater than a small percent of stable_threshold like 10% + /// ex: stale_threshold=10m, static_friction=1m + /// and the implication is 9m old messages will now be sent to the BLQ in some cases + fn new_with_strategy( next_step: Next, blq_producer: ProduceStrategy, stale_threshold: TimeDelta, static_friction: Option, ) -> Result { - /* - next_step, - is where fresh messages get forwarded to - producer, - ProcessingStrategy that submits messages to the BLQ, - stale messages will get submitted to it. - stale_threshold, - messages older than the stale_threshold will get sent to the producer - static_friction, - Once we enter stale routing mode (at STALE_THRESHOLD), - we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. - This is because we want a higher threshold to enter the stale routing state - but a lower threshold to stay in it, so we don't flip-flop at the boundary. - Best practice would be no greater than a small percent of stable_threshold like 10% - ex: stale_threshold=10m, static_friction=1m - and the implication is 9m old messages will now be sent to the BLQ in some cases - */ if stale_threshold <= TimeDelta::zero() { return Err("stale_threshold must be positive"); } @@ -93,6 +138,7 @@ where state: State::Idle, producer: blq_producer, static_friction, + _concurrency: None, }) } } @@ -235,7 +281,7 @@ mod tests { This tests that the BLQRouter forwards business-as-usual fresh messages through it and crashes when it hits its first stale message */ - let mut router = BLQRouter::new( + let mut router = BLQRouter::new_with_strategy( MockStrategy::new(), MockStrategy::new(), TimeDelta::seconds(10), @@ -257,7 +303,7 @@ mod tests { This tests that the BLQRouter properly routes stale messages to the BLQ and then switches back to forwarding fresh messages once the backlog is burned */ - let mut router = BLQRouter::new( + let mut router = BLQRouter::new_with_strategy( MockStrategy::new(), MockStrategy::new(), TimeDelta::seconds(10), From 5b9ba9731bd447b42d46e4884f52b670ff4d441f Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 14:58:32 -0700 Subject: [PATCH 10/14] now everything gets committed properly --- rust_snuba/src/factory_v2.rs | 21 ++++++++-------- rust_snuba/src/strategies/blq_router.rs | 33 +++++++++++++++++++------ 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index a8aff6c636..2e99b1751d 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -43,8 +43,9 @@ use crate::strategies::replacements::ProduceReplacements; use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; // BLQ configuration -const STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30); -const STATIC_FRICTION: Option = Some(TimeDelta::minutes(2)); +const BLQ_STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30); +const BLQ_STATIC_FRICTION: Option = Some(TimeDelta::minutes(2)); + pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, pub env_config: config::EnvConfig, @@ -283,17 +284,17 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { ) { tracing::info!( "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", - STALE_THRESHOLD, + BLQ_STALE_THRESHOLD, self.blq_topic, - STATIC_FRICTION + BLQ_STATIC_FRICTION ); Box::new( BLQRouter::new( next_step, blq_producer_config.clone(), blq_topic, - STALE_THRESHOLD, - STATIC_FRICTION, + BLQ_STALE_THRESHOLD, + BLQ_STATIC_FRICTION, ) .expect("invalid BLQRouter config"), ) @@ -446,17 +447,17 @@ impl ConsumerStrategyFactoryV2 { ) { tracing::info!( "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", - STALE_THRESHOLD, + BLQ_STALE_THRESHOLD, self.blq_topic, - STATIC_FRICTION, + BLQ_STATIC_FRICTION, ); Box::new( BLQRouter::new( next_step, blq_producer_config.clone(), blq_topic, - STALE_THRESHOLD, - STATIC_FRICTION, + BLQ_STALE_THRESHOLD, + BLQ_STATIC_FRICTION, ) .expect("invalid BLQRouter config"), ) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 4c8c4a5c37..4250d99ec6 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -47,7 +47,11 @@ use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; enum State { Idle, // no messages have gone through the router yet RoutingStale, // router is directing stale messages to the backlog-queue (BLQ) - Forwarding, // router is forwarding non-stale messages along to the next strategy + // we have processed all stale messages and are now flushing (finishing producing to BLQ) + // when we transition to this state we will have CommitRequest for what was flushed, and poll + // will be responsible for returning it + Flushing(Option), + Forwarding, // router is forwarding non-stale messages along to the next strategy } pub struct BLQRouter { @@ -151,9 +155,14 @@ where fn poll(&mut self) -> Result, StrategyError> { let produce_result = self.producer.poll(); let next_step_result = self.next_step.poll(); - match self.state { + match &mut self.state { State::RoutingStale => produce_result, State::Forwarding | State::Idle => next_step_result, + State::Flushing(commits) => { + let commits = commits.take(); + self.state = State::Forwarding; + Ok(commits) + } } } @@ -192,12 +201,19 @@ where } (false, State::RoutingStale) => { // We hit a fresh message, so we are done routing the backlog. - // Call join on the producer so all writes to the BLQ are committed. - self.producer.join(Some(Duration::from_secs(5))).unwrap(); + // Finish producing and committing all the state messages and + // then switch back to forwarding fresh. - // Now go back to forwarding non-stale messages as usual. - self.state = State::Forwarding; - self.next_step.submit(message) + // i know i shouldnt be blocking in submit but there was no better way to do it + // the pipeline cant make progress until this completes anyways so it should be fine + let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap(); + self.state = State::Flushing(flush_results); + Ok(()) + } + (true, State::Flushing(_)) | (false, State::Flushing(_)) => { + return Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message }, + )); } } } @@ -210,9 +226,10 @@ where fn join(&mut self, timeout: Option) -> Result, StrategyError> { let producer_result = self.producer.join(timeout); let next_step_result = self.next_step.join(timeout); - match self.state { + match &self.state { State::RoutingStale => producer_result, State::Forwarding | State::Idle => next_step_result, + State::Flushing(commits) => Ok(commits.clone()), } } } From 46a491958a059a7b46ae7cb7ace7a16e62c23766 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 15:12:55 -0700 Subject: [PATCH 11/14] merge cofnlict --- rust_snuba/src/factory_v2.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 2e99b1751d..424e9f1ca3 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -511,12 +511,18 @@ impl TaskRunner for SchemaValidator { #[cfg(test)] mod tests { use super::*; + use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic}; + use sentry_options::init_with_schemas; + use sentry_options::testing::set_override; + use serde_json::json; + use std::sync::Once; use std::sync::{Arc, Mutex}; + // ----------- BYTES_INSERT_BATCH ------------------ /// A next-step that records every batch it receives. struct RecordingStep { batches: Arc>>>, @@ -742,19 +748,8 @@ mod tests { assert_eq!(batches[0].len(), 5); assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush } -} - -#[cfg(test)] -mod tests { - use std::sync::Once; - - use super::*; - use sentry_arroyo::backends::kafka::config::KafkaConfig; - use sentry_arroyo::types::Topic; - use sentry_options::init_with_schemas; - use sentry_options::testing::set_override; - use serde_json::json; + // --------- BLQ ------------- fn make_factory( blq_producer_config: Option, blq_topic: Option, @@ -807,6 +802,7 @@ mod tests { use_row_binary: false, blq_producer_config, blq_topic, + max_batch_size_calculation: config::BatchSizeCalculation::Rows, } } From ccb8c292abfa4d18e12a950000a4ba3c6ab4acbc Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 15:28:27 -0700 Subject: [PATCH 12/14] linter --- rust_snuba/src/factory_v2.rs | 10 +++++----- rust_snuba/src/strategies/blq_router.rs | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 424e9f1ca3..620d48fb81 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -517,7 +517,7 @@ mod tests { }; use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic}; use sentry_options::init_with_schemas; - use sentry_options::testing::set_override; + use sentry_options::testing::override_options; use serde_json::json; use std::sync::Once; use std::sync::{Arc, Mutex}; @@ -825,7 +825,7 @@ mod tests { #[test] fn test_should_not_use_blq_when_flag_disabled() { init_config(); - let _guard = set_override("snuba", "consumer.blq_enabled", json!(false)); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(false))]).unwrap(); let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); assert!(!factory.should_use_blq()); } @@ -833,7 +833,7 @@ mod tests { #[test] fn test_should_not_use_blq_when_no_producer_config() { init_config(); - let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); let factory = make_factory(None, Some(Topic::new("blq"))); assert!(!factory.should_use_blq()); } @@ -841,7 +841,7 @@ mod tests { #[test] fn test_should_not_use_blq_when_no_topic() { init_config(); - let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); let factory = make_factory(Some(blq_kafka_config()), None); assert!(!factory.should_use_blq()); } @@ -849,7 +849,7 @@ mod tests { #[test] fn test_should_use_blq_when_all_conditions_met() { init_config(); - let _guard = set_override("snuba", "consumer.blq_enabled", json!(true)); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); assert!(factory.should_use_blq()); } diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 4250d99ec6..1fd9803e7d 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -211,9 +211,9 @@ where Ok(()) } (true, State::Flushing(_)) | (false, State::Flushing(_)) => { - return Err(SubmitError::MessageRejected( + Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message }, - )); + )) } } } From dfabf0587ea689ad6afc2af04a4031eab3649fdc Mon Sep 17 00:00:00 2001 From: Claude Code Date: Thu, 2 Apr 2026 15:29:54 -0700 Subject: [PATCH 13/14] bug fix --- rust_snuba/src/strategies/blq_router.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 1fd9803e7d..75f8488449 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -208,7 +208,9 @@ where // the pipeline cant make progress until this completes anyways so it should be fine let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap(); self.state = State::Flushing(flush_results); - Ok(()) + Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message }, + )) } (true, State::Flushing(_)) | (false, State::Flushing(_)) => { Err(SubmitError::MessageRejected( From 8cc82447fb002b88cccf348a3b4d3723cc95a804 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Fri, 3 Apr 2026 12:01:13 -0700 Subject: [PATCH 14/14] fix test --- rust_snuba/src/strategies/blq_router.rs | 26 ++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs index 75f8488449..be309a79f5 100644 --- a/rust_snuba/src/strategies/blq_router.rs +++ b/rust_snuba/src/strategies/blq_router.rs @@ -310,12 +310,34 @@ mod tests { // consuming messages as normal for _ in 0..10 { router.submit(make_message(Utc::now())).unwrap(); + _ = router.poll(); } assert_eq!(router.state, State::Forwarding); // now theres a stale message, consumer should crash _ = router.submit(make_message(Utc::now() - TimeDelta::seconds(20))); } + fn submit_with_retry( + router: &mut BLQRouter, + message: Message, + max_retries: usize, + ) -> Result<(), SubmitError> { + let mut msg = message; + for _ in 0..max_retries { + match router.submit(msg) { + Ok(()) => return Ok(()), + Err(SubmitError::MessageRejected(rejected)) => { + _ = router.poll(); + msg = rejected.message; + } + Err(e) => return Err(e), + } + } + Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message: msg }, + )) + } + #[test] fn test_stale_to_fresh() { /* @@ -334,12 +356,14 @@ mod tests { router .submit(make_message(Utc::now() - TimeDelta::minutes(1))) .unwrap(); + _ = router.poll(); } assert_eq!(router.state, State::RoutingStale); assert!(!router.producer.join_called); // now we are back to fresh messages for _ in 0..5 { - router.submit(make_message(Utc::now())).unwrap(); + submit_with_retry(&mut router, make_message(Utc::now()), 3).unwrap(); + _ = router.poll(); } assert_eq!(router.state, State::Forwarding); assert!(router.producer.join_called);