@@ -17,7 +17,8 @@ use super::consumer::{
1717} ;
1818
1919pub struct ActivationBatcherConfig {
20- pub kafka_config : ClientConfig ,
20+ pub producer_config : ClientConfig ,
21+ pub kafka_cluster : String ,
2122 pub kafka_topic : String ,
2223 pub kafka_long_topic : String ,
2324 pub send_timeout_ms : u64 ,
@@ -30,7 +31,8 @@ impl ActivationBatcherConfig {
3031 /// Convert from application configuration into ActivationBatcher config.
3132 pub fn from_config ( config : & Config ) -> Self {
3233 Self {
33- kafka_config : config. kafka_producer_config ( ) ,
34+ producer_config : config. kafka_producer_config ( ) ,
35+ kafka_cluster : config. kafka_cluster . clone ( ) ,
3436 kafka_topic : config. kafka_topic . clone ( ) ,
3537 kafka_long_topic : config. kafka_long_topic . clone ( ) ,
3638 send_timeout_ms : config. kafka_send_timeout_ms ,
@@ -48,6 +50,7 @@ pub struct InflightActivationBatcher {
4850 config : ActivationBatcherConfig ,
4951 runtime_config_manager : Arc < RuntimeConfigManager > ,
5052 producer : Arc < FutureProducer > ,
53+ producer_cluster : String ,
5154}
5255
5356impl InflightActivationBatcher {
@@ -57,17 +60,23 @@ impl InflightActivationBatcher {
5760 ) -> Self {
5861 let producer: Arc < FutureProducer > = Arc :: new (
5962 config
60- . kafka_config
63+ . producer_config
6164 . create ( )
6265 . expect ( "Could not create kafka producer in inflight activation batcher" ) ,
6366 ) ;
67+ let producer_cluster = config
68+ . producer_config
69+ . get ( "bootstrap.servers" )
70+ . unwrap ( )
71+ . to_owned ( ) ;
6472 Self {
6573 batch : Vec :: with_capacity ( config. max_batch_len ) ,
6674 batch_size : 0 ,
6775 forward_batch : Vec :: with_capacity ( config. max_batch_len ) ,
6876 config,
6977 runtime_config_manager,
7078 producer,
79+ producer_cluster,
7180 }
7281 }
7382}
@@ -136,6 +145,20 @@ impl Reducer for InflightActivationBatcher {
136145 // Send all forward batch in parallel
137146 if !self . forward_batch . is_empty ( ) {
138147 let runtime_config = self . runtime_config_manager . read ( ) . await ;
148+ let forward_cluster = runtime_config
149+ . demoted_topic_cluster
150+ . clone ( )
151+ . unwrap_or ( self . config . kafka_cluster . clone ( ) ) ;
152+ if self . producer_cluster != forward_cluster {
153+ let mut new_config = self . config . producer_config . clone ( ) ;
154+ new_config. set ( "bootstrap.servers" , & forward_cluster) ;
155+ self . producer = Arc :: new (
156+ new_config
157+ . create ( )
158+ . expect ( "Could not create kafka producer in inflight activation batcher" ) ,
159+ ) ;
160+ self . producer_cluster = forward_cluster;
161+ }
139162 let forward_topic = runtime_config
140163 . demoted_topic
141164 . clone ( )
@@ -442,7 +465,9 @@ demoted_namespaces:
442465drop_task_killswitch:
443466 -
444467demoted_namespaces:
445- - bad_namespace"# ;
468+ - bad_namespace
469+ demoted_topic_cluster: 0.0.0.0:9092
470+ demoted_topic: taskworker-demoted"# ;
446471
447472 let test_path = "test_forward_task_due_to_demoted_namespace.yaml" ;
448473 fs:: write ( test_path, test_yaml) . await . unwrap ( ) ;
@@ -453,8 +478,8 @@ demoted_namespaces:
453478 ActivationBatcherConfig :: from_config ( & config) ,
454479 runtime_config,
455480 ) ;
456- println ! ( "kafka_topic: {:?}" , config . kafka_topic ) ;
457- println ! ( "kafka_long_topic: {:?}" , config. kafka_long_topic ) ;
481+
482+ assert_eq ! ( batcher . producer_cluster , config. kafka_cluster . clone ( ) ) ;
458483
459484 let inflight_activation_0 = InflightActivation {
460485 id : "0" . to_string ( ) ,
@@ -534,6 +559,7 @@ demoted_namespaces:
534559 assert_eq ! ( flush_result. as_ref( ) . unwrap( ) [ 0 ] . taskname, "good_task" ) ;
535560 assert_eq ! ( batcher. batch. len( ) , 0 ) ;
536561 assert_eq ! ( batcher. forward_batch. len( ) , 0 ) ;
562+ assert_eq ! ( batcher. producer_cluster, "0.0.0.0:9092" ) ;
537563
538564 fs:: remove_file ( test_path) . await . unwrap ( ) ;
539565 }
0 commit comments