Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- BREAKING: Add Listener support for Hive ([#605]).
- Add internal headless service in addition to the metrics service ([#613]).

### Changed

Expand Down Expand Up @@ -43,6 +44,7 @@ All notable changes to this project will be documented in this file.
[#603]: https://github.com/stackabletech/hive-operator/pull/603
[#604]: https://github.com/stackabletech/hive-operator/pull/604
[#605]: https://github.com/stackabletech/hive-operator/pull/605
[#613]: https://github.com/stackabletech/hive-operator/pull/613

## [25.3.0] - 2025-03-21

Expand Down
91 changes: 30 additions & 61 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use stackable_operator::{
api::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, Probe, Service,
ServicePort, ServiceSpec, TCPSocketAction, Volume,
ConfigMap, ConfigMapVolumeSource, EmptyDirVolumeSource, Probe, TCPSocketAction,
Volume,
},
},
apimachinery::pkg::{
Expand All @@ -56,7 +56,7 @@ use stackable_operator::{
core::{DeserializeGuard, error_boundary},
runtime::controller::Action,
},
kvp::{Label, Labels, ObjectLabels},
kvp::{Labels, ObjectLabels},
logging::controller::ReconcilerError,
memory::{BinaryMultiple, MemoryQuantity},
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
Expand Down Expand Up @@ -94,14 +94,18 @@ use crate::{
STACKABLE_LOG_DIR_NAME,
v1alpha1::{self, HiveMetastoreRoleConfig},
},
discovery::{self, build_headless_role_group_metrics_service_name},
discovery::{self},
kerberos::{
self, add_kerberos_pod_config, kerberos_config_properties,
kerberos_container_start_commands,
},
listener::{LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_role_listener},
operations::{graceful_shutdown::add_graceful_shutdown_config, pdb::add_pdbs},
product_logging::extend_role_group_config_map,
service::{
build_rolegroup_headless_service, build_rolegroup_metrics_service,
rolegroup_metrics_service_name,
},
};

pub const HIVE_CONTROLLER_NAME: &str = "hivecluster";
Expand Down Expand Up @@ -345,6 +349,9 @@ pub enum Error {
BuildListenerVolume {
source: ListenerOperatorVolumeSourceBuilderError,
},

#[snafu(display("faild to configure service"))]
ServiceConfiguration { source: crate::service::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -456,7 +463,14 @@ pub async fn reconcile_hive(
.merged_config(&HiveRole::MetaStore, &rolegroup)
.context(FailedToResolveResourceConfigSnafu)?;

let rg_service = build_rolegroup_service(hive, &resolved_product_image, &rolegroup)?;
let rg_metrics_service =
build_rolegroup_metrics_service(hive, &resolved_product_image, &rolegroup)
.context(ServiceConfigurationSnafu)?;

let rg_headless_service =
build_rolegroup_headless_service(hive, &resolved_product_image, &rolegroup)
.context(ServiceConfigurationSnafu)?;

let rg_configmap = build_metastore_rolegroup_config_map(
hive,
&hive_namespace,
Expand All @@ -479,7 +493,14 @@ pub async fn reconcile_hive(
)?;

cluster_resources
.add(client, rg_service)
.add(client, rg_metrics_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
})?;

cluster_resources
.add(client, rg_headless_service)
.await
.context(ApplyRoleGroupServiceSnafu {
rolegroup: rolegroup.clone(),
Expand Down Expand Up @@ -713,52 +734,10 @@ fn build_metastore_rolegroup_config_map(
})
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
fn build_rolegroup_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service> {
Ok(Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
.name(build_headless_role_group_metrics_service_name(
rolegroup.object_name(),
))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
})
}

/// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator.
///
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the
/// corresponding [`Service`] (from [`build_rolegroup_service`]).
/// corresponding [`Service`](`stackable_operator::k8s_openapi::api::core::v1::Service`) (from [`build_rolegroup_headless_service`] and [`build_rolegroup_metrics_service`]).
#[allow(clippy::too_many_arguments)]
fn build_metastore_rolegroup_statefulset(
hive: &v1alpha1::HiveCluster,
Expand Down Expand Up @@ -1100,9 +1079,8 @@ fn build_metastore_rolegroup_statefulset(
),
..LabelSelector::default()
},
service_name: Some(build_headless_role_group_metrics_service_name(
rolegroup_ref.object_name(),
)),
// TODO: Use method on RoleGroupRef once op-rs is released
service_name: Some(rolegroup_metrics_service_name(rolegroup_ref)),
template: pod_template,
volume_claim_templates: Some(vec![pvc]),
..StatefulSetSpec::default()
Expand All @@ -1123,15 +1101,6 @@ pub fn error_policy(
}
}

pub fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

/// Creates recommended `ObjectLabels` to be used in deployed resources
pub fn build_recommended_labels<'a, T>(
owner: &'a T,
Expand Down
4 changes: 0 additions & 4 deletions rust/operator-binary/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,3 @@ fn build_discovery_configmap(
obj_ref: ObjectRef::from_obj(hive),
})
}

pub fn build_headless_role_group_metrics_service_name(name: String) -> String {
format!("{name}-metrics")
}
2 changes: 1 addition & 1 deletion rust/operator-binary/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn build_role_listener(
Ok(listener)
}

fn listener_ports() -> Vec<ListenerPort> {
pub fn listener_ports() -> Vec<ListenerPort> {
vec![ListenerPort {
name: HIVE_PORT_NAME.to_owned(),
port: HIVE_PORT.into(),
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod kerberos;
mod listener;
mod operations;
mod product_logging;
mod service;

use std::sync::Arc;

Expand Down
141 changes: 141 additions & 0 deletions rust/operator-binary/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::meta::ObjectMetaBuilder,
commons::product_image_selection::ResolvedProductImage,
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
kvp::{Label, Labels},
role_utils::RoleGroupRef,
};

use crate::{
controller::build_recommended_labels,
crd::{APP_NAME, HIVE_PORT, HIVE_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1},
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Metadata"))]
MetadataBuild {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Labels"))]
LabelBuild {
source: stackable_operator::kvp::LabelError,
},
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
pub fn build_rolegroup_headless_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service, Error> {
let headless_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
// TODO: Use method on RoleGroupRef once op-rs is released
.name(rolegroup_headless_service_name(rolegroup))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
// Expecting same ports as on listener service, just as a headless, internal service
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};
Ok(headless_service)
}

/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label
pub fn build_rolegroup_metrics_service(
hive: &v1alpha1::HiveCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>,
) -> Result<Service, Error> {
let metrics_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(hive)
// TODO: Use method on RoleGroupRef once op-rs is released
.name(rolegroup_metrics_service_name(rolegroup))
.ownerreference_from_resource(hive, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
hive,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_string()),
cluster_ip: Some("None".to_string()),
ports: Some(metrics_ports()),
selector: Some(
Labels::role_group_selector(hive, APP_NAME, &rolegroup.role, &rolegroup.role_group)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};
Ok(metrics_service)
}

/// Headless service for cluster internal purposes only.
// TODO: Move to operator-rs
pub fn rolegroup_headless_service_name(rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>) -> String {
format!("{name}-headless", name = rolegroup.object_name())
}

/// Headless metrics service exposes Prometheus endpoint only
// TODO: Move to operator-rs
pub fn rolegroup_metrics_service_name(rolegroup: &RoleGroupRef<v1alpha1::HiveCluster>) -> String {
format!("{name}-metrics", name = rolegroup.object_name())
}

fn metrics_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(HIVE_PORT_NAME.to_string()),
port: HIVE_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}
7 changes: 7 additions & 0 deletions tests/templates/kuttl/external-access/20-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ metadata:
name: test-hive-metastore-default-metrics
spec:
type: ClusterIP # exposed metrics
---
apiVersion: v1
kind: Service
metadata:
name: test-hive-metastore-default-headless
spec:
type: ClusterIP # exposed metrics
2 changes: 1 addition & 1 deletion tests/templates/kuttl/smoke/80-assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
commands:
- script: kubectl exec -n "$NAMESPACE" test-metastore-0 -- python /tmp/test_metastore.py -m hive-metastore.$NAMESPACE.svc.cluster.local
- script: kubectl exec -n "$NAMESPACE" test-metastore-0 -- python /tmp/test_metastore.py -m hive-metastore-default-headless.$NAMESPACE.svc.cluster.local