@@ -8,7 +8,7 @@ use std::{
88use constants:: * ;
99use history:: LogFileDirectorySpec ;
1010use logdir:: ResolvedLogDir ;
11- use product_config:: { types:: PropertyNameKind , ProductConfigManager } ;
11+ use product_config:: { ProductConfigManager , types:: PropertyNameKind } ;
1212use serde:: { Deserialize , Serialize } ;
1313use snafu:: { OptionExt , ResultExt , Snafu } ;
1414use stackable_operator:: {
@@ -33,8 +33,8 @@ use stackable_operator::{
3333 kvp:: ObjectLabels ,
3434 memory:: { BinaryMultiple , MemoryQuantity } ,
3535 product_config_utils:: {
36- transform_all_roles_to_config , validate_all_roles_and_groups_config ,
37- ValidatedRoleConfigByPropertyKind ,
36+ ValidatedRoleConfigByPropertyKind , transform_all_roles_to_config ,
37+ validate_all_roles_and_groups_config ,
3838 } ,
3939 product_logging,
4040 role_utils:: { CommonConfiguration , GenericRoleConfig , JavaCommonConfig , Role , RoleGroup } ,
@@ -544,20 +544,47 @@ impl v1alpha1::SparkApplication {
544544 let mut submit_cmd = vec ! [
545545 "/stackable/spark/bin/spark-submit" . to_string( ) ,
546546 "--verbose" . to_string( ) ,
547- "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}" . to_string( ) ,
547+ "--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}"
548+ . to_string( ) ,
548549 format!( "--deploy-mode {mode}" ) ,
549550 format!( "--name {name}" ) ,
550- format!( "--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES}/{POD_TEMPLATE_FILE}" ) ,
551- format!( "--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES}/{POD_TEMPLATE_FILE}" ) ,
552- format!( "--conf spark.kubernetes.driver.podTemplateContainerName={container_name}" , container_name = SparkContainer :: Spark ) ,
553- format!( "--conf spark.kubernetes.executor.podTemplateContainerName={container_name}" , container_name = SparkContainer :: Spark ) ,
554- format!( "--conf spark.kubernetes.namespace={}" , self . metadata. namespace. as_ref( ) . context( NoNamespaceSnafu ) ?) ,
555- format!( "--conf spark.kubernetes.driver.container.image={}" , spark_image. to_string( ) ) ,
556- format!( "--conf spark.kubernetes.executor.container.image={}" , spark_image. to_string( ) ) ,
557- format!( "--conf spark.kubernetes.authenticate.driver.serviceAccountName={}" , serviceaccount_name) ,
558- format!( "--conf spark.driver.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" ) ,
551+ format!(
552+ "--conf spark.kubernetes.driver.podTemplateFile={VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"
553+ ) ,
554+ format!(
555+ "--conf spark.kubernetes.executor.podTemplateFile={VOLUME_MOUNT_PATH_EXECUTOR_POD_TEMPLATES}/{POD_TEMPLATE_FILE}"
556+ ) ,
557+ format!(
558+ "--conf spark.kubernetes.driver.podTemplateContainerName={container_name}" ,
559+ container_name = SparkContainer :: Spark
560+ ) ,
561+ format!(
562+ "--conf spark.kubernetes.executor.podTemplateContainerName={container_name}" ,
563+ container_name = SparkContainer :: Spark
564+ ) ,
565+ format!(
566+ "--conf spark.kubernetes.namespace={}" ,
567+ self . metadata. namespace. as_ref( ) . context( NoNamespaceSnafu ) ?
568+ ) ,
569+ format!(
570+ "--conf spark.kubernetes.driver.container.image={}" ,
571+ spark_image. to_string( )
572+ ) ,
573+ format!(
574+ "--conf spark.kubernetes.executor.container.image={}" ,
575+ spark_image. to_string( )
576+ ) ,
577+ format!(
578+ "--conf spark.kubernetes.authenticate.driver.serviceAccountName={}" ,
579+ serviceaccount_name
580+ ) ,
581+ format!(
582+ "--conf spark.driver.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"
583+ ) ,
559584 format!( "--conf spark.driver.extraClassPath=/stackable/spark/extra-jars/*" ) ,
560- format!( "--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}" ) ,
585+ format!(
586+ "--conf spark.executor.defaultJavaOptions=-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"
587+ ) ,
561588 format!( "--conf spark.executor.extraClassPath=/stackable/spark/extra-jars/*" ) ,
562589 ] ;
563590
@@ -682,7 +709,9 @@ impl v1alpha1::SparkApplication {
682709 submit_cmd. extend ( self . spec . args . clone ( ) ) ;
683710
684711 Ok ( vec ! [
685- format!( "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &" ) ,
712+ format!(
713+ "containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"
714+ ) ,
686715 submit_cmd. join( " " ) ,
687716 ] )
688717 }
@@ -792,14 +821,11 @@ impl v1alpha1::SparkApplication {
792821 } ;
793822 if let Some ( role_envs) = role_envs {
794823 env. extend ( role_envs. iter ( ) . map ( |( k, v) | {
795- (
796- k,
797- EnvVar {
798- name : k. clone ( ) ,
799- value : Some ( v. clone ( ) ) ,
800- ..Default :: default ( )
801- } ,
802- )
824+ ( k, EnvVar {
825+ name : k. clone ( ) ,
826+ value : Some ( v. clone ( ) ) ,
827+ ..Default :: default ( )
828+ } )
803829 } ) )
804830 }
805831
@@ -854,13 +880,10 @@ impl v1alpha1::SparkApplication {
854880 Role {
855881 config : submit_conf. clone ( ) ,
856882 role_config : GenericRoleConfig :: default ( ) ,
857- role_groups : [ (
858- "default" . to_string ( ) ,
859- RoleGroup {
860- config : submit_conf,
861- replicas : Some ( 1 ) ,
862- } ,
863- ) ]
883+ role_groups : [ ( "default" . to_string ( ) , RoleGroup {
884+ config : submit_conf,
885+ replicas : Some ( 1 ) ,
886+ } ) ]
864887 . into ( ) ,
865888 }
866889 . erase ( ) ,
@@ -877,13 +900,10 @@ impl v1alpha1::SparkApplication {
877900 Role {
878901 config : driver_conf. clone ( ) ,
879902 role_config : GenericRoleConfig :: default ( ) ,
880- role_groups : [ (
881- "default" . to_string ( ) ,
882- RoleGroup {
883- config : driver_conf,
884- replicas : Some ( 1 ) ,
885- } ,
886- ) ]
903+ role_groups : [ ( "default" . to_string ( ) , RoleGroup {
904+ config : driver_conf,
905+ replicas : Some ( 1 ) ,
906+ } ) ]
887907 . into ( ) ,
888908 }
889909 . erase ( ) ,
@@ -967,7 +987,9 @@ fn subtract_spark_memory_overhead(for_java: bool, limit: &Quantity) -> Result<St
967987 . value as u32 ;
968988
969989 if MIN_MEMORY_OVERHEAD > original_memory {
970- tracing:: warn!( "Skip memory overhead since not enough memory ({original_memory}m). At least {MIN_MEMORY_OVERHEAD}m required" ) ;
990+ tracing:: warn!(
991+ "Skip memory overhead since not enough memory ({original_memory}m). At least {MIN_MEMORY_OVERHEAD}m required"
992+ ) ;
971993 return Ok ( format ! ( "{original_memory}m" ) ) ;
972994 }
973995
@@ -981,7 +1003,9 @@ fn subtract_spark_memory_overhead(for_java: bool, limit: &Quantity) -> Result<St
9811003
9821004 let deduction = max ( MIN_MEMORY_OVERHEAD , original_memory - reduced_memory) ;
9831005
984- tracing:: debug!( "subtract_spark_memory_overhead: original_memory ({original_memory}) - deduction ({deduction})" ) ;
1006+ tracing:: debug!(
1007+ "subtract_spark_memory_overhead: original_memory ({original_memory}) - deduction ({deduction})"
1008+ ) ;
9851009 Ok ( format ! ( "{}m" , original_memory - deduction) )
9861010}
9871011
@@ -1089,7 +1113,7 @@ mod tests {
10891113 use std:: collections:: { BTreeMap , HashMap } ;
10901114
10911115 use indoc:: indoc;
1092- use product_config:: { types:: PropertyNameKind , ProductConfigManager } ;
1116+ use product_config:: { ProductConfigManager , types:: PropertyNameKind } ;
10931117 use rstest:: rstest;
10941118 use stackable_operator:: {
10951119 commons:: {
0 commit comments