@@ -36,8 +36,7 @@ use stackable_operator::{
36
36
apps:: v1:: { StatefulSet , StatefulSetSpec } ,
37
37
core:: v1:: {
38
38
ConfigMap , ConfigMapVolumeSource , ContainerPort , EnvVar , EnvVarSource , ExecAction ,
39
- HTTPGetAction , Probe , Secret , SecretKeySelector , Service , ServicePort , ServiceSpec ,
40
- Volume ,
39
+ HTTPGetAction , Probe , Secret , SecretKeySelector , Volume ,
41
40
} ,
42
41
} ,
43
42
apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
@@ -47,7 +46,7 @@ use stackable_operator::{
47
46
core:: { DeserializeGuard , error_boundary} ,
48
47
runtime:: { controller:: Action , reflector:: ObjectRef } ,
49
48
} ,
50
- kvp:: { Annotation , Label , Labels , ObjectLabels } ,
49
+ kvp:: { Annotation , Labels , ObjectLabels } ,
51
50
logging:: controller:: ReconcilerError ,
52
51
memory:: { BinaryMultiple , MemoryQuantity } ,
53
52
product_config_utils:: {
@@ -88,16 +87,17 @@ use crate::{
88
87
authentication:: resolve_authentication_classes,
89
88
catalog,
90
89
discovery:: { TrinoDiscovery , TrinoDiscoveryProtocol , TrinoPodRef } ,
91
- rolegroup_metrics_service_name , v1alpha1,
90
+ rolegroup_headless_service_name , v1alpha1,
92
91
} ,
93
92
listener:: {
94
93
LISTENER_VOLUME_DIR , LISTENER_VOLUME_NAME , build_group_listener, build_group_listener_pvc,
95
- group_listener_name,
94
+ group_listener_name, secret_volume_listener_scope ,
96
95
} ,
97
96
operations:: {
98
97
add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb:: add_pdbs,
99
98
} ,
100
99
product_logging:: { get_log_properties, get_vector_toml} ,
100
+ service:: { build_rolegroup_headless_service, build_rolegroup_metrics_service} ,
101
101
} ;
102
102
103
103
pub struct Ctx {
@@ -357,6 +357,9 @@ pub enum Error {
357
357
358
358
#[ snafu( display( "failed to configure listener" ) ) ]
359
359
ListenerConfiguration { source : crate :: listener:: Error } ,
360
+
361
+ #[ snafu( display( "failed to configure service" ) ) ]
362
+ ServiceConfiguration { source : crate :: service:: Error } ,
360
363
}
361
364
362
365
type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -482,8 +485,37 @@ pub async fn reconcile_trino(
482
485
. merged_config ( & trino_role, & role_group_ref, & catalog_definitions)
483
486
. context ( FailedToResolveConfigSnafu ) ?;
484
487
485
- let rg_service =
486
- build_rolegroup_service ( trino, & resolved_product_image, & role_group_ref) ?;
488
+ let role_group_service_recommended_labels = build_recommended_labels (
489
+ trino,
490
+ & resolved_product_image. app_version_label ,
491
+ & role_group_ref. role ,
492
+ & role_group_ref. role_group ,
493
+ ) ;
494
+
495
+ let role_group_service_selector = Labels :: role_group_selector (
496
+ trino,
497
+ APP_NAME ,
498
+ & role_group_ref. role ,
499
+ & role_group_ref. role_group ,
500
+ )
501
+ . context ( LabelBuildSnafu ) ?;
502
+
503
+ let rg_headless_service = build_rolegroup_headless_service (
504
+ trino,
505
+ & role_group_ref,
506
+ role_group_service_recommended_labels. clone ( ) ,
507
+ role_group_service_selector. clone ( ) . into ( ) ,
508
+ )
509
+ . context ( ServiceConfigurationSnafu ) ?;
510
+
511
+ let rg_metrics_service = build_rolegroup_metrics_service (
512
+ trino,
513
+ & role_group_ref,
514
+ role_group_service_recommended_labels,
515
+ role_group_service_selector. into ( ) ,
516
+ )
517
+ . context ( ServiceConfigurationSnafu ) ?;
518
+
487
519
let rg_configmap = build_rolegroup_config_map (
488
520
trino,
489
521
& resolved_product_image,
@@ -515,7 +547,14 @@ pub async fn reconcile_trino(
515
547
) ?;
516
548
517
549
cluster_resources
518
- . add ( client, rg_service)
550
+ . add ( client, rg_headless_service)
551
+ . await
552
+ . with_context ( |_| ApplyRoleGroupServiceSnafu {
553
+ rolegroup : role_group_ref. clone ( ) ,
554
+ } ) ?;
555
+
556
+ cluster_resources
557
+ . add ( client, rg_metrics_service)
519
558
. await
520
559
. with_context ( |_| ApplyRoleGroupServiceSnafu {
521
560
rolegroup : role_group_ref. clone ( ) ,
@@ -834,7 +873,7 @@ fn build_rolegroup_catalog_config_map(
834
873
/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
835
874
///
836
875
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
837
- /// corresponding [`Service`] (from [`build_rolegroup_service `]).
876
+ /// corresponding [`stackable_operator::k8s_openapi::api::core::v1:: Service`] (from [`build_rolegroup_headless_service `]).
838
877
#[ allow( clippy:: too_many_arguments) ]
839
878
fn build_rolegroup_statefulset (
840
879
trino : & v1alpha1:: TrinoCluster ,
@@ -930,6 +969,7 @@ fn build_rolegroup_statefulset(
930
969
// add volume mounts depending on the client tls, internal tls, catalogs and authentication
931
970
tls_volume_mounts (
932
971
trino,
972
+ trino_role,
933
973
& mut pod_builder,
934
974
& mut cb_prepare,
935
975
& mut cb_trino,
@@ -1193,7 +1233,7 @@ fn build_rolegroup_statefulset(
1193
1233
) ,
1194
1234
..LabelSelector :: default ( )
1195
1235
} ,
1196
- service_name : Some ( rolegroup_metrics_service_name (
1236
+ service_name : Some ( rolegroup_headless_service_name (
1197
1237
& role_group_ref. object_name ( ) ,
1198
1238
) ) ,
1199
1239
template : pod_template,
@@ -1204,53 +1244,6 @@ fn build_rolegroup_statefulset(
1204
1244
} )
1205
1245
}
1206
1246
1207
- /// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
1208
- ///
1209
- /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
1210
- fn build_rolegroup_service (
1211
- trino : & v1alpha1:: TrinoCluster ,
1212
- resolved_product_image : & ResolvedProductImage ,
1213
- role_group_ref : & RoleGroupRef < v1alpha1:: TrinoCluster > ,
1214
- ) -> Result < Service > {
1215
- Ok ( Service {
1216
- metadata : ObjectMetaBuilder :: new ( )
1217
- . name_and_namespace ( trino)
1218
- . name ( rolegroup_metrics_service_name (
1219
- & role_group_ref. object_name ( ) ,
1220
- ) )
1221
- . ownerreference_from_resource ( trino, None , Some ( true ) )
1222
- . context ( ObjectMissingMetadataForOwnerRefSnafu ) ?
1223
- . with_recommended_labels ( build_recommended_labels (
1224
- trino,
1225
- & resolved_product_image. app_version_label ,
1226
- & role_group_ref. role ,
1227
- & role_group_ref. role_group ,
1228
- ) )
1229
- . context ( MetadataBuildSnafu ) ?
1230
- . with_label ( Label :: try_from ( ( "prometheus.io/scrape" , "true" ) ) . context ( LabelBuildSnafu ) ?)
1231
- . build ( ) ,
1232
- spec : Some ( ServiceSpec {
1233
- // Internal communication does not need to be exposed
1234
- type_ : Some ( "ClusterIP" . to_string ( ) ) ,
1235
- cluster_ip : Some ( "None" . to_string ( ) ) ,
1236
- ports : Some ( service_ports ( ) ) ,
1237
- selector : Some (
1238
- Labels :: role_group_selector (
1239
- trino,
1240
- APP_NAME ,
1241
- & role_group_ref. role ,
1242
- & role_group_ref. role_group ,
1243
- )
1244
- . context ( LabelBuildSnafu ) ?
1245
- . into ( ) ,
1246
- ) ,
1247
- publish_not_ready_addresses : Some ( true ) ,
1248
- ..ServiceSpec :: default ( )
1249
- } ) ,
1250
- status : None ,
1251
- } )
1252
- }
1253
-
1254
1247
pub fn error_policy (
1255
1248
_obj : Arc < DeserializeGuard < v1alpha1:: TrinoCluster > > ,
1256
1249
error : & Error ,
@@ -1409,15 +1402,6 @@ fn get_random_base64() -> String {
1409
1402
openssl:: base64:: encode_block ( & buf)
1410
1403
}
1411
1404
1412
- fn service_ports ( ) -> Vec < ServicePort > {
1413
- vec ! [ ServicePort {
1414
- name: Some ( METRICS_PORT_NAME . to_string( ) ) ,
1415
- port: METRICS_PORT . into( ) ,
1416
- protocol: Some ( "TCP" . to_string( ) ) ,
1417
- ..ServicePort :: default ( )
1418
- } ]
1419
- }
1420
-
1421
1405
fn container_ports ( trino : & v1alpha1:: TrinoCluster ) -> Vec < ContainerPort > {
1422
1406
let mut ports = vec ! [ ContainerPort {
1423
1407
name: Some ( METRICS_PORT_NAME . to_string( ) ) ,
@@ -1530,14 +1514,22 @@ fn create_tls_volume(
1530
1514
volume_name : & str ,
1531
1515
tls_secret_class : & str ,
1532
1516
requested_secret_lifetime : & Duration ,
1517
+ listener_scope : Option < String > ,
1533
1518
) -> Result < Volume > {
1519
+ let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder :: new ( tls_secret_class) ;
1520
+
1521
+ secret_volume_source_builder
1522
+ . with_pod_scope ( )
1523
+ . with_format ( SecretFormat :: TlsPkcs12 )
1524
+ . with_auto_tls_cert_lifetime ( * requested_secret_lifetime) ;
1525
+
1526
+ if let Some ( listener_scope) = & listener_scope {
1527
+ secret_volume_source_builder. with_listener_volume_scope ( listener_scope) ;
1528
+ }
1529
+
1534
1530
Ok ( VolumeBuilder :: new ( volume_name)
1535
1531
. ephemeral (
1536
- SecretOperatorVolumeSourceBuilder :: new ( tls_secret_class)
1537
- . with_pod_scope ( )
1538
- . with_node_scope ( )
1539
- . with_format ( SecretFormat :: TlsPkcs12 )
1540
- . with_auto_tls_cert_lifetime ( * requested_secret_lifetime)
1532
+ secret_volume_source_builder
1541
1533
. build ( )
1542
1534
. context ( TlsCertSecretClassVolumeBuildSnafu ) ?,
1543
1535
)
@@ -1546,6 +1538,7 @@ fn create_tls_volume(
1546
1538
1547
1539
fn tls_volume_mounts (
1548
1540
trino : & v1alpha1:: TrinoCluster ,
1541
+ trino_role : & TrinoRole ,
1549
1542
pod_builder : & mut PodBuilder ,
1550
1543
cb_prepare : & mut ContainerBuilder ,
1551
1544
cb_trino : & mut ContainerBuilder ,
@@ -1564,6 +1557,8 @@ fn tls_volume_mounts(
1564
1557
"server-tls-mount" ,
1565
1558
server_tls,
1566
1559
requested_secret_lifetime,
1560
+ // add listener
1561
+ secret_volume_listener_scope ( trino_role) ,
1567
1562
) ?)
1568
1563
. context ( AddVolumeSnafu ) ?;
1569
1564
}
@@ -1600,6 +1595,7 @@ fn tls_volume_mounts(
1600
1595
"internal-tls-mount" ,
1601
1596
internal_tls,
1602
1597
requested_secret_lifetime,
1598
+ None ,
1603
1599
) ?)
1604
1600
. context ( AddVolumeSnafu ) ?;
1605
1601
0 commit comments