Skip to content

Commit 074be54

Browse files
feat: Inject vector aggregator address as env into vector config (#609)
* start watching configmaps that are referenced in superset spec * format code * rename function * add changelog entry * minor refactor * wip: Inject the vector aggregator address into the vector config using an env var * regenerate nix * rename stores * chore: Rename store variables * chore: Use borrows --------- Co-authored-by: Nick Larsen <[email protected]>
1 parent 3490151 commit 074be54

File tree

8 files changed

+91
-105
lines changed

8 files changed

+91
-105
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@
88
- BREAKING: The file log directory was set by `SUPERSET_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
99
(or via `--rolling-logs <DIRECTORY>`).
1010
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
11+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
12+
of having the operator write it to the vector config ([#609]).
1113

1214
### Fixed
1315

1416
- Use `json` file extension for log files ([#615]).
17+
- Fix a bug where changes to ConfigMaps that are referenced in the SupersetCluster spec didn't trigger a reconciliation ([#609]).
1518

19+
[#609]: https://github.com/stackabletech/superset-operator/pull/609
1620
[#610]: https://github.com/stackabletech/superset-operator/pull/610
1721
[#615]: https://github.com/stackabletech/superset-operator/pull/615
1822

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/superset-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

crate-hashes.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/main.rs

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ async fn main() -> anyhow::Result<()> {
150150
watch_namespace.get_api::<DeserializeGuard<v1alpha1::SupersetCluster>>(&client),
151151
watcher::Config::default(),
152152
);
153-
let superset_store_1 = superset_controller.store();
153+
let authentication_class_store = superset_controller.store();
154+
let config_map_store = superset_controller.store();
154155
let superset_controller = superset_controller
155156
.owns(
156157
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
@@ -165,7 +166,7 @@ async fn main() -> anyhow::Result<()> {
165166
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
166167
watcher::Config::default(),
167168
move |authentication_class| {
168-
superset_store_1
169+
authentication_class_store
169170
.state()
170171
.into_iter()
171172
.filter(move |superset| {
@@ -174,6 +175,17 @@ async fn main() -> anyhow::Result<()> {
174175
.map(|superset| ObjectRef::from_obj(&*superset))
175176
},
176177
)
178+
.watches(
179+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
180+
watcher::Config::default(),
181+
move |config_map| {
182+
config_map_store
183+
.state()
184+
.into_iter()
185+
.filter(move |superset| references_config_map(superset, &config_map))
186+
.map(|superset| ObjectRef::from_obj(&*superset))
187+
},
188+
)
177189
.run(
178190
superset_controller::reconcile_superset,
179191
superset_controller::error_policy,
@@ -212,16 +224,16 @@ async fn main() -> anyhow::Result<()> {
212224
),
213225
watcher::Config::default(),
214226
);
215-
let druid_connection_store_1 = druid_connection_controller.store();
216-
let druid_connection_store_2 = druid_connection_controller.store();
217-
let druid_connection_store_3 = druid_connection_controller.store();
227+
let superset_cluster_store = druid_connection_controller.store();
228+
let job_store = druid_connection_controller.store();
229+
let config_map_store = druid_connection_controller.store();
218230
let druid_connection_controller = druid_connection_controller
219231
.shutdown_on_signal()
220232
.watches(
221233
watch_namespace.get_api::<DeserializeGuard<v1alpha1::SupersetCluster>>(&client),
222234
watcher::Config::default(),
223235
move |superset_cluster| {
224-
druid_connection_store_1
236+
superset_cluster_store
225237
.state()
226238
.into_iter()
227239
.filter(move |druid_connection| {
@@ -234,7 +246,7 @@ async fn main() -> anyhow::Result<()> {
234246
watch_namespace.get_api::<DeserializeGuard<Job>>(&client),
235247
watcher::Config::default(),
236248
move |job| {
237-
druid_connection_store_2
249+
job_store
238250
.state()
239251
.into_iter()
240252
.filter(move |druid_connection| valid_druid_job(druid_connection, &job))
@@ -245,7 +257,7 @@ async fn main() -> anyhow::Result<()> {
245257
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
246258
watcher::Config::default(),
247259
move |config_map| {
248-
druid_connection_store_3
260+
config_map_store
249261
.state()
250262
.into_iter()
251263
.filter(move |druid_connection| {
@@ -305,6 +317,26 @@ fn references_authentication_class(
305317
.any(|c| c.common.authentication_class_name() == &authentication_class_name)
306318
}
307319

320+
fn references_config_map(
321+
superset: &DeserializeGuard<v1alpha1::SupersetCluster>,
322+
config_map: &DeserializeGuard<ConfigMap>,
323+
) -> bool {
324+
let Ok(superset) = &superset.0 else {
325+
return false;
326+
};
327+
328+
match &superset.spec.cluster_config.authorization {
329+
Some(superset_authorization) => {
330+
superset_authorization
331+
.role_mapping_from_opa
332+
.opa
333+
.config_map_name
334+
== config_map.name_any()
335+
}
336+
None => false,
337+
}
338+
}
339+
308340
fn valid_druid_connection(
309341
superset_cluster: &DeserializeGuard<v1alpha1::SupersetCluster>,
310342
druid_connection: &DeserializeGuard<druidconnection::v1alpha1::DruidConnection>,

rust/operator-binary/src/product_logging.rs

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use std::fmt::{Display, Write};
22

3-
use snafu::{OptionExt, ResultExt, Snafu};
3+
use snafu::Snafu;
44
use stackable_operator::{
55
builder::configmap::ConfigMapBuilder,
6-
client::Client,
7-
k8s_openapi::api::core::v1::ConfigMap,
86
kube::Resource,
97
product_logging::{
108
self,
@@ -31,54 +29,17 @@ pub enum Error {
3129
entry: &'static str,
3230
cm_name: String,
3331
},
34-
#[snafu(display("vectorAggregatorConfigMapName must be set"))]
35-
MissingVectorAggregatorAddress,
3632
}
3733

3834
type Result<T, E = Error> = std::result::Result<T, E>;
3935

4036
pub const LOG_CONFIG_FILE: &str = "log_config.py";
4137

42-
const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS";
4338
const LOG_FILE: &str = "superset.py.json";
4439

45-
/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the
46-
/// cluster spec
47-
pub async fn resolve_vector_aggregator_address<T: Resource>(
48-
client: &Client,
49-
cluster: &T,
50-
vector_aggregator_config_map_name: Option<&str>,
51-
) -> Result<Option<String>> {
52-
let vector_aggregator_address =
53-
if let Some(vector_aggregator_config_map_name) = vector_aggregator_config_map_name {
54-
let namespace = cluster
55-
.meta()
56-
.namespace
57-
.as_deref()
58-
.context(ObjectHasNoNamespaceSnafu)?;
59-
let vector_aggregator_address = client
60-
.get::<ConfigMap>(vector_aggregator_config_map_name, namespace)
61-
.await
62-
.context(ConfigMapNotFoundSnafu {
63-
cm_name: vector_aggregator_config_map_name.to_string(),
64-
})?
65-
.data
66-
.and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY))
67-
.context(MissingConfigMapEntrySnafu {
68-
entry: VECTOR_AGGREGATOR_CM_ENTRY,
69-
cm_name: vector_aggregator_config_map_name.to_string(),
70-
})?;
71-
Some(vector_aggregator_address)
72-
} else {
73-
None
74-
};
75-
Ok(vector_aggregator_address)
76-
}
77-
7840
/// Extend the ConfigMap with logging and Vector configurations
7941
pub fn extend_config_map_with_log_config<C, K>(
8042
rolegroup: &RoleGroupRef<K>,
81-
vector_aggregator_address: Option<&str>,
8243
logging: &Logging<C>,
8344
main_container: &C,
8445
vector_container: &C,
@@ -111,11 +72,7 @@ where
11172
if logging.enable_vector_agent {
11273
cm_builder.add_data(
11374
product_logging::framework::VECTOR_CONFIG_FILE,
114-
product_logging::framework::create_vector_config(
115-
rolegroup,
116-
vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?,
117-
vector_log_config,
118-
),
75+
product_logging::framework::create_vector_config(rolegroup, vector_log_config),
11976
);
12077
}
12178

0 commit comments

Comments
 (0)