Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- Add Listener integration for Trino ([#753]).
- Add support for Trino 476 ([#755]).
- Add internal headless service in addition to the metrics service ([#766]).

### Changed

Expand Down Expand Up @@ -56,6 +57,7 @@ All notable changes to this project will be documented in this file.
[#753]: https://github.com/stackabletech/trino-operator/pull/753
[#755]: https://github.com/stackabletech/trino-operator/pull/755
[#760]: https://github.com/stackabletech/trino-operator/pull/760
[#766]: https://github.com/stackabletech/trino-operator/pull/766

## [25.3.0] - 2025-03-21

Expand Down
138 changes: 67 additions & 71 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use stackable_operator::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, ConfigMapVolumeSource, ContainerPort, EnvVar, EnvVarSource, ExecAction,
HTTPGetAction, Probe, Secret, SecretKeySelector, Service, ServicePort, ServiceSpec,
Volume,
HTTPGetAction, Probe, Secret, SecretKeySelector, Volume,
},
},
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
Expand All @@ -47,7 +46,7 @@ use stackable_operator::{
core::{DeserializeGuard, error_boundary},
runtime::{controller::Action, reflector::ObjectRef},
},
kvp::{Annotation, Label, Labels, ObjectLabels},
kvp::{Annotation, Labels, ObjectLabels},
logging::controller::ReconcilerError,
memory::{BinaryMultiple, MemoryQuantity},
product_config_utils::{
Expand Down Expand Up @@ -88,16 +87,17 @@ use crate::{
authentication::resolve_authentication_classes,
catalog,
discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef},
rolegroup_metrics_service_name, v1alpha1,
rolegroup_headless_service_name, v1alpha1,
},
listener::{
LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc,
group_listener_name,
group_listener_name, secret_volume_listener_scope,
},
operations::{
add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb::add_pdbs,
},
product_logging::{get_log_properties, get_vector_toml},
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
};

pub struct Ctx {
Expand Down Expand Up @@ -357,6 +357,9 @@ pub enum Error {

#[snafu(display("failed to configure listener"))]
ListenerConfiguration { source: crate::listener::Error },

#[snafu(display("failed to configure service"))]
ServiceConfiguration { source: crate::service::Error },
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -482,8 +485,37 @@ pub async fn reconcile_trino(
.merged_config(&trino_role, &role_group_ref, &catalog_definitions)
.context(FailedToResolveConfigSnafu)?;

let rg_service =
build_rolegroup_service(trino, &resolved_product_image, &role_group_ref)?;
let role_group_service_recommended_labels = build_recommended_labels(
trino,
&resolved_product_image.app_version_label,
&role_group_ref.role,
&role_group_ref.role_group,
);

let role_group_service_selector = Labels::role_group_selector(
trino,
APP_NAME,
&role_group_ref.role,
&role_group_ref.role_group,
)
.context(LabelBuildSnafu)?;

let rg_headless_service = build_rolegroup_headless_service(
trino,
&role_group_ref,
role_group_service_recommended_labels.clone(),
role_group_service_selector.clone().into(),
)
.context(ServiceConfigurationSnafu)?;

let rg_metrics_service = build_rolegroup_metrics_service(
trino,
&role_group_ref,
role_group_service_recommended_labels,
role_group_service_selector.into(),
)
.context(ServiceConfigurationSnafu)?;

let rg_configmap = build_rolegroup_config_map(
trino,
&resolved_product_image,
Expand Down Expand Up @@ -515,7 +547,14 @@ pub async fn reconcile_trino(
)?;

cluster_resources
.add(client, rg_service)
.add(client, rg_headless_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: role_group_ref.clone(),
})?;

cluster_resources
.add(client, rg_metrics_service)
.await
.with_context(|_| ApplyRoleGroupServiceSnafu {
rolegroup: role_group_ref.clone(),
Expand Down Expand Up @@ -834,7 +873,7 @@ fn build_rolegroup_catalog_config_map(
/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
/// corresponding [`Service`] (from [`build_rolegroup_service`]).
/// corresponding [`stackable_operator::k8s_openapi::api::core::v1::Service`] (from [`build_rolegroup_headless_service`]).
#[allow(clippy::too_many_arguments)]
fn build_rolegroup_statefulset(
trino: &v1alpha1::TrinoCluster,
Expand Down Expand Up @@ -930,6 +969,7 @@ fn build_rolegroup_statefulset(
// add volume mounts depending on the client tls, internal tls, catalogs and authentication
tls_volume_mounts(
trino,
trino_role,
&mut pod_builder,
&mut cb_prepare,
&mut cb_trino,
Expand Down Expand Up @@ -1193,7 +1233,7 @@ fn build_rolegroup_statefulset(
),
..LabelSelector::default()
},
service_name: Some(rolegroup_metrics_service_name(
service_name: Some(rolegroup_headless_service_name(
&role_group_ref.object_name(),
)),
template: pod_template,
Expand All @@ -1204,53 +1244,6 @@ fn build_rolegroup_statefulset(
})
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_rolegroup_service(
trino: &v1alpha1::TrinoCluster,
resolved_product_image: &ResolvedProductImage,
role_group_ref: &RoleGroupRef<v1alpha1::TrinoCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(trino)
.name(rolegroup_metrics_service_name(
&role_group_ref.object_name(),
))
.ownerreference_from_resource(trino, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
trino,
&resolved_product_image.app_version_label,
&role_group_ref.role,
&role_group_ref.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(
trino,
APP_NAME,
&role_group_ref.role,
&role_group_ref.role_group,
)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
})
}

pub fn error_policy(
_obj: Arc<DeserializeGuard<v1alpha1::TrinoCluster>>,
error: &Error,
Expand Down Expand Up @@ -1409,15 +1402,6 @@ fn get_random_base64() -> String {
openssl::base64::encode_block(&buf)
}

fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn container_ports(trino: &v1alpha1::TrinoCluster) -> Vec<ContainerPort> {
let mut ports = vec![ContainerPort {
name: Some(METRICS_PORT_NAME.to_string()),
Expand Down Expand Up @@ -1530,14 +1514,22 @@ fn create_tls_volume(
volume_name: &str,
tls_secret_class: &str,
requested_secret_lifetime: &Duration,
listener_scope: Option<String>,
) -> Result<Volume> {
let mut secret_volume_source_builder = SecretOperatorVolumeSourceBuilder::new(tls_secret_class);

secret_volume_source_builder
.with_pod_scope()
.with_format(SecretFormat::TlsPkcs12)
.with_auto_tls_cert_lifetime(*requested_secret_lifetime);

if let Some(listener_scope) = &listener_scope {
secret_volume_source_builder.with_listener_volume_scope(listener_scope);
}

Ok(VolumeBuilder::new(volume_name)
.ephemeral(
SecretOperatorVolumeSourceBuilder::new(tls_secret_class)
.with_pod_scope()
.with_node_scope()
.with_format(SecretFormat::TlsPkcs12)
.with_auto_tls_cert_lifetime(*requested_secret_lifetime)
secret_volume_source_builder
.build()
.context(TlsCertSecretClassVolumeBuildSnafu)?,
)
Expand All @@ -1546,6 +1538,7 @@ fn create_tls_volume(

fn tls_volume_mounts(
trino: &v1alpha1::TrinoCluster,
trino_role: &TrinoRole,
pod_builder: &mut PodBuilder,
cb_prepare: &mut ContainerBuilder,
cb_trino: &mut ContainerBuilder,
Expand All @@ -1564,6 +1557,8 @@ fn tls_volume_mounts(
"server-tls-mount",
server_tls,
requested_secret_lifetime,
// add listener
secret_volume_listener_scope(trino_role),
)?)
.context(AddVolumeSnafu)?;
}
Expand Down Expand Up @@ -1600,6 +1595,7 @@ fn tls_volume_mounts(
"internal-tls-mount",
internal_tls,
requested_secret_lifetime,
None,
)?)
.context(AddVolumeSnafu)?;

Expand Down
8 changes: 7 additions & 1 deletion rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ pub const MAX_TRINO_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity {
};

pub const METRICS_SERVICE_SUFFIX: &str = "metrics";
pub const HEADLESS_SERVICE_SUFFIX: &str = "headless";

pub const JVM_HEAP_FACTOR: f32 = 0.8;

Expand Down Expand Up @@ -837,7 +838,7 @@ impl v1alpha1::TrinoCluster {
let ns = ns.clone();
(0..rolegroup.replicas.unwrap_or(0)).map(move |i| TrinoPodRef {
namespace: ns.clone(),
role_group_service_name: rolegroup_metrics_service_name(
role_group_service_name: rolegroup_headless_service_name(
&role_group_ref.object_name(),
),
pod_name: format!(
Expand Down Expand Up @@ -945,6 +946,11 @@ pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> Strin
format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}")
}

/// Returns the headless rolegroup service name `<cluster>-<role>-<rolegroup>-<HEADLESS_SERVICE_SUFFIX>`.
pub fn rolegroup_headless_service_name(role_group_ref_object_name: &str) -> String {
format!("{role_group_ref_object_name}-{HEADLESS_SERVICE_SUFFIX}")
}

fn extract_role_from_coordinator_config(
fragment: Role<TrinoConfigFragment, TrinoCoordinatorRoleConfig, JavaCommonConfig>,
) -> Role<TrinoConfigFragment, GenericRoleConfig, JavaCommonConfig> {
Expand Down
8 changes: 8 additions & 0 deletions rust/operator-binary/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ pub fn group_listener_name(trino: &v1alpha1::TrinoCluster, role: &TrinoRole) ->
}
}

/// The listener volume name depending on the role
pub fn secret_volume_listener_scope(role: &TrinoRole) -> Option<String> {
match role {
TrinoRole::Coordinator => Some(LISTENER_VOLUME_NAME.to_string()),
TrinoRole::Worker => None,
}
}

/// We only use the http/https port here and intentionally omit the metrics one.
fn listener_ports(trino: &v1alpha1::TrinoCluster) -> Vec<ListenerPort> {
let name = trino.exposed_protocol().to_string();
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod crd;
mod listener;
mod operations;
mod product_logging;
mod service;

use std::sync::Arc;

Expand Down
Loading