diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 265b505..ce9c44b 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -15,10 +15,10 @@ use super::consumer::{ ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer, ReducerWhenFullBehaviour, }; -use super::utils::tag_for_forwarding; pub struct ActivationBatcherConfig { pub kafka_config: ClientConfig, + pub kafka_topic: String, pub kafka_long_topic: String, pub send_timeout_ms: u64, pub max_batch_time_ms: u64, @@ -31,6 +31,7 @@ impl ActivationBatcherConfig { pub fn from_config(config: &Config) -> Self { Self { kafka_config: config.kafka_producer_config(), + kafka_topic: config.kafka_topic.clone(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, max_batch_time_ms: config.db_insert_batch_max_time_ms, @@ -78,6 +79,10 @@ impl Reducer for InflightActivationBatcher { async fn reduce(&mut self, t: Self::Input) -> Result<(), anyhow::Error> { let runtime_config = self.runtime_config_manager.read().await; + let forward_topic = runtime_config + .demoted_topic + .clone() + .unwrap_or(self.config.kafka_long_topic.clone()); let task_name = &t.taskname; let namespace = &t.namespace; @@ -95,36 +100,22 @@ impl Reducer for InflightActivationBatcher { } if runtime_config.demoted_namespaces.contains(namespace) { - match tag_for_forwarding(&t.activation) { - Ok(Some(tagged_activation)) => { - // Forward it - metrics::counter!( - "filter.forward_task_demoted_namespace", - "namespace" => namespace.clone(), - "taskname" => task_name.clone(), - ) - .increment(1); - self.forward_batch.push(tagged_activation); - return Ok(()); - } - Ok(None) => { - // Already forwarded, fall through to add to batch - metrics::counter!( - "filter.forward_task_demoted_namespace.skipped", - "namespace" => namespace.clone(), - "taskname" => task_name.clone(), - ) - .increment(1); - } - Err(_) => { - // Decode error, fall through to add to batch to handle in upkeep - metrics::counter!( - "filter.forward_task_demoted_namespace.decode_error", - "namespace" => namespace.clone(), - "taskname" => task_name.clone(), - ) - .increment(1); - } + if forward_topic == self.config.kafka_topic { + metrics::counter!( + "filter.forward_task_demoted_namespace.skipped", + "namespace" => namespace.clone(), + "taskname" => task_name.clone(), + ) + .increment(1); + } else { + metrics::counter!( + "filter.forward_task_demoted_namespace", + "namespace" => namespace.clone(), + "taskname" => task_name.clone(), + ) + .increment(1); + self.forward_batch.push(t.activation.clone()); + return Ok(()); } } @@ -148,14 +139,13 @@ impl Reducer for InflightActivationBatcher { // Send all forward batch in parallel if !self.forward_batch.is_empty() { - // The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config. - let topic = runtime_config + let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(self.config.kafka_long_topic.clone()); let sends = self.forward_batch.iter().map(|payload| { self.producer.send( - FutureRecord::<(), Vec>::to(&topic).payload(payload), + FutureRecord::<(), Vec>::to(&forward_topic).payload(payload), Timeout::After(Duration::from_millis(self.config.send_timeout_ms)), ) }); @@ -206,7 +196,6 @@ mod tests { ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer, RuntimeConfigManager, }; - use crate::kafka::utils::tag_for_forwarding; use chrono::Utc; use std::collections::HashMap; use tokio::fs; @@ -217,90 +206,6 @@ mod tests { use crate::store::inflight_activation::InflightActivationStatus; - #[test] - fn test_tag_for_forwarding() { - let original = TaskActivation { - id: "test-id".to_string(), - namespace: "bad_namespace".to_string(), - taskname: "test_task".to_string(), - parameters: r#"{"key":"value"}"#.to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 60, - expires: Some(300), - delay: Some(10), - }; - - let encoded = original.encode_to_vec(); - let tagged = tag_for_forwarding(&encoded).unwrap(); - - assert!( - tagged.is_some(), - "Should return Some for untagged activation" - ); - let decoded = TaskActivation::decode(&tagged.unwrap() as &[u8]).unwrap(); - - // Namespace should be preserved (not modified) - assert_eq!(decoded.namespace, "bad_namespace"); - - // Should have forwarded header added - assert_eq!( - decoded.headers.get("forwarded"), - Some(&"true".to_string()), - "Should have forwarded header set to true" - ); - - // All other fields should be preserved - assert_eq!(decoded.id, original.id); - assert_eq!(decoded.taskname, original.taskname); - assert_eq!(decoded.parameters, original.parameters); - assert_eq!(decoded.received_at, original.received_at); - assert_eq!(decoded.retry_state, original.retry_state); - assert_eq!( - decoded.processing_deadline_duration, - original.processing_deadline_duration - ); - assert_eq!(decoded.expires, original.expires); - assert_eq!(decoded.delay, original.delay); - } - - #[test] - fn test_tag_for_forwarding_already_tagged() { - let mut original = TaskActivation { - id: "test-id".to_string(), - namespace: "bad_namespace".to_string(), - taskname: "test_task".to_string(), - parameters: r#"{"key":"value"}"#.to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 60, - expires: Some(300), - delay: Some(10), - }; - - // Pre-tag it - original - .headers - .insert("forwarded".to_string(), "true".to_string()); - - let encoded = original.encode_to_vec(); - let result = tag_for_forwarding(&encoded).unwrap(); - - assert!( - result.is_none(), - "Should return None for already tagged activation" - ); - } - - #[test] - fn test_tag_for_forwarding_decode_error() { - let invalid_bytes = vec![0xFF, 0xFF, 0xFF]; // Invalid protobuf - let result = tag_for_forwarding(&invalid_bytes); - assert!(result.is_err()); - } - #[tokio::test] async fn test_drop_task_due_to_killswitch() { let test_yaml = r#" @@ -551,6 +456,8 @@ demoted_namespaces: ActivationBatcherConfig::from_config(&config), runtime_config, ); + println!("kafka_topic: {:?}", config.kafka_topic); + println!("kafka_long_topic: {:?}", config.kafka_long_topic); let inflight_activation_0 = InflightActivation { id: "0".to_string(), diff --git a/src/kafka/mod.rs b/src/kafka/mod.rs index 09f306d..ae4122a 100644 --- a/src/kafka/mod.rs +++ b/src/kafka/mod.rs @@ -4,4 +4,3 @@ pub mod deserialize_activation; pub mod inflight_activation_batcher; pub mod inflight_activation_writer; pub mod os_stream_writer; -pub mod utils; diff --git a/src/kafka/utils.rs b/src/kafka/utils.rs deleted file mode 100644 index 506fc46..0000000 --- a/src/kafka/utils.rs +++ /dev/null @@ -1,24 +0,0 @@ -use prost::Message; -use sentry_protos::taskbroker::v1::TaskActivation; - -/// Tags a TaskActivation for forwarding to demoted topic by adding a header marker. -/// This prevents infinite forwarding loops when tasks are received by another -/// broker with the same demoted_namespaces configuration. -/// -/// # Arguments -/// * `activation_bytes` - the activation protobuf of a TaskActivation -/// -/// # Returns -/// * `Ok(Some(Vec))` - The protobuf of the TaskActivation with forwarding header added -/// * `Ok(None)` - Already tagged for forwarding (skip forwarding) -/// * `Err(DecodeError)` - If the protobuf cannot be decoded -pub fn tag_for_forwarding(activation_bytes: &[u8]) -> Result>, prost::DecodeError> { - let mut activation = TaskActivation::decode(activation_bytes)?; - if activation.headers.get("forwarded").map(|v| v.as_str()) == Some("true") { - return Ok(None); - } - activation - .headers - .insert("forwarded".to_string(), "true".to_string()); - Ok(Some(activation.encode_to_vec())) -} diff --git a/src/upkeep.rs b/src/upkeep.rs index 737a685..4684f6c 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -21,7 +21,6 @@ use uuid::Uuid; use crate::{ SERVICE_NAME, config::Config, - kafka::utils::tag_for_forwarding, runtime_config::RuntimeConfigManager, store::inflight_activation::{InflightActivationStatus, InflightActivationStore}, }; @@ -285,7 +284,11 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to "long" namespace let demoted_namespaces = runtime_config.demoted_namespaces.clone(); - if !demoted_namespaces.is_empty() { + let forward_topic = runtime_config + .demoted_topic + .clone() + .unwrap_or(config.kafka_long_topic.clone()); + if !demoted_namespaces.is_empty() && forward_topic != config.kafka_topic { let forward_demoted_start = Instant::now(); if let Ok(tasks) = store .get_pending_activations_from_namespaces(Some(&demoted_namespaces), None) @@ -297,29 +300,14 @@ pub async fn do_upkeep( .map(|inflight| { let producer = producer.clone(); let config = config.clone(); - // The default demoted topic to forward tasks to is config.kafka_long_topic if not set in runtime config. - let topic = runtime_config.demoted_topic.clone().unwrap_or(config.kafka_long_topic.clone()); - + let topic = forward_topic.clone(); async move { metrics::counter!("upkeep.forward_task_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1); - let tagged_activation = match tag_for_forwarding(&inflight.activation) { - Ok(Some(tagged)) => tagged, - Ok(None) => { - metrics::counter!("upkeep.forward_task_demoted_namespace.skipped", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1); - return Ok(inflight.id); - } - Err(err) => { - metrics::counter!("upkeep.forward_task_demoted_namespace.decode_error", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1); - error!("forward_task_demoted_namespace.tag_failure: {}", err); - return Err(anyhow::anyhow!("failed to tag for forwarding: {}", err)); - } - }; - let delivery = producer .send( FutureRecord::<(), Vec>::to(&topic) - .payload(&tagged_activation), + .payload(&inflight.activation), Timeout::After(Duration::from_millis(config.kafka_send_timeout_ms)), ) .await;