Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- BREAKING: Added listener support for Superset ([#625]).
- Add internal headless service in addition to the metrics service and move listener logic to listener.rs ([#644])

### Changed

Expand Down Expand Up @@ -45,6 +46,7 @@
[#635]: https://github.com/stackabletech/superset-operator/pull/635
[#637]: https://github.com/stackabletech/superset-operator/pull/637
[#643]: https://github.com/stackabletech/superset-operator/pull/643
[#644]: https://github.com/stackabletech/superset-operator/pull/644

## [25.3.0] - 2025-03-21

Expand Down
12 changes: 4 additions & 8 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ use stackable_operator::{
};
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};

use crate::crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig};
use crate::{
crd::v1alpha1::{SupersetConfigFragment, SupersetRoleConfig},
listener::default_listener_class,
};

pub mod affinity;
pub mod authentication;
Expand All @@ -49,9 +52,6 @@ pub const MAX_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity {
unit: BinaryMultiple::Mebi,
};

pub const LISTENER_VOLUME_NAME: &str = "listener";
pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener";

pub const APP_PORT_NAME: &str = "http";
pub const APP_PORT: u16 = 8088;
pub const METRICS_PORT_NAME: &str = "metrics";
Expand Down Expand Up @@ -311,10 +311,6 @@ impl Default for v1alpha1::SupersetRoleConfig {
}
}

fn default_listener_class() -> String {
"cluster-internal".to_string()
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SupersetCredentials {
Expand Down
61 changes: 61 additions & 0 deletions rust/operator-binary/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use snafu::{ResultExt, Snafu};
use stackable_operator::{builder::meta::ObjectMetaBuilder, crd::listener, kvp::ObjectLabels};

use crate::crd::{APP_PORT, APP_PORT_NAME, v1alpha1};

pub const LISTENER_VOLUME_NAME: &str = "listener";
pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener";

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Metadata"))]
MetadataBuild {
source: stackable_operator::builder::meta::Error,
},
}

pub fn build_group_listener(
superset: &v1alpha1::SupersetCluster,
object_labels: ObjectLabels<v1alpha1::SupersetCluster>,
listener_class: String,
listener_group_name: String,
) -> Result<listener::v1alpha1::Listener, Error> {
let metadata = ObjectMetaBuilder::new()
.name_and_namespace(superset)
.name(listener_group_name)
.ownerreference_from_resource(superset, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(object_labels)
.context(MetadataBuildSnafu)?
.build();

let spec = listener::v1alpha1::ListenerSpec {
class_name: Some(listener_class),
ports: Some(listener_ports()),
..Default::default()
};

let listener = listener::v1alpha1::Listener {
metadata,
spec,
status: None,
};

Ok(listener)
}

pub fn listener_ports() -> Vec<listener::v1alpha1::ListenerPort> {
vec![listener::v1alpha1::ListenerPort {
name: APP_PORT_NAME.to_owned(),
port: APP_PORT.into(),
protocol: Some("TCP".to_owned()),
}]
}

pub fn default_listener_class() -> String {
"cluster-internal".to_string()
}
2 changes: 2 additions & 0 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ mod config;
mod controller_commons;
mod crd;
mod druid_connection_controller;
mod listener;
mod operations;
mod product_logging;
mod rbac;
mod service;
mod superset_controller;
mod util;

Expand Down
155 changes: 155 additions & 0 deletions rust/operator-binary/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::meta::ObjectMetaBuilder,
commons::product_image_selection::ResolvedProductImage,
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
kvp::{Label, Labels},
role_utils::RoleGroupRef,
};

use crate::{
crd::{APP_NAME, APP_PORT, APP_PORT_NAME, METRICS_PORT, METRICS_PORT_NAME, v1alpha1},
superset_controller::SUPERSET_CONTROLLER_NAME,
util::build_recommended_labels,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Metadata"))]
MetadataBuild {
source: stackable_operator::builder::meta::Error,
},
#[snafu(display("failed to build Labels"))]
LabelBuild {
source: stackable_operator::kvp::LabelError,
},
}

/// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
///
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
pub fn build_node_rolegroup_headless_service(
superset: &v1alpha1::SupersetCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::SupersetCluster>,
) -> Result<Service, Error> {
let headless_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(superset)
.name(rolegroup_headless_service_name(rolegroup))
.ownerreference_from_resource(superset, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
superset,
SUPERSET_CONTROLLER_NAME,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_owned()),
cluster_ip: Some("None".to_owned()),
ports: Some(service_ports()),
selector: Some(
Labels::role_group_selector(
superset,
APP_NAME,
&rolegroup.role,
&rolegroup.role_group,
)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};
Ok(headless_service)
}

/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label
pub fn build_node_rolegroup_metrics_service(
superset: &v1alpha1::SupersetCluster,
resolved_product_image: &ResolvedProductImage,
rolegroup: &RoleGroupRef<v1alpha1::SupersetCluster>,
) -> Result<Service, Error> {
let metrics_service = Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(superset)
.name(rolegroup_metrics_service_name(rolegroup))
.ownerreference_from_resource(superset, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(build_recommended_labels(
superset,
SUPERSET_CONTROLLER_NAME,
&resolved_product_image.app_version_label,
&rolegroup.role,
&rolegroup.role_group,
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
// Internal communication does not need to be exposed
type_: Some("ClusterIP".to_owned()),
cluster_ip: Some("None".to_owned()),
ports: Some(metrics_ports()),
selector: Some(
Labels::role_group_selector(
superset,
APP_NAME,
&rolegroup.role,
&rolegroup.role_group,
)
.context(LabelBuildSnafu)?
.into(),
),
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
};

Ok(metrics_service)
}

/// Headless service for cluster internal purposes only.
// TODO: Move to operator-rs
pub fn rolegroup_headless_service_name(
rolegroup: &RoleGroupRef<v1alpha1::SupersetCluster>,
) -> String {
format!("{name}-headless", name = rolegroup.object_name())
}

/// Headless metrics service exposes Prometheus endpoint only
// TODO: Move to operator-rs
pub fn rolegroup_metrics_service_name(
rolegroup: &RoleGroupRef<v1alpha1::SupersetCluster>,
) -> String {
format!("{name}-metrics", name = rolegroup.object_name())
}

fn metrics_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(METRICS_PORT_NAME.to_string()),
port: METRICS_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}

fn service_ports() -> Vec<ServicePort> {
vec![ServicePort {
name: Some(APP_PORT_NAME.to_string()),
port: APP_PORT.into(),
protocol: Some("TCP".to_string()),
..ServicePort::default()
}]
}
Loading