Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- BREAKING: Add Listener support for Hive ([#605]).
- Add internal headless service in addition to the metrics service ([#613]).

### Changed

Expand Down Expand Up @@ -43,6 +44,7 @@ All notable changes to this project will be documented in this file.
[#603]: https://github.com/stackabletech/hive-operator/pull/603
[#604]: https://github.com/stackabletech/hive-operator/pull/604
[#605]: https://github.com/stackabletech/hive-operator/pull/605
[#613]: https://github.com/stackabletech/hive-operator/pull/613

## [25.3.0] - 2025-03-21

Expand Down
91 changes: 30 additions & 61 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use stackable_operator::{
api::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, Probe, Service,
ServicePort, ServiceSpec, TCPSocketAction, Volume,
ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, Probe, TCPSocketAction,
Volume,
},
},
apimachinery::pkg::{
Expand All @@ -56,7 +56,7 @@ use stackable_operator::{
core::{DeserializeGuard, error_boundary},
runtime::controller::Action,
},
kvp::{Label, Labels, ObjectLabels},
kvp::{Labels, ObjectLabels},
logging::controller::ReconcilerError,
memory::{BinaryMultiple, MemoryQuantity},
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
Expand Down Expand Up @@ -94,14 +94,18 @@ use crate::{
STACKABLE_LOG_DIR_NAME,
v1alpha1::{self, HiveMetastoreRoleConfig},
},
discovery::{self, build_headless_role_group_metrics_service_name},
discovery::{self},
kerberos::{
self, add_kerberos_pod_config, kerberos_config_properties,
kerberos_container_start_commands,
},
listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_role_listener},
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
product_logging::extend_role_group_config_map,
service::{
build_rolegroup_headless_service, build_rolegroup_metrics_service,
rolegroup_headless_service_name,
},
};

pub const HIVE_CONTROLLER_NAME: &str = "hivecluster";
Expand Down Expand Up @@ -345,6 +349,9 @@ pub enum Error {
BuildListenerVolume {
source: ListenerOperatorVolumeSourceBuilderError,
},

#[snafu(display("faild to configure service"))]
ServiceConfiguration { source: crate::service::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -456,7 +463,14 @@ pub async fn reconcile_hive(
.merged_config(&HiveRole::MetaStore, &rolegroup)
.context(FailedToResolveResourceConfigSnafu)?;

let rg_service = build_rolegroup_service(hive, &resolved_product_image, &rolegroup)?;
let rg_metrics_service =
build_rolegroup_metrics_service(hive, &resolved_product_image, &rolegroup)
.context(ServiceConfigurationSnafu)?;

let rg_headless_service =
build_rolegroup_headless_service(hive, &resolved_product_image, &rolegroup)
.context(ServiceConfigurationSnafu)?;

let rg_configmap = build_metastore_rolegroup_config_map(
hive,
&hive_namespace,
Expand All @@ -479,7 +493,14 @@ pub async fn reconcile_hive(
)?;

cluster_resources
.add(client, rg_service)
.add(client, rg_metrics_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
})?;

cluster_resources
.add(client, rg_headless_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
Expand Down Expand Up @@ -713,52 +734,10 @@ fn build_metastore_rolegroup_config_map(
})
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_rolegroup_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
.name(build_headless_role_group_metrics_service_name(
rolegroup.object_name(),
))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
})
}

/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
/// corresponding [`Service`] (from [`build_rolegroup_service`]).
/// corresponding [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (via [`build_rolegroup_headless_service`] and metrics from [`build_rolegroup_metrics_service`]).
#[allow(clippy::too_many_arguments)]
fn build_metastore_rolegroup_statefulset(
hive: &v1alpha1::HiveCluster,
Expand Down Expand Up @@ -1100,9 +1079,8 @@ fn build_metastore_rolegroup_statefulset(
),
..LabelSelector::default()
},
service_name: Some(build_headless_role_group_metrics_service_name(
rolegroup_ref.object_name(),
)),
// TODO: Use method on RoleGroupRef once op-rs is released
service_name: Some(rolegroup_headless_service_name(rolegroup_ref)),
template: pod_template,
volume_claim_templates: Some(vec![pvc]),
..StatefulSetSpec::default()
Expand All @@ -1123,15 +1101,6 @@ pub fn error_policy(
}
}

pub fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

/// Creates recommended `ObjectLabels` to be used in deployed resources
pub fn build_recommended_labels<'a, T>(
owner: &'a T,
Expand Down
4 changes: 0 additions & 4 deletions rust/operator-binary/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,3 @@ fn build_discovery_configmap(
obj_ref: ObjectRef::from_obj(hive),
})
}

pub fn build_headless_role_group_metrics_service_name(name: String) -> String {
format!("{name}-metrics")
}
2 changes: 1 addition & 1 deletion rust/operator-binary/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn build_role_listener(
Ok(listener)
}

fn listener_ports() -> Vec<ListenerPort> {
pub fn listener_ports() -> Vec<ListenerPort> {
vec![ListenerPort {
name: HIVE_PORT_NAME.to_owned(),
port: HIVE_PORT.into(),
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod kerberos;
mod listener;
mod operations;
mod product_logging;
mod service;

use std::sync::Arc;

Expand Down
141 changes: 141 additions & 0 deletions rust/operator-binary/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::meta::ObjectMetaBuilder,
commons::product_image_selection::ResolvedProductImage,
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
kvp::{Label, Labels},
role_utils::RoleGroupRef,
};

use crate::{
controller::build_recommended_labels,
crd::{APP_NAME, HIVE_PORT, HIVE_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1},
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Metadata"))]
MetadataBuild {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Labels"))]
LabelBuild {
source: stackable_operator::kvp::LabelError,
},
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
pub fn build_rolegroup_headless_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service, Error> {
let headless_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
// TODO: Use method on RoleGroupRef once op-rs is released
.name(rolegroup_headless_service_name(rolegroup))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
// Expecting same ports as on listener service, just as a headless, internal service
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};
Ok(headless_service)
}

/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label
pub fn build_rolegroup_metrics_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service, Error> {
let metrics_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
// TODO: Use method on RoleGroupRef once op-rs is released
.name(rolegroup_metrics_service_name(rolegroup))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(metrics_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};
Ok(metrics_service)
}

/// Headless service for cluster internal purposes only.
// TODO: Move to operator-rs
pub fn rolegroup_headless_service_name(rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>) -> String {
format!("{name}-headless", name = rolegroup.object_name())
}

/// Headless metrics service exposes Prometheus endpoint only
// TODO: Move to operator-rs
pub fn rolegroup_metrics_service_name(rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>) -> String {
format!("{name}-metrics", name = rolegroup.object_name())
}

fn metrics_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(HIVE_PORT_NAME.to_string()),
port: HIVE_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}
7 changes: 7 additions & 0 deletions tests/templates/kuttl/external-access/20-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ metadata:
name: test-hive-metastore-default-metrics
spec:
type: ClusterIP # exposed metrics
---
apiVersion: v1
kind: Service
metadata:
name: test-hive-metastore-default-headless
spec:
type: ClusterIP # exposed metrics
1 change: 1 addition & 0 deletions tests/templates/kuttl/smoke/80-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ apiVersion: kuttl.dev/v1beta1
kind: TestAssert
commands:
- script: kubectl exec -n "$NAMESPACE" test-metastore-0 -- python /tmp/test_metastore.py -m hive-metastore.$NAMESPACE.svc.cluster.local
- script: kubectl exec -n "$NAMESPACE" test-metastore-0 -- python /tmp/test_metastore.py -m hive-metastore-default-headless.$NAMESPACE.svc.cluster.local