@@ -4,7 +4,9 @@ use std::{
44} ;
55
66use snafu:: { OptionExt , Snafu } ;
7- use stackable_operator:: { kube:: ResourceExt , utils:: cluster_info:: KubernetesClusterInfo } ;
7+ use stackable_operator:: {
8+ kube:: ResourceExt , role_utils:: RoleGroupRef , utils:: cluster_info:: KubernetesClusterInfo ,
9+ } ;
810use strum:: { EnumDiscriminants , EnumString } ;
911
1012use crate :: crd:: { STACKABLE_LISTENER_BROKER_DIR , security:: KafkaTlsSecurity , v1alpha1} ;
@@ -170,10 +172,14 @@ impl Display for KafkaListener {
170172pub fn get_kafka_listener_config (
171173 kafka : & v1alpha1:: KafkaCluster ,
172174 kafka_security : & KafkaTlsSecurity ,
173- object_name : & str ,
175+ rolegroup_ref : & RoleGroupRef < v1alpha1 :: KafkaCluster > ,
174176 cluster_info : & KubernetesClusterInfo ,
175177) -> Result < KafkaListenerConfig , KafkaListenerError > {
176- let pod_fqdn = pod_fqdn ( kafka, object_name, cluster_info) ?;
178+ let pod_fqdn = pod_fqdn (
179+ kafka,
180+ & rolegroup_ref. rolegroup_headless_service_name ( ) ,
181+ cluster_info,
182+ ) ?;
177183 let mut listeners = vec ! [ ] ;
178184 let mut advertised_listeners = vec ! [ ] ;
179185 let mut listener_security_protocol_map: BTreeMap < KafkaListenerName , KafkaListenerProtocol > =
@@ -334,12 +340,11 @@ pub fn node_port_cmd(directory: &str, port_name: &str) -> String {
334340
335341pub fn pod_fqdn (
336342 kafka : & v1alpha1:: KafkaCluster ,
337- object_name : & str ,
343+ sts_service_name : & str ,
338344 cluster_info : & KubernetesClusterInfo ,
339345) -> Result < String , KafkaListenerError > {
340346 Ok ( format ! (
341- "$POD_NAME.{object_name}.{namespace}.svc.{cluster_domain}" ,
342- object_name = object_name,
347+ "$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}" ,
343348 namespace = kafka. namespace( ) . context( ObjectHasNoNamespaceSnafu ) ?,
344349 cluster_domain = cluster_info. cluster_domain
345350 ) )
@@ -354,7 +359,7 @@ mod tests {
354359 } ;
355360
356361 use super :: * ;
357- use crate :: crd:: authentication:: ResolvedAuthenticationClasses ;
362+ use crate :: crd:: { authentication:: ResolvedAuthenticationClasses , role :: KafkaRole } ;
358363
359364 fn default_cluster_info ( ) -> KubernetesClusterInfo {
360365 KubernetesClusterInfo {
@@ -364,9 +369,6 @@ mod tests {
364369
365370 #[ test]
366371 fn test_get_kafka_listeners_config ( ) {
367- let object_name = "simple-kafka-broker-default" ;
368- let cluster_info = default_cluster_info ( ) ;
369-
370372 let kafka_cluster = r#"
371373 apiVersion: kafka.stackable.tech/v1alpha1
372374 kind: KafkaCluster
@@ -400,9 +402,12 @@ mod tests {
400402 "internalTls" . to_string ( ) ,
401403 Some ( "tls" . to_string ( ) ) ,
402404 ) ;
403-
405+ let cluster_info = default_cluster_info ( ) ;
406+ // "simple-kafka-broker-default"
407+ let rolegroup_ref = kafka. rolegroup_ref ( & KafkaRole :: Broker , "default" ) ;
404408 let config =
405- get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
409+ get_kafka_listener_config ( & kafka, & kafka_security, & rolegroup_ref, & cluster_info)
410+ . unwrap ( ) ;
406411
407412 assert_eq ! (
408413 config. listeners( ) ,
@@ -428,7 +433,12 @@ mod tests {
428433 kafka_security. client_port_name( )
429434 ) ,
430435 internal_name = KafkaListenerName :: Internal ,
431- internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
436+ internal_host = pod_fqdn(
437+ & kafka,
438+ & rolegroup_ref. rolegroup_headless_service_name( ) ,
439+ & cluster_info
440+ )
441+ . unwrap( ) ,
432442 internal_port = kafka_security. internal_port( ) ,
433443 )
434444 ) ;
@@ -454,7 +464,8 @@ mod tests {
454464 Some ( "tls" . to_string ( ) ) ,
455465 ) ;
456466 let config =
457- get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
467+ get_kafka_listener_config ( & kafka, & kafka_security, & rolegroup_ref, & cluster_info)
468+ . unwrap ( ) ;
458469
459470 assert_eq ! (
460471 config. listeners( ) ,
@@ -480,7 +491,12 @@ mod tests {
480491 kafka_security. client_port_name( )
481492 ) ,
482493 internal_name = KafkaListenerName :: Internal ,
483- internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
494+ internal_host = pod_fqdn(
495+ & kafka,
496+ & rolegroup_ref. rolegroup_headless_service_name( ) ,
497+ & cluster_info
498+ )
499+ . unwrap( ) ,
484500 internal_port = kafka_security. internal_port( ) ,
485501 )
486502 ) ;
@@ -505,7 +521,8 @@ mod tests {
505521 ) ;
506522
507523 let config =
508- get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
524+ get_kafka_listener_config ( & kafka, & kafka_security, & rolegroup_ref, & cluster_info)
525+ . unwrap ( ) ;
509526
510527 assert_eq ! (
511528 config. listeners( ) ,
@@ -531,7 +548,12 @@ mod tests {
531548 kafka_security. client_port_name( )
532549 ) ,
533550 internal_name = KafkaListenerName :: Internal ,
534- internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
551+ internal_host = pod_fqdn(
552+ & kafka,
553+ & rolegroup_ref. rolegroup_headless_service_name( ) ,
554+ & cluster_info
555+ )
556+ . unwrap( ) ,
535557 internal_port = kafka_security. internal_port( ) ,
536558 )
537559 ) ;
@@ -552,9 +574,6 @@ mod tests {
552574
553575 #[ test]
554576 fn test_get_kafka_kerberos_listeners_config ( ) {
555- let object_name = "simple-kafka-broker-default" ;
556- let cluster_info = default_cluster_info ( ) ;
557-
558577 let kafka_cluster = r#"
559578 apiVersion: kafka.stackable.tech/v1alpha1
560579 kind: KafkaCluster
@@ -587,9 +606,12 @@ mod tests {
587606 "tls" . to_string ( ) ,
588607 Some ( "tls" . to_string ( ) ) ,
589608 ) ;
590-
609+ let cluster_info = default_cluster_info ( ) ;
610+ // "simple-kafka-broker-default"
611+ let rolegroup_ref = kafka. rolegroup_ref ( & KafkaRole :: Broker , "default" ) ;
591612 let config =
592- get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
613+ get_kafka_listener_config ( & kafka, & kafka_security, & rolegroup_ref, & cluster_info)
614+ . unwrap ( ) ;
593615
594616 assert_eq ! (
595617 config. listeners( ) ,
@@ -618,7 +640,12 @@ mod tests {
618640 kafka_security. client_port_name( )
619641 ) ,
620642 internal_name = KafkaListenerName :: Internal ,
621- internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
643+ internal_host = pod_fqdn(
644+ & kafka,
645+ & rolegroup_ref. rolegroup_headless_service_name( ) ,
646+ & cluster_info
647+ )
648+ . unwrap( ) ,
622649 internal_port = kafka_security. internal_port( ) ,
623650 bootstrap_name = KafkaListenerName :: Bootstrap ,
624651 bootstrap_host = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
0 commit comments