Skip to content

Commit eed00d1

Browse files
committed
implement log aggregation
1 parent a810031 commit eed00d1

File tree

1 file changed

+24
-22
lines changed

1 file changed

+24
-22
lines changed

rust/operator-binary/src/connect/controller.rs

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use stackable_operator::{
3030
},
3131
kube::{
3232
core::{error_boundary, DeserializeGuard},
33-
runtime::controller::Action,
33+
runtime::{controller::Action, reflector::ObjectRef},
3434
Resource, ResourceExt,
3535
},
3636
kvp::{Label, Labels, ObjectLabels},
@@ -42,7 +42,7 @@ use stackable_operator::{
4242
CustomContainerLogConfig,
4343
},
4444
},
45-
role_utils::{JavaCommonConfig, JvmArgumentOverrides},
45+
role_utils::{JavaCommonConfig, JvmArgumentOverrides, RoleGroupRef},
4646
time::Duration,
4747
};
4848
use strum::{Display, EnumDiscriminants, IntoStaticStr};
@@ -53,8 +53,8 @@ use super::crd::{
5353
};
5454
use crate::{
5555
crd::constants::{
56-
APP_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT,
57-
OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
56+
APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE,
57+
METRICS_PORT, OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
5858
VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG,
5959
VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
6060
},
@@ -347,11 +347,11 @@ pub fn error_policy(
347347
#[allow(clippy::result_large_err)]
348348
fn build_config_map(
349349
scs: &v1alpha1::SparkConnectServer,
350-
_connect_config: &ConnectConfig,
350+
connect_config: &ConnectConfig,
351351
driver_service: &Service,
352352
service_account: &ServiceAccount,
353353
resolved_product_image: &ResolvedProductImage,
354-
_vector_aggregator_address: Option<&str>,
354+
vector_aggregator_address: Option<&str>,
355355
) -> Result<ConfigMap, Error> {
356356
let cm_name = object_name(&scs.name_any(), SparkConnectRole::Server);
357357

@@ -392,16 +392,20 @@ fn build_config_map(
392392
.add_data(SPARK_DEFAULTS_FILE_NAME, spark_props)
393393
.add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props);
394394

395-
// TODO: figure out how to do this without "rolegroupref"
396-
// product_logging::extend_config_map(
397-
// rolegroupref,
398-
// vector_aggregator_address,
399-
// &connect_config.logging,
400-
// SparkConnectServerContainer::SparkConnect,
401-
// SparkConnectServerContainer::Vector,
402-
// &mut cm_builder,
403-
// )
404-
// .context(InvalidLoggingConfigSnafu { cm_name: &cm_name })?;
395+
let role_group_ref = RoleGroupRef {
396+
cluster: ObjectRef::from_obj(scs),
397+
role: SparkConnectRole::Server.to_string(),
398+
role_group: DUMMY_SPARK_CONNECT_GROUP_NAME.to_string(),
399+
};
400+
product_logging::extend_config_map(
401+
&role_group_ref,
402+
vector_aggregator_address,
403+
&connect_config.logging,
404+
SparkConnectServerContainer::SparkConnect,
405+
SparkConnectServerContainer::Vector,
406+
&mut cm_builder,
407+
)
408+
.context(InvalidLoggingConfigSnafu { cm_name: &cm_name })?;
405409

406410
cm_builder
407411
.build()
@@ -628,10 +632,9 @@ fn build_service(
628632
fn command_args(spark_version: &str) -> Vec<String> {
629633
let command = [
630634
// ---------- start containerdebug
631-
// TODO: enable this before making a PR
632-
// format!(
633-
// "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"
634-
// ),
635+
format!(
636+
"containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"
637+
),
635638
// ---------- start spark connect server
636639
"/stackable/spark/sbin/start-connect-server.sh".to_string(),
637640
"--deploy-mode client".to_string(), // 'cluster' mode not supported
@@ -678,8 +681,7 @@ fn object_name(stacklet_name: &str, role: SparkConnectRole) -> String {
678681
#[allow(clippy::result_large_err)]
679682
fn jvm_args(user_java_config: Option<&JavaCommonConfig>) -> Result<String, Error> {
680683
let jvm_args = vec![
681-
// TODO: fix this when the logging container is fixed
682-
// format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
684+
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
683685
format!(
684686
"-Djava.security.properties={VOLUME_MOUNT_PATH_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
685687
),

0 commit comments

Comments
 (0)