Skip to content

Commit 85c9e9b

Browse files
feat: Inject vector aggregator address as env into vector config (#734)
* inject vector aggregator address as env into config & add watch for referenced cms * add pr number to changelog * chore: Use borrows * chore: Remove Arc --------- Co-authored-by: Nick Larsen <[email protected]>
1 parent 54f1c5e commit 85c9e9b

File tree

8 files changed

+83
-105
lines changed

8 files changed

+83
-105
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ All notable changes to this project will be documented in this file.
1010
- BREAKING: The file log directory was set by `TRINO_OPERATOR_LOG_DIRECTORY`,
1111
and is now set by `ROLLING_LOGS` (or via `--rolling-logs <DIRECTORY>`).
1212
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#734]).
1315

1416
### Fixed
1517

1618
- Use `json` file extension for log files ([#733]).
19+
- Fix a bug where changes to ConfigMaps that are referenced in the TrinoCluster spec didn't trigger a reconciliation ([#734]).
1720

1821
[#728]: https://github.com/stackabletech/trino-operator/pull/728
22+
[#734]: https://github.com/stackabletech/trino-operator/pull/734
1923
[#733]: https://github.com/stackabletech/trino-operator/pull/733
2024

2125
## [25.3.0] - 2025-03-21

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/trino-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

crate-hashes.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/controller.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ use crate::{
9393
operations::{
9494
add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb::add_pdbs,
9595
},
96-
product_logging::{get_log_properties, get_vector_toml, resolve_vector_aggregator_address},
96+
product_logging::{get_log_properties, get_vector_toml},
9797
};
9898

9999
pub struct Ctx {
@@ -230,10 +230,8 @@ pub enum Error {
230230
#[snafu(display("failed to resolve and merge config for role and role group"))]
231231
FailedToResolveConfig { source: crate::crd::Error },
232232

233-
#[snafu(display("failed to resolve the Vector aggregator address"))]
234-
ResolveVectorAggregatorAddress {
235-
source: crate::product_logging::Error,
236-
},
233+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
234+
VectorAggregatorConfigMapMissing,
237235

238236
#[snafu(display("failed to build vector container"))]
239237
BuildVectorContainer { source: LoggingError },
@@ -470,10 +468,6 @@ pub async fn reconcile_trino(
470468

471469
create_shared_internal_secret(trino, client).await?;
472470

473-
let vector_aggregator_address = resolve_vector_aggregator_address(trino, client)
474-
.await
475-
.context(ResolveVectorAggregatorAddressSnafu)?;
476-
477471
let mut sts_cond_builder = StatefulSetConditionBuilder::default();
478472

479473
for (trino_role_str, role_config) in validated_config {
@@ -498,7 +492,6 @@ pub async fn reconcile_trino(
498492
&merged_config,
499493
&trino_authentication_config,
500494
&trino_opa_config,
501-
vector_aggregator_address.as_deref(),
502495
&client.kubernetes_cluster_info,
503496
)?;
504497
let rg_catalog_configmap = build_rolegroup_catalog_config_map(
@@ -634,7 +627,6 @@ fn build_rolegroup_config_map(
634627
merged_config: &v1alpha1::TrinoConfig,
635628
trino_authentication_config: &TrinoAuthenticationConfig,
636629
trino_opa_config: &Option<TrinoOpaConfig>,
637-
vector_aggregator_address: Option<&str>,
638630
cluster_info: &KubernetesClusterInfo,
639631
) -> Result<ConfigMap> {
640632
let mut cm_conf_data = BTreeMap::new();
@@ -725,14 +717,11 @@ fn build_rolegroup_config_map(
725717
cm_conf_data.insert(file_name.to_string(), log_properties);
726718
}
727719

728-
if let Some(vector_toml) = get_vector_toml(
729-
rolegroup_ref,
730-
vector_aggregator_address,
731-
&merged_config.logging,
732-
)
733-
.context(InvalidLoggingConfigSnafu {
734-
cm_name: rolegroup_ref.object_name(),
735-
})? {
720+
if let Some(vector_toml) = get_vector_toml(rolegroup_ref, &merged_config.logging)
721+
.context(InvalidLoggingConfigSnafu {
722+
cm_name: rolegroup_ref.object_name(),
723+
})?
724+
{
736725
cm_conf_data.insert(
737726
product_logging::framework::VECTOR_CONFIG_FILE.to_string(),
738727
vector_toml,
@@ -1079,21 +1068,29 @@ fn build_rolegroup_statefulset(
10791068
}
10801069

10811070
if merged_config.logging.enable_vector_agent {
1082-
pod_builder.add_container(
1083-
product_logging::framework::vector_container(
1084-
resolved_product_image,
1085-
"config",
1086-
"log",
1087-
merged_config.logging.containers.get(&Container::Vector),
1088-
ResourceRequirementsBuilder::new()
1089-
.with_cpu_request("250m")
1090-
.with_cpu_limit("500m")
1091-
.with_memory_request("128Mi")
1092-
.with_memory_limit("128Mi")
1093-
.build(),
1094-
)
1095-
.context(BuildVectorContainerSnafu)?,
1096-
);
1071+
match &trino.spec.cluster_config.vector_aggregator_config_map_name {
1072+
Some(vector_aggregator_config_map_name) => {
1073+
pod_builder.add_container(
1074+
product_logging::framework::vector_container(
1075+
resolved_product_image,
1076+
"config",
1077+
"log",
1078+
merged_config.logging.containers.get(&Container::Vector),
1079+
ResourceRequirementsBuilder::new()
1080+
.with_cpu_request("250m")
1081+
.with_cpu_limit("500m")
1082+
.with_memory_request("128Mi")
1083+
.with_memory_limit("128Mi")
1084+
.build(),
1085+
vector_aggregator_config_map_name,
1086+
)
1087+
.context(BuildVectorContainerSnafu)?,
1088+
);
1089+
}
1090+
None => {
1091+
VectorAggregatorConfigMapMissingSnafu.fail()?;
1092+
}
1093+
}
10971094
}
10981095

10991096
let metadata = ObjectMetaBuilder::new()
@@ -1812,7 +1809,6 @@ mod tests {
18121809
&merged_config,
18131810
&trino_authentication_config,
18141811
&trino_opa_config,
1815-
None,
18161812
&cluster_info,
18171813
)
18181814
.unwrap()

rust/operator-binary/src/main.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ async fn main() -> anyhow::Result<()> {
137137
watch_namespace.get_api::<DeserializeGuard<v1alpha1::TrinoCluster>>(&client),
138138
watcher::Config::default(),
139139
);
140-
let catalog_cluster_store = Arc::new(cluster_controller.store());
141-
let authentication_class_cluster_store = catalog_cluster_store.clone();
140+
let catalog_cluster_store = cluster_controller.store();
141+
let authentication_class_cluster_store = cluster_controller.store();
142+
let config_map_cluster_store = cluster_controller.store();
142143

143144
cluster_controller
144145
.owns(
@@ -181,6 +182,17 @@ async fn main() -> anyhow::Result<()> {
181182
.map(|trino| ObjectRef::from_obj(&*trino))
182183
},
183184
)
185+
.watches(
186+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
187+
watcher::Config::default(),
188+
move |config_map| {
189+
config_map_cluster_store
190+
.state()
191+
.into_iter()
192+
.filter(move |druid| references_config_map(druid, &config_map))
193+
.map(|druid| ObjectRef::from_obj(&*druid))
194+
},
195+
)
184196
.run(
185197
controller::reconcile_trino,
186198
controller::error_policy,
@@ -229,3 +241,20 @@ fn references_authentication_class(
229241
.iter()
230242
.any(|c| c.authentication_class_name() == &authentication_class_name)
231243
}
244+
245+
fn references_config_map(
246+
trino: &DeserializeGuard<v1alpha1::TrinoCluster>,
247+
config_map: &DeserializeGuard<ConfigMap>,
248+
) -> bool {
249+
let Ok(trino) = &trino.0 else {
250+
return false;
251+
};
252+
253+
match &trino.spec.cluster_config.authorization {
254+
Some(trino_authorization) => match &trino_authorization.opa {
255+
Some(opa_config) => opa_config.config_map_name == config_map.name_any(),
256+
None => false,
257+
},
258+
None => false,
259+
}
260+
}

rust/operator-binary/src/product_logging.rs

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
use snafu::{OptionExt, ResultExt, Snafu};
1+
use snafu::Snafu;
22
use stackable_operator::{
3-
client::Client,
4-
k8s_openapi::api::core::v1::ConfigMap,
5-
kube::ResourceExt,
63
product_logging::{
74
framework::create_vector_config,
85
spec::{
@@ -32,15 +29,10 @@ pub enum Error {
3229
entry: &'static str,
3330
cm_name: String,
3431
},
35-
36-
#[snafu(display("vectorAggregatorConfigMapName must be set"))]
37-
MissingVectorAggregatorAddress,
3832
}
3933

4034
type Result<T, E = Error> = std::result::Result<T, E>;
4135

42-
const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS";
43-
4436
#[derive(Display)]
4537
#[strum(serialize_all = "lowercase")]
4638
pub enum TrinoLogLevel {
@@ -63,44 +55,6 @@ impl From<LogLevel> for TrinoLogLevel {
6355
}
6456
}
6557

66-
/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the
67-
/// cluster spec
68-
pub async fn resolve_vector_aggregator_address(
69-
trino: &v1alpha1::TrinoCluster,
70-
client: &Client,
71-
) -> Result<Option<String>> {
72-
let vector_aggregator_address = if let Some(vector_aggregator_config_map_name) = &trino
73-
.spec
74-
.cluster_config
75-
.vector_aggregator_config_map_name
76-
.as_ref()
77-
{
78-
let vector_aggregator_address = client
79-
.get::<ConfigMap>(
80-
vector_aggregator_config_map_name,
81-
trino
82-
.namespace()
83-
.as_deref()
84-
.context(ObjectHasNoNamespaceSnafu)?,
85-
)
86-
.await
87-
.context(ConfigMapNotFoundSnafu {
88-
cm_name: vector_aggregator_config_map_name.to_string(),
89-
})?
90-
.data
91-
.and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY))
92-
.context(MissingConfigMapEntrySnafu {
93-
entry: VECTOR_AGGREGATOR_CM_ENTRY,
94-
cm_name: vector_aggregator_config_map_name.to_string(),
95-
})?;
96-
Some(vector_aggregator_address)
97-
} else {
98-
None
99-
};
100-
101-
Ok(vector_aggregator_address)
102-
}
103-
10458
/// Return the `log.properties` configuration
10559
pub fn get_log_properties(logging: &Logging<Container>) -> Option<String> {
10660
if let Some(ContainerLogConfig {
@@ -116,7 +70,6 @@ pub fn get_log_properties(logging: &Logging<Container>) -> Option<String> {
11670
/// Return the vector toml configuration
11771
pub fn get_vector_toml(
11872
rolegroup: &RoleGroupRef<v1alpha1::TrinoCluster>,
119-
vector_aggregator_address: Option<&str>,
12073
logging: &Logging<Container>,
12174
) -> Result<Option<String>> {
12275
let vector_log_config = if let Some(ContainerLogConfig {
@@ -129,11 +82,7 @@ pub fn get_vector_toml(
12982
};
13083

13184
if logging.enable_vector_agent {
132-
Ok(Some(create_vector_config(
133-
rolegroup,
134-
vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?,
135-
vector_log_config,
136-
)))
85+
Ok(Some(create_vector_config(rolegroup, vector_log_config)))
13786
} else {
13887
Ok(None)
13988
}

0 commit comments

Comments
 (0)