diff --git a/CHANGELOG.md b/CHANGELOG.md index b14355b8..76d0b100 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,15 @@ - BREAKING: The file log directory was set by `SUPERSET_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS` (or via `--rolling-logs `). - Replace stackable-operator `print_startup_string` with `tracing::info!` with fields. +- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead + of having the operator write it to the vector config ([#609]). ### Fixed - Use `json` file extension for log files ([#615]). +- Fix a bug where changes to ConfigMaps that are referenced in the SupersetCluster spec didn't trigger a reconciliation ([#609]). +[#609]: https://github.com/stackabletech/superset-operator/pull/609 [#610]: https://github.com/stackabletech/superset-operator/pull/610 [#615]: https://github.com/stackabletech/superset-operator/pull/615 diff --git a/Cargo.lock b/Cargo.lock index 32a54c73..f7c4ddd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2622,8 +2622,8 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "stackable-operator" -version = "0.89.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +version = "0.90.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "chrono", "clap", @@ -2658,7 +2658,7 @@ dependencies = [ [[package]] name = "stackable-operator-derive" version = "0.3.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "darling", "proc-macro2", @@ -2669,7 +2669,7 @@ dependencies = [ [[package]] name = "stackable-shared" version = "0.0.1" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#cd73728af410c52972b9a9a3ba1302bcdb574d04" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#ea063b4595caa20c82d37c595487c76476c9ab10" dependencies = [ "kube", "semver", diff --git a/Cargo.nix b/Cargo.nix index 50ffae04..30dd1cf7 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -8473,13 +8473,13 @@ rec { }; "stackable-operator" = rec { crateName = "stackable-operator"; - version = "0.89.1"; + version = "0.90.0"; edition = "2024"; workspace_member = null; src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; - rev = "cd73728af410c52972b9a9a3ba1302bcdb574d04"; - sha256 = "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9"; + rev = "ea063b4595caa20c82d37c595487c76476c9ab10"; + sha256 = "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2"; }; libName = "stackable_operator"; authors = [ @@ -8624,8 +8624,8 @@ rec { workspace_member = null; src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; - rev = "cd73728af410c52972b9a9a3ba1302bcdb574d04"; - sha256 = "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9"; + rev = "ea063b4595caa20c82d37c595487c76476c9ab10"; + sha256 = "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2"; }; procMacro = true; libName = "stackable_operator_derive"; @@ -8659,8 +8659,8 @@ rec { workspace_member = null; src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; - rev = "cd73728af410c52972b9a9a3ba1302bcdb574d04"; - sha256 = "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9"; + rev = "ea063b4595caa20c82d37c595487c76476c9ab10"; + sha256 = "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2"; }; libName = "stackable_shared"; authors = [ diff --git a/Cargo.toml b/Cargo.toml index 0868d540..53dc630b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/superset-operator" [workspace.dependencies] product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" } -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" } stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" } stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" } diff --git a/crate-hashes.json b/crate-hashes.json index 0460dbd5..c8a9703a 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -1,7 +1,7 @@ { - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#stackable-operator-derive@0.3.1": "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#stackable-operator@0.89.1": "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.89.1#stackable-shared@0.0.1": "1hrxrybc6197ibx0m2wfxlg5pdg4hanf6xvslzrhsp77a04pb0y9", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#stackable-operator-derive@0.3.1": "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#stackable-operator@0.90.0": "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.90.0#stackable-shared@0.0.1": "0fclvpxhchykqd7bl8hscr4v06mbs2v5vjp0xv27nvqr94j63xs2", "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-telemetry-0.4.0#stackable-telemetry@0.4.0": "0hcm64fb2ngyalq8rci5lrr881prg023pq9cd1sfr79iynbr6a26", "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.7.1#k8s-version@0.1.2": "16klfwx3kz3ys7afwjicfj8msws9a718izx09jspwwpff3rl6wsi", "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-versioned-0.7.1#stackable-versioned-macros@0.7.1": "16klfwx3kz3ys7afwjicfj8msws9a718izx09jspwwpff3rl6wsi", diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index e1a0c4c5..52ffdeed 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -150,7 +150,8 @@ async fn main() -> anyhow::Result<()> { watch_namespace.get_api::>(&client), watcher::Config::default(), ); - let superset_store_1 = superset_controller.store(); + let authentication_class_store = superset_controller.store(); + let config_map_store = superset_controller.store(); let superset_controller = superset_controller .owns( watch_namespace.get_api::>(&client), @@ -165,7 +166,7 @@ async fn main() -> anyhow::Result<()> { client.get_api::>(&()), watcher::Config::default(), move |authentication_class| { - superset_store_1 + authentication_class_store .state() .into_iter() .filter(move |superset| { @@ -174,6 +175,17 @@ async fn main() -> anyhow::Result<()> { .map(|superset| ObjectRef::from_obj(&*superset)) }, ) + .watches( + watch_namespace.get_api::>(&client), + watcher::Config::default(), + move |config_map| { + config_map_store + .state() + .into_iter() + .filter(move |superset| references_config_map(superset, &config_map)) + .map(|superset| ObjectRef::from_obj(&*superset)) + }, + ) .run( superset_controller::reconcile_superset, superset_controller::error_policy, @@ -212,16 +224,16 @@ async fn main() -> anyhow::Result<()> { ), watcher::Config::default(), ); - let druid_connection_store_1 = druid_connection_controller.store(); - let druid_connection_store_2 = druid_connection_controller.store(); - let druid_connection_store_3 = druid_connection_controller.store(); + let superset_cluster_store = druid_connection_controller.store(); + let job_store = druid_connection_controller.store(); + let config_map_store = druid_connection_controller.store(); let druid_connection_controller = druid_connection_controller .shutdown_on_signal() .watches( watch_namespace.get_api::>(&client), watcher::Config::default(), move |superset_cluster| { - druid_connection_store_1 + superset_cluster_store .state() .into_iter() .filter(move |druid_connection| { @@ -234,7 +246,7 @@ async fn main() -> anyhow::Result<()> { watch_namespace.get_api::>(&client), watcher::Config::default(), move |job| { - druid_connection_store_2 + job_store .state() .into_iter() .filter(move |druid_connection| valid_druid_job(druid_connection, &job)) @@ -245,7 +257,7 @@ async fn main() -> anyhow::Result<()> { watch_namespace.get_api::>(&client), watcher::Config::default(), move |config_map| { - druid_connection_store_3 + config_map_store .state() .into_iter() .filter(move |druid_connection| { @@ -305,6 +317,26 @@ fn references_authentication_class( .any(|c| c.common.authentication_class_name() == &authentication_class_name) } +fn references_config_map( + superset: &DeserializeGuard, + config_map: &DeserializeGuard, +) -> bool { + let Ok(superset) = &superset.0 else { + return false; + }; + + match &superset.spec.cluster_config.authorization { + Some(superset_authorization) => { + superset_authorization + .role_mapping_from_opa + .opa + .config_map_name + == config_map.name_any() + } + None => false, + } +} + fn valid_druid_connection( superset_cluster: &DeserializeGuard, druid_connection: &DeserializeGuard, diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 73de7598..c38a9038 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -1,10 +1,8 @@ use std::fmt::{Display, Write}; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::Snafu; use stackable_operator::{ builder::configmap::ConfigMapBuilder, - client::Client, - k8s_openapi::api::core::v1::ConfigMap, kube::Resource, product_logging::{ self, @@ -31,54 +29,17 @@ pub enum Error { entry: &'static str, cm_name: String, }, - #[snafu(display("vectorAggregatorConfigMapName must be set"))] - MissingVectorAggregatorAddress, } type Result = std::result::Result; pub const LOG_CONFIG_FILE: &str = "log_config.py"; -const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS"; const LOG_FILE: &str = "superset.py.json"; -/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the -/// cluster spec -pub async fn resolve_vector_aggregator_address( - client: &Client, - cluster: &T, - vector_aggregator_config_map_name: Option<&str>, -) -> Result> { - let vector_aggregator_address = - if let Some(vector_aggregator_config_map_name) = vector_aggregator_config_map_name { - let namespace = cluster - .meta() - .namespace - .as_deref() - .context(ObjectHasNoNamespaceSnafu)?; - let vector_aggregator_address = client - .get::(vector_aggregator_config_map_name, namespace) - .await - .context(ConfigMapNotFoundSnafu { - cm_name: vector_aggregator_config_map_name.to_string(), - })? - .data - .and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY)) - .context(MissingConfigMapEntrySnafu { - entry: VECTOR_AGGREGATOR_CM_ENTRY, - cm_name: vector_aggregator_config_map_name.to_string(), - })?; - Some(vector_aggregator_address) - } else { - None - }; - Ok(vector_aggregator_address) -} - /// Extend the ConfigMap with logging and Vector configurations pub fn extend_config_map_with_log_config( rolegroup: &RoleGroupRef, - vector_aggregator_address: Option<&str>, logging: &Logging, main_container: &C, vector_container: &C, @@ -111,11 +72,7 @@ where if logging.enable_vector_agent { cm_builder.add_data( product_logging::framework::VECTOR_CONFIG_FILE, - product_logging::framework::create_vector_config( - rolegroup, - vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?, - vector_log_config, - ), + product_logging::framework::create_vector_config(rolegroup, vector_log_config), ); } diff --git a/rust/operator-binary/src/superset_controller.rs b/rust/operator-binary/src/superset_controller.rs index a5ff71b4..41e9b126 100644 --- a/rust/operator-binary/src/superset_controller.rs +++ b/rust/operator-binary/src/superset_controller.rs @@ -82,9 +82,7 @@ use crate::{ v1alpha1::{Container, SupersetCluster, SupersetClusterStatus, SupersetConfig}, }, operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs}, - product_logging::{ - LOG_CONFIG_FILE, extend_config_map_with_log_config, resolve_vector_aggregator_address, - }, + product_logging::{LOG_CONFIG_FILE, extend_config_map_with_log_config}, util::build_recommended_labels, }; @@ -195,10 +193,8 @@ pub enum Error { #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crate::crd::Error }, - #[snafu(display("failed to resolve the Vector aggregator address"))] - ResolveVectorAggregatorAddress { - source: crate::product_logging::Error, - }, + #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] + VectorAggregatorConfigMapMissing, #[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))] InvalidLoggingConfig { @@ -322,18 +318,6 @@ pub async fn reconcile_superset( let cluster_operation_cond_builder = ClusterOperationsConditionBuilder::new(&superset.spec.cluster_config.cluster_operation); - let vector_aggregator_address = resolve_vector_aggregator_address( - client, - superset, - superset - .spec - .cluster_config - .vector_aggregator_config_map_name - .as_deref(), - ) - .await - .context(ResolveVectorAggregatorAddressSnafu)?; - let auth_config = SupersetClientAuthenticationDetailsResolved::from( &superset.spec.cluster_config.authentication, client, @@ -430,7 +414,6 @@ pub async fn reconcile_superset( &auth_config, &superset_opa_config, &config.logging, - vector_aggregator_address.as_deref(), )?; let rg_statefulset = build_server_rolegroup_statefulset( superset, @@ -559,7 +542,6 @@ fn build_rolegroup_config_map( authentication_config: &SupersetClientAuthenticationDetailsResolved, superset_opa_config: &Option, logging: &Logging, - vector_aggregator_address: Option<&str>, ) -> Result { let mut config_properties = BTreeMap::new(); let mut imports = PYTHON_IMPORTS.to_vec(); @@ -640,7 +622,6 @@ fn build_rolegroup_config_map( extend_config_map_with_log_config( rolegroup, - vector_aggregator_address, logging, &Container::Superset, &Container::Vector, @@ -908,21 +889,33 @@ fn build_server_rolegroup_statefulset( pb.add_container(metrics_container); if merged_config.logging.enable_vector_agent { - pb.add_container( - product_logging::framework::vector_container( - resolved_product_image, - CONFIG_VOLUME_NAME, - LOG_VOLUME_NAME, - merged_config.logging.containers.get(&Container::Vector), - ResourceRequirementsBuilder::new() - .with_cpu_request("250m") - .with_cpu_limit("500m") - .with_memory_request("128Mi") - .with_memory_limit("128Mi") - .build(), - ) - .context(ConfigureLoggingSnafu)?, - ); + match &superset + .spec + .cluster_config + .vector_aggregator_config_map_name + { + Some(vector_aggregator_config_map_name) => { + pb.add_container( + product_logging::framework::vector_container( + resolved_product_image, + CONFIG_VOLUME_NAME, + LOG_VOLUME_NAME, + merged_config.logging.containers.get(&Container::Vector), + ResourceRequirementsBuilder::new() + .with_cpu_request("250m") + .with_cpu_limit("500m") + .with_memory_request("128Mi") + .with_memory_limit("128Mi") + .build(), + vector_aggregator_config_map_name, + ) + .context(ConfigureLoggingSnafu)?, + ); + } + None => { + VectorAggregatorConfigMapMissingSnafu.fail()?; + } + } } let mut pod_template = pb.build_template();