Skip to content

Commit fe92a6c

Browse files
committed
add watch for referenced cms
1 parent 9b42c44 commit fe92a6c

File tree

2 files changed

+74
-46
lines changed

2 files changed

+74
-46
lines changed

CHANGELOG.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7-
### Added
8-
9-
- Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` ([#671]).
10-
117
### Changed
128

139
- Replace stackable-operator `initialize_logging` with stackable-telemetry `Tracing` ([#661]).
1410
- BREAKING: The file log directory was set by `HDFS_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
1511
(or via `--rolling-logs <DIRECTORY>`).
1612
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#671]).
1715

1816
### Fixed
1917

rust/operator-binary/src/main.rs

Lines changed: 72 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use stackable_operator::{
1313
core::v1::{ConfigMap, Service},
1414
},
1515
kube::{
16-
Api,
16+
Api, ResourceExt,
1717
api::PartialObjectMeta,
1818
core::DeserializeGuard,
1919
runtime::{
@@ -166,47 +166,60 @@ pub async fn create_controller(
166166
let hdfs_controller = Controller::new(
167167
namespace.get_api::<DeserializeGuard<v1alpha1::HdfsCluster>>(&client),
168168
watcher::Config::default(),
169-
)
170-
.owns(
171-
namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
172-
watcher::Config::default(),
173-
)
174-
.owns(
175-
namespace.get_api::<DeserializeGuard<Service>>(&client),
176-
watcher::Config::default(),
177-
)
178-
.owns(
179-
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
180-
watcher::Config::default(),
181-
)
182-
.shutdown_on_signal()
183-
.run(
184-
hdfs_controller::reconcile_hdfs,
185-
hdfs_controller::error_policy,
186-
Arc::new(hdfs_controller::Ctx {
187-
client: client.clone(),
188-
product_config,
189-
event_recorder: hdfs_event_recorder.clone(),
190-
}),
191-
)
192-
// We can let the reporting happen in the background
193-
.for_each_concurrent(
194-
16, // concurrency limit
195-
|result| {
196-
// The event_recorder needs to be shared across all invocations, so that
197-
// events are correctly aggregated
198-
let hdfs_event_recorder = hdfs_event_recorder.clone();
199-
async move {
200-
report_controller_reconciled(
201-
&hdfs_event_recorder,
202-
HDFS_FULL_CONTROLLER_NAME,
203-
&result,
204-
)
205-
.await;
206-
}
207-
},
208-
)
209-
.instrument(info_span!("hdfs_controller"));
169+
);
170+
let hdfs_store_1 = hdfs_controller.store();
171+
let hdfs_controller = hdfs_controller
172+
.owns(
173+
namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
174+
watcher::Config::default(),
175+
)
176+
.owns(
177+
namespace.get_api::<DeserializeGuard<Service>>(&client),
178+
watcher::Config::default(),
179+
)
180+
.owns(
181+
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
182+
watcher::Config::default(),
183+
)
184+
.shutdown_on_signal()
185+
.watches(
186+
namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
187+
watcher::Config::default(),
188+
move |config_map| {
189+
hdfs_store_1
190+
.state()
191+
.into_iter()
192+
.filter(move |hdfs| references_config_map(hdfs, &config_map))
193+
.map(|hdfs| reflector::ObjectRef::from_obj(&*hdfs))
194+
},
195+
)
196+
.run(
197+
hdfs_controller::reconcile_hdfs,
198+
hdfs_controller::error_policy,
199+
Arc::new(hdfs_controller::Ctx {
200+
client: client.clone(),
201+
product_config,
202+
event_recorder: hdfs_event_recorder.clone(),
203+
}),
204+
)
205+
// We can let the reporting happen in the background
206+
.for_each_concurrent(
207+
16, // concurrency limit
208+
|result| {
209+
// The event_recorder needs to be shared across all invocations, so that
210+
// events are correctly aggregated
211+
let hdfs_event_recorder = hdfs_event_recorder.clone();
212+
async move {
213+
report_controller_reconciled(
214+
&hdfs_event_recorder,
215+
HDFS_FULL_CONTROLLER_NAME,
216+
&result,
217+
)
218+
.await;
219+
}
220+
},
221+
)
222+
.instrument(info_span!("hdfs_controller"));
210223

211224
pin_mut!(hdfs_controller, reflector);
212225
// kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery
@@ -231,3 +244,20 @@ pub fn build_recommended_labels<'a, T>(
231244
role_group,
232245
}
233246
}
247+
248+
fn references_config_map(
249+
hdfs: &DeserializeGuard<v1alpha1::HdfsCluster>,
250+
config_map: &DeserializeGuard<ConfigMap>,
251+
) -> bool {
252+
let Ok(hdfs) = &hdfs.0 else {
253+
return false;
254+
};
255+
256+
hdfs.spec.cluster_config.zookeeper_config_map_name == config_map.name_any()
257+
|| match hdfs.spec.cluster_config.authorization.to_owned() {
258+
Some(hdfs_authorization) => {
259+
hdfs_authorization.opa.config_map_name == config_map.name_any()
260+
}
261+
None => false,
262+
}
263+
}

0 commit comments

Comments
 (0)