From dbe860ed4eaf0cd33908903f2cea2b74378b3852 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Mon, 27 Oct 2025 11:42:29 -0400 Subject: [PATCH 1/3] Try btaching forwards --- src/kafka/inflight_activation_batcher.rs | 83 +++++++++++++----------- 1 file changed, 46 insertions(+), 37 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index b1c36f1..6f67804 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -1,8 +1,14 @@ +use super::consumer::{ + ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer, + ReducerWhenFullBehaviour, +}; +use super::utils::modify_activation_namespace; use crate::{ config::Config, runtime_config::RuntimeConfigManager, store::inflight_activation::InflightActivation, }; use chrono::Utc; +use futures::future::join_all; use rdkafka::config::ClientConfig; use rdkafka::{ producer::{FutureProducer, FutureRecord}, @@ -10,12 +16,6 @@ use rdkafka::{ }; use std::{mem::replace, sync::Arc, time::Duration}; -use super::consumer::{ - ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer, - ReducerWhenFullBehaviour, -}; -use super::utils::modify_activation_namespace; - pub struct ActivationBatcherConfig { pub kafka_config: ClientConfig, pub kafka_long_topic: String, @@ -28,8 +28,14 @@ pub struct ActivationBatcherConfig { impl ActivationBatcherConfig { /// Convert from application configuration into ActivationBatcher config. pub fn from_config(config: &Config) -> Self { + let mut kafka_config = config.kafka_producer_config(); + kafka_config + .set("linger.ms", "10") + .set("batch.size", "1048576") + .set("queue.buffering.max.messages", "100000") + .set("queue.buffering.max.kbytes", "10485760"); Self { - kafka_config: config.kafka_producer_config(), + kafka_config, kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, max_batch_time_ms: config.db_insert_batch_max_time_ms, @@ -42,7 +48,7 @@ impl ActivationBatcherConfig { pub struct InflightActivationBatcher { batch: Vec, batch_size: usize, - forwarded_count: usize, + forward_batch: Vec>, // payload config: ActivationBatcherConfig, runtime_config_manager: Arc, producer: Arc, @@ -62,7 +68,7 @@ impl InflightActivationBatcher { Self { batch: Vec::with_capacity(config.max_batch_len), batch_size: 0, - forwarded_count: 0, + forward_batch: Vec::with_capacity(config.max_batch_len), config, runtime_config_manager, producer, @@ -101,29 +107,9 @@ impl Reducer for InflightActivationBatcher { ) .increment(1); - // The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config. - let topic = runtime_config - .demoted_topic - .clone() - .unwrap_or(self.config.kafka_long_topic.clone()); - if let Ok(modified_activation) = modify_activation_namespace(&t.activation) { - let delivery = self - .producer - .send( - FutureRecord::<(), Vec>::to(&topic).payload(&modified_activation), - Timeout::After(Duration::from_millis(self.config.send_timeout_ms)), - ) - .await; - if delivery.is_ok() { - metrics::counter!("filter.forward_task_demoted_namespace_success", - "namespace" => namespace.clone(), - "taskname" => task_name.clone(), - ) - .increment(1); - self.forwarded_count += 1; - return Ok(()); - } + self.forward_batch.push(modified_activation); + return Ok(()); } } @@ -134,19 +120,42 @@ impl Reducer for InflightActivationBatcher { } async fn flush(&mut self) -> Result, anyhow::Error> { - if self.batch.is_empty() && self.forwarded_count == 0 { + if self.batch.is_empty() && self.forward_batch.is_empty() { return Ok(None); } - if self.batch.is_empty() { - metrics::histogram!("consumer.forwarded_rows").record(self.forwarded_count as f64); - } else { + let runtime_config = self.runtime_config_manager.read().await; + + if !self.batch.is_empty() { metrics::histogram!("consumer.batch_rows").record(self.batch.len() as f64); metrics::histogram!("consumer.batch_bytes").record(self.batch_size as f64); } + // Send all forward batch in parallel + if !self.forward_batch.is_empty() { + // The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config. + let topic = runtime_config + .demoted_topic + .clone() + .unwrap_or(self.config.kafka_long_topic.clone()); + let sends = self.forward_batch.iter().map(|payload| { + self.producer.send( + FutureRecord::<(), Vec>::to(&topic).payload(payload), + Timeout::After(Duration::from_millis(self.config.send_timeout_ms)), + ) + }); + + let results = join_all(sends).await; + let success_count = results.iter().filter(|r| r.is_ok()).count(); + + metrics::histogram!("consumer.forwarded_rows").record(success_count as f64); + metrics::counter!("filter.forward_task_demoted_namespace_success") + .increment(success_count as u64); + + self.forward_batch.clear(); + } + self.batch_size = 0; - self.forwarded_count = 0; Ok(Some(replace( &mut self.batch, @@ -156,7 +165,7 @@ impl Reducer for InflightActivationBatcher { fn reset(&mut self) { self.batch_size = 0; - self.forwarded_count = 0; + self.forward_batch.clear(); self.batch.clear(); } From 5224077b4f1302c3bfb09c83100ac9f1cc7c704f Mon Sep 17 00:00:00 2001 From: Tony Le Date: Mon, 27 Oct 2025 13:01:09 -0400 Subject: [PATCH 2/3] Only batch forwards --- src/kafka/inflight_activation_batcher.rs | 96 +++++++++++++++++------- src/kafka/utils.rs | 21 +++--- src/upkeep.rs | 24 ++++-- 3 files changed, 97 insertions(+), 44 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 6f67804..301e493 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -2,7 +2,7 @@ use super::consumer::{ ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer, ReducerWhenFullBehaviour, }; -use super::utils::modify_activation_namespace; +use super::utils::tag_for_forwarding; use crate::{ config::Config, runtime_config::RuntimeConfigManager, store::inflight_activation::InflightActivation, @@ -28,14 +28,8 @@ pub struct ActivationBatcherConfig { impl ActivationBatcherConfig { /// Convert from application configuration into ActivationBatcher config. pub fn from_config(config: &Config) -> Self { - let mut kafka_config = config.kafka_producer_config(); - kafka_config - .set("linger.ms", "10") - .set("batch.size", "1048576") - .set("queue.buffering.max.messages", "100000") - .set("queue.buffering.max.kbytes", "10485760"); Self { - kafka_config, + kafka_config: config.kafka_producer_config(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, max_batch_time_ms: config.db_insert_batch_max_time_ms, @@ -100,16 +94,24 @@ impl Reducer for InflightActivationBatcher { } if runtime_config.demoted_namespaces.contains(namespace) { - metrics::counter!( - "filter.forward_task_demoted_namespace", - "namespace" => namespace.clone(), - "taskname" => task_name.clone(), - ) - .increment(1); - - if let Ok(modified_activation) = modify_activation_namespace(&t.activation) { - self.forward_batch.push(modified_activation); - return Ok(()); + match tag_for_forwarding(&t.activation) { + Ok(Some(tagged_activation)) => { + // Forward it + metrics::counter!( + "filter.forward_task_demoted_namespace", + "namespace" => namespace.clone(), + "taskname" => task_name.clone(), + ) + .increment(1); + self.forward_batch.push(tagged_activation); + return Ok(()); + } + Ok(None) => { + // Already forwarded, fall through to add to batch + } + Err(_) => { + // Decode error, fall through to add to batch to handle in upkeep + } } } @@ -190,7 +192,7 @@ mod tests { ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer, RuntimeConfigManager, }; - use crate::kafka::utils::modify_activation_namespace; + use crate::kafka::utils::tag_for_forwarding; use chrono::Utc; use std::collections::HashMap; use tokio::fs; @@ -202,7 +204,7 @@ mod tests { use crate::store::inflight_activation::InflightActivationStatus; #[test] - fn test_modify_namespace_for_forwarding() { + fn test_tag_for_forwarding() { let original = TaskActivation { id: "test-id".to_string(), namespace: "bad_namespace".to_string(), @@ -217,17 +219,28 @@ mod tests { }; let encoded = original.encode_to_vec(); - let modified = modify_activation_namespace(&encoded).unwrap(); - let decoded = TaskActivation::decode(&modified as &[u8]).unwrap(); + let tagged = tag_for_forwarding(&encoded).unwrap(); - // Namespace should be changed to "long" - assert_eq!(decoded.namespace, "bad_namespace_long"); + assert!( + tagged.is_some(), + "Should return Some for untagged activation" + ); + let decoded = TaskActivation::decode(&tagged.unwrap() as &[u8]).unwrap(); + + // Namespace should be preserved (not modified) + assert_eq!(decoded.namespace, "bad_namespace"); + + // Should have forwarded header added + assert_eq!( + decoded.headers.get("forwarded"), + Some(&"true".to_string()), + "Should have forwarded header set to true" + ); // All other fields should be preserved assert_eq!(decoded.id, original.id); assert_eq!(decoded.taskname, original.taskname); assert_eq!(decoded.parameters, original.parameters); - assert_eq!(decoded.headers, original.headers); assert_eq!(decoded.received_at, original.received_at); assert_eq!(decoded.retry_state, original.retry_state); assert_eq!( @@ -239,9 +252,38 @@ mod tests { } #[test] - fn test_modify_namespace_for_forwarding_decode_error() { + fn test_tag_for_forwarding_already_tagged() { + let mut original = TaskActivation { + id: "test-id".to_string(), + namespace: "bad_namespace".to_string(), + taskname: "test_task".to_string(), + parameters: r#"{"key":"value"}"#.to_string(), + headers: HashMap::new(), + received_at: None, + retry_state: None, + processing_deadline_duration: 60, + expires: Some(300), + delay: Some(10), + }; + + // Pre-tag it + original + .headers + .insert("forwarded".to_string(), "true".to_string()); + + let encoded = original.encode_to_vec(); + let result = tag_for_forwarding(&encoded).unwrap(); + + assert!( + result.is_none(), + "Should return None for already tagged activation" + ); + } + + #[test] + fn test_tag_for_forwarding_decode_error() { let invalid_bytes = vec![0xFF, 0xFF, 0xFF]; // Invalid protobuf - let result = modify_activation_namespace(&invalid_bytes); + let result = tag_for_forwarding(&invalid_bytes); assert!(result.is_err()); } diff --git a/src/kafka/utils.rs b/src/kafka/utils.rs index 6753346..506fc46 100644 --- a/src/kafka/utils.rs +++ b/src/kafka/utils.rs @@ -1,21 +1,24 @@ use prost::Message; use sentry_protos::taskbroker::v1::TaskActivation; -/// The default namespace suffix for demoted/forwarded tasks -pub const DEFAULT_DEMOTED_NAMESPACE_SUFFIX: &str = "_long"; - -/// Modifies a TaskActivation's namespace for forwarding to demoted topic. +/// Tags a TaskActivation for forwarding to demoted topic by adding a header marker. /// This prevents infinite forwarding loops when tasks are received by another /// broker with the same demoted_namespaces configuration. /// /// # Arguments -/// * `activation_bytes` - The serialized TaskActivation protobuf +/// * `activation_bytes` - the activation protobuf of a TaskActivation /// /// # Returns -/// * `Ok(Vec)` - The protobuf of the TaskActivation with namespace modified +/// * `Ok(Some(Vec))` - The protobuf of the TaskActivation with forwarding header added +/// * `Ok(None)` - Already tagged for forwarding (skip forwarding) /// * `Err(DecodeError)` - If the protobuf cannot be decoded -pub fn modify_activation_namespace(activation_bytes: &[u8]) -> Result, prost::DecodeError> { +pub fn tag_for_forwarding(activation_bytes: &[u8]) -> Result>, prost::DecodeError> { let mut activation = TaskActivation::decode(activation_bytes)?; - activation.namespace += DEFAULT_DEMOTED_NAMESPACE_SUFFIX; - Ok(activation.encode_to_vec()) + if activation.headers.get("forwarded").map(|v| v.as_str()) == Some("true") { + return Ok(None); + } + activation + .headers + .insert("forwarded".to_string(), "true".to_string()); + Ok(Some(activation.encode_to_vec())) } diff --git a/src/upkeep.rs b/src/upkeep.rs index 3e9ae84..f679751 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use crate::{ SERVICE_NAME, config::Config, - kafka::utils::modify_activation_namespace, + kafka::utils::tag_for_forwarding, runtime_config::RuntimeConfigManager, store::inflight_activation::{InflightActivationStatus, InflightActivationStore}, }; @@ -301,20 +301,28 @@ pub async fn do_upkeep( let topic = runtime_config.demoted_topic.clone().unwrap_or(config.kafka_long_topic.clone()); async move { - metrics::counter!("upkeep.forward_demoted_namespace", "namespace" => inflight.namespace, "taskname" => inflight.taskname).increment(1); - - let modified_activation = match modify_activation_namespace(&inflight.activation) { - Ok(modified_activation) => modified_activation, + metrics::counter!("upkeep.forward_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1); + + let tagged_activation = match tag_for_forwarding(&inflight.activation) { + Ok(Some(tagged)) => tagged, + Ok(None) => { + // Already forwarded, skip + metrics::counter!("upkeep.forward_demoted_namespace.already_forwarded_skip", + "namespace" => inflight.namespace, + "taskname" => inflight.taskname + ).increment(1); + return Ok(inflight.id); + } Err(err) => { - error!("forward_demoted_namespace.modify_namespace.failure: {}", err); - return Err(anyhow::anyhow!("failed to modify namespace: {}", err)); + error!("forward_demoted_namespace.tag_failure: {}", err); + return Err(anyhow::anyhow!("failed to tag for forwarding: {}", err)); } }; let delivery = producer .send( FutureRecord::<(), Vec>::to(&topic) - .payload(&modified_activation), + .payload(&tagged_activation), Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), ) .await; From 24c9f4d25172f53d9bf8df936fd1240216b67cb2 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Mon, 27 Oct 2025 13:02:31 -0400 Subject: [PATCH 3/3] upkeep metrics --- src/upkeep.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/upkeep.rs b/src/upkeep.rs index f679751..0973e18 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -306,11 +306,6 @@ pub async fn do_upkeep( let tagged_activation = match tag_for_forwarding(&inflight.activation) { Ok(Some(tagged)) => tagged, Ok(None) => { - // Already forwarded, skip - metrics::counter!("upkeep.forward_demoted_namespace.already_forwarded_skip", - "namespace" => inflight.namespace, - "taskname" => inflight.taskname - ).increment(1); return Ok(inflight.id); } Err(err) => {