diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 33b9ad8..7a02b66 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -17,7 +17,8 @@ use super::consumer::{ }; pub struct ActivationBatcherConfig { - pub kafka_config: ClientConfig, + pub producer_config: ClientConfig, + pub kafka_cluster: String, pub kafka_topic: String, pub kafka_long_topic: String, pub send_timeout_ms: u64, @@ -30,7 +31,8 @@ impl ActivationBatcherConfig { /// Convert from application configuration into ActivationBatcher config. pub fn from_config(config: &Config) -> Self { Self { - kafka_config: config.kafka_producer_config(), + producer_config: config.kafka_producer_config(), + kafka_cluster: config.kafka_cluster.clone(), kafka_topic: config.kafka_topic.clone(), kafka_long_topic: config.kafka_long_topic.clone(), send_timeout_ms: config.kafka_send_timeout_ms, @@ -48,6 +50,7 @@ pub struct InflightActivationBatcher { config: ActivationBatcherConfig, runtime_config_manager: Arc, producer: Arc, + producer_cluster: String, } impl InflightActivationBatcher { @@ -57,10 +60,15 @@ impl InflightActivationBatcher { ) -> Self { let producer: Arc = Arc::new( config - .kafka_config + .producer_config .create() .expect("Could not create kafka producer in inflight activation batcher"), ); + let producer_cluster = config + .producer_config + .get("bootstrap.servers") + .unwrap() + .to_owned(); Self { batch: Vec::with_capacity(config.max_batch_len), batch_size: 0, @@ -68,6 +76,7 @@ impl InflightActivationBatcher { config, runtime_config_manager, producer, + producer_cluster, } } } @@ -136,6 +145,20 @@ impl Reducer for InflightActivationBatcher { // Send all forward batch in parallel if !self.forward_batch.is_empty() { let runtime_config = self.runtime_config_manager.read().await; + let forward_cluster = runtime_config + .demoted_topic_cluster + .clone() + .unwrap_or(self.config.kafka_cluster.clone()); + if self.producer_cluster != forward_cluster { + let mut new_config = self.config.producer_config.clone(); + new_config.set("bootstrap.servers", &forward_cluster); + self.producer = Arc::new( + new_config + .create() + .expect("Could not create kafka producer in inflight activation batcher"), + ); + self.producer_cluster = forward_cluster; + } let forward_topic = runtime_config .demoted_topic .clone() @@ -442,7 +465,9 @@ demoted_namespaces: drop_task_killswitch: - demoted_namespaces: - - bad_namespace"#; + - bad_namespace +demoted_topic_cluster: 0.0.0.0:9092 +demoted_topic: taskworker-demoted"#; let test_path = "test_forward_task_due_to_demoted_namespace.yaml"; fs::write(test_path, test_yaml).await.unwrap(); @@ -453,8 +478,8 @@ demoted_namespaces: ActivationBatcherConfig::from_config(&config), runtime_config, ); - println!("kafka_topic: {:?}", config.kafka_topic); - println!("kafka_long_topic: {:?}", config.kafka_long_topic); + + assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); let inflight_activation_0 = InflightActivation { id: "0".to_string(), @@ -534,6 +559,7 @@ demoted_namespaces: assert_eq!(flush_result.as_ref().unwrap()[0].taskname, "good_task"); assert_eq!(batcher.batch.len(), 0); assert_eq!(batcher.forward_batch.len(), 0); + assert_eq!(batcher.producer_cluster, "0.0.0.0:9092"); fs::remove_file(test_path).await.unwrap(); } diff --git a/src/runtime_config.rs b/src/runtime_config.rs index 460d8f9..cbd40fc 100644 --- a/src/runtime_config.rs +++ b/src/runtime_config.rs @@ -14,6 +14,9 @@ pub struct RuntimeConfig { /// Tasks from these namespaces will be automatically forwarded to the "long" namespace /// to prevent them from blocking other tasks in shared namespaces. pub demoted_namespaces: Vec, + /// The cluster to forward tasks from demoted namespaces to. + /// If not set, the current cluster taskbroker is consuming from will be used. + pub demoted_topic_cluster: Option, /// The topic to forward tasks from demoted namespaces to. /// If not set, the taskworker-long topic will be used pub demoted_topic: Option, @@ -176,6 +179,7 @@ drop_task_killswitch: - demoted_namespaces: - bad_namespace +demoted_topic_cluster: kafka:9092 demoted_topic: taskworker-demoted-topic"#; let test_path = "test_demoted_namespaces.yaml"; @@ -183,6 +187,10 @@ demoted_topic: taskworker-demoted-topic"#; let runtime_config = RuntimeConfigManager::new(Some(test_path.to_string())).await; let config = runtime_config.read().await; + assert_eq!( + config.demoted_topic_cluster.as_deref().unwrap(), + "kafka:9092" + ); assert_eq!(config.demoted_namespaces.len(), 1); assert_eq!(config.demoted_namespaces[0], "bad_namespace"); assert_eq!( diff --git a/src/upkeep.rs b/src/upkeep.rs index 4684f6c..898522f 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -284,12 +284,25 @@ pub async fn do_upkeep( // 12. Forward tasks from demoted namespaces to "long" namespace let demoted_namespaces = runtime_config.demoted_namespaces.clone(); + let forward_cluster = runtime_config + .demoted_topic_cluster + .clone() + .unwrap_or(config.kafka_cluster.clone()); let forward_topic = runtime_config .demoted_topic .clone() .unwrap_or(config.kafka_long_topic.clone()); - if !demoted_namespaces.is_empty() && forward_topic != config.kafka_topic { + let same_cluster = forward_cluster == config.kafka_cluster; + let same_topic = forward_topic == config.kafka_topic; + if !(demoted_namespaces.is_empty() || (same_cluster && same_topic)) { let forward_demoted_start = Instant::now(); + let mut forward_producer_config = config.kafka_producer_config(); + forward_producer_config.set("bootstrap.servers", &forward_cluster); + let forward_producer: Arc = Arc::new( + forward_producer_config + .create() + .expect("Could not create kafka producer in upkeep"), + ); if let Ok(tasks) = store .get_pending_activations_from_namespaces(Some(&demoted_namespaces), None) .await @@ -298,13 +311,13 @@ pub async fn do_upkeep( let deliveries = tasks .into_iter() .map(|inflight| { - let producer = producer.clone(); + let forward_producer = forward_producer.clone(); let config = config.clone(); let topic = forward_topic.clone(); async move { metrics::counter!("upkeep.forward_task_demoted_namespace", "namespace" => inflight.namespace.clone(), "taskname" => inflight.taskname.clone()).increment(1); - let delivery = producer + let delivery = forward_producer .send( FutureRecord::<(), Vec>::to(&topic) .payload(&inflight.activation),