Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ All notable changes to this project will be documented in this file.
- Helm: Allow Pod `priorityClassName` to be configured ([#890]).
- Add experimental support for Kafka KRaft mode ([#889]).
- Add experimental support for Kafka `4.1.0` ([#889]).
- Add `prometheus.io/path|port|scheme` annotations to metrics service ([#897]).

### Changed

- Deprecate support for Kafka `3.7.2` ([#892]).
- BREAKING: The `<cluster>-<role>-<rolegroup>` rolegroup service was replaced with a `<cluster>-<role>-<rolegroup>-headless`
and `<cluster>-<role>-<rolegroup>-metrics` rolegroup service ([#897]).

[#889]: https://github.com/stackabletech/kafka-operator/pull/889
[#890]: https://github.com/stackabletech/kafka-operator/pull/890
[#892]: https://github.com/stackabletech/kafka-operator/pull/892
[#897]: https://github.com/stackabletech/kafka-operator/pull/897

## [25.7.0] - 2025-07-23

Expand Down
2 changes: 1 addition & 1 deletion rust/operator-binary/src/config/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn controller_kafka_container_command(
fn to_listeners(port: u16) -> String {
// The environment variables are set in the statefulset of the controller
format!(
"{listener_name}://$POD_NAME.$ROLEGROUP_REF.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}",
"{listener_name}://$POD_NAME.$ROLEGROUP_HEADLESS_SERVICE_NAME.$NAMESPACE.svc.$CLUSTER_DOMAIN:{port}",
listener_name = KafkaListenerName::Controller
)
}
Expand Down
73 changes: 50 additions & 23 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::{
};

use snafu::{OptionExt, Snafu};
use stackable_operator::{kube::ResourceExt, utils::cluster_info::KubernetesClusterInfo};
use stackable_operator::{
kube::ResourceExt, role_utils::RoleGroupRef, utils::cluster_info::KubernetesClusterInfo,
};
use strum::{EnumDiscriminants, EnumString};

use crate::crd::{STACKABLE_LISTENER_BROKER_DIR, security::KafkaTlsSecurity, v1alpha1};
Expand Down Expand Up @@ -170,10 +172,14 @@ impl Display for KafkaListener {
pub fn get_kafka_listener_config(
kafka: &v1alpha1::KafkaCluster,
kafka_security: &KafkaTlsSecurity,
object_name: &str,
rolegroup_ref: &RoleGroupRef<v1alpha1::KafkaCluster>,
cluster_info: &KubernetesClusterInfo,
) -> Result<KafkaListenerConfig, KafkaListenerError> {
let pod_fqdn = pod_fqdn(kafka, object_name, cluster_info)?;
let pod_fqdn = pod_fqdn(
kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
cluster_info,
)?;
let mut listeners = vec![];
let mut advertised_listeners = vec![];
let mut listener_security_protocol_map: BTreeMap<KafkaListenerName, KafkaListenerProtocol> =
Expand Down Expand Up @@ -334,12 +340,11 @@ pub fn node_port_cmd(directory: &str, port_name: &str) -> String {

pub fn pod_fqdn(
kafka: &v1alpha1::KafkaCluster,
object_name: &str,
sts_service_name: &str,
cluster_info: &KubernetesClusterInfo,
) -> Result<String, KafkaListenerError> {
Ok(format!(
"$POD_NAME.{object_name}.{namespace}.svc.{cluster_domain}",
object_name = object_name,
"$POD_NAME.{sts_service_name}.{namespace}.svc.{cluster_domain}",
namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?,
cluster_domain = cluster_info.cluster_domain
))
Expand All @@ -354,7 +359,7 @@ mod tests {
};

use super::*;
use crate::crd::authentication::ResolvedAuthenticationClasses;
use crate::crd::{authentication::ResolvedAuthenticationClasses, role::KafkaRole};

fn default_cluster_info() -> KubernetesClusterInfo {
KubernetesClusterInfo {
Expand All @@ -364,9 +369,6 @@ mod tests {

#[test]
fn test_get_kafka_listeners_config() {
let object_name = "simple-kafka-broker-default";
let cluster_info = default_cluster_info();

let kafka_cluster = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand Down Expand Up @@ -400,9 +402,12 @@ mod tests {
"internalTls".to_string(),
Some("tls".to_string()),
);

let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default");
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -428,7 +433,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -454,7 +464,8 @@ mod tests {
Some("tls".to_string()),
);
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -480,7 +491,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -505,7 +521,8 @@ mod tests {
);

let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand All @@ -531,7 +548,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
)
);
Expand All @@ -552,9 +574,6 @@ mod tests {

#[test]
fn test_get_kafka_kerberos_listeners_config() {
let object_name = "simple-kafka-broker-default";
let cluster_info = default_cluster_info();

let kafka_cluster = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand Down Expand Up @@ -587,9 +606,12 @@ mod tests {
"tls".to_string(),
Some("tls".to_string()),
);

let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
let rolegroup_ref = kafka.rolegroup_ref(&KafkaRole::Broker, "default");
let config =
get_kafka_listener_config(&kafka, &kafka_security, object_name, &cluster_info).unwrap();
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
.unwrap();

assert_eq!(
config.listeners(),
Expand Down Expand Up @@ -618,7 +640,12 @@ mod tests {
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name, &cluster_info).unwrap(),
internal_host = pod_fqdn(
&kafka,
&rolegroup_ref.rolegroup_headless_service_name(),
&cluster_info
)
.unwrap(),
internal_port = kafka_security.internal_port(),
bootstrap_name = KafkaListenerName::Bootstrap,
bootstrap_host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
Expand Down
7 changes: 5 additions & 2 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ impl v1alpha1::KafkaCluster {
for replica in 0..replicas {
pod_descriptors.push(KafkaPodDescriptor {
namespace: namespace.clone(),
role_group_service_name: rolegroup_ref.object_name(),
role_group_service_name: rolegroup_ref
.rolegroup_headless_service_name(),
role_group_statefulset_name: rolegroup_ref.object_name(),
replica,
cluster_domain: cluster_info.cluster_domain.clone(),
node_id: node_id_hash_offset + u32::from(replica),
Expand Down Expand Up @@ -341,6 +343,7 @@ impl v1alpha1::KafkaCluster {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct KafkaPodDescriptor {
namespace: String,
role_group_statefulset_name: String,
role_group_service_name: String,
replica: u16,
cluster_domain: DomainName,
Expand All @@ -361,7 +364,7 @@ impl KafkaPodDescriptor {
}

pub fn pod_name(&self) -> String {
format!("{}-{}", self.role_group_service_name, self.replica)
format!("{}-{}", self.role_group_statefulset_name, self.replica)
}

/// Build the Kraft voter String
Expand Down
22 changes: 18 additions & 4 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
resource::{
configmap::build_rolegroup_config_map,
listener::build_broker_rolegroup_bootstrap_listener,
service::build_rolegroup_service,
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
statefulset::{build_broker_rolegroup_statefulset, build_controller_rolegroup_statefulset},
},
};
Expand Down Expand Up @@ -347,8 +347,16 @@ pub async fn reconcile_kafka(
.merged_config(kafka, &rolegroup_ref.role_group)
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(kafka, &resolved_product_image, &rolegroup_ref)
let rg_headless_service = build_rolegroup_headless_service(
kafka,
&resolved_product_image,
&rolegroup_ref,
&kafka_security,
)
.context(BuildServiceSnafu)?;

let rg_metrics_service =
build_rolegroup_metrics_service(kafka, &resolved_product_image, &rolegroup_ref)
.context(BuildServiceSnafu)?;

let rg_configmap = build_rolegroup_config_map(
Expand Down Expand Up @@ -407,7 +415,13 @@ pub async fn reconcile_kafka(
}

cluster_resources
.add(client, rg_service)
.add(client, rg_headless_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup_ref.clone(),
})?;
cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup_ref.clone(),
Expand Down
Loading
Loading