Skip to content

Commit 2d8bfa1

Browse files
feat: Inject vector aggregator address as env into vector config (#772)
* inject vector aggregator address as env into config & add watch for referenced cms * add changelog entry * add pr number to changelog * stop ignoring unencountered rustsec advisories * rename stores * chore: Use borrows --------- Co-authored-by: Nick Larsen <[email protected]>
1 parent 866c32e commit 2d8bfa1

File tree

7 files changed

+68
-111
lines changed

7 files changed

+68
-111
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ All notable changes to this project will be documented in this file.
1010
- BREAKING: The file log directory was set by `NIFI_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
1111
(or via `--rolling-logs <DIRECTORY>`).
1212
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#772]).
1315

1416
### Fixed
1517

1618
- Use `json` file extension for log files ([#774]).
19+
- Fix a bug where changes to ConfigMaps that are referenced in the NifiCluster spec didn't trigger a reconciliation ([#772]).
1720

1821
[#767]: https://github.com/stackabletech/nifi-operator/pull/767
22+
[#772]: https://github.com/stackabletech/nifi-operator/pull/772
1923
[#774]: https://github.com/stackabletech/nifi-operator/pull/774
2024

2125
## [25.3.0] - 2025-03-21

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/nifi-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

deny.toml

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,6 @@ targets = [
99

1010
[advisories]
1111
yanked = "deny"
12-
ignore = [
13-
# https://rustsec.org/advisories/RUSTSEC-2023-0071
14-
# "rsa" crate: Marvin Attack: potential key recovery through timing sidechannel
15-
#
16-
# No patch is yet available, however work is underway to migrate to a fully constant-time implementation
17-
# So we need to accept this, as of SDP 24.11 we are not using the rsa crate to create certificates used in production
18-
# setups.
19-
#
20-
# TODO: Remove after https://github.com/RustCrypto/RSA/pull/394 is merged
21-
"RUSTSEC-2023-0071",
22-
23-
# https://rustsec.org/advisories/RUSTSEC-2024-0384
24-
# "instant" is unmaintained
25-
#
26-
# The upstream "kube" crate also silenced this in https://github.com/kube-rs/kube/commit/4f1e889f265da8f19f03f60683569cae1a154fda
27-
# They/we are actively working on migrating kube from backoff to backon, which removes the transitive dependency on
28-
# instant, in https://github.com/kube-rs/kube/pull/1652.
29-
#
30-
# TODO: Remove after https://github.com/kube-rs/kube/pull/1652 is merged
31-
"RUSTSEC-2024-0384",
32-
]
3312

3413
[bans]
3514
multiple-versions = "allow"

rust/operator-binary/src/controller.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ use crate::{
8888
v1alpha1,
8989
},
9090
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
91-
product_logging::{extend_role_group_config_map, resolve_vector_aggregator_address},
91+
product_logging::extend_role_group_config_map,
9292
reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name},
9393
security::{
9494
authentication::{
@@ -254,10 +254,8 @@ pub enum Error {
254254
#[snafu(display("failed to resolve and merge config for role and role group"))]
255255
FailedToResolveConfig { source: crate::crd::Error },
256256

257-
#[snafu(display("failed to resolve the Vector aggregator address"))]
258-
ResolveVectorAggregatorAddress {
259-
source: crate::product_logging::Error,
260-
},
257+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
258+
VectorAggregatorConfigMapMissing,
261259

262260
#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
263261
InvalidLoggingConfig {
@@ -515,10 +513,6 @@ pub async fn reconcile_nifi(
515513
.context(SecuritySnafu)?;
516514
}
517515

518-
let vector_aggregator_address = resolve_vector_aggregator_address(nifi, client)
519-
.await
520-
.context(ResolveVectorAggregatorAddressSnafu)?;
521-
522516
let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
523517
nifi,
524518
APP_NAME,
@@ -572,7 +566,6 @@ pub async fn reconcile_nifi(
572566
&rolegroup,
573567
rolegroup_config,
574568
&merged_config,
575-
vector_aggregator_address.as_deref(),
576569
&proxy_hosts,
577570
)
578571
.await?;
@@ -747,7 +740,6 @@ async fn build_node_rolegroup_config_map(
747740
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
748741
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
749742
merged_config: &NifiConfig,
750-
vector_aggregator_address: Option<&str>,
751743
proxy_hosts: &str,
752744
) -> Result<ConfigMap> {
753745
tracing::debug!("building rolegroup configmaps");
@@ -833,15 +825,11 @@ async fn build_node_rolegroup_config_map(
833825
})?,
834826
);
835827

836-
extend_role_group_config_map(
837-
rolegroup,
838-
vector_aggregator_address,
839-
&merged_config.logging,
840-
&mut cm_builder,
841-
)
842-
.context(InvalidLoggingConfigSnafu {
843-
cm_name: rolegroup.object_name(),
844-
})?;
828+
extend_role_group_config_map(rolegroup, &merged_config.logging, &mut cm_builder).context(
829+
InvalidLoggingConfigSnafu {
830+
cm_name: rolegroup.object_name(),
831+
},
832+
)?;
845833

846834
cm_builder
847835
.build()
@@ -1244,21 +1232,29 @@ async fn build_node_rolegroup_statefulset(
12441232
}
12451233

12461234
if merged_config.logging.enable_vector_agent {
1247-
pod_builder.add_container(
1248-
product_logging::framework::vector_container(
1249-
resolved_product_image,
1250-
"config",
1251-
"log",
1252-
merged_config.logging.containers.get(&Container::Vector),
1253-
ResourceRequirementsBuilder::new()
1254-
.with_cpu_request("250m")
1255-
.with_cpu_limit("500m")
1256-
.with_memory_request("128Mi")
1257-
.with_memory_limit("128Mi")
1258-
.build(),
1259-
)
1260-
.context(ConfigureLoggingSnafu)?,
1261-
);
1235+
match &nifi.spec.cluster_config.vector_aggregator_config_map_name {
1236+
Some(vector_aggregator_config_map_name) => {
1237+
pod_builder.add_container(
1238+
product_logging::framework::vector_container(
1239+
resolved_product_image,
1240+
"config",
1241+
"log",
1242+
merged_config.logging.containers.get(&Container::Vector),
1243+
ResourceRequirementsBuilder::new()
1244+
.with_cpu_request("250m")
1245+
.with_cpu_limit("500m")
1246+
.with_memory_request("128Mi")
1247+
.with_memory_limit("128Mi")
1248+
.build(),
1249+
vector_aggregator_config_map_name,
1250+
)
1251+
.context(ConfigureLoggingSnafu)?,
1252+
);
1253+
}
1254+
None => {
1255+
VectorAggregatorConfigMapMissingSnafu.fail()?;
1256+
}
1257+
}
12621258
}
12631259

12641260
nifi_auth_config

rust/operator-binary/src/main.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use stackable_operator::{
1111
core::v1::{ConfigMap, Service},
1212
},
1313
kube::{
14+
ResourceExt,
1415
core::DeserializeGuard,
1516
runtime::{
1617
Controller,
@@ -139,7 +140,8 @@ async fn main() -> anyhow::Result<()> {
139140
watcher::Config::default(),
140141
);
141142

142-
let nifi_store_1 = nifi_controller.store();
143+
let authentication_class_store = nifi_controller.store();
144+
let config_map_store = nifi_controller.store();
143145

144146
nifi_controller
145147
.owns(
@@ -159,12 +161,23 @@ async fn main() -> anyhow::Result<()> {
159161
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
160162
watcher::Config::default(),
161163
move |_| {
162-
nifi_store_1
164+
authentication_class_store
163165
.state()
164166
.into_iter()
165167
.map(|nifi| ObjectRef::from_obj(&*nifi))
166168
},
167169
)
170+
.watches(
171+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
172+
watcher::Config::default(),
173+
move |config_map| {
174+
config_map_store
175+
.state()
176+
.into_iter()
177+
.filter(move |nifi| references_config_map(nifi, &config_map))
178+
.map(|nifi| ObjectRef::from_obj(&*nifi))
179+
},
180+
)
168181
.run(
169182
controller::reconcile_nifi,
170183
controller::error_policy,
@@ -196,3 +209,14 @@ async fn main() -> anyhow::Result<()> {
196209

197210
Ok(())
198211
}
212+
213+
fn references_config_map(
214+
nifi: &DeserializeGuard<v1alpha1::NifiCluster>,
215+
config_map: &DeserializeGuard<ConfigMap>,
216+
) -> bool {
217+
let Ok(nifi) = &nifi.0 else {
218+
return false;
219+
};
220+
221+
nifi.spec.cluster_config.zookeeper_config_map_name == config_map.name_any()
222+
}

rust/operator-binary/src/product_logging.rs

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use snafu::{OptionExt, ResultExt, Snafu};
1+
use snafu::Snafu;
22
use stackable_operator::{
33
builder::configmap::ConfigMapBuilder,
4-
client::Client,
5-
k8s_openapi::api::core::v1::ConfigMap,
6-
kube::ResourceExt,
74
memory::BinaryMultiple,
85
product_logging::{
96
self,
@@ -39,7 +36,6 @@ type Result<T, E = Error> = std::result::Result<T, E>;
3936
pub const LOGBACK_CONFIG_FILE: &str = "logback.xml";
4037
pub const NIFI_LOG_FILE: &str = "nifi.log4j.xml";
4138

42-
const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS";
4339
const CONSOLE_CONVERSION_PATTERN: &str = "%date %level [%thread] %logger{40} %msg%n";
4440
// This is required to remove double entries in the nifi.log4j.xml as well as nested
4541
// console output like: "<timestamp> <loglevel> ... <timestamp> <loglevel> ...
@@ -58,47 +54,9 @@ const ADDITONAL_LOGBACK_CONFIG: &str = r#" <appender name="PASSTHROUGH" class="
5854
</logger>
5955
"#;
6056

61-
/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the
62-
/// cluster spec
63-
pub async fn resolve_vector_aggregator_address(
64-
nifi: &v1alpha1::NifiCluster,
65-
client: &Client,
66-
) -> Result<Option<String>> {
67-
let vector_aggregator_address = if let Some(vector_aggregator_config_map_name) = &nifi
68-
.spec
69-
.cluster_config
70-
.vector_aggregator_config_map_name
71-
.as_ref()
72-
{
73-
let vector_aggregator_address = client
74-
.get::<ConfigMap>(
75-
vector_aggregator_config_map_name,
76-
nifi.namespace()
77-
.as_deref()
78-
.context(ObjectHasNoNamespaceSnafu)?,
79-
)
80-
.await
81-
.context(ConfigMapNotFoundSnafu {
82-
cm_name: vector_aggregator_config_map_name.to_string(),
83-
})?
84-
.data
85-
.and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY))
86-
.context(MissingConfigMapEntrySnafu {
87-
entry: VECTOR_AGGREGATOR_CM_ENTRY,
88-
cm_name: vector_aggregator_config_map_name.to_string(),
89-
})?;
90-
Some(vector_aggregator_address)
91-
} else {
92-
None
93-
};
94-
95-
Ok(vector_aggregator_address)
96-
}
97-
9857
/// Extend the role group ConfigMap with logging and Vector configurations
9958
pub fn extend_role_group_config_map(
10059
rolegroup: &RoleGroupRef<v1alpha1::NifiCluster>,
101-
vector_aggregator_address: Option<&str>,
10260
logging: &Logging<Container>,
10361
cm_builder: &mut ConfigMapBuilder,
10462
) -> Result<()> {
@@ -137,11 +95,7 @@ pub fn extend_role_group_config_map(
13795
if logging.enable_vector_agent {
13896
cm_builder.add_data(
13997
product_logging::framework::VECTOR_CONFIG_FILE,
140-
product_logging::framework::create_vector_config(
141-
rolegroup,
142-
vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?,
143-
vector_log_config,
144-
),
98+
product_logging::framework::create_vector_config(rolegroup, vector_log_config),
14599
);
146100
}
147101

0 commit comments

Comments
 (0)