Skip to content
140 changes: 70 additions & 70 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use stackable_operator::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, ConfigMapVolumeSource, ContainerPort, EnvVar, EnvVarSource, ExecAction,
HTTPGetAction, Probe, Secret, SecretKeySelector, Service, ServicePort, ServiceSpec,
Volume,
HTTPGetAction, Probe, Secret, SecretKeySelector, Volume,
},
},
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
Expand All @@ -47,7 +46,7 @@ use stackable_operator::{
core::{DeserializeGuard, error_boundary},
runtime::{controller::Action, reflector::ObjectRef},
},
kvp::{Annotation, Label, Labels, ObjectLabels},
kvp::{Annotation, Labels, ObjectLabels},
logging::controller::ReconcilerError,
memory::{BinaryMultiple, MemoryQuantity},
product_config_utils::{
Expand Down Expand Up @@ -88,7 +87,7 @@ use crate::{
authentication::resolve_authentication_classes,
catalog,
discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef},
rolegroup_metrics_service_name, v1alpha1,
rolegroup_headless_service_name, v1alpha1,
},
listener::{
LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc,
Expand All @@ -98,6 +97,7 @@ use crate::{
add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb::add_pdbs,
},
product_logging::{get_log_properties, get_vector_toml},
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
};

pub struct Ctx {
Expand Down Expand Up @@ -357,6 +357,9 @@ pub enum Error {

#[snafu(display("failed to configure listener"))]
ListenerConfiguration { source: crate::listener::Error },

#[snafu(display("failed to configure service"))]
ServiceConfiguration { source: crate::service::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -482,8 +485,36 @@ pub async fn reconcile_trino(
.merged_config(&trino_role, &role_group_ref, &catalog_definitions)
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(trino, &resolved_product_image, &role_group_ref)?;
let role_group_service_recommended_labels = build_recommended_labels(
trino,
&resolved_product_image.app_version_label,
&role_group_ref.role,
&role_group_ref.role_group,
);

let role_group_service_selector = Labels::role_group_selector(
trino,
APP_NAME,
&role_group_ref.role,
&role_group_ref.role_group,
)
.context(LabelBuildSnafu)?;

let rg_headless_service = build_rolegroup_headless_service(
trino,
&role_group_ref,
role_group_service_recommended_labels.clone(),
role_group_service_selector.clone().into(),
)
.context(ServiceConfigurationSnafu)?;
let rg_metrics_service = build_rolegroup_metrics_service(
trino,
&role_group_ref,
role_group_service_recommended_labels,
role_group_service_selector.into(),
)
.context(ServiceConfigurationSnafu)?;

let rg_configmap = build_rolegroup_config_map(
trino,
&resolved_product_image,
Expand Down Expand Up @@ -515,7 +546,13 @@ pub async fn reconcile_trino(
)?;

cluster_resources
.add(client, rg_service)
.add(client, rg_headless_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: role_group_ref.clone(),
})?;
cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: role_group_ref.clone(),
Expand Down Expand Up @@ -834,7 +871,7 @@ fn build_rolegroup_catalog_config_map(
/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
/// corresponding [`Service`] (from [`build_rolegroup_service`]).
/// corresponding [`Service`] (from [`build_rolegroup_headless_service`]).
#[allow(clippy::too_many_arguments)]
fn build_rolegroup_statefulset(
trino: &v1alpha1::TrinoCluster,
Expand Down Expand Up @@ -927,6 +964,9 @@ fn build_rolegroup_statefulset(
.requested_secret_lifetime
.context(MissingSecretLifetimeSnafu)?;

let extra_service_scopes = group_listener_name(trino, trino_role)
.map(|listener_service_name| vec![listener_service_name]);

// add volume mounts depending on the client tls, internal tls, catalogs and authentication
tls_volume_mounts(
trino,
Expand All @@ -935,6 +975,7 @@ fn build_rolegroup_statefulset(
&mut cb_trino,
catalogs,
&requested_secret_lifetime,
extra_service_scopes,
)?;

let mut prepare_args = vec![];
Expand Down Expand Up @@ -1193,7 +1234,7 @@ fn build_rolegroup_statefulset(
),
..LabelSelector::default()
},
service_name: Some(rolegroup_metrics_service_name(
service_name: Some(rolegroup_headless_service_name(
&role_group_ref.object_name(),
)),
template: pod_template,
Expand All @@ -1204,53 +1245,6 @@ fn build_rolegroup_statefulset(
})
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_rolegroup_service(
trino: &v1alpha1::TrinoCluster,
resolved_product_image: &ResolvedProductImage,
role_group_ref: &RoleGroupRef<v1alpha1::TrinoCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(trino)
.name(rolegroup_metrics_service_name(
&role_group_ref.object_name(),
))
.ownerreference_from_resource(trino, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
trino,
&resolved_product_image.app_version_label,
&role_group_ref.role,
&role_group_ref.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(
trino,
APP_NAME,
&role_group_ref.role,
&role_group_ref.role_group,
)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
})
}

pub fn error_policy(
_obj: Arc<DeserializeGuard<v1alpha1::TrinoCluster>>,
error: &Error,
Expand Down Expand Up @@ -1409,15 +1403,6 @@ fn get_random_base64() -> String {
openssl::base64::encode_block(&buf)
}

fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn container_ports(trino: &v1alpha1::TrinoCluster) -> Vec<ContainerPort> {
let mut ports = vec![ContainerPort {
name: Some(METRICS_PORT_NAME.to_string()),
Expand Down Expand Up @@ -1530,14 +1515,25 @@ fn create_tls_volume(
volume_name: &str,
tls_secret_class: &str,
requested_secret_lifetime: &Duration,
service_scopes: Option<Vec<String>>,
) -> Result<Volume> {
let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder::new(tls_secret_class);

secret_volume_source_builder
.with_pod_scope()
.with_node_scope()
.with_format(SecretFormat::TlsPkcs12)
.with_auto_tls_cert_lifetime(*requested_secret_lifetime);

if let Some(scopes) = &service_scopes {
for scope in scopes {
secret_volume_source_builder.with_service_scope(scope);
}
}

Ok(VolumeBuilder::new(volume_name)
.ephemeral(
SecretOperatorVolumeSourceBuilder::new(tls_secret_class)
.with_pod_scope()
.with_node_scope()
.with_format(SecretFormat::TlsPkcs12)
.with_auto_tls_cert_lifetime(*requested_secret_lifetime)
secret_volume_source_builder
.build()
.context(TlsCertSecretClassVolumeBuildSnafu)?,
)
Expand All @@ -1551,6 +1547,7 @@ fn tls_volume_mounts(
cb_trino: &mut ContainerBuilder,
catalogs: &[CatalogConfig],
requested_secret_lifetime: &Duration,
extra_service_scopes: Option<Vec<String>>,
) -> Result<()> {
if let Some(server_tls) = trino.get_server_tls() {
cb_prepare
Expand All @@ -1564,6 +1561,8 @@ fn tls_volume_mounts(
"server-tls-mount",
server_tls,
requested_secret_lifetime,
// add listener
extra_service_scopes,
)?)
.context(AddVolumeSnafu)?;
}
Expand Down Expand Up @@ -1600,6 +1599,7 @@ fn tls_volume_mounts(
"internal-tls-mount",
internal_tls,
requested_secret_lifetime,
None,
)?)
.context(AddVolumeSnafu)?;

Expand Down
6 changes: 6 additions & 0 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub const MAX_TRINO_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity {
};

pub const METRICS_SERVICE_SUFFIX: &str = "metrics";
pub const HEADLESS_SERVICE_SUFFIX: &str = "headless";

pub const JVM_HEAP_FACTOR: f32 = 0.8;

Expand Down Expand Up @@ -945,6 +946,11 @@ pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> Strin
format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}")
}

/// Returns the headless rolegroup service name `<cluster>-<role>-<rolegroup>-<HEADLESS_SERVICE_SUFFIX>`.
pub fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String {
format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}")
}

fn extract_role_from_coordinator_config(
fragment: Role<TrinoConfigFragment, TrinoCoordinatorRoleConfig, JavaCommonConfig>,
) -> Role<TrinoConfigFragment, GenericRoleConfig, JavaCommonConfig> {
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod crd;
mod listener;
mod operations;
mod product_logging;
mod service;

use std::sync::Arc;

Expand Down
Loading
Loading