Skip to content

Commit d35f838

Browse files
committed
chore: Update to operator-rs 0.84.1
1 parent 6768272 commit d35f838

File tree

10 files changed

+1341
-604
lines changed

10 files changed

+1341
-604
lines changed

Cargo.lock

Lines changed: 229 additions & 218 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 1015 additions & 325 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,22 @@ repository = "https://github.com/stackabletech/hdfs-operator"
1313
anyhow = "1.0"
1414
built = { version = "0.7", features = ["chrono", "git2"] }
1515
clap = "4.5"
16+
const_format = "0.2"
1617
futures = { version = "0.3", features = ["compat"] }
1718
indoc = "2.0"
18-
rstest = "0.23"
19+
rstest = "0.24"
1920
semver = "1.0"
2021
serde = { version = "1.0", features = ["derive"] }
2122
serde_json = "1.0"
2223
serde_yaml = "0.9"
2324
snafu = "0.8"
24-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.83.0" }
25+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.84.1" }
2526
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
2627
strum = { version = "0.26", features = ["derive"] }
2728
tokio = { version = "1.40", features = ["full"] }
2829
tracing = "0.1"
2930
tracing-futures = { version = "0.2", features = ["futures-03"] }
3031

31-
#[patch."https://github.com/stackabletech/operator-rs.git"]
32-
#stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
33-
#stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
32+
# [patch."https://github.com/stackabletech/operator-rs.git"]
33+
# stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
34+
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }

crate-hashes.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deploy/helm/hdfs-operator/templates/roles.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ rules:
9191
- events
9292
verbs:
9393
- create
94+
- patch
9495
- apiGroups:
9596
- listeners.stackable.tech
9697
resources:

rust/crd/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ use stackable_operator::{
3737
self,
3838
spec::{ContainerLogConfig, Logging},
3939
},
40-
role_utils::{GenericRoleConfig, Role, RoleGroup, RoleGroupRef},
40+
role_utils::{
41+
GenericProductSpecificCommonConfig, GenericRoleConfig, Role, RoleGroup, RoleGroupRef,
42+
},
4143
schemars::{self, JsonSchema},
4244
status::condition::{ClusterCondition, HasStatusCondition},
4345
time::Duration,
@@ -539,23 +541,23 @@ impl HdfsCluster {
539541
pub fn namenode_rolegroup(
540542
&self,
541543
role_group: &str,
542-
) -> Option<&RoleGroup<NameNodeConfigFragment>> {
544+
) -> Option<&RoleGroup<NameNodeConfigFragment, GenericProductSpecificCommonConfig>> {
543545
self.spec.name_nodes.as_ref()?.role_groups.get(role_group)
544546
}
545547

546548
/// Get a reference to the datanode [`RoleGroup`] struct if it exists.
547549
pub fn datanode_rolegroup(
548550
&self,
549551
role_group: &str,
550-
) -> Option<&RoleGroup<DataNodeConfigFragment>> {
552+
) -> Option<&RoleGroup<DataNodeConfigFragment, GenericProductSpecificCommonConfig>> {
551553
self.spec.data_nodes.as_ref()?.role_groups.get(role_group)
552554
}
553555

554556
/// Get a reference to the journalnode [`RoleGroup`] struct if it exists.
555557
pub fn journalnode_rolegroup(
556558
&self,
557559
role_group: &str,
558-
) -> Option<&RoleGroup<JournalNodeConfigFragment>> {
560+
) -> Option<&RoleGroup<JournalNodeConfigFragment, GenericProductSpecificCommonConfig>> {
559561
self.spec
560562
.journal_nodes
561563
.as_ref()?

rust/operator-binary/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ stackable-hdfs-crd = { path = "../crd" }
1313

1414
anyhow.workspace = true
1515
clap.workspace = true
16+
const_format.workspace = true
1617
futures.workspace = true
1718
indoc.workspace = true
1819
product-config.workspace = true

rust/operator-binary/src/event.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use snafu::{ResultExt, Snafu};
2-
use stackable_hdfs_crd::{constants::CONTROLLER_NAME, HdfsCluster, HdfsRole};
2+
use stackable_hdfs_crd::{HdfsCluster, HdfsRole};
33
use stackable_operator::{
4-
client::Client,
5-
kube::runtime::{
6-
events::{Event, EventType, Recorder, Reporter},
7-
reflector::ObjectRef,
8-
},
4+
k8s_openapi::api::core::v1::ObjectReference,
5+
kube::runtime::events::{Event, EventType},
96
};
107
use strum::{EnumDiscriminants, IntoStaticStr};
118

9+
use crate::hdfs_controller::Ctx;
10+
1211
#[derive(Snafu, Debug, EnumDiscriminants)]
1312
#[strum_discriminants(derive(IntoStaticStr))]
1413
pub enum Error {
@@ -18,30 +17,25 @@ pub enum Error {
1817
},
1918
}
2019

21-
/// Publish a Kubernetes event for the `hdfs` cluster resource.
22-
pub async fn publish_event(
23-
hdfs: &HdfsCluster,
24-
client: &Client,
25-
action: &str,
26-
reason: &str,
27-
message: &str,
20+
/// Publish a Kubernetes warning event for the `hdfs` cluster resource.
21+
pub async fn publish_warning_event(
22+
ctx: &Ctx,
23+
hdfs_object_ref: &ObjectReference,
24+
action: String,
25+
reason: String,
26+
message: String,
2827
) -> Result<(), Error> {
29-
let reporter = Reporter {
30-
controller: CONTROLLER_NAME.into(),
31-
instance: None,
32-
};
33-
34-
let object_ref = ObjectRef::from_obj(hdfs);
35-
36-
let recorder = Recorder::new(client.as_kube_client(), reporter, object_ref.into());
37-
recorder
38-
.publish(Event {
39-
action: action.into(),
40-
reason: reason.into(),
41-
note: Some(message.into()),
42-
type_: EventType::Warning,
43-
secondary: None,
44-
})
28+
ctx.event_recorder
29+
.publish(
30+
&Event {
31+
action,
32+
reason,
33+
note: Some(message),
34+
type_: EventType::Warning,
35+
secondary: None,
36+
},
37+
hdfs_object_ref,
38+
)
4539
.await
4640
.context(PublishEventSnafu)
4741
}

rust/operator-binary/src/hdfs_controller.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use std::{
33
sync::Arc,
44
};
55

6+
use const_format::concatcp;
67
use product_config::{
78
types::PropertyNameKind,
89
writer::{to_hadoop_xml, to_java_properties_string, PropertiesWriterError},
910
ProductConfigManager,
1011
};
1112
use snafu::{OptionExt, ResultExt, Snafu};
12-
use stackable_operator::k8s_openapi::api::core::v1::ServiceAccount;
1313
use stackable_operator::{
1414
builder::{
1515
configmap::ConfigMapBuilder,
@@ -48,6 +48,9 @@ use stackable_operator::{
4848
time::Duration,
4949
utils::cluster_info::KubernetesClusterInfo,
5050
};
51+
use stackable_operator::{
52+
k8s_openapi::api::core::v1::ServiceAccount, kube::runtime::events::Recorder,
53+
};
5154
use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr};
5255

5356
use stackable_hdfs_crd::{
@@ -60,7 +63,7 @@ use crate::{
6063
config::{CoreSiteConfigBuilder, HdfsSiteConfigBuilder},
6164
container::{self, ContainerConfig, TLS_STORE_DIR, TLS_STORE_PASSWORD},
6265
discovery::{self, build_discovery_configmap},
63-
event::{build_invalid_replica_message, publish_event},
66+
event::{build_invalid_replica_message, publish_warning_event},
6467
operations::{
6568
graceful_shutdown::{self, add_graceful_shutdown_config},
6669
pdb::add_pdbs,
@@ -71,7 +74,9 @@ use crate::{
7174
};
7275

7376
pub const RESOURCE_MANAGER_HDFS_CONTROLLER: &str = "hdfs-operator-hdfs-controller";
74-
const HDFS_CONTROLLER: &str = "hdfs-controller";
77+
const HDFS_CONTROLLER_NAME: &str = "hdfs-controller";
78+
pub const HDFS_FULL_CONTROLLER_NAME: &str = concatcp!(HDFS_CONTROLLER_NAME, '.', OPERATOR_NAME);
79+
7580
const DOCKER_IMAGE_BASE_NAME: &str = "hadoop";
7681

7782
#[derive(Snafu, Debug, EnumDiscriminants)]
@@ -258,6 +263,7 @@ pub type HdfsOperatorResult<T> = Result<T, Error>;
258263
pub struct Ctx {
259264
pub client: Client,
260265
pub product_config: ProductConfigManager,
266+
pub event_recorder: Arc<Recorder>,
261267
}
262268

263269
pub async fn reconcile_hdfs(
@@ -296,6 +302,7 @@ pub async fn reconcile_hdfs(
296302
)
297303
.context(InvalidProductConfigSnafu)?;
298304

305+
let hdfs_obj_ref = hdfs.object_ref(&());
299306
// A list of all name and journal nodes across all role groups is needed for all ConfigMaps and initialization checks.
300307
let namenode_podrefs = hdfs
301308
.pod_refs(&HdfsRole::NameNode)
@@ -308,7 +315,7 @@ pub async fn reconcile_hdfs(
308315
APP_NAME,
309316
OPERATOR_NAME,
310317
RESOURCE_MANAGER_HDFS_CONTROLLER,
311-
&hdfs.object_ref(&()),
318+
&hdfs_obj_ref,
312319
ClusterResourceApplyStrategy::from(&hdfs.spec.cluster_operation),
313320
)
314321
.context(CreateClusterResourcesSnafu)?;
@@ -367,13 +374,13 @@ pub async fn reconcile_hdfs(
367374
continue;
368375
};
369376

370-
if let Some(content) = build_invalid_replica_message(hdfs, &role, dfs_replication) {
371-
publish_event(
372-
hdfs,
373-
client,
374-
"Reconcile",
375-
"Invalid replicas",
376-
content.as_ref(),
377+
if let Some(message) = build_invalid_replica_message(hdfs, &role, dfs_replication) {
378+
publish_warning_event(
379+
&ctx,
380+
&hdfs_obj_ref,
381+
"Reconcile".to_owned(),
382+
"Invalid replicas".to_owned(),
383+
message,
377384
)
378385
.await
379386
.context(FailedToCreateClusterEventSnafu)?;
@@ -488,7 +495,7 @@ pub async fn reconcile_hdfs(
488495
let discovery_cm = build_discovery_configmap(
489496
hdfs,
490497
&client.kubernetes_cluster_info,
491-
HDFS_CONTROLLER,
498+
HDFS_CONTROLLER_NAME,
492499
&hdfs
493500
.namenode_listener_refs(client)
494501
.await

rust/operator-binary/src/main.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use std::sync::Arc;
22

33
use built_info::PKG_VERSION;
44
use clap::{crate_description, crate_version, Parser};
5-
use futures::StreamExt;
5+
use futures::{pin_mut, StreamExt};
6+
use hdfs_controller::HDFS_FULL_CONTROLLER_NAME;
67
use product_config::ProductConfigManager;
78
use stackable_hdfs_crd::{constants::*, HdfsCluster};
89
use stackable_operator::{
@@ -12,10 +13,13 @@ use stackable_operator::{
1213
apps::v1::StatefulSet,
1314
core::v1::{ConfigMap, Service},
1415
},
15-
kube::core::DeserializeGuard,
1616
kube::{
1717
api::PartialObjectMeta,
18-
runtime::{reflector, watcher, Controller},
18+
core::DeserializeGuard,
19+
runtime::{
20+
events::{Recorder, Reporter},
21+
reflector, watcher, Controller,
22+
},
1923
Api,
2024
},
2125
kvp::ObjectLabels,
@@ -95,6 +99,14 @@ pub async fn create_controller(
9599
) {
96100
let (store, store_w) = reflector::store();
97101

102+
let hdfs_event_recorder = Arc::new(Recorder::new(
103+
client.as_kube_client(),
104+
Reporter {
105+
controller: HDFS_FULL_CONTROLLER_NAME.to_string(),
106+
instance: None,
107+
},
108+
));
109+
98110
// The topology provider will need to build label information by querying kubernetes nodes and this
99111
// requires the clusterrole 'hdfs-clusterrole-nodes': this is bound to each deployed HDFS cluster
100112
// via a patch.
@@ -107,7 +119,8 @@ pub async fn create_controller(
107119
)
108120
.then(|ev| {
109121
hdfs_clusterrolebinding_nodes_controller::reconcile(client.as_kube_client(), &store, ev)
110-
});
122+
})
123+
.collect::<()>();
111124

112125
let hdfs_controller = Controller::new(
113126
namespace.get_api::<DeserializeGuard<HdfsCluster>>(&client),
@@ -132,14 +145,31 @@ pub async fn create_controller(
132145
Arc::new(hdfs_controller::Ctx {
133146
client: client.clone(),
134147
product_config,
148+
event_recorder: hdfs_event_recorder.clone(),
135149
}),
136150
)
137-
.map(|res| report_controller_reconciled(&client, CONTROLLER_NAME, &res))
151+
// We can let the reporting happen in the background
152+
.for_each_concurrent(
153+
16, // concurrency limit
154+
|result| {
155+
// The event_recorder needs to be shared across all invocations, so that
156+
// events are correctly aggregated
157+
let hdfs_event_recorder = hdfs_event_recorder.clone();
158+
async move {
159+
report_controller_reconciled(
160+
&hdfs_event_recorder,
161+
HDFS_FULL_CONTROLLER_NAME,
162+
&result,
163+
)
164+
.await;
165+
}
166+
},
167+
)
138168
.instrument(info_span!("hdfs_controller"));
139169

140-
futures::stream::select(hdfs_controller, reflector)
141-
.collect::<()>()
142-
.await;
170+
pin_mut!(hdfs_controller, reflector);
171+
// kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery
172+
futures::future::select(hdfs_controller, reflector).await;
143173
}
144174

145175
/// Creates recommended `ObjectLabels` to be used in deployed resources

0 commit comments

Comments
 (0)