@@ -78,14 +78,16 @@ use crate::{
78
78
command, config,
79
79
crd:: {
80
80
ACCESS_CONTROL_PROPERTIES , APP_NAME , CONFIG_DIR_NAME , CONFIG_PROPERTIES , Container ,
81
- DISCOVERY_URI , ENV_INTERNAL_SECRET , HTTP_PORT , HTTP_PORT_NAME , HTTPS_PORT , HTTPS_PORT_NAME ,
82
- JVM_CONFIG , JVM_SECURITY_PROPERTIES , LOG_PROPERTIES , MAX_TRINO_LOG_FILES_SIZE ,
83
- METRICS_PORT , METRICS_PORT_NAME , NODE_PROPERTIES , RW_CONFIG_DIR_NAME ,
84
- STACKABLE_CLIENT_TLS_DIR , STACKABLE_INTERNAL_TLS_DIR , STACKABLE_MOUNT_INTERNAL_TLS_DIR ,
85
- STACKABLE_MOUNT_SERVER_TLS_DIR , STACKABLE_SERVER_TLS_DIR , TrinoRole ,
81
+ DISCOVERY_URI , ENV_INTERNAL_SECRET , EXCHANGE_MANAGER_PROPERTIES , HTTP_PORT , HTTP_PORT_NAME ,
82
+ HTTPS_PORT , HTTPS_PORT_NAME , JVM_CONFIG , JVM_SECURITY_PROPERTIES , LOG_PROPERTIES ,
83
+ MAX_TRINO_LOG_FILES_SIZE , METRICS_PORT , METRICS_PORT_NAME , NODE_PROPERTIES ,
84
+ RW_CONFIG_DIR_NAME , STACKABLE_CLIENT_TLS_DIR , STACKABLE_INTERNAL_TLS_DIR ,
85
+ STACKABLE_MOUNT_INTERNAL_TLS_DIR , STACKABLE_MOUNT_SERVER_TLS_DIR , STACKABLE_SERVER_TLS_DIR ,
86
+ TrinoRole ,
86
87
authentication:: resolve_authentication_classes,
87
88
catalog,
88
89
discovery:: { TrinoDiscovery , TrinoDiscoveryProtocol , TrinoPodRef } ,
90
+ fault_tolerant_execution:: ResolvedFaultTolerantExecutionConfig ,
89
91
rolegroup_headless_service_name, v1alpha1,
90
92
} ,
91
93
listener:: {
@@ -298,6 +300,11 @@ pub enum Error {
298
300
source : crate :: operations:: graceful_shutdown:: Error ,
299
301
} ,
300
302
303
+ #[ snafu( display( "failed to configure fault tolerant execution" ) ) ]
304
+ FaultTolerantExecution {
305
+ source : crate :: crd:: fault_tolerant_execution:: Error ,
306
+ } ,
307
+
301
308
#[ snafu( display( "failed to get required Labels" ) ) ]
302
309
GetRequiredLabels {
303
310
source :
@@ -424,6 +431,20 @@ pub async fn reconcile_trino(
424
431
catalogs. push ( catalog_config) ;
425
432
}
426
433
434
+ // Resolve fault tolerant execution configuration with S3 connections if needed
435
+ let resolved_fte_config = match trino. spec . cluster_config . fault_tolerant_execution . as_ref ( ) {
436
+ Some ( fte_config) => Some (
437
+ ResolvedFaultTolerantExecutionConfig :: from_config (
438
+ fte_config,
439
+ Some ( client) ,
440
+ & trino. namespace_r ( ) . context ( ReadRoleSnafu ) ?,
441
+ )
442
+ . await
443
+ . context ( FaultTolerantExecutionSnafu ) ?,
444
+ ) ,
445
+ None => None ,
446
+ } ;
447
+
427
448
let validated_config = validated_product_config (
428
449
trino,
429
450
// The Trino version is a single number like 396.
@@ -526,6 +547,7 @@ pub async fn reconcile_trino(
526
547
& trino_authentication_config,
527
548
& trino_opa_config,
528
549
& client. kubernetes_cluster_info ,
550
+ & resolved_fte_config,
529
551
) ?;
530
552
let rg_catalog_configmap = build_rolegroup_catalog_config_map (
531
553
trino,
@@ -543,6 +565,7 @@ pub async fn reconcile_trino(
543
565
& trino_authentication_config,
544
566
& catalogs,
545
567
& rbac_sa. name_any ( ) ,
568
+ & resolved_fte_config,
546
569
) ?;
547
570
548
571
cluster_resources
@@ -651,6 +674,7 @@ fn build_rolegroup_config_map(
651
674
trino_authentication_config : & TrinoAuthenticationConfig ,
652
675
trino_opa_config : & Option < TrinoOpaConfig > ,
653
676
cluster_info : & KubernetesClusterInfo ,
677
+ resolved_fte_config : & Option < ResolvedFaultTolerantExecutionConfig > ,
654
678
) -> Result < ConfigMap > {
655
679
let mut cm_conf_data = BTreeMap :: new ( ) ;
656
680
@@ -712,6 +736,16 @@ fn build_rolegroup_config_map(
712
736
dynamic_resolved_config
713
737
. extend ( graceful_shutdown_config_properties ( trino, trino_role) ) ;
714
738
739
+ // Add fault tolerant execution properties from resolved configuration
740
+ if let Some ( resolved_fte) = resolved_fte_config {
741
+ dynamic_resolved_config. extend (
742
+ resolved_fte
743
+ . config_properties
744
+ . iter ( )
745
+ . map ( |( k, v) | ( k. clone ( ) , Some ( v. clone ( ) ) ) ) ,
746
+ ) ;
747
+ }
748
+
715
749
// Add static properties and overrides
716
750
dynamic_resolved_config. extend ( transformed_config) ;
717
751
@@ -776,6 +810,22 @@ fn build_rolegroup_config_map(
776
810
777
811
cm_conf_data. insert ( JVM_CONFIG . to_string ( ) , jvm_config. to_string ( ) ) ;
778
812
813
+ // Add exchange manager properties from resolved fault tolerant execution configuration
814
+ if let Some ( resolved_fte) = resolved_fte_config {
815
+ if !resolved_fte. exchange_manager_properties . is_empty ( ) {
816
+ let exchange_props_with_options: BTreeMap < String , Option < String > > = resolved_fte
817
+ . exchange_manager_properties
818
+ . iter ( )
819
+ . map ( |( k, v) | ( k. clone ( ) , Some ( v. clone ( ) ) ) )
820
+ . collect ( ) ;
821
+ cm_conf_data. insert (
822
+ EXCHANGE_MANAGER_PROPERTIES . to_string ( ) ,
823
+ to_java_properties_string ( exchange_props_with_options. iter ( ) )
824
+ . with_context ( |_| FailedToWriteJavaPropertiesSnafu ) ?,
825
+ ) ;
826
+ }
827
+ }
828
+
779
829
let jvm_sec_props: BTreeMap < String , Option < String > > = config
780
830
. get ( & PropertyNameKind :: File ( JVM_SECURITY_PROPERTIES . to_string ( ) ) )
781
831
. cloned ( )
@@ -884,6 +934,7 @@ fn build_rolegroup_statefulset(
884
934
trino_authentication_config : & TrinoAuthenticationConfig ,
885
935
catalogs : & [ CatalogConfig ] ,
886
936
sa_name : & str ,
937
+ resolved_fte_config : & Option < ResolvedFaultTolerantExecutionConfig > ,
887
938
) -> Result < StatefulSet > {
888
939
let role = trino
889
940
. role ( trino_role)
@@ -974,6 +1025,7 @@ fn build_rolegroup_statefulset(
974
1025
& mut cb_trino,
975
1026
catalogs,
976
1027
& requested_secret_lifetime,
1028
+ resolved_fte_config,
977
1029
) ?;
978
1030
979
1031
let mut prepare_args = vec ! [ ] ;
@@ -992,6 +1044,7 @@ fn build_rolegroup_statefulset(
992
1044
trino,
993
1045
catalogs,
994
1046
merged_config,
1047
+ resolved_fte_config,
995
1048
) ) ;
996
1049
997
1050
prepare_args
@@ -1056,7 +1109,12 @@ fn build_rolegroup_statefulset(
1056
1109
"-c" . to_string( ) ,
1057
1110
] )
1058
1111
. args ( vec ! [
1059
- command:: container_trino_args( trino_authentication_config, catalogs) . join( "\n " ) ,
1112
+ command:: container_trino_args(
1113
+ trino_authentication_config,
1114
+ catalogs,
1115
+ resolved_fte_config,
1116
+ )
1117
+ . join( "\n " ) ,
1060
1118
] )
1061
1119
. add_env_vars ( env)
1062
1120
. add_volume_mount ( "config" , CONFIG_DIR_NAME )
@@ -1532,6 +1590,7 @@ fn tls_volume_mounts(
1532
1590
cb_trino : & mut ContainerBuilder ,
1533
1591
catalogs : & [ CatalogConfig ] ,
1534
1592
requested_secret_lifetime : & Duration ,
1593
+ resolved_fte_config : & Option < ResolvedFaultTolerantExecutionConfig > ,
1535
1594
) -> Result < ( ) > {
1536
1595
if let Some ( server_tls) = trino. get_server_tls ( ) {
1537
1596
cb_prepare
@@ -1611,6 +1670,19 @@ fn tls_volume_mounts(
1611
1670
. context ( AddVolumeSnafu ) ?;
1612
1671
}
1613
1672
1673
+ // fault tolerant execution S3 credentials and other resources
1674
+ if let Some ( resolved_fte) = resolved_fte_config {
1675
+ cb_prepare
1676
+ . add_volume_mounts ( resolved_fte. volume_mounts . clone ( ) )
1677
+ . context ( AddVolumeMountSnafu ) ?;
1678
+ cb_trino
1679
+ . add_volume_mounts ( resolved_fte. volume_mounts . clone ( ) )
1680
+ . context ( AddVolumeMountSnafu ) ?;
1681
+ pod_builder
1682
+ . add_volumes ( resolved_fte. volumes . clone ( ) )
1683
+ . context ( AddVolumeSnafu ) ?;
1684
+ }
1685
+
1614
1686
Ok ( ( ) )
1615
1687
}
1616
1688
@@ -1780,6 +1852,7 @@ mod tests {
1780
1852
& trino_authentication_config,
1781
1853
& trino_opa_config,
1782
1854
& cluster_info,
1855
+ & None ,
1783
1856
)
1784
1857
. unwrap ( )
1785
1858
}
0 commit comments