From 7eed75513d0bbb51adde83e370b55ddbf154e1b9 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 14 Oct 2025 17:48:44 +0200 Subject: [PATCH 01/11] create client.properties and update tls tests --- rust/operator-binary/src/crd/security.rs | 110 ++++++++++++++++++ .../operator-binary/src/resource/configmap.rs | 12 ++ .../kuttl/tls/30_test_client_auth_tls.sh | 15 ++- .../templates/kuttl/tls/30_test_client_tls.sh | 13 +-- tests/templates/kuttl/tls/31-assert.yaml | 11 -- .../templates/kuttl/tls/31-run-tests.yaml.j2 | 13 --- .../kuttl/tls/31_test-tls-job.yaml.j2 | 94 --------------- .../tls/{20-assert.yaml => 40-assert.yaml} | 0 ...kafka.yaml.j2 => 40-install-kafka.yaml.j2} | 11 ++ tests/templates/kuttl/tls/50-assert.yaml.j2 | 11 ++ 10 files changed, 157 insertions(+), 133 deletions(-) delete mode 100644 tests/templates/kuttl/tls/31-assert.yaml delete mode 100644 tests/templates/kuttl/tls/31-run-tests.yaml.j2 delete mode 100644 tests/templates/kuttl/tls/31_test-tls-job.yaml.j2 rename tests/templates/kuttl/tls/{20-assert.yaml => 40-assert.yaml} (100%) rename tests/templates/kuttl/tls/{20-install-kafka.yaml.j2 => 40-install-kafka.yaml.j2} (84%) create mode 100644 tests/templates/kuttl/tls/50-assert.yaml.j2 diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 94fea587..8c189805 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -299,6 +299,116 @@ impl KafkaTlsSecurity { args } + /// Returns a configuration file that can be used by Kafka clients running inside the + /// Kubernetes cluster to connect to the Kafka servers. + pub fn client_properties(&self) -> Vec<(String, Option)> { + let mut props = vec![]; + + if self.tls_client_authentication_class().is_some() { + props.push(("security.protocol".to_string(), Some("SSL".to_string()))); + props.push(("ssl.client.auth".to_string(), Some("required".to_string()))); + props.push(("ssl.keystore.type".to_string(), Some("PKCS12".to_string()))); + props.push(( + "ssl.keystore.location".to_string(), + Some(format!( + "{}/keystore.p12", + Self::STACKABLE_TLS_KAFKA_SERVER_DIR + )), + )); + props.push(( + "ssl.keystore.password".to_string(), + Some(Self::SSL_STORE_PASSWORD.to_string()), + )); + props.push(( + "ssl.truststore.type".to_string(), + Some("PKCS12".to_string()), + )); + props.push(( + "ssl.truststore.location".to_string(), + Some(format!( + "{}/truststore.p12", + Self::STACKABLE_TLS_KAFKA_SERVER_DIR + )), + )); + props.push(( + "ssl.truststore.password".to_string(), + Some(Self::SSL_STORE_PASSWORD.to_string()), + )); + } else if self.has_kerberos_enabled() { + props.push(( + "security.protocol".to_string(), + Some("SASL_SSL".to_string()), + )); + props.push(("ssl.keystore.type".to_string(), Some("PKCS12".to_string()))); + props.push(( + "ssl.keystore.location".to_string(), + Some(format!( + "{}/keystore.p12", + Self::STACKABLE_TLS_KAFKA_SERVER_DIR + )), + )); + props.push(( + "ssl.keystore.password".to_string(), + Some(Self::SSL_STORE_PASSWORD.to_string()), + )); + props.push(( + "ssl.truststore.type".to_string(), + Some("PKCS12".to_string()), + )); + props.push(( + "ssl.truststore.location".to_string(), + Some(format!( + "{}/truststore.p12", + Self::STACKABLE_TLS_KAFKA_SERVER_DIR + )), + )); + props.push(( + "ssl.truststore.password".to_string(), + Some(Self::SSL_STORE_PASSWORD.to_string()), + )); + props.push(( + "sasl.enabled.mechanisms".to_string(), + Some("GSSAPI".to_string()), + )); + props.push(( + "sasl.kerberos.service.name".to_string(), + Some(KafkaRole::Broker.kerberos_service_name().to_string()), + )); + props.push(( + "sasl.mechanism.inter.broker.protocol".to_string(), + Some("GSSAPI".to_string()), + )); + props.push(( + "sasl.jaas.config".to_string(), + Some(format!("com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"{keytab}\" principal=\"{service}/{pod}@{realm}\"", + keytab="/stackable/kerberos/keytab", + service=KafkaRole::Broker.kerberos_service_name(), + pod="todo", + realm="$KERBEROS_REALM")))); + } else if self.tls_server_secret_class().is_some() { + props.push(("security.protocol".to_string(), Some("SSL".to_string()))); + props.push(( + "ssl.truststore.type".to_string(), + Some("PKCS12".to_string()), + )); + props.push(( + "ssl.truststore.location".to_string(), + Some(format!( + "{}/truststore.p12", + Self::STACKABLE_TLS_KAFKA_SERVER_DIR + )), + )); + props.push(( + "ssl.truststore.password".to_string(), + Some(Self::SSL_STORE_PASSWORD.to_string()), + )); + } else { + props.push(("# No SSL required to connect to Kafka".to_string(), None)); + } + + props + } + /// Adds required volumes and volume mounts to the broker pod and container builders /// depending on the tls and authentication settings. pub fn add_broker_volume_and_volume_mounts( diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 54a06921..76368fef 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -124,6 +124,18 @@ pub fn build_rolegroup_config_map( rolegroup: rolegroup.role_group.clone(), } })?, + ) + .add_data( + "client.properties", + to_java_properties_string( + kafka_security + .client_properties() + .iter() + .map(|(k, v)| (k, v)), + ) + .with_context(|_| JvmSecurityPopertiesSnafu { + rolegroup: rolegroup.role_group.clone(), + })?, ); tracing::debug!(?kafka_config, "Applied kafka config"); diff --git a/tests/templates/kuttl/tls/30_test_client_auth_tls.sh b/tests/templates/kuttl/tls/30_test_client_auth_tls.sh index bae7473b..b3daec69 100755 --- a/tests/templates/kuttl/tls/30_test_client_auth_tls.sh +++ b/tests/templates/kuttl/tls/30_test_client_auth_tls.sh @@ -5,7 +5,9 @@ unset TOPIC unset BAD_TOPIC -echo "Connecting to boostrap address $KAFKA" +KAFKA="$(cat /stackable/listener-broker/default-address/address):$(cat /stackable/listener-broker/default-address/ports/kafka-tls)" + +echo "Connecting to bootstrap address $KAFKA" echo "Start client auth TLS testing..." ############################################################################ @@ -15,10 +17,7 @@ echo "Start client auth TLS testing..." TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config - -if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /tmp/client.config +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /stackable/config/client.properties then echo "[SUCCESS] Secure client topic created!" else @@ -26,7 +25,7 @@ else exit 1 fi -if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /tmp/client.config | grep "$TOPIC" +if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /stackable/config/client.properties | grep "$TOPIC" then echo "[SUCCESS] Secure client topic read!" else @@ -48,7 +47,7 @@ fi ############################################################################ # Test the connection with bad host name ############################################################################ -if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /tmp/client.config &> /dev/null +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /stackable/config/client.properties &> /dev/null then echo "[ERROR] Secure client topic created with bad host name!" exit 1 @@ -59,7 +58,7 @@ fi ############################################################################ # Test the connection with bad certificate ############################################################################ -echo $'security.protocol=SSL\nssl.keystore.location=/tmp/wrong_keystore.p12\nssl.keystore.password=changeit\nssl.truststore.location=/tmp/wrong_truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config +echo $'security.protocol=SSL\nssl.keystore.location=/test-scripts/wrong_keystore.p12\nssl.keystore.password=changeit\nssl.truststore.location=/test-scripts/wrong_truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server "$KAFKA" --command-config /tmp/client.config &> /dev/null then echo "[ERROR] Secure client topic created with wrong certificate!" diff --git a/tests/templates/kuttl/tls/30_test_client_tls.sh b/tests/templates/kuttl/tls/30_test_client_tls.sh index 34c2d493..c317d66d 100755 --- a/tests/templates/kuttl/tls/30_test_client_tls.sh +++ b/tests/templates/kuttl/tls/30_test_client_tls.sh @@ -5,7 +5,9 @@ unset TOPIC unset BAD_TOPIC -echo "Connecting to boostrap address $KAFKA" +KAFKA="$(cat /stackable/listener-broker/default-address/address):$(cat /stackable/listener-broker/default-address/ports/kafka-tls)" + +echo "Connecting to bootstrap address $KAFKA" echo "Start client TLS testing..." ############################################################################ @@ -15,10 +17,7 @@ echo "Start client TLS testing..." TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config - -if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /tmp/client.config +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /stackable/config/client.properties then echo "[SUCCESS] Secure client topic created!" else @@ -26,7 +25,7 @@ else exit 1 fi -if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /tmp/client.config | grep "$TOPIC" +if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$KAFKA" --command-config /stackable/config/client.properties | grep "$TOPIC" then echo "[SUCCESS] Secure client topic read!" else @@ -48,7 +47,7 @@ fi ############################################################################ # Test the connection with bad host name ############################################################################ -if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /tmp/client.config &> /dev/null +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /stackable/config/client.properties &> /dev/null then echo "[ERROR] Secure client topic created with bad host name!" exit 1 diff --git a/tests/templates/kuttl/tls/31-assert.yaml b/tests/templates/kuttl/tls/31-assert.yaml deleted file mode 100644 index 52cbcc7b..00000000 --- a/tests/templates/kuttl/tls/31-assert.yaml +++ /dev/null @@ -1,11 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -timeout: 600 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: test-tls -status: - succeeded: 1 diff --git a/tests/templates/kuttl/tls/31-run-tests.yaml.j2 b/tests/templates/kuttl/tls/31-run-tests.yaml.j2 deleted file mode 100644 index d3a31c8d..00000000 --- a/tests/templates/kuttl/tls/31-run-tests.yaml.j2 +++ /dev/null @@ -1,13 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -commands: - # Use the same Kafka image in the test Job as in the Kafka broker StatefulSet - - script: >- - KAFKA_IMAGE=$( - kubectl get statefulsets.apps test-kafka-broker-default - --namespace $NAMESPACE - --output=jsonpath='{.spec.template.spec.containers[?(.name=="kafka")].image}' - ) - envsubst < 31_test-tls-job.yaml | - kubectl apply --namespace $NAMESPACE --filename - diff --git a/tests/templates/kuttl/tls/31_test-tls-job.yaml.j2 b/tests/templates/kuttl/tls/31_test-tls-job.yaml.j2 deleted file mode 100644 index 4428f4b5..00000000 --- a/tests/templates/kuttl/tls/31_test-tls-job.yaml.j2 +++ /dev/null @@ -1,94 +0,0 @@ ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: test-tls -spec: - template: - spec: - containers: - - name: kafka - image: ${KAFKA_IMAGE} - workingDir: /stackable/test -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - command: - - ./test_client_auth_tls.sh -{% elif test_scenario['values']['use-client-tls'] == 'true' %} - command: - - ./test_client_tls.sh -{% else %} - command: - - "true" -{% endif %} - env: - - name: KAFKA - valueFrom: - configMapKeyRef: - name: test-kafka - key: KAFKA - volumeMounts: - - name: test-scripts - mountPath: /stackable/test - - mountPath: /stackable/tls_keystore_internal - name: tls-keystore-internal -{% if test_scenario['values']['use-client-auth-tls'] == 'true' or test_scenario['values']['use-client-tls'] == 'true' %} - - mountPath: /stackable/tls_keystore_server - name: tls-keystore-server -{% endif %} - volumes: - - name: test-scripts - configMap: - name: test-scripts - defaultMode: 0777 -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - - name: tls-keystore-server - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/class: test-kafka-client-auth-tls - secrets.stackable.tech/format: tls-pkcs12 - secrets.stackable.tech/scope: pod,node - spec: - storageClassName: secrets.stackable.tech - accessModes: - - ReadWriteOnce - resources: - requests: - storage: "1" -{% elif test_scenario['values']['use-client-tls'] == 'true' %} - - name: tls-keystore-server - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/class: tls - secrets.stackable.tech/format: tls-pkcs12 - secrets.stackable.tech/scope: pod,node - spec: - storageClassName: secrets.stackable.tech - accessModes: - - ReadWriteOnce - resources: - requests: - storage: "1" -{% endif %} - - name: tls-keystore-internal - ephemeral: - volumeClaimTemplate: - metadata: - annotations: - secrets.stackable.tech/class: tls - secrets.stackable.tech/format: tls-pkcs12 - secrets.stackable.tech/scope: pod,node - spec: - storageClassName: secrets.stackable.tech - accessModes: - - ReadWriteOnce - resources: - requests: - storage: "1" - securityContext: - fsGroup: 1000 - serviceAccountName: test-sa - restartPolicy: OnFailure diff --git a/tests/templates/kuttl/tls/20-assert.yaml b/tests/templates/kuttl/tls/40-assert.yaml similarity index 100% rename from tests/templates/kuttl/tls/20-assert.yaml rename to tests/templates/kuttl/tls/40-assert.yaml diff --git a/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 b/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 similarity index 84% rename from tests/templates/kuttl/tls/20-install-kafka.yaml.j2 rename to tests/templates/kuttl/tls/40-install-kafka.yaml.j2 index da660f7f..ba0278a4 100644 --- a/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 @@ -66,3 +66,14 @@ spec: roleGroups: default: replicas: 3 + podOverrides: + spec: + volumes: + - name: test-scripts + configMap: + name: test-scripts + containers: + - name: kafka + volumeMounts: + - mountPath: /test-scripts + name: test-scripts diff --git a/tests/templates/kuttl/tls/50-assert.yaml.j2 b/tests/templates/kuttl/tls/50-assert.yaml.j2 new file mode 100644 index 00000000..8c99a7d1 --- /dev/null +++ b/tests/templates/kuttl/tls/50-assert.yaml.j2 @@ -0,0 +1,11 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +commands: + - script: | +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -- bash /test-scripts/test_client_auth_tls.sh +{% elif test_scenario['values']['use-client-tls'] == 'true' %} + kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -- bash /test-scripts/test_client_tls.sh +{% else %} + true +{% endif %} From 6c2dfa7a422657483f2e4c2257cfeb8406d53303 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 16 Oct 2025 14:10:00 +0200 Subject: [PATCH 02/11] fix: loose ends and cleanups (#900) * group kerberos code together * clean up bootstrap listener ports * update metrics.py to actually test for the existence of metrics * read jmx config from server.yaml instead of broker.yaml * update docs * update changelog --- CHANGELOG.md | 6 ++++ .../kafka/pages/usage-guide/monitoring.adoc | 4 ++- rust/operator-binary/src/config/jvm.rs | 6 ++-- rust/operator-binary/src/crd/security.rs | 24 ++++++-------- rust/operator-binary/src/resource/listener.rs | 33 +++++++------------ tests/templates/kuttl/smoke-kraft/metrics.py | 16 ++++----- tests/templates/kuttl/smoke/metrics.py | 16 ++++----- 7 files changed, 50 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d35d682a..7de48bce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,17 @@ All notable changes to this project will be documented in this file. - Deprecate support for Kafka `3.7.2` ([#892]). - BREAKING: The `--` rolegroup service was replaced with a `---headless` and `---metrics` rolegroup service ([#897]). +- Small cleanups and updates ([#900]) + - remove the metrics port from services that don't need it + - use the new `server.yaml` for jmx configuration + - update metrics tests + - update monitoring doc [#889]: https://github.com/stackabletech/kafka-operator/pull/889 [#890]: https://github.com/stackabletech/kafka-operator/pull/890 [#892]: https://github.com/stackabletech/kafka-operator/pull/892 [#897]: https://github.com/stackabletech/kafka-operator/pull/897 +[#900]: https://github.com/stackabletech/kafka-operator/pull/900 ## [25.7.0] - 2025-07-23 diff --git a/docs/modules/kafka/pages/usage-guide/monitoring.adoc b/docs/modules/kafka/pages/usage-guide/monitoring.adoc index 421129ae..92a28dc7 100644 --- a/docs/modules/kafka/pages/usage-guide/monitoring.adoc +++ b/docs/modules/kafka/pages/usage-guide/monitoring.adoc @@ -1,5 +1,7 @@ = Monitoring :description: The managed Kafka instances are automatically configured to export Prometheus metrics. -The managed Kafka instances are automatically configured to export Prometheus metrics. +The operator sets up all Kafka server instances with a JMX exporter agent. +The operator also sets up a dedicated metrics service for each role group. +The name of this service follows the schema `---metrics`. See xref:operators:monitoring.adoc[] for more details. diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 4311d40d..f3ed39bd 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -51,7 +51,7 @@ fn construct_jvm_args( format!("-Xms{java_heap}"), format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"), format!( - "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml" + "-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/server.yaml" ), ]; @@ -130,7 +130,7 @@ mod tests { assert_eq!( non_heap_jvm_args, "-Djava.security.properties=/stackable/config/security.properties \ - -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml" + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/server.yaml" ); assert_eq!(heap_jvm_args, "-Xmx1638m -Xms1638m"); } @@ -177,7 +177,7 @@ mod tests { assert_eq!( non_heap_jvm_args, "-Djava.security.properties=/stackable/config/security.properties \ - -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml \ + -javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/server.yaml \ -Dhttps.proxyHost=proxy.my.corp \ -Djava.net.preferIPv4Stack=true \ -Dhttps.proxyPort=1234" diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 8c189805..505a52d0 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -594,6 +594,16 @@ impl KafkaTlsSecurity { KafkaListenerName::Bootstrap.listener_ssl_truststore_type(), "PKCS12".to_string(), ); + config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); + config.insert( + "sasl.kerberos.service.name".to_string(), + KafkaRole::Broker.kerberos_service_name().to_string(), + ); + config.insert( + "sasl.mechanism.inter.broker.protocol".to_string(), + "GSSAPI".to_string(), + ); + tracing::debug!("Kerberos configs added: [{:#?}]", config); } // Internal TLS @@ -655,20 +665,6 @@ impl KafkaTlsSecurity { ); } - // Kerberos - if self.has_kerberos_enabled() { - config.insert("sasl.enabled.mechanisms".to_string(), "GSSAPI".to_string()); - config.insert( - "sasl.kerberos.service.name".to_string(), - KafkaRole::Broker.kerberos_service_name().to_string(), - ); - config.insert( - "sasl.mechanism.inter.broker.protocol".to_string(), - "GSSAPI".to_string(), - ); - tracing::debug!("Kerberos configs added: [{:#?}]", config); - } - // common config.insert( Self::INTER_BROKER_LISTENER_NAME.to_string(), diff --git a/rust/operator-binary/src/resource/listener.rs b/rust/operator-binary/src/resource/listener.rs index 23cc254f..3a9b19fa 100644 --- a/rust/operator-binary/src/resource/listener.rs +++ b/rust/operator-binary/src/resource/listener.rs @@ -5,10 +5,7 @@ use stackable_operator::{ }; use crate::{ - crd::{ - METRICS_PORT, METRICS_PORT_NAME, role::broker::BrokerConfig, security::KafkaTlsSecurity, - v1alpha1, - }, + crd::{role::broker::BrokerConfig, security::KafkaTlsSecurity, v1alpha1}, kafka_controller::KAFKA_CONTROLLER_NAME, utils::build_recommended_labels, }; @@ -53,33 +50,27 @@ pub fn build_broker_rolegroup_bootstrap_listener( .build(), spec: listener::v1alpha1::ListenerSpec { class_name: Some(merged_config.bootstrap_listener_class.clone()), - ports: Some(listener_ports(kafka_security)), + ports: Some(bootstrap_listener_ports(kafka_security)), ..listener::v1alpha1::ListenerSpec::default() }, status: None, }) } -/// We only expose client HTTP / HTTPS and Metrics ports. -fn listener_ports(kafka_security: &KafkaTlsSecurity) -> Vec { - let mut ports = vec![ +fn bootstrap_listener_ports( + kafka_security: &KafkaTlsSecurity, +) -> Vec { + vec![if kafka_security.has_kerberos_enabled() { listener::v1alpha1::ListenerPort { - name: METRICS_PORT_NAME.to_string(), - port: METRICS_PORT.into(), + name: kafka_security.bootstrap_port_name().to_string(), + port: kafka_security.bootstrap_port().into(), protocol: Some("TCP".to_string()), - }, + } + } else { listener::v1alpha1::ListenerPort { name: kafka_security.client_port_name().to_string(), port: kafka_security.client_port().into(), protocol: Some("TCP".to_string()), - }, - ]; - if kafka_security.has_kerberos_enabled() { - ports.push(listener::v1alpha1::ListenerPort { - name: kafka_security.bootstrap_port_name().to_string(), - port: kafka_security.bootstrap_port().into(), - protocol: Some("TCP".to_string()), - }); - } - ports + } + }] } diff --git a/tests/templates/kuttl/smoke-kraft/metrics.py b/tests/templates/kuttl/smoke-kraft/metrics.py index 1f39540d..91072053 100644 --- a/tests/templates/kuttl/smoke-kraft/metrics.py +++ b/tests/templates/kuttl/smoke-kraft/metrics.py @@ -3,8 +3,6 @@ import requests if __name__ == "__main__": - result = 0 - LOG_LEVEL = "DEBUG" # if args.debug else 'INFO' logging.basicConfig( level=LOG_LEVEL, @@ -12,10 +10,12 @@ stream=sys.stdout, ) - http_code = requests.get( - "http://test-kafka-broker-default-metrics:9606" - ).status_code - if http_code != 200: - result = 1 + response = requests.get("http://test-kafka-broker-default-metrics:9606/metrics") + + assert response.status_code == 200, ( + f"Expected HTTP return code 200 from the metrics endpoint but got [{response.status_code}]" + ) - sys.exit(result) + assert "jmx_scrape_error" in response.text, ( + "Expected metric [jmx_scrape_error] not found" + ) diff --git a/tests/templates/kuttl/smoke/metrics.py b/tests/templates/kuttl/smoke/metrics.py index ad0c75e0..91072053 100644 --- a/tests/templates/kuttl/smoke/metrics.py +++ b/tests/templates/kuttl/smoke/metrics.py @@ -3,8 +3,6 @@ import requests if __name__ == "__main__": - result = 0 - LOG_LEVEL = "DEBUG" # if args.debug else 'INFO' logging.basicConfig( level=LOG_LEVEL, @@ -12,10 +10,12 @@ stream=sys.stdout, ) - http_code = requests.get( - "http://test-kafka-broker-default-metrics:9606/metrics" - ).status_code - if http_code != 200: - result = 1 + response = requests.get("http://test-kafka-broker-default-metrics:9606/metrics") + + assert response.status_code == 200, ( + f"Expected HTTP return code 200 from the metrics endpoint but got [{response.status_code}]" + ) - sys.exit(result) + assert "jmx_scrape_error" in response.text, ( + "Expected metric [jmx_scrape_error] not found" + ) From 7f7076ecd2211823169f9e1e05ef922a98a108d4 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 17 Oct 2025 15:34:39 +0200 Subject: [PATCH 03/11] add comment about the kerberos situation --- rust/operator-binary/src/crd/security.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 505a52d0..12827c8f 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -335,6 +335,13 @@ impl KafkaTlsSecurity { Some(Self::SSL_STORE_PASSWORD.to_string()), )); } else if self.has_kerberos_enabled() { + // TODO: to make this configuration file usable out of the box the operator needs to be + // refactored to write out Java jaas files instead of passing command line parameters + // to the Kafka daemon scripts. + // This will simplify the code and the command lines lot. + // It will also make the jaas files reusable by the Kafka shell scripts. + props.push(("# This is just an example. The sasl.jaas.config property must be updated before use.".to_string(), None)); + props.push(( "security.protocol".to_string(), Some("SASL_SSL".to_string()), From 527434675187570e77bd8468f8ffd289c58e05ac Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 17 Oct 2025 17:11:04 +0200 Subject: [PATCH 04/11] update docs to remove kcat references --- .../getting_started/getting_started.sh | 53 +++++++------- .../getting_started/getting_started.sh.j2 | 53 +++++++------- .../pages/getting_started/first_steps.adoc | 73 +++++++++++++------ .../kafka/pages/getting_started/index.adoc | 1 - docs/modules/kafka/pages/index.adoc | 2 +- 5 files changed, 109 insertions(+), 73 deletions(-) diff --git a/docs/modules/kafka/examples/getting_started/getting_started.sh b/docs/modules/kafka/examples/getting_started/getting_started.sh index cb90b245..ad01f565 100755 --- a/docs/modules/kafka/examples/getting_started/getting_started.sh +++ b/docs/modules/kafka/examples/getting_started/getting_started.sh @@ -90,28 +90,31 @@ trap "kill $PORT_FORWARD_PID" EXIT sleep 15 -echo "Creating test data" -# tag::kcat-create-data[] -echo "some test data" > data -# end::kcat-create-data[] - -echo "Writing test data" -# tag::kcat-write-data[] -kcat -b localhost:9092 -t test-data-topic -P data -# end::kcat-write-data[] - -echo "Reading test data" -# tag::kcat-read-data[] -kcat -b localhost:9092 -t test-data-topic -C -e > read-data.out -# end::kcat-read-data[] - -echo "Check contents" -# tag::kcat-check-data[] -grep "some test data" read-data.out -# end::kcat-check-data[] - -echo "Cleanup" -# tag::kcat-cleanup-data[] -rm data -rm read-data.out -# end::kcat-cleanup-data[] +echo "Creating test topic test-data-topic" +# tag::create-topic[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-topics.sh \ +--list \ +--bootstrap-server localhost:9092 +# end::create-topic[] + +echo "Publish test data" +# tag::write-data[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-producer-perf-test.sh \ +--producer-props bootstrap.servers=localhost:9092 \ +--topic test-data-topic \ +--payload-monotonic \ +--throughput 1 \ +--num-records 5 +# end::write-data[] + +echo "Consume test data" +# tag::read-data[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-console-consumer.sh \ +--bootstrap-server localhost:9092 \ +--topic test-data-topic \ +--offset earliest \ +--partition 0 \ +--timeout-ms 1000 +# end::read-data[] + +echo "Success!" diff --git a/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 b/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 index 12682dd1..9d6169ea 100755 --- a/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 +++ b/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 @@ -90,28 +90,31 @@ trap "kill $PORT_FORWARD_PID" EXIT sleep 15 -echo "Creating test data" -# tag::kcat-create-data[] -echo "some test data" > data -# end::kcat-create-data[] - -echo "Writing test data" -# tag::kcat-write-data[] -kcat -b localhost:9092 -t test-data-topic -P data -# end::kcat-write-data[] - -echo "Reading test data" -# tag::kcat-read-data[] -kcat -b localhost:9092 -t test-data-topic -C -e > read-data.out -# end::kcat-read-data[] - -echo "Check contents" -# tag::kcat-check-data[] -grep "some test data" read-data.out -# end::kcat-check-data[] - -echo "Cleanup" -# tag::kcat-cleanup-data[] -rm data -rm read-data.out -# end::kcat-cleanup-data[] +echo "Creating test topic test-data-topic" +# tag::create-topic[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-topics.sh \ +--list \ +--bootstrap-server localhost:9092 +# end::create-topic[] + +echo "Publish test data" +# tag::write-data[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-producer-perf-test.sh \ +--producer-props bootstrap.servers=localhost:9092 \ +--topic test-data-topic \ +--payload-monotonic \ +--throughput 1 \ +--num-records 5 +# end::write-data[] + +echo "Consume test data" +# tag::read-data[] +kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-console-consumer.sh \ +--bootstrap-server localhost:9092 \ +--topic test-data-topic \ +--offset earliest \ +--partition 0 \ +--timeout-ms 1000 +# end::read-data[] + +echo "Success!" diff --git a/docs/modules/kafka/pages/getting_started/first_steps.adoc b/docs/modules/kafka/pages/getting_started/first_steps.adoc index d4ce94f0..410ebac5 100644 --- a/docs/modules/kafka/pages/getting_started/first_steps.adoc +++ b/docs/modules/kafka/pages/getting_started/first_steps.adoc @@ -1,6 +1,5 @@ = First steps -:description: Deploy and verify a Kafka cluster on Kubernetes with Stackable Operators, including ZooKeeper setup and data testing using kcat. -:kcat-install: https://github.com/edenhill/kcat#install +:description: Deploy and verify a Kafka cluster on Kubernetes with Stackable Operators, including ZooKeeper setup and data testing. After going through the xref:getting_started/installation.adoc[] section and having installed all the operators, you now deploy a Kafka cluster and the required dependencies. Afterward you can <<_verify_that_it_works, verify that it works>> by producing test data into a topic and consuming it. @@ -65,58 +64,90 @@ This creates the actual Kafka instance. == Verify that it works -Next you produce data into a topic and read it via {kcat-install}[kcat]. -Depending on your platform you may need to replace `kafkacat` in the commands below with `kcat`. +Next we will use the Kafka client scripts to create a new topic, publish and consume some data. -First, make sure that all the Pods in the StatefulSets are ready: +The Kafka operator has created a service called `simple-kafka-broker-default-bootstrap`. +This service represents the endpoint clients should initially connect to in order to publish and consume data. +First, make sure that the service exists and it is healthy: [source,bash] ---- -kubectl get statefulset +kubectl describe svc simple-kafka-broker-default-bootstrap ---- -The output should show all pods ready: +The output should look somewhat like this: ---- -NAME READY AGE -simple-kafka-broker-default 3/3 5m -simple-zk-server-default 3/3 7m +Name: simple-kafka-broker-default-bootstrap +Namespace: default +Labels: app.kubernetes.io/component=broker + app.kubernetes.io/instance=simple-kafka-broker-default-bootstrap + app.kubernetes.io/managed-by=listeners.stackable.tech_listener + app.kubernetes.io/name=listener + app.kubernetes.io/role-group=default + app.kubernetes.io/version=3.9.1-stackable0.0.0-dev + stackable.tech/vendor=Stackable +Annotations: +Selector: listener.stackable.tech/mnt.9555cbb6f38d4b0ca1771e6d83d28e27=simple-kafka-broker-default-bootstrap +Type: NodePort +IP Family Policy: SingleStack +IP Families: IPv4 +IP: 10.105.88.52 +IPs: 10.105.88.52 +Port: kafka 9092/TCP +TargetPort: 9092/TCP +NodePort: kafka 32608/TCP +Endpoints: 10.244.4.22:9092,10.244.4.24:9092,10.244.4.23:9092 +Session Affinity: None +External Traffic Policy: Local +Internal Traffic Policy: Cluster +Events: ---- -Then, create a port-forward for the Kafka Broker: +In the output we see that there are three endpoints serviced here. +There correspond to the three broker pods belonging to the Kafka cluster. + +Then, create a port-forward on this service: ---- include::example$getting_started/getting_started.sh[tag=port-forwarding] ---- -Create a file containing some data: +Now, create a new topic called `test-data-topic`: ---- -include::example$getting_started/getting_started.sh[tag=kcat-create-data] +include::example$getting_started/getting_started.sh[tag=create-topic] ---- -Write that data: +Use the Kafka performance producer script to send a couple of messages to the topic previously created: ---- -include::example$getting_started/getting_started.sh[tag=kcat-write-data] +include::example$getting_started/getting_started.sh[tag=write-data] ---- -Read that data: +The output should contain the following line: ---- -include::example$getting_started/getting_started.sh[tag=kcat-read-data] +5 records sent, 1.138434 records/sec (0.00 MB/sec), 83.40 ms avg latency, 395.00 ms max latency, 3 ms 50th, 395 ms 95th, 395 ms 99th, 395 ms 99.9th. ---- -Check the content: +This confirms that there were five messages sent to the topic and it also displays performance timers. +We are not interested in any performance indicators but appreciate the fact that there were five unique messages that we consume later. + +Now let's consume the messages from above: ---- -include::example$getting_started/getting_started.sh[tag=kcat-check-data] +include::example$getting_started/getting_started.sh[tag=read-data] ---- -And clean up: +The consumer should print the messages in between logging statements ---- -include::example$getting_started/getting_started.sh[tag=kcat-cleanup-data] +0 +1 +2 +3 +4 ---- You successfully created a Kafka cluster and produced and consumed data. diff --git a/docs/modules/kafka/pages/getting_started/index.adoc b/docs/modules/kafka/pages/getting_started/index.adoc index 1744de46..87386ecd 100644 --- a/docs/modules/kafka/pages/getting_started/index.adoc +++ b/docs/modules/kafka/pages/getting_started/index.adoc @@ -11,7 +11,6 @@ You need: * a Kubernetes cluster * kubectl * optional: Helm -* https://github.com/edenhill/kcat#install[kcat] for testing Resource sizing depends on cluster type(s), usage and scope, but as a starting point a minimum of the following resources is recommended for this operator: diff --git a/docs/modules/kafka/pages/index.adoc b/docs/modules/kafka/pages/index.adoc index 43671170..9e1a86c6 100644 --- a/docs/modules/kafka/pages/index.adoc +++ b/docs/modules/kafka/pages/index.adoc @@ -21,7 +21,7 @@ It is commonly used for real-time data processing, data ingestion, event streami == Getting started -Follow the xref:kafka:getting_started/index.adoc[] which guides you through installing The Stackable Kafka and ZooKeeper operators, setting up ZooKeeper and Kafka and testing your Kafka using `kcat`. +Follow the xref:kafka:getting_started/index.adoc[] which guides you through installing The Stackable Kafka and ZooKeeper operators, setting up ZooKeeper and Kafka and testing your Kafka installation. == Resources From a0973951194014d5ca61b84d417696dfbbae6e5d Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 17 Oct 2025 17:11:34 +0200 Subject: [PATCH 05/11] remove comments from client.properties --- rust/operator-binary/src/crd/security.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index 12827c8f..aa9d3525 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -340,8 +340,6 @@ impl KafkaTlsSecurity { // to the Kafka daemon scripts. // This will simplify the code and the command lines lot. // It will also make the jaas files reusable by the Kafka shell scripts. - props.push(("# This is just an example. The sasl.jaas.config property must be updated before use.".to_string(), None)); - props.push(( "security.protocol".to_string(), Some("SASL_SSL".to_string()), @@ -410,7 +408,11 @@ impl KafkaTlsSecurity { Some(Self::SSL_STORE_PASSWORD.to_string()), )); } else { - props.push(("# No SSL required to connect to Kafka".to_string(), None)); + // Empty client.properties. + // Unfortunately cannot add comments because the properties writer escapes them + // generating garbage. + // + // props.push(("# No SSL required to connect to Kafka".to_string(), None)); } props From f5fd22582bfbf26efe5cdf5d5ac093798a1503bc Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 17 Oct 2025 17:36:00 +0200 Subject: [PATCH 06/11] fix topic creation --- .../modules/kafka/examples/getting_started/getting_started.sh | 4 +++- .../kafka/examples/getting_started/getting_started.sh.j2 | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/modules/kafka/examples/getting_started/getting_started.sh b/docs/modules/kafka/examples/getting_started/getting_started.sh index ad01f565..f2563958 100755 --- a/docs/modules/kafka/examples/getting_started/getting_started.sh +++ b/docs/modules/kafka/examples/getting_started/getting_started.sh @@ -93,7 +93,9 @@ sleep 15 echo "Creating test topic test-data-topic" # tag::create-topic[] kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-topics.sh \ ---list \ +--create \ +--topic test-data-topic \ +--partitions 1 \ --bootstrap-server localhost:9092 # end::create-topic[] diff --git a/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 b/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 index 9d6169ea..75b6fa61 100755 --- a/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 +++ b/docs/modules/kafka/examples/getting_started/getting_started.sh.j2 @@ -93,7 +93,9 @@ sleep 15 echo "Creating test topic test-data-topic" # tag::create-topic[] kubectl exec -n default simple-kafka-broker-default-0 -c kafka -t -- /stackable/kafka/bin/kafka-topics.sh \ ---list \ +--create \ +--topic test-data-topic \ +--partitions 1 \ --bootstrap-server localhost:9092 # end::create-topic[] From 048e3de5fc8e7723c50c4ebbbfe0c0c6ec371212 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 17 Oct 2025 22:25:20 +0200 Subject: [PATCH 07/11] remove kcat from upgrade test --- tests/templates/kuttl/upgrade/03-assert.yaml | 12 +-- .../kuttl/upgrade/03-write-data.yaml.j2 | 94 ------------------- tests/templates/kuttl/upgrade/05-assert.yaml | 12 +-- .../kuttl/upgrade/05-read-data.yaml.j2 | 71 -------------- 4 files changed, 10 insertions(+), 179 deletions(-) delete mode 100644 tests/templates/kuttl/upgrade/03-write-data.yaml.j2 delete mode 100644 tests/templates/kuttl/upgrade/05-read-data.yaml.j2 diff --git a/tests/templates/kuttl/upgrade/03-assert.yaml b/tests/templates/kuttl/upgrade/03-assert.yaml index c7accefb..ab808bd2 100644 --- a/tests/templates/kuttl/upgrade/03-assert.yaml +++ b/tests/templates/kuttl/upgrade/03-assert.yaml @@ -2,10 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: write-data -status: - succeeded: 1 +commands: + - command: > + kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -t -- + bash -c "echo message written before upgrade | + /stackable/kafka/bin/kafka-console-producer.sh --topic upgrade-test-data --bootstrap-server test-kafka-broker-default-bootstrap:$(cat /stackable/listener-bootstrap/default-address/ports/kafka*) --producer.config /stackable/config/client.properties" diff --git a/tests/templates/kuttl/upgrade/03-write-data.yaml.j2 b/tests/templates/kuttl/upgrade/03-write-data.yaml.j2 deleted file mode 100644 index b03d20ea..00000000 --- a/tests/templates/kuttl/upgrade/03-write-data.yaml.j2 +++ /dev/null @@ -1,94 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: write-data -spec: - template: - spec: - serviceAccountName: read-write-data-sa -{% if test_scenario['values']['openshift'] == "true" %} - securityContext: - runAsUser: 0 -{% endif %} - containers: - - name: write-data - image: oci.stackable.tech/sdp/kafka-testing-tools:1.0.0-stackable0.0.0-dev - command: [sh, -euo, pipefail, -c] - args: - - | -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - export SSL_OPTIONS="-X security.protocol=SSL -X ssl.key.location=/stackable/tls_client/tls.key -X ssl.certificate.location=/stackable/tls_client/tls.crt -X ssl.ca.location=/stackable/tls_client/ca.crt" -{% elif test_scenario['values']['use-client-tls'] == 'true' %} - export SSL_OPTIONS="-X security.protocol=SSL -X ssl.ca.location=/stackable/tls_client/ca.crt" -{% else %} - export SSL_OPTIONS="" -{% endif %} - echo "message written before upgrade" > /tmp/message - /stackable/kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -P /tmp/message - env: - - name: KAFKA - valueFrom: - configMapKeyRef: - name: test-kafka - key: KAFKA - volumeMounts: - - mountPath: /stackable/tls_client - name: tls - volumes: - - ephemeral: - volumeClaimTemplate: - metadata: - annotations: -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - secrets.stackable.tech/class: test-kafka-client-auth-tls -{% else %} - secrets.stackable.tech/class: tls -{% endif %} - secrets.stackable.tech/scope: pod,node - creationTimestamp: null - spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: "1" - storageClassName: secrets.stackable.tech - volumeMode: Filesystem - name: tls - restartPolicy: Never - ---- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: read-write-data-sa - -{% if test_scenario['values']['openshift'] == "true" %} ---- -kind: Role -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: use-privileged-scc -rules: - - apiGroups: ["security.openshift.io"] - resources: ["securitycontextconstraints"] - resourceNames: ["privileged"] - verbs: ["use"] ---- -kind: RoleBinding -apiVersion: rbac.authorization.k8s.io/v1 -metadata: - name: use-privileged-scc -subjects: - - kind: ServiceAccount - name: read-write-data-sa -roleRef: - kind: Role - name: use-privileged-scc - apiGroup: rbac.authorization.k8s.io -{% endif %} diff --git a/tests/templates/kuttl/upgrade/05-assert.yaml b/tests/templates/kuttl/upgrade/05-assert.yaml index 4d8db7a4..8d8e6cbc 100644 --- a/tests/templates/kuttl/upgrade/05-assert.yaml +++ b/tests/templates/kuttl/upgrade/05-assert.yaml @@ -2,10 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: read-data -status: - succeeded: 1 +commands: + - command: > + kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -c kafka -t -- + bash -c "/stackable/kafka/bin/kafka-console-consumer.sh --topic upgrade-test-data --bootstrap-server test-kafka-broker-default-bootstrap:$(cat /stackable/listener-bootstrap/default-address/ports/kafka*) --offset earliest --partition 0 --timeout-ms 1000 --consumer.config /stackable/config/client.properties" + | grep 'message written before upgrade' diff --git a/tests/templates/kuttl/upgrade/05-read-data.yaml.j2 b/tests/templates/kuttl/upgrade/05-read-data.yaml.j2 deleted file mode 100644 index d766466c..00000000 --- a/tests/templates/kuttl/upgrade/05-read-data.yaml.j2 +++ /dev/null @@ -1,71 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: read-data -spec: - template: - spec: - serviceAccountName: read-write-data-sa -{% if test_scenario['values']['openshift'] == "true" %} - securityContext: - runAsUser: 0 -{% endif %} - containers: - - name: read-data - image: oci.stackable.tech/sdp/kafka-testing-tools:1.0.0-stackable0.0.0-dev - command: [sh, -euo, pipefail, -c] - args: - - | -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - export SSL_OPTIONS="-X security.protocol=SSL -X ssl.key.location=/stackable/tls_client/tls.key -X ssl.certificate.location=/stackable/tls_client/tls.crt -X ssl.ca.location=/stackable/tls_client/ca.crt" -{% elif test_scenario['values']['use-client-tls'] == 'true' %} - export SSL_OPTIONS="-X security.protocol=SSL -X ssl.ca.location=/stackable/tls_client/ca.crt" -{% else %} - export SSL_OPTIONS="" -{% endif %} - echo "message written after upgrade" > /tmp/message - /stackable/kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -P /tmp/message - - echo "message written before upgrade" > /tmp/expected-messages - echo >> /tmp/expected-messages - cat /tmp/message >> /tmp/expected-messages - echo >> /tmp/expected-messages - /stackable/kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -C -e > /tmp/read-messages - diff /tmp/read-messages /tmp/expected-messages - cmp /tmp/read-messages /tmp/expected-messages - env: - - name: KAFKA - valueFrom: - configMapKeyRef: - name: test-kafka - key: KAFKA - volumeMounts: - - mountPath: /stackable/tls_client - name: tls - volumes: - - ephemeral: - volumeClaimTemplate: - metadata: - annotations: -{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} - secrets.stackable.tech/class: test-kafka-client-auth-tls -{% else %} - secrets.stackable.tech/class: tls -{% endif %} - secrets.stackable.tech/scope: pod,node - creationTimestamp: null - spec: - accessModes: - - ReadWriteOnce - resources: - requests: - storage: "1" - storageClassName: secrets.stackable.tech - volumeMode: Filesystem - name: tls - restartPolicy: Never From 5ec69da3fadfead77024a26f6e9420e761e25f27 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Sat, 18 Oct 2025 17:05:49 +0200 Subject: [PATCH 08/11] update changelog --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7de48bce..2d1cd628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ All notable changes to this project will be documented in this file. - Add experimental support for Kafka KRaft mode ([#889]). - Add experimental support for Kafka `4.1.0` ([#889]). - Add `prometheus.io/path|port|scheme` annotations to metrics service ([#897]). +- Add `client.properties` to the Kafka configuration config map ([#898]). + + Use this file together with the Kafka client shell scripts and not worry about TLS settings. + Unfortunately, when Kerberos is enabled this file is incomplete and must be edited first before it can be used. ### Changed @@ -21,11 +25,15 @@ All notable changes to this project will be documented in this file. - use the new `server.yaml` for jmx configuration - update metrics tests - update monitoring doc +- Replace `kcat` with Kafka client scripts wherever possible ([#898]). + + At the moment, `kcat` is still used for liveliness probes and Kerberos tests. [#889]: https://github.com/stackabletech/kafka-operator/pull/889 [#890]: https://github.com/stackabletech/kafka-operator/pull/890 [#892]: https://github.com/stackabletech/kafka-operator/pull/892 [#897]: https://github.com/stackabletech/kafka-operator/pull/897 +[#898]: https://github.com/stackabletech/kafka-operator/pull/898 [#900]: https://github.com/stackabletech/kafka-operator/pull/900 ## [25.7.0] - 2025-07-23 From 0efffdbb3e6131e41c1b43cd337b8947eb38067f Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:46:00 +0200 Subject: [PATCH 09/11] Update CHANGELOG.md Co-authored-by: Malte Sander --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d1cd628..a405262d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ All notable changes to this project will be documented in this file. - Add `prometheus.io/path|port|scheme` annotations to metrics service ([#897]). - Add `client.properties` to the Kafka configuration config map ([#898]). - Use this file together with the Kafka client shell scripts and not worry about TLS settings. + Use this file together with the Kafka client shell scripts and preconfigured TLS settings. Unfortunately, when Kerberos is enabled this file is incomplete and must be edited first before it can be used. ### Changed From e51050d67d79de4a61ff89f8551484201a6f7121 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:47:17 +0200 Subject: [PATCH 10/11] Apply suggestions from code review Co-authored-by: Malte Sander --- docs/modules/kafka/pages/getting_started/first_steps.adoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/modules/kafka/pages/getting_started/first_steps.adoc b/docs/modules/kafka/pages/getting_started/first_steps.adoc index 410ebac5..6b5500e2 100644 --- a/docs/modules/kafka/pages/getting_started/first_steps.adoc +++ b/docs/modules/kafka/pages/getting_started/first_steps.adoc @@ -64,7 +64,7 @@ This creates the actual Kafka instance. == Verify that it works -Next we will use the Kafka client scripts to create a new topic, publish and consume some data. +Next, use the Kafka client scripts to create a topic and publish and consume data. The Kafka operator has created a service called `simple-kafka-broker-default-bootstrap`. This service represents the endpoint clients should initially connect to in order to publish and consume data. @@ -104,8 +104,8 @@ Internal Traffic Policy: Cluster Events: ---- -In the output we see that there are three endpoints serviced here. -There correspond to the three broker pods belonging to the Kafka cluster. +The output shows that there are three endpoints serviced here. +They correspond to the three broker pods belonging to the Kafka cluster. Then, create a port-forward on this service: From af709040590794b2c413c0463845b4fc66c87a8d Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 21 Oct 2025 12:10:20 +0200 Subject: [PATCH 11/11] implement review feedback --- rust/operator-binary/src/crd/security.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index aa9d3525..6c482eb8 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -22,7 +22,7 @@ use stackable_operator::{ shared::time::Duration, }; -use super::listener::node_port_cmd; +use super::listener::{KafkaListenerProtocol, node_port_cmd}; use crate::crd::{ LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, STACKABLE_KERBEROS_KRB5_PATH, STACKABLE_LISTENER_BROKER_DIR, @@ -305,7 +305,10 @@ impl KafkaTlsSecurity { let mut props = vec![]; if self.tls_client_authentication_class().is_some() { - props.push(("security.protocol".to_string(), Some("SSL".to_string()))); + props.push(( + "security.protocol".to_string(), + Some(KafkaListenerProtocol::Ssl.to_string()), + )); props.push(("ssl.client.auth".to_string(), Some("required".to_string()))); props.push(("ssl.keystore.type".to_string(), Some("PKCS12".to_string()))); props.push(( @@ -342,7 +345,7 @@ impl KafkaTlsSecurity { // It will also make the jaas files reusable by the Kafka shell scripts. props.push(( "security.protocol".to_string(), - Some("SASL_SSL".to_string()), + Some(KafkaListenerProtocol::SaslSsl.to_string()), )); props.push(("ssl.keystore.type".to_string(), Some("PKCS12".to_string()))); props.push(( @@ -391,7 +394,10 @@ impl KafkaTlsSecurity { pod="todo", realm="$KERBEROS_REALM")))); } else if self.tls_server_secret_class().is_some() { - props.push(("security.protocol".to_string(), Some("SSL".to_string()))); + props.push(( + "security.protocol".to_string(), + Some(KafkaListenerProtocol::Ssl.to_string()), + )); props.push(( "ssl.truststore.type".to_string(), Some("PKCS12".to_string()), @@ -408,11 +414,10 @@ impl KafkaTlsSecurity { Some(Self::SSL_STORE_PASSWORD.to_string()), )); } else { - // Empty client.properties. - // Unfortunately cannot add comments because the properties writer escapes them - // generating garbage. - // - // props.push(("# No SSL required to connect to Kafka".to_string(), None)); + props.push(( + "security.protocol".to_string(), + Some(KafkaListenerProtocol::Plaintext.to_string()), + )); } props