@@ -267,8 +267,9 @@ mod tests {
267267 builder:: meta:: ObjectMetaBuilder ,
268268 commons:: {
269269 authentication:: {
270- tls:: AuthenticationProvider , AuthenticationClass , AuthenticationClassProvider ,
271- AuthenticationClassSpec ,
270+ kerberos,
271+ tls:: { self } ,
272+ AuthenticationClass , AuthenticationClassProvider , AuthenticationClassSpec ,
272273 } ,
273274 networking:: DomainName ,
274275 } ,
@@ -307,7 +308,7 @@ mod tests {
307308 ResolvedAuthenticationClasses :: new ( vec ! [ AuthenticationClass {
308309 metadata: ObjectMetaBuilder :: new( ) . name( "auth-class" ) . build( ) ,
309310 spec: AuthenticationClassSpec {
310- provider: AuthenticationClassProvider :: Tls ( AuthenticationProvider {
311+ provider: AuthenticationClassProvider :: Tls ( tls :: AuthenticationProvider {
311312 client_cert_secret_class: Some ( "client-auth-secret-class" . to_string( ) ) ,
312313 } ) ,
313314 } ,
@@ -456,4 +457,97 @@ mod tests {
456457 )
457458 ) ;
458459 }
460+
461+ #[ test]
462+ fn test_get_kafka_kerberos_listeners_config ( ) {
463+ let object_name = "simple-kafka-broker-default" ;
464+ let cluster_info = default_cluster_info ( ) ;
465+
466+ let kafka_cluster = r#"
467+ apiVersion: kafka.stackable.tech/v1alpha1
468+ kind: KafkaCluster
469+ metadata:
470+ name: simple-kafka
471+ namespace: default
472+ spec:
473+ image:
474+ productVersion: 3.7.1
475+ clusterConfig:
476+ authentication:
477+ - authenticationClass: kafka-kerberos
478+ tls:
479+ serverSecretClass: tls
480+ zookeeperConfigMapName: xyz
481+ "# ;
482+ let kafka: KafkaCluster = serde_yaml:: from_str ( kafka_cluster) . expect ( "illegal test input" ) ;
483+ let kafka_security = KafkaTlsSecurity :: new (
484+ ResolvedAuthenticationClasses :: new ( vec ! [ AuthenticationClass {
485+ metadata: ObjectMetaBuilder :: new( ) . name( "auth-class" ) . build( ) ,
486+ spec: AuthenticationClassSpec {
487+ provider: AuthenticationClassProvider :: Kerberos (
488+ kerberos:: AuthenticationProvider {
489+ kerberos_secret_class: "kerberos-secret-class" . to_string( ) ,
490+ } ,
491+ ) ,
492+ } ,
493+ } ] ) ,
494+ "tls" . to_string ( ) ,
495+ Some ( "tls" . to_string ( ) ) ,
496+ ) ;
497+
498+ let config =
499+ get_kafka_listener_config ( & kafka, & kafka_security, object_name, & cluster_info) . unwrap ( ) ;
500+
501+ assert_eq ! (
502+ config. listeners( ) ,
503+ format!(
504+ "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}" ,
505+ name = KafkaListenerName :: Client ,
506+ host = LISTENER_LOCAL_ADDRESS ,
507+ port = kafka_security. client_port( ) ,
508+ internal_name = KafkaListenerName :: Internal ,
509+ internal_host = LISTENER_LOCAL_ADDRESS ,
510+ internal_port = kafka_security. internal_port( ) ,
511+ bootstrap_name = KafkaListenerName :: Bootstrap ,
512+ bootstrap_host = LISTENER_LOCAL_ADDRESS ,
513+ bootstrap_port = kafka_security. bootstrap_port( ) ,
514+ )
515+ ) ;
516+
517+ assert_eq ! (
518+ config. advertised_listeners( ) ,
519+ format!(
520+ "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port},{bootstrap_name}://{bootstrap_host}:{bootstrap_port}" ,
521+ name = KafkaListenerName :: Client ,
522+ host = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
523+ port = node_port_cmd(
524+ STACKABLE_LISTENER_BROKER_DIR ,
525+ kafka_security. client_port_name( )
526+ ) ,
527+ internal_name = KafkaListenerName :: Internal ,
528+ internal_host = pod_fqdn( & kafka, object_name, & cluster_info) . unwrap( ) ,
529+ internal_port = kafka_security. internal_port( ) ,
530+ bootstrap_name = KafkaListenerName :: Bootstrap ,
531+ bootstrap_host = node_address_cmd( STACKABLE_LISTENER_BROKER_DIR ) ,
532+ bootstrap_port = node_port_cmd(
533+ STACKABLE_LISTENER_BROKER_DIR ,
534+ kafka_security. client_port_name( )
535+ ) ,
536+ )
537+ ) ;
538+
539+ assert_eq ! (
540+ config. listener_security_protocol_map( ) ,
541+ format!(
542+ "{name}:{protocol},{internal_name}:{internal_protocol},{bootstrap_name}:{bootstrap_protocol}" ,
543+ name = KafkaListenerName :: Client ,
544+ protocol = KafkaListenerProtocol :: SaslSsl ,
545+ internal_name = KafkaListenerName :: Internal ,
546+ internal_protocol = KafkaListenerProtocol :: Ssl ,
547+ bootstrap_name = KafkaListenerName :: Bootstrap ,
548+ bootstrap_protocol = KafkaListenerProtocol :: SaslSsl ,
549+
550+ )
551+ ) ;
552+ }
459553}
0 commit comments