diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b4db3b9..304edc03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file. - Add experimental support for Spark 4 ([#589]) - Helm: Allow Pod `priorityClassName` to be configured ([#608]). - Support for Spark 3.5.7 ([#610]). +- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark history server ([#619]). +- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark connect ([#619]). ### Fixed @@ -35,6 +37,7 @@ All notable changes to this project will be documented in this file. [#610]: https://github.com/stackabletech/spark-k8s-operator/pull/610 [#611]: https://github.com/stackabletech/spark-k8s-operator/pull/611 [#617]: https://github.com/stackabletech/spark-k8s-operator/pull/617 +[#619]: https://github.com/stackabletech/spark-k8s-operator/pull/619 ## [25.7.0] - 2025-07-23 diff --git a/apps/ny_tlc_report.py b/apps/ny_tlc_report.py index 52d9bb81..0a03a1b5 100644 --- a/apps/ny_tlc_report.py +++ b/apps/ny_tlc_report.py @@ -9,6 +9,7 @@ need to be submitted along with the job. --output Path to write the report as a CSV file. """ + import argparse from argparse import Namespace diff --git a/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc b/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc index d099505d..30a287a6 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc @@ -157,10 +157,9 @@ By setting up port forwarding on 18080 the UI can be opened by pointing your bro image::history-server-ui.png[History Server Console] -== Metrics +== Monitoring -[NOTE] -==== -Starting with version 25.7, the built-in Prometheus servlet is enabled in addition to the existing JMX exporter. -The JMX exporter is still available but it is deprecated and will be removed in a future release. -==== +The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark History instances with Prometheus. +These metrics are exported via the JMX exporter as the history server doesn't support the built in Spark prometheus servlet. +The service name follows the convention `-history-metrics`. +Metrics can be scraped at the endpoint `:18081/metrics`. diff --git a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc index d955a3ca..753b0d99 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/operations/applications.adoc @@ -9,7 +9,7 @@ As the operator creates the necessary resources, the status of the application t NOTE: The operator never reconciles an application once it has been created. To resubmit an application, a new SparkApplication resource must be created. -== Metrics +== Monitoring [NOTE] ==== diff --git a/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc index bf9bf1d1..009e4aba 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc @@ -26,12 +26,14 @@ include::example$example-spark-connect.yaml[] <7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores! <8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role. -== Metrics +== Monitoring -The server pod exposes Prometheus metrics at the following endpoints: +The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark Connect instances with Prometheus. +The service name follows the convention `-server-metrics`. +This service exposes Prometheus metrics at the following endpoints: -* `/metrics/prometheus` for driver instances. -* `/metrics/executors/prometheus` for executor instances. +* `:4040/metrics/prometheus` for driver instances. +* `:4040/metrics/executors/prometheus` for executor instances. To customize the metrics configuration use the `spec.server.configOverrides' like this: @@ -47,8 +49,8 @@ The example above adds a new endpoint for application metrics. == Spark History Server -Unforunately integration with the Spark History Server is not supported yet. -The connect server seems to ignore the `spark.eventLog` properties while also prohibiting clients to set them programatically. +Unfortunately integration with the Spark History Server is not supported yet. +The connect server seems to ignore the `spark.eventLog` properties while also prohibiting clients to set them programmatically. == Notable Omissions diff --git a/examples/README-examples.md b/examples/README-examples.md index 4b9c931a..f26927b5 100644 --- a/examples/README-examples.md +++ b/examples/README-examples.md @@ -50,7 +50,7 @@ Several resources are needed in this store. These can be loaded like this: ```text kubectl exec minio-mc-0 -- sh -c 'mc alias set test-minio http://test-minio:9000/' -kubectl cp examples/ny-tlc-report-1.1.0-3.5.7.jar minio-mc-0:/tmp +kubectl cp tests/templates/kuttl/spark-ny-public-s3/ny-tlc-report-1.1.0-3.5.7.jar minio-mc-0:/tmp kubectl cp apps/ny_tlc_report.py minio-mc-0:/tmp kubectl cp examples/yellow_tripdata_2021-07.csv minio-mc-0:/tmp kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report-1.1.0-3.5.7.jar test-minio/my-bucket diff --git a/kind/assert-pvc-jars.yaml b/kind/assert-pvc-jars.yaml index d8f43c2b..29daaa90 100644 --- a/kind/assert-pvc-jars.yaml +++ b/kind/assert-pvc-jars.yaml @@ -13,7 +13,7 @@ spec: claimName: pvc-ksv containers: - name: assert-pvc-jars - image: oci.stackable.tech/sdp/tools:0.2.0-stackable0.4.0 + image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev env: - name: DEST_DIR value: "/dependencies/jars" diff --git a/kind/kind-pvc.yaml b/kind/kind-pvc.yaml index e5e548ab..27114812 100644 --- a/kind/kind-pvc.yaml +++ b/kind/kind-pvc.yaml @@ -24,7 +24,7 @@ spec: claimName: pvc-ksv containers: - name: aws-deps - image: oci.stackable.tech/sdp/tools:0.2.0-stackable0.4.0 + image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev env: - name: DEST_DIR value: "/dependencies/jars" diff --git a/kind/minio.yaml b/kind/minio.yaml index eb3f2c60..24fb542c 100644 --- a/kind/minio.yaml +++ b/kind/minio.yaml @@ -29,6 +29,6 @@ spec: spec: containers: - name: minio-mc - image: bitnamilegacy/minio:2022-debian-10 + image: docker.io/bitnamilegacy/minio:2024-debian-12 stdin: true tty: true diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 44bdbc01..3b876d93 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -21,7 +21,7 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1}; use crate::{ Ctx, - connect::{common, crd::SparkConnectServerStatus, executor, server}, + connect::{common, crd::SparkConnectServerStatus, executor, server, service}, crd::constants::{OPERATOR_NAME, SPARK_IMAGE_BASE_NAME}, }; @@ -47,7 +47,7 @@ pub enum Error { ServerProperties { source: server::Error }, #[snafu(display("failed to build spark connect service"))] - BuildService { source: server::Error }, + BuildService { source: service::Error }, #[snafu(display("failed to build spark connect executor config map for {name}"))] BuildExecutorConfigMap { @@ -67,9 +67,6 @@ pub enum Error { name: String, }, - #[snafu(display("spark connect object has no namespace"))] - ObjectHasNoNamespace, - #[snafu(display("failed to update the connect server stateful set"))] ApplyStatefulSet { source: stackable_operator::cluster_resources::Error, @@ -208,12 +205,22 @@ pub async fn reconcile( .context(ApplyRoleBindingSnafu)?; // Headless service used by executors connect back to the driver - let service = - server::build_internal_service(scs, &resolved_product_image.app_version_label_value) + let headless_service = + service::build_headless_service(scs, &resolved_product_image.app_version_label_value) .context(BuildServiceSnafu)?; - let applied_internal_service = cluster_resources - .add(client, service.clone()) + let applied_headless_service = cluster_resources + .add(client, headless_service.clone()) + .await + .context(ApplyServiceSnafu)?; + + // Metrics service used for scraping + let metrics_service = + service::build_metrics_service(scs, &resolved_product_image.app_version_label_value) + .context(BuildServiceSnafu)?; + + cluster_resources + .add(client, metrics_service.clone()) .await .context(ApplyServiceSnafu)?; @@ -224,7 +231,7 @@ pub async fn reconcile( server::server_properties( scs, &server_config, - &applied_internal_service, + &applied_headless_service, &service_account, &resolved_product_image, ) diff --git a/rust/operator-binary/src/connect/mod.rs b/rust/operator-binary/src/connect/mod.rs index b8144f1a..1692e6f5 100644 --- a/rust/operator-binary/src/connect/mod.rs +++ b/rust/operator-binary/src/connect/mod.rs @@ -3,3 +3,7 @@ pub mod controller; pub mod crd; mod executor; pub mod server; +mod service; + +pub(crate) const GRPC: &str = "grpc"; +pub(crate) const HTTP: &str = "http"; diff --git a/rust/operator-binary/src/connect/server.rs b/rust/operator-binary/src/connect/server.rs index d322e1fe..861168c0 100644 --- a/rust/operator-binary/src/connect/server.rs +++ b/rust/operator-binary/src/connect/server.rs @@ -24,7 +24,7 @@ use stackable_operator::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service, - ServiceAccount, ServicePort, ServiceSpec, + ServiceAccount, }, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, @@ -38,6 +38,7 @@ use stackable_operator::{ use super::crd::CONNECT_APP_NAME; use crate::{ connect::{ + GRPC, HTTP, common::{self, SparkConnectRole, object_name}, crd::{ CONNECT_GRPC_PORT, CONNECT_UI_PORT, DEFAULT_SPARK_CONNECT_GROUP_NAME, @@ -57,9 +58,6 @@ use crate::{ product_logging, }; -const GRPC: &str = "grpc"; -const HTTP: &str = "http"; - #[derive(Snafu, Debug)] #[allow(clippy::enum_variant_names)] pub enum Error { @@ -396,63 +394,6 @@ pub(crate) fn build_stateful_set( }) } -// This is the headless driver service used for the internal -// communication with the executors as recommended by the Spark docs. -pub(crate) fn build_internal_service( - scs: &v1alpha1::SparkConnectServer, - app_version_label: &str, -) -> Result { - let service_name = format!( - "{cluster}-{role}-headless", - cluster = scs.name_any(), - role = SparkConnectRole::Server - ); - - let selector = - Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string()) - .context(LabelBuildSnafu)? - .into(); - - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(scs) - .name(service_name) - .ownerreference_from_resource(scs, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(common::labels( - scs, - app_version_label, - &SparkConnectRole::Server.to_string(), - )) - .context(MetadataBuildSnafu)? - .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) - .build(), - spec: Some(ServiceSpec { - type_: Some("ClusterIP".to_owned()), - cluster_ip: Some("None".to_owned()), - ports: Some(vec![ - ServicePort { - name: Some(String::from(GRPC)), - port: CONNECT_GRPC_PORT, - ..ServicePort::default() - }, - ServicePort { - name: Some(String::from(HTTP)), - port: CONNECT_UI_PORT, - ..ServicePort::default() - }, - ]), - selector: Some(selector), - // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness - // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become - // "ready" until the Service is "ready" and vice versa. - publish_not_ready_addresses: Some(true), - ..ServiceSpec::default() - }), - status: None, - }) -} - #[allow(clippy::result_large_err)] pub(crate) fn command_args(user_args: &[String]) -> Vec { let mut command = vec![ diff --git a/rust/operator-binary/src/connect/service.rs b/rust/operator-binary/src/connect/service.rs new file mode 100644 index 00000000..408d28ad --- /dev/null +++ b/rust/operator-binary/src/connect/service.rs @@ -0,0 +1,163 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::{self, meta::ObjectMetaBuilder}, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kube::ResourceExt, + kvp::{Annotations, Labels}, +}; + +use super::crd::CONNECT_APP_NAME; +use crate::connect::{ + GRPC, HTTP, + common::{self, SparkConnectRole}, + crd::{CONNECT_GRPC_PORT, CONNECT_UI_PORT, v1alpha1}, +}; + +#[derive(Snafu, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { source: builder::meta::Error }, + + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, + + #[snafu(display("failed to build Metadata"))] + MetadataBuild { source: builder::meta::Error }, +} + +// This is the headless driver service used for the internal +// communication with the executors as recommended by the Spark docs. +pub(crate) fn build_headless_service( + scs: &v1alpha1::SparkConnectServer, + app_version_label: &str, +) -> Result { + let service_name = format!( + "{cluster}-{role}-headless", + cluster = scs.name_any(), + role = SparkConnectRole::Server + ); + + let selector = + Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string()) + .context(LabelBuildSnafu)? + .into(); + + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(service_name) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .build(), + spec: Some(ServiceSpec { + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(vec![ + ServicePort { + name: Some(String::from(GRPC)), + port: CONNECT_GRPC_PORT, + ..ServicePort::default() + }, + ServicePort { + name: Some(String::from(HTTP)), + port: CONNECT_UI_PORT, + ..ServicePort::default() + }, + ]), + selector: Some(selector), + // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness + // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become + // "ready" until the Service is "ready" and vice versa. + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }) +} + +// This is the metrics service +pub(crate) fn build_metrics_service( + scs: &v1alpha1::SparkConnectServer, + app_version_label: &str, +) -> Result { + let service_name = format!( + "{cluster}-{role}-metrics", + cluster = scs.name_any(), + role = SparkConnectRole::Server + ); + + let selector = + Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string()) + .context(LabelBuildSnafu)? + .into(); + + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(scs) + .name(service_name) + .ownerreference_from_resource(scs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(common::labels( + scs, + app_version_label, + &SparkConnectRole::Server.to_string(), + )) + .context(MetadataBuildSnafu)? + .with_labels(prometheus_labels()) + .with_annotations(prometheus_annotations()) + .build(), + spec: Some(ServiceSpec { + type_: Some("ClusterIP".to_owned()), + cluster_ip: Some("None".to_owned()), + ports: Some(metrics_ports()), + selector: Some(selector), + // The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness + // probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become + // "ready" until the Service is "ready" and vice versa. + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }) +} + +fn metrics_ports() -> Vec { + vec![ServicePort { + name: Some("metrics".to_string()), + port: CONNECT_UI_PORT, + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +/// Common labels for Prometheus +fn prometheus_labels() -> Labels { + Labels::try_from([("prometheus.io/scrape", "true")]).expect("should be a valid label") +} + +/// Common annotations for Prometheus +/// +/// These annotations can be used in a ServiceMonitor. +/// +/// see also +fn prometheus_annotations() -> Annotations { + Annotations::try_from([ + ( + "prometheus.io/path".to_owned(), + "/metrics/prometheus".to_owned(), + ), + ("prometheus.io/port".to_owned(), CONNECT_UI_PORT.to_string()), + ("prometheus.io/scheme".to_owned(), "http".to_owned()), + ("prometheus.io/scrape".to_owned(), "true".to_owned()), + ]) + .expect("should be valid annotations") +} diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index 7bf6cadf..e48b55dc 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -39,7 +39,7 @@ use stackable_operator::{ core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, - kvp::{Labels, ObjectLabels}, + kvp::Labels, logging::controller::ReconcilerError, product_logging::{ framework::{LoggingError, calculate_log_volume_size_limit, vector_container}, @@ -57,20 +57,24 @@ use crate::{ Ctx, crd::{ constants::{ - ACCESS_KEY_ID, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, - HISTORY_UI_PORT, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, - LISTENER_VOLUME_NAME, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, - SPARK_IMAGE_BASE_NAME, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, - VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, - VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, + ACCESS_KEY_ID, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_UI_PORT, + JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, + SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, + STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, + VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, + VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, listener_ext, logdir::ResolvedLogDir, tlscerts, to_spark_env_sh_string, }, - history::operations::pdb::add_pdbs, + history::{ + operations::pdb::add_pdbs, + recommended_labels, + service::{self, build_rolegroup_metrics_service}, + }, product_logging::{self}, }; @@ -96,9 +100,6 @@ pub enum Error { #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, - #[snafu(display("object has no namespace"))] - ObjectHasNoNamespace, - #[snafu(display("invalid config map {name}"))] InvalidConfigMap { source: stackable_operator::builder::configmap::Error, @@ -125,6 +126,11 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, + #[snafu(display("failed to update history server metrics service"))] + ApplyMetricsService { + source: stackable_operator::cluster_resources::Error, + }, + #[snafu(display("failed to apply role ServiceAccount"))] ApplyServiceAccount { source: stackable_operator::cluster_resources::Error, @@ -233,6 +239,9 @@ pub enum Error { ResolveProductImage { source: product_image_selection::Error, }, + + #[snafu(display("failed to resolve product image"))] + BuildMetricsService { source: service::Error }, } impl ReconcilerError for Error { @@ -320,10 +329,10 @@ pub async fn reconcile( &rgr, &log_dir, )?; - cluster_resources - .add(client, config_map) - .await - .context(ApplyConfigMapSnafu)?; + + let metrics_service = + build_rolegroup_metrics_service(shs, &resolved_product_image, &rgr) + .context(BuildMetricsServiceSnafu)?; let sts = build_stateful_set( shs, @@ -333,6 +342,15 @@ pub async fn reconcile( &merged_config, &service_account, )?; + + cluster_resources + .add(client, config_map) + .await + .context(ApplyConfigMapSnafu)?; + cluster_resources + .add(client, metrics_service) + .await + .context(ApplyMetricsServiceSnafu)?; cluster_resources .add(client, sts) .await @@ -380,7 +398,7 @@ fn build_group_listener( let listener_name = group_listener_name(shs, role); let recommended_object_labels = - labels(shs, &resolved_product_image.app_version_label_value, "none"); + recommended_labels(shs, &resolved_product_image.app_version_label_value, "none"); let listener_ports = [listener::v1alpha1::ListenerPort { name: "http".to_string(), @@ -445,7 +463,7 @@ fn build_config_map( .name(&cm_name) .ownerreference_from_resource(shs, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(labels( + .with_recommended_labels(recommended_labels( shs, app_version_label_value, &rolegroupref.role_group, @@ -511,7 +529,7 @@ fn build_stateful_set( rolegroupref.object_name() }; - let recommended_object_labels = labels( + let recommended_object_labels = recommended_labels( shs, &resolved_product_image.app_version_label_value, rolegroupref.role_group.as_ref(), @@ -732,22 +750,6 @@ fn command_args(logdir: &ResolvedLogDir) -> Vec { vec![command.join("\n")] } -fn labels<'a, T>( - shs: &'a T, - app_version_label_value: &'a str, - role_group: &'a str, -) -> ObjectLabels<'a, T> { - ObjectLabels { - owner: shs, - app_name: HISTORY_APP_NAME, - app_version: app_version_label_value, - operator_name: OPERATOR_NAME, - controller_name: HISTORY_CONTROLLER_NAME, - role: HISTORY_ROLE_NAME, - role_group, - } -} - /// Return the Spark properties for the cleaner role group (if any). /// There should be only one role group with "cleaner=true" and this /// group should have a replica count of 0 or 1. diff --git a/rust/operator-binary/src/history/mod.rs b/rust/operator-binary/src/history/mod.rs index c374e5b3..f0f0bf2b 100644 --- a/rust/operator-binary/src/history/mod.rs +++ b/rust/operator-binary/src/history/mod.rs @@ -1,3 +1,26 @@ +use stackable_operator::kvp::ObjectLabels; + +use crate::crd::constants::{ + HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, OPERATOR_NAME, +}; + pub mod config; pub mod history_controller; pub mod operations; +pub mod service; + +pub(crate) fn recommended_labels<'a, T>( + shs: &'a T, + app_version_label_value: &'a str, + role_group: &'a str, +) -> ObjectLabels<'a, T> { + ObjectLabels { + owner: shs, + app_name: HISTORY_APP_NAME, + app_version: app_version_label_value, + operator_name: OPERATOR_NAME, + controller_name: HISTORY_CONTROLLER_NAME, + role: HISTORY_ROLE_NAME, + role_group, + } +} diff --git a/rust/operator-binary/src/history/service.rs b/rust/operator-binary/src/history/service.rs new file mode 100644 index 00000000..063c4075 --- /dev/null +++ b/rust/operator-binary/src/history/service.rs @@ -0,0 +1,105 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::meta::ObjectMetaBuilder, + commons::product_image_selection::ResolvedProductImage, + k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}, + kvp::{Annotations, Labels}, + role_utils::RoleGroupRef, +}; + +use crate::{ + crd::{ + constants::{HISTORY_APP_NAME, METRICS_PORT}, + history::v1alpha1, + }, + history::recommended_labels, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("failed to build Labels"))] + LabelBuild { + source: stackable_operator::kvp::LabelError, + }, + + #[snafu(display("failed to build Metadata"))] + MetadataBuild { + source: stackable_operator::builder::meta::Error, + }, + #[snafu(display("object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, +} + +/// The rolegroup metrics [`Service`] is a service that exposes metrics and a prometheus scraping label +pub fn build_rolegroup_metrics_service( + shs: &v1alpha1::SparkHistoryServer, + resolved_product_image: &ResolvedProductImage, + rolegroup_ref: &RoleGroupRef, +) -> Result { + Ok(Service { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(shs) + .name(rolegroup_ref.rolegroup_metrics_service_name()) + .ownerreference_from_resource(shs, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(recommended_labels( + shs, + &resolved_product_image.app_version_label_value, + &rolegroup_ref.role_group, + )) + .context(MetadataBuildSnafu)? + .with_labels(prometheus_labels()) + .with_annotations(prometheus_annotations()) + .build(), + spec: Some(ServiceSpec { + // Internal communication does not need to be exposed + type_: Some("ClusterIP".to_string()), + cluster_ip: Some("None".to_string()), + ports: Some(metrics_ports()), + selector: Some( + Labels::role_group_selector( + shs, + HISTORY_APP_NAME, + &rolegroup_ref.role, + &rolegroup_ref.role_group, + ) + .context(LabelBuildSnafu)? + .into(), + ), + publish_not_ready_addresses: Some(true), + ..ServiceSpec::default() + }), + status: None, + }) +} + +fn metrics_ports() -> Vec { + vec![ServicePort { + name: Some("metrics".to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }] +} + +/// Common labels for Prometheus +fn prometheus_labels() -> Labels { + Labels::try_from([("prometheus.io/scrape", "true")]).expect("should be a valid label") +} + +/// Common annotations for Prometheus +/// +/// These annotations can be used in a ServiceMonitor. +/// +/// see also +fn prometheus_annotations() -> Annotations { + Annotations::try_from([ + ("prometheus.io/path".to_owned(), "/metrics".to_owned()), + ("prometheus.io/port".to_owned(), METRICS_PORT.to_string()), + ("prometheus.io/scheme".to_owned(), "http".to_owned()), + ("prometheus.io/scrape".to_owned(), "true".to_owned()), + ]) + .expect("should be valid annotations") +} diff --git a/tests/templates/kuttl/logging/test_log_aggregation.py b/tests/templates/kuttl/logging/test_log_aggregation.py index 90124086..e13c4059 100755 --- a/tests/templates/kuttl/logging/test_log_aggregation.py +++ b/tests/templates/kuttl/logging/test_log_aggregation.py @@ -4,9 +4,9 @@ def check_sent_events(): response = requests.post( - 'http://spark-vector-aggregator:8686/graphql', + "http://spark-vector-aggregator:8686/graphql", json={ - 'query': """ + "query": """ { transforms(first:100) { nodes { @@ -20,29 +20,30 @@ def check_sent_events(): } } """ - } + }, ) - assert response.status_code == 200, \ - 'Cannot access the API of the vector aggregator.' + assert response.status_code == 200, ( + "Cannot access the API of the vector aggregator." + ) result = response.json() - transforms = result['data']['transforms']['nodes'] + transforms = result["data"]["transforms"]["nodes"] for transform in transforms: - sentEvents = transform['metrics']['sentEventsTotal'] - componentId = transform['componentId'] + sentEvents = transform["metrics"]["sentEventsTotal"] + componentId = transform["componentId"] - if componentId == 'filteredInvalidEvents': - assert sentEvents is None or \ - sentEvents['sentEventsTotal'] == 0, \ - 'Invalid log events were sent.' + if componentId == "filteredInvalidEvents": + assert sentEvents is None or sentEvents["sentEventsTotal"] == 0, ( + "Invalid log events were sent." + ) else: - assert sentEvents is not None and \ - sentEvents['sentEventsTotal'] > 0, \ + assert sentEvents is not None and sentEvents["sentEventsTotal"] > 0, ( f'No events were sent in "{componentId}".' + ) -if __name__ == '__main__': +if __name__ == "__main__": check_sent_events() - print('Test successful!') + print("Test successful!") diff --git a/tests/templates/kuttl/pyspark-ny-public-s3/ny_tlc_report.py b/tests/templates/kuttl/pyspark-ny-public-s3/ny_tlc_report.py index 52d9bb81..0a03a1b5 100644 --- a/tests/templates/kuttl/pyspark-ny-public-s3/ny_tlc_report.py +++ b/tests/templates/kuttl/pyspark-ny-public-s3/ny_tlc_report.py @@ -9,6 +9,7 @@ need to be submitted along with the job. --output Path to write the report as a CSV file. """ + import argparse from argparse import Namespace diff --git a/tests/templates/kuttl/spark-connect/40-assert.yaml b/tests/templates/kuttl/spark-connect/40-assert.yaml new file mode 100644 index 00000000..50ea896d --- /dev/null +++ b/tests/templates/kuttl/spark-connect/40-assert.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +commands: + - script: kubectl exec -n $NAMESPACE python -- python /test-scripts/metrics.py diff --git a/tests/templates/kuttl/spark-connect/40-install-test-container.yaml b/tests/templates/kuttl/spark-connect/40-install-test-container.yaml new file mode 100644 index 00000000..5d38bc74 --- /dev/null +++ b/tests/templates/kuttl/spark-connect/40-install-test-container.yaml @@ -0,0 +1,71 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-test-container +timeout: 300 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-scripts +data: + metrics.py: | + import sys + import logging + import requests + + if __name__ == "__main__": + LOG_LEVEL = "DEBUG" # if args.debug else 'INFO' + logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + response = requests.get("http://spark-connect-server-metrics:4040/metrics/prometheus") + + assert response.status_code == 200, ( + f"Expected HTTP return code 200 from the server metrics endpoint but got [{response.status_code}]" + ) + + assert "BlockManager_memory_remainingOnHeapMem_MB_Value" in response.text, ( + "Expected metric [BlockManager_memory_remainingOnHeapMem_MB_Value] not found" + ) + + response = requests.get("http://spark-connect-server-metrics:4040/metrics/executors/prometheus") + + assert response.status_code == 200, ( + f"Expected HTTP return code 200 from the executor metrics endpoint but got [{response.status_code}]" + ) + + assert "metrics_executor_memoryUsed_bytes" in response.text, ( + "Expected metric [metrics_executor_memoryUsed_bytes] not found" + ) + + +--- +apiVersion: v1 +kind: Pod +metadata: + name: python +spec: + containers: + - name: tools + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + stdin: true + tty: true + resources: + requests: + memory: "128Mi" + cpu: "512m" + limits: + memory: "128Mi" + cpu: "1" + volumeMounts: + - name: test-scripts + mountPath: /test-scripts + volumes: + - name: test-scripts + configMap: + name: test-scripts diff --git a/tests/templates/kuttl/spark-history-server/30-assert.yaml b/tests/templates/kuttl/spark-history-server/30-assert.yaml new file mode 100644 index 00000000..50ea896d --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/30-assert.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +commands: + - script: kubectl exec -n $NAMESPACE python -- python /test-scripts/metrics.py diff --git a/tests/templates/kuttl/spark-history-server/30-install-test-container.yaml b/tests/templates/kuttl/spark-history-server/30-install-test-container.yaml new file mode 100644 index 00000000..804eb0a2 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/30-install-test-container.yaml @@ -0,0 +1,59 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-test-container +timeout: 300 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-scripts +data: + metrics.py: | + import sys + import logging + import requests + + if __name__ == "__main__": + LOG_LEVEL = "DEBUG" # if args.debug else 'INFO' + logging.basicConfig( + level=LOG_LEVEL, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + response = requests.get("http://spark-history-node-default-metrics:18081/metrics") + + assert response.status_code == 200, ( + f"Expected HTTP return code 200 from the metrics endpoint but got [{response.status_code}]" + ) + + assert "jmx_scrape_error" in response.text, ( + "Expected metric [jmx_scrape_error] not found" + ) +--- +apiVersion: v1 +kind: Pod +metadata: + name: python +spec: + containers: + - name: tools + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + stdin: true + tty: true + resources: + requests: + memory: "128Mi" + cpu: "512m" + limits: + memory: "128Mi" + cpu: "1" + volumeMounts: + - name: test-scripts + mountPath: /test-scripts + volumes: + - name: test-scripts + configMap: + name: test-scripts