Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ All notable changes to this project will be documented in this file.
- BREAKING: The file log directory was set by `KAFKA_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
(or via `--rolling-logs <DIRECTORY>`).
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
of having the operator write it to the vector config ([#844]).

### Fixed

- Fix a bug where changes to ConfigMaps that are referenced in the KafkaCluster spec didn't trigger a reconciliation ([#844]).

### Fixed

- Use `json` file extension for log files ([#846]).

[#840]: https://github.com/stackabletech/kafka-operator/pull/840
[#844]: https://github.com/stackabletech/kafka-operator/pull/844
[#846]: https://github.com/stackabletech/kafka-operator/pull/846

## [25.3.0] - 2025-03-21
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/kafka-operator"

[workspace.dependencies]
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }

Expand Down
6 changes: 3 additions & 3 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 32 additions & 39 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ use crate::{
graceful_shutdown::{add_graceful_shutdown_config, graceful_shutdown_config_properties},
pdb::add_pdbs,
},
product_logging::{
LOG4J_CONFIG_FILE, MAX_KAFKA_LOG_FILES_SIZE, extend_role_group_config_map,
resolve_vector_aggregator_address,
},
product_logging::{LOG4J_CONFIG_FILE, MAX_KAFKA_LOG_FILES_SIZE, extend_role_group_config_map},
utils::build_recommended_labels,
};

Expand Down Expand Up @@ -253,10 +250,8 @@ pub enum Error {
#[snafu(display("failed to resolve and merge config for role and role group"))]
FailedToResolveConfig { source: crate::crd::Error },

#[snafu(display("failed to resolve the Vector aggregator address"))]
ResolveVectorAggregatorAddress {
source: crate::product_logging::Error,
},
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
VectorAggregatorConfigMapMissing,

#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
InvalidLoggingConfig {
Expand Down Expand Up @@ -396,7 +391,7 @@ impl ReconcilerError for Error {
Error::FailedToInitializeSecurityContext { .. } => None,
Error::CreateClusterResources { .. } => None,
Error::FailedToResolveConfig { .. } => None,
Error::ResolveVectorAggregatorAddress { .. } => None,
Error::VectorAggregatorConfigMapMissing { .. } => None,
Error::InvalidLoggingConfig { .. } => None,
Error::ApplyServiceAccount { .. } => None,
Error::ApplyRoleBinding { .. } => None,
Expand Down Expand Up @@ -508,10 +503,6 @@ pub async fn reconcile_kafka(
None
};

let vector_aggregator_address = resolve_vector_aggregator_address(kafka, client)
.await
.context(ResolveVectorAggregatorAddressSnafu)?;

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
Expand Down Expand Up @@ -550,7 +541,6 @@ pub async fn reconcile_kafka(
&rolegroup_ref,
rolegroup_config,
&merged_config,
vector_aggregator_address.as_deref(),
)?;
let rg_statefulset = build_broker_rolegroup_statefulset(
kafka,
Expand Down Expand Up @@ -688,7 +678,6 @@ fn build_broker_rolegroup_config_map(
rolegroup: &RoleGroupRef<v1alpha1::KafkaCluster>,
broker_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &KafkaConfig,
vector_aggregator_address: Option<&str>,
) -> Result<ConfigMap> {
let mut server_cfg = broker_config
.get(&PropertyNameKind::File(SERVER_PROPERTIES_FILE.to_string()))
Expand Down Expand Up @@ -751,15 +740,11 @@ fn build_broker_rolegroup_config_map(
tracing::debug!(?server_cfg, "Applied server config");
tracing::debug!(?jvm_sec_props, "Applied JVM config");

extend_role_group_config_map(
rolegroup,
vector_aggregator_address,
&merged_config.logging,
&mut cm_builder,
)
.context(InvalidLoggingConfigSnafu {
cm_name: rolegroup.object_name(),
})?;
extend_role_group_config_map(rolegroup, &merged_config.logging, &mut cm_builder).context(
InvalidLoggingConfigSnafu {
cm_name: rolegroup.object_name(),
},
)?;

cm_builder
.build()
Expand Down Expand Up @@ -1114,21 +1099,29 @@ fn build_broker_rolegroup_statefulset(

// Add vector container after kafka container to keep the defaulting into kafka container
if merged_config.logging.enable_vector_agent {
pod_builder.add_container(
product_logging::framework::vector_container(
resolved_product_image,
"config",
"log",
merged_config.logging.containers.get(&Container::Vector),
ResourceRequirementsBuilder::new()
.with_cpu_request("250m")
.with_cpu_limit("500m")
.with_memory_request("128Mi")
.with_memory_limit("128Mi")
.build(),
)
.context(ConfigureLoggingSnafu)?,
);
match &kafka.spec.cluster_config.vector_aggregator_config_map_name {
Some(vector_aggregator_config_map_name) => {
pod_builder.add_container(
product_logging::framework::vector_container(
resolved_product_image,
"config",
"log",
merged_config.logging.containers.get(&Container::Vector),
ResourceRequirementsBuilder::new()
.with_cpu_request("250m")
.with_cpu_limit("500m")
.with_memory_request("128Mi")
.with_memory_limit("128Mi")
.build(),
vector_aggregator_config_map_name,
)
.context(ConfigureLoggingSnafu)?,
);
}
None => {
VectorAggregatorConfigMapMissingSnafu.fail()?;
}
}
}

add_graceful_shutdown_config(merged_config, &mut pod_builder).context(GracefulShutdownSnafu)?;
Expand Down
130 changes: 82 additions & 48 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use stackable_operator::{
rbac::v1::RoleBinding,
},
kube::{
ResourceExt,
core::DeserializeGuard,
runtime::{
Controller,
events::{Recorder, Reporter},
reflector::ObjectRef,
watcher,
},
},
Expand Down Expand Up @@ -156,55 +158,87 @@ pub async fn create_controller(
instance: None,
}));

Controller::new(
let kafka_controller = Controller::new(
namespace.get_api::<DeserializeGuard<v1alpha1::KafkaCluster>>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<StatefulSet>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Service>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Listener>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ConfigMap>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ServiceAccount>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<RoleBinding>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
.run(
kafka_controller::reconcile_kafka,
kafka_controller::error_policy,
Arc::new(kafka_controller::Ctx {
client: client.clone(),
product_config,
}),
)
// We can let the reporting happen in the background
.for_each_concurrent(
16, // concurrency limit
move |result| {
// The event_recorder needs to be shared across all invocations, so that
// events are correctly aggregated
let event_recorder = event_recorder.clone();
async move {
report_controller_reconciled(&event_recorder, KAFKA_FULL_CONTROLLER_NAME, &result)
);
let config_map_store = kafka_controller.store();
kafka_controller
.owns(
namespace.get_api::<StatefulSet>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Service>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<Listener>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ConfigMap>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<ServiceAccount>(&client),
watcher::Config::default(),
)
.owns(
namespace.get_api::<RoleBinding>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
.watches(
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
move |config_map| {
config_map_store
.state()
.into_iter()
.filter(move |kafka| references_config_map(kafka, &config_map))
.map(|kafka| ObjectRef::from_obj(&*kafka))
},
)
.run(
kafka_controller::reconcile_kafka,
kafka_controller::error_policy,
Arc::new(kafka_controller::Ctx {
client: client.clone(),
product_config,
}),
)
// We can let the reporting happen in the background
.for_each_concurrent(
16, // concurrency limit
move |result| {
// The event_recorder needs to be shared across all invocations, so that
// events are correctly aggregated
let event_recorder = event_recorder.clone();
async move {
report_controller_reconciled(
&event_recorder,
KAFKA_FULL_CONTROLLER_NAME,
&result,
)
.await;
}
},
)
.await;
}
},
)
.await;
}

fn references_config_map(
kafka: &DeserializeGuard<v1alpha1::KafkaCluster>,
config_map: &DeserializeGuard<ConfigMap>,
) -> bool {
let Ok(kafka) = &kafka.0 else {
return false;
};

kafka.spec.cluster_config.zookeeper_config_map_name == config_map.name_any()
|| match &kafka.spec.cluster_config.authorization.opa {
Some(opa_config) => opa_config.config_map_name == config_map.name_any(),
None => false,
}
}
Loading
Loading