Skip to content

Commit b46703c

Browse files
feat: Inject vector aggregator address as env into vector config (#551)
* inject vector aggregator address as env into config * chore: Use borrows * chore: Add todo comments on unwraps --------- Co-authored-by: Nick Larsen <[email protected]>
1 parent 4351d7d commit b46703c

File tree

10 files changed

+84
-127
lines changed

10 files changed

+84
-127
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ All notable changes to this project will be documented in this file.
1010
- BREAKING: The file log directory was set by `SPARK_K8S_OPERATOR_LOG_DIRECTORY`, and is now set by `ROLLING_LOGS`
1111
(or via `--rolling-logs <DIRECTORY>`).
1212
- Replace stackable-operator `print_startup_string` with `tracing::info!` with fields.
13+
- BREAKING: Inject the vector aggregator address into the vector config using the env var `VECTOR_AGGREGATOR_ADDRESS` instead
14+
of having the operator write it to the vector config ([#551]).
1315

1416
### Fixed
1517

1618
- Use `json` file extension for log files ([#553]).
1719

1820
[#547]: https://github.com/stackabletech/spark-k8s-operator/pull/547
21+
[#551]: https://github.com/stackabletech/spark-k8s-operator/pull/551
1922
[#553]: https://github.com/stackabletech/spark-k8s-operator/pull/553
2023

2124
## [25.3.0] - 2025-03-21

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repository = "https://github.com/stackabletech/spark-k8s-operator"
1111

1212
[workspace.dependencies]
1313
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
14-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.89.1" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.90.0" }
1515
stackable-telemetry = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-telemetry-0.4.0" }
1616
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.7.1" }
1717

crate-hashes.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/crd/logdir.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ impl S3LogDir {
155155
let bucket = log_file_dir
156156
.bucket
157157
.clone()
158+
// TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error.
158159
.resolve(client, namespace.unwrap().as_str())
159160
.await
160161
.context(ConfigureS3Snafu)?;

rust/operator-binary/src/crd/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,7 @@ impl v1alpha1::SparkApplication {
838838
product_config: &ProductConfigManager,
839839
) -> Result<ValidatedRoleConfigByPropertyKind, Error> {
840840
let submit_conf = if self.spec.job.is_some() {
841+
// TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error.
841842
self.spec.job.as_ref().unwrap().clone()
842843
} else {
843844
CommonConfiguration {
@@ -847,6 +848,7 @@ impl v1alpha1::SparkApplication {
847848
};
848849

849850
let driver_conf = if self.spec.driver.is_some() {
851+
// TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error.
850852
self.spec.driver.as_ref().unwrap().clone()
851853
} else {
852854
CommonConfiguration {
@@ -857,6 +859,7 @@ impl v1alpha1::SparkApplication {
857859

858860
let executor_conf: RoleGroup<RoleConfigFragment, JavaCommonConfig> =
859861
if self.spec.executor.is_some() {
862+
// TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error.
860863
self.spec.executor.as_ref().unwrap().clone()
861864
} else {
862865
RoleGroup {

rust/operator-binary/src/history/history_controller.rs

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ use crate::{
6363
tlscerts, to_spark_env_sh_string,
6464
},
6565
history::operations::pdb::add_pdbs,
66-
product_logging::{self, resolve_vector_aggregator_address},
66+
product_logging::{self},
6767
};
6868

6969
#[derive(Snafu, Debug, EnumDiscriminants)]
@@ -142,8 +142,8 @@ pub enum Error {
142142
source: stackable_operator::cluster_resources::Error,
143143
},
144144

145-
#[snafu(display("failed to resolve the Vector aggregator address"))]
146-
ResolveVectorAggregatorAddress { source: product_logging::Error },
145+
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
146+
VectorAggregatorConfigMapMissing,
147147

148148
#[snafu(display("failed to add the logging configuration to the ConfigMap [{cm_name}]"))]
149149
InvalidLoggingConfig {
@@ -250,16 +250,6 @@ pub async fn reconcile(
250250
.await
251251
.context(LogDirSnafu)?;
252252

253-
let vector_aggregator_address = resolve_vector_aggregator_address(
254-
client,
255-
shs.namespace()
256-
.as_deref()
257-
.context(ObjectHasNoNamespaceSnafu)?,
258-
shs.spec.vector_aggregator_config_map_name.as_deref(),
259-
)
260-
.await
261-
.context(ResolveVectorAggregatorAddressSnafu)?;
262-
263253
// Use a dedicated service account for history server pods.
264254
let (serviceaccount, rolebinding) =
265255
build_history_role_serviceaccount(shs, &resolved_product_image.app_version_label)?;
@@ -318,7 +308,6 @@ pub async fn reconcile(
318308
&resolved_product_image.app_version_label,
319309
&rgr,
320310
&log_dir,
321-
vector_aggregator_address.as_deref(),
322311
)?;
323312
cluster_resources
324313
.add(client, config_map)
@@ -377,7 +366,6 @@ fn build_config_map(
377366
app_version_label: &str,
378367
rolegroupref: &RoleGroupRef<v1alpha1::SparkHistoryServer>,
379368
log_dir: &ResolvedLogDir,
380-
vector_aggregator_address: Option<&str>,
381369
) -> Result<ConfigMap, Error> {
382370
let cm_name = rolegroupref.object_name();
383371

@@ -428,7 +416,6 @@ fn build_config_map(
428416

429417
product_logging::extend_config_map(
430418
rolegroupref,
431-
vector_aggregator_address,
432419
&merged_config.logging,
433420
SparkHistoryServerContainer::SparkHistory,
434421
SparkHistoryServerContainer::Vector,
@@ -556,24 +543,32 @@ fn build_stateful_set(
556543
pb.add_container(container);
557544

558545
if merged_config.logging.enable_vector_agent {
559-
pb.add_container(
560-
vector_container(
561-
resolved_product_image,
562-
VOLUME_MOUNT_NAME_CONFIG,
563-
VOLUME_MOUNT_NAME_LOG,
564-
merged_config
565-
.logging
566-
.containers
567-
.get(&SparkHistoryServerContainer::Vector),
568-
ResourceRequirementsBuilder::new()
569-
.with_cpu_request("250m")
570-
.with_cpu_limit("500m")
571-
.with_memory_request("128Mi")
572-
.with_memory_limit("128Mi")
573-
.build(),
574-
)
575-
.context(ConfigureLoggingSnafu)?,
576-
);
546+
match shs.spec.vector_aggregator_config_map_name.to_owned() {
547+
Some(vector_aggregator_config_map_name) => {
548+
pb.add_container(
549+
vector_container(
550+
resolved_product_image,
551+
VOLUME_MOUNT_NAME_CONFIG,
552+
VOLUME_MOUNT_NAME_LOG,
553+
merged_config
554+
.logging
555+
.containers
556+
.get(&SparkHistoryServerContainer::Vector),
557+
ResourceRequirementsBuilder::new()
558+
.with_cpu_request("250m")
559+
.with_cpu_limit("500m")
560+
.with_memory_request("128Mi")
561+
.with_memory_limit("128Mi")
562+
.build(),
563+
&vector_aggregator_config_map_name,
564+
)
565+
.context(ConfigureLoggingSnafu)?,
566+
);
567+
}
568+
None => {
569+
VectorAggregatorConfigMapMissingSnafu.fail()?;
570+
}
571+
}
577572
}
578573

579574
let mut pod_template = pb.build_template();
@@ -634,6 +629,7 @@ fn build_service(
634629
Some("None".to_string()),
635630
),
636631
None => (
632+
// TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error.
637633
format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role),
638634
shs.spec.cluster_config.listener_class.k8s_service_type(),
639635
None,

rust/operator-binary/src/product_logging.rs

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use std::fmt::Display;
22

3-
use snafu::{OptionExt, ResultExt, Snafu};
3+
use snafu::Snafu;
44
use stackable_operator::{
55
builder::configmap::ConfigMapBuilder,
6-
client::Client,
7-
k8s_openapi::api::core::v1::ConfigMap,
86
kube::Resource,
97
memory::BinaryMultiple,
108
product_logging::{
@@ -29,51 +27,17 @@ pub enum Error {
2927
entry: &'static str,
3028
cm_name: String,
3129
},
32-
33-
#[snafu(display("vectorAggregatorConfigMapName must be set"))]
34-
MissingVectorAggregatorAddress,
3530
}
3631

3732
type Result<T, E = Error> = std::result::Result<T, E>;
3833

3934
pub const LOG_FILE: &str = "spark.log4j2.xml";
4035

41-
const VECTOR_AGGREGATOR_CM_ENTRY: &str = "ADDRESS";
4236
const CONSOLE_CONVERSION_PATTERN: &str = "%d{ISO8601} %p [%t] %c - %m%n";
4337

44-
/// Return the address of the Vector aggregator if the corresponding ConfigMap name is given in the
45-
/// cluster spec
46-
pub async fn resolve_vector_aggregator_address(
47-
client: &Client,
48-
namespace: &str,
49-
vector_aggregator_config_map_name: Option<&str>,
50-
) -> Result<Option<String>> {
51-
let vector_aggregator_address =
52-
if let Some(vector_aggregator_config_map_name) = vector_aggregator_config_map_name {
53-
let vector_aggregator_address = client
54-
.get::<ConfigMap>(vector_aggregator_config_map_name, namespace)
55-
.await
56-
.context(ConfigMapNotFoundSnafu {
57-
cm_name: vector_aggregator_config_map_name.to_string(),
58-
})?
59-
.data
60-
.and_then(|mut data| data.remove(VECTOR_AGGREGATOR_CM_ENTRY))
61-
.context(MissingConfigMapEntrySnafu {
62-
entry: VECTOR_AGGREGATOR_CM_ENTRY,
63-
cm_name: vector_aggregator_config_map_name.to_string(),
64-
})?;
65-
Some(vector_aggregator_address)
66-
} else {
67-
None
68-
};
69-
70-
Ok(vector_aggregator_address)
71-
}
72-
7338
/// Extend a ConfigMap with logging and Vector configurations
7439
pub fn extend_config_map<C, K>(
7540
role_group: &RoleGroupRef<K>,
76-
vector_aggregator_address: Option<&str>,
7741
logging: &Logging<C>,
7842
main_container: C,
7943
vector_container: C,
@@ -114,11 +78,7 @@ where
11478
if logging.enable_vector_agent {
11579
cm_builder.add_data(
11680
product_logging::framework::VECTOR_CONFIG_FILE,
117-
product_logging::framework::create_vector_config(
118-
role_group,
119-
vector_aggregator_address.context(MissingVectorAggregatorAddressSnafu)?,
120-
vector_log_config,
121-
),
81+
product_logging::framework::create_vector_config(role_group, vector_log_config),
12282
);
12383
}
12484

0 commit comments

Comments
 (0)