Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ All notable changes to this project will be documented in this file.
config property `requestedSecretLifetime`. This helps reduce frequent Pod restarts ([#796]).
- Run a `containerdebug` process in the background of each Kafka container to collect debugging information ([#803]).
- Aggregate emitted Kubernetes events on the CustomResources ([#809]).
- Support configuring JVM arguments ([#819]).

[#796]: https://github.com/stackabletech/kafka-operator/pull/796
[#803]: https://github.com/stackabletech/kafka-operator/pull/803
[#809]: https://github.com/stackabletech/kafka-operator/pull/809
[#813]: https://github.com/stackabletech/kafka-operator/pull/813
[#819]: https://github.com/stackabletech/kafka-operator/pull/819

## [24.11.1] - 2025-01-10

Expand Down
52 changes: 52 additions & 0 deletions deploy/helm/kafka-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,32 @@ spec:
default: {}
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
type: object
jvmArgumentOverrides:
default:
add: []
remove: []
removeRegex: []
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
properties:
add:
default: []
description: JVM arguments to be added
items:
type: string
type: array
remove:
default: []
description: JVM arguments to be removed by exact match
items:
type: string
type: array
removeRegex:
default: []
description: JVM arguments matching any of this regexes will be removed
items:
type: string
type: array
type: object
podOverrides:
default: {}
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
Expand Down Expand Up @@ -541,6 +567,32 @@ spec:
default: {}
description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.'
type: object
jvmArgumentOverrides:
default:
add: []
remove: []
removeRegex: []
description: Allows overriding JVM arguments. Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) for details on the usage.
properties:
add:
default: []
description: JVM arguments to be added
items:
type: string
type: array
remove:
default: []
description: JVM arguments to be removed by exact match
items:
type: string
type: array
removeRegex:
default: []
description: JVM arguments matching any of this regexes will be removed
items:
type: string
type: array
type: object
podOverrides:
default: {}
description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,10 @@ servers:

The Kafka operator also supports Pod overrides, allowing you to override any property that you can set on a Kubernetes Pod.
Read the xref:concepts:overrides.adoc#pod-overrides[Pod overrides documentation] to learn more about this feature.

== JVM argument overrides

Stackable operators automatically determine the set of needed JVM arguments, such as memory settings or trust- and keystores.
Using JVM argument overrides you can configure the JVM arguments xref:concepts:overrides.adoc#jvm-argument-overrides[according to the concepts page].

One thing that is different for Kafka, is that all head-related arguments will be passed in via the env variable `KAFKA_HEAP_OPTS`, all the other ones via `EXTRA_ARGS`.
2 changes: 1 addition & 1 deletion docs/modules/kafka/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
** xref:kafka:usage-guide/security.adoc[]
** xref:kafka:usage-guide/monitoring.adoc[]
** xref:kafka:usage-guide/logging.adoc[]
** xref:kafka:usage-guide/configuration-environment-overrides.adoc[]
** xref:kafka:usage-guide/overrides.adoc[]
** xref:kafka:usage-guide/operations/index.adoc[]
*** xref:kafka:usage-guide/operations/cluster-operations.adoc[]
*** xref:kafka:usage-guide/operations/pod-placement.adoc[]
Expand Down
13 changes: 7 additions & 6 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use stackable_operator::{
kube::{runtime::reflector::ObjectRef, CustomResource, ResourceExt},
product_config_utils::Configuration,
product_logging::{self, spec::Logging},
role_utils::{
GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, RoleGroupRef,
},
role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef},
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
time::Duration,
Expand Down Expand Up @@ -129,7 +127,7 @@ pub struct KafkaClusterSpec {
pub image: ProductImage,

// no doc - docs in Role struct.
pub brokers: Option<Role<KafkaConfigFragment>>,
pub brokers: Option<Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>>,

/// Kafka settings that affect all roles and role groups.
/// The settings in the `clusterConfig` are cluster wide settings that do not need to be configurable at role or role group level.
Expand Down Expand Up @@ -191,7 +189,10 @@ impl KafkaCluster {
}
}

pub fn role(&self, role_variant: &KafkaRole) -> Result<&Role<KafkaConfigFragment>, Error> {
pub fn role(
&self,
role_variant: &KafkaRole,
) -> Result<&Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>, Error> {
match role_variant {
KafkaRole::Broker => self.spec.brokers.as_ref(),
}
Expand All @@ -203,7 +204,7 @@ impl KafkaCluster {
pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
) -> Result<&RoleGroup<KafkaConfigFragment, GenericProductSpecificCommonConfig>, Error> {
) -> Result<&RoleGroup<KafkaConfigFragment, JavaCommonConfig>, Error> {
let role_variant =
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
role: rolegroup_ref.role.to_owned(),
Expand Down
200 changes: 200 additions & 0 deletions rust/operator-binary/src/config/jvm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_kafka_crd::{
KafkaConfig, KafkaConfigFragment, JVM_SECURITY_PROPERTIES_FILE, METRICS_PORT,
STACKABLE_CONFIG_DIR,
};
use stackable_operator::{
memory::{BinaryMultiple, MemoryQuantity},
role_utils::{self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role},
};

const JAVA_HEAP_FACTOR: f32 = 0.8;

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("invalid memory resource configuration - missing default or value in crd?"))]
MissingMemoryResourceConfig,

#[snafu(display("invalid memory config"))]
InvalidMemoryConfig {
source: stackable_operator::memory::Error,
},

#[snafu(display("failed to merge jvm argument overrides"))]
MergeJvmArgumentOverrides { source: role_utils::Error },
}

/// All JVM arguments.
fn construct_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<Vec<String>, Error> {
let heap_size = MemoryQuantity::try_from(
merged_config
.resources
.memory
.limit
.as_ref()
.context(MissingMemoryResourceConfigSnafu)?,
)
.context(InvalidMemoryConfigSnafu)?
.scale_to(BinaryMultiple::Mebi)
* JAVA_HEAP_FACTOR;
let java_heap = heap_size
.format_for_java()
.context(InvalidMemoryConfigSnafu)?;

let jvm_args = vec![
// Heap settings
format!("-Xmx{java_heap}"),
format!("-Xms{java_heap}"),
format!("-Djava.security.properties={STACKABLE_CONFIG_DIR}/{JVM_SECURITY_PROPERTIES_FILE}"),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/broker.yaml")
];

let operator_generated = JvmArgumentOverrides::new_with_only_additions(jvm_args);
let merged = role
.get_merged_jvm_argument_overrides(role_group, &operator_generated)
.context(MergeJvmArgumentOverridesSnafu)?;
Ok(merged
.effective_jvm_config_after_merging()
// Sorry for the clone, that's how operator-rs is currently modelled :P
.clone())
}

/// Arguments that go into `EXTRA_ARGS`, so *not* the heap settings (which you cen get using
/// [`construct_heap_jvm_args`]).
pub fn construct_non_heap_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<String, Error> {
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
jvm_args.retain(|arg| !is_heap_jvm_argument(arg));

Ok(jvm_args.join(" "))
}

/// Arguments that go into `KAFKA_HEAP_OPTS`.
/// You can get the normal JVM arguments using [`construct_non_heap_jvm_args`].
pub fn construct_heap_jvm_args(
merged_config: &KafkaConfig,
role: &Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
role_group: &str,
) -> Result<String, Error> {
let mut jvm_args = construct_jvm_args(merged_config, role, role_group)?;
jvm_args.retain(|arg| is_heap_jvm_argument(arg));

Ok(jvm_args.join(" "))
}

fn is_heap_jvm_argument(jvm_argument: &str) -> bool {
let lowercase = jvm_argument.to_lowercase();

lowercase.starts_with("-xms") || lowercase.starts_with("-xmx")
}

#[cfg(test)]
mod tests {
use stackable_kafka_crd::{KafkaCluster, KafkaRole};

use super::*;

#[test]
fn test_construct_jvm_arguments_defaults() {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.7.1
clusterConfig:
zookeeperConfigMapName: xyz
brokers:
roleGroups:
default:
replicas: 1
"#;
let (kafka_role, role, merged_config) = construct_boilerplate(input);
let non_heap_jvm_args =
construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();

assert_eq!(
non_heap_jvm_args,
"-Djava.security.properties=/stackable/config/security.properties \
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml"
);
assert_eq!(heap_jvm_args, "-Xmx819m -Xms819m");
}

#[test]
fn test_construct_jvm_argument_overrides() {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.7.1
clusterConfig:
zookeeperConfigMapName: xyz
brokers:
config:
resources:
memory:
limit: 42Gi
jvmArgumentOverrides:
add:
- -Dhttps.proxyHost=proxy.my.corp
- -Dhttps.proxyPort=8080
- -Djava.net.preferIPv4Stack=true
roleGroups:
default:
replicas: 1
jvmArgumentOverrides:
# We need more memory!
removeRegex:
- -Xmx.*
- -Dhttps.proxyPort=.*
add:
- -Xmx40000m
- -Dhttps.proxyPort=1234
"#;
let (kafka_role, role, merged_config) = construct_boilerplate(input);
let non_heap_jvm_args =
construct_non_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();
let heap_jvm_args = construct_heap_jvm_args(&kafka_role, &role, &merged_config).unwrap();

assert_eq!(
non_heap_jvm_args,
"-Djava.security.properties=/stackable/config/security.properties \
-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar=9606:/stackable/jmx/broker.yaml \
-Dhttps.proxyHost=proxy.my.corp \
-Djava.net.preferIPv4Stack=true \
-Dhttps.proxyPort=1234"
);
assert_eq!(heap_jvm_args, "-Xms34406m -Xmx40000m");
}

fn construct_boilerplate(
kafka_cluster: &str,
) -> (
KafkaConfig,
Role<KafkaConfigFragment, GenericRoleConfig, JavaCommonConfig>,
String,
) {
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");

let kafka_role = KafkaRole::Broker;
let rolegroup_ref = kafka.broker_rolegroup_ref("default");
let merged_config = kafka.merged_config(&kafka_role, &rolegroup_ref).unwrap();
let role = kafka.spec.brokers.unwrap();

(merged_config, role, "default".to_owned())
}
}
1 change: 1 addition & 0 deletions rust/operator-binary/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod jvm;
Loading