Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file.
- Add experimental support for Spark 4 ([#589])
- Helm: Allow Pod `priorityClassName` to be configured ([#608]).
- Support for Spark 3.5.7 ([#610]).
- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark history server ([#619]).
- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark connect ([#619]).

### Fixed

Expand All @@ -35,6 +37,7 @@ All notable changes to this project will be documented in this file.
[#610]: https://github.com/stackabletech/spark-k8s-operator/pull/610
[#611]: https://github.com/stackabletech/spark-k8s-operator/pull/611
[#617]: https://github.com/stackabletech/spark-k8s-operator/pull/617
[#619]: https://github.com/stackabletech/spark-k8s-operator/pull/619

## [25.7.0] - 2025-07-23

Expand Down
1 change: 1 addition & 0 deletions apps/ny_tlc_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
need to be submitted along with the job.
--output Path to write the report as a CSV file.
"""

import argparse

from argparse import Namespace
Expand Down
11 changes: 5 additions & 6 deletions docs/modules/spark-k8s/pages/usage-guide/history-server.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,9 @@ By setting up port forwarding on 18080 the UI can be opened by pointing your bro

image::history-server-ui.png[History Server Console]

== Metrics
== Monitoring

[NOTE]
====
Starting with version 25.7, the built-in Prometheus servlet is enabled in addition to the existing JMX exporter.
The JMX exporter is still available but it is deprecated and will be removed in a future release.
====
The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark History instances with Prometheus.
These metrics are exported via the JMX exporter as the history server doesn't support the built in Spark prometheus servlet.
The service name follows the convention `<stacklet name>-history-metrics`.
Metrics can be scraped at the endpoint `<service name>:18081/metrics`.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ As the operator creates the necessary resources, the status of the application t
NOTE: The operator never reconciles an application once it has been created.
To resubmit an application, a new SparkApplication resource must be created.

== Metrics
== Monitoring

[NOTE]
====
Expand Down
14 changes: 8 additions & 6 deletions docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ include::example$example-spark-connect.yaml[]
<7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores!
<8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role.

== Metrics
== Monitoring

The server pod exposes Prometheus metrics at the following endpoints:
The operator creates a Kubernetes service dedicated specifically to collect metrics for Spark Connect instances with Prometheus.
The service name follows the convention `<stacklet name>-server-metrics`.
This service exposes Prometheus metrics at the following endpoints:

* `/metrics/prometheus` for driver instances.
* `/metrics/executors/prometheus` for executor instances.
* `<service name>:4040/metrics/prometheus` for driver instances.
* `<service name>:4040/metrics/executors/prometheus` for executor instances.

To customize the metrics configuration use the `spec.server.configOverrides' like this:

Expand All @@ -47,8 +49,8 @@ The example above adds a new endpoint for application metrics.

== Spark History Server

Unforunately integration with the Spark History Server is not supported yet.
The connect server seems to ignore the `spark.eventLog` properties while also prohibiting clients to set them programatically.
Unfortunately integration with the Spark History Server is not supported yet.
The connect server seems to ignore the `spark.eventLog` properties while also prohibiting clients to set them programmatically.

== Notable Omissions

Expand Down
2 changes: 1 addition & 1 deletion examples/README-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Several resources are needed in this store. These can be loaded like this:

```text
kubectl exec minio-mc-0 -- sh -c 'mc alias set test-minio http://test-minio:9000/'
kubectl cp examples/ny-tlc-report-1.1.0-3.5.7.jar minio-mc-0:/tmp
kubectl cp tests/templates/kuttl/spark-ny-public-s3/ny-tlc-report-1.1.0-3.5.7.jar minio-mc-0:/tmp
kubectl cp apps/ny_tlc_report.py minio-mc-0:/tmp
kubectl cp examples/yellow_tripdata_2021-07.csv minio-mc-0:/tmp
kubectl exec minio-mc-0 -- mc cp /tmp/ny-tlc-report-1.1.0-3.5.7.jar test-minio/my-bucket
Expand Down
2 changes: 1 addition & 1 deletion kind/assert-pvc-jars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
claimName: pvc-ksv
containers:
- name: assert-pvc-jars
image: oci.stackable.tech/sdp/tools:0.2.0-stackable0.4.0
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
env:
- name: DEST_DIR
value: "/dependencies/jars"
Expand Down
2 changes: 1 addition & 1 deletion kind/kind-pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
claimName: pvc-ksv
containers:
- name: aws-deps
image: oci.stackable.tech/sdp/tools:0.2.0-stackable0.4.0
image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev
env:
- name: DEST_DIR
value: "/dependencies/jars"
Expand Down
2 changes: 1 addition & 1 deletion kind/minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ spec:
spec:
containers:
- name: minio-mc
image: bitnamilegacy/minio:2022-debian-10
image: docker.io/bitnamilegacy/minio:2024-debian-12
stdin: true
tty: true
27 changes: 17 additions & 10 deletions rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1};
use crate::{
Ctx,
connect::{common, crd::SparkConnectServerStatus, executor, server},
connect::{common, crd::SparkConnectServerStatus, executor, server, service},
crd::constants::{OPERATOR_NAME, SPARK_IMAGE_BASE_NAME},
};

Expand All @@ -47,7 +47,7 @@ pub enum Error {
ServerProperties { source: server::Error },

#[snafu(display("failed to build spark connect service"))]
BuildService { source: server::Error },
BuildService { source: service::Error },

#[snafu(display("failed to build spark connect executor config map for {name}"))]
BuildExecutorConfigMap {
Expand All @@ -67,9 +67,6 @@ pub enum Error {
name: String,
},

#[snafu(display("spark connect object has no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to update the connect server stateful set"))]
ApplyStatefulSet {
source: stackable_operator::cluster_resources::Error,
Expand Down Expand Up @@ -208,12 +205,22 @@ pub async fn reconcile(
.context(ApplyRoleBindingSnafu)?;

// Headless service used by executors connect back to the driver
let service =
server::build_internal_service(scs, &resolved_product_image.app_version_label_value)
let headless_service =
service::build_headless_service(scs, &resolved_product_image.app_version_label_value)
.context(BuildServiceSnafu)?;

let applied_internal_service = cluster_resources
.add(client, service.clone())
let applied_headless_service = cluster_resources
.add(client, headless_service.clone())
.await
.context(ApplyServiceSnafu)?;

// Metrics service used for scraping
let metrics_service =
service::build_metrics_service(scs, &resolved_product_image.app_version_label_value)
.context(BuildServiceSnafu)?;

cluster_resources
.add(client, metrics_service.clone())
.await
.context(ApplyServiceSnafu)?;

Expand All @@ -224,7 +231,7 @@ pub async fn reconcile(
server::server_properties(
scs,
&server_config,
&applied_internal_service,
&applied_headless_service,
&service_account,
&resolved_product_image,
)
Expand Down
4 changes: 4 additions & 0 deletions rust/operator-binary/src/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ pub mod controller;
pub mod crd;
mod executor;
pub mod server;
mod service;

pub(crate) const GRPC: &str = "grpc";
pub(crate) const HTTP: &str = "http";
63 changes: 2 additions & 61 deletions rust/operator-binary/src/connect/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use stackable_operator::{
apps::v1::{StatefulSet, StatefulSetSpec},
core::v1::{
ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service,
ServiceAccount, ServicePort, ServiceSpec,
ServiceAccount,
},
},
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
Expand All @@ -38,6 +38,7 @@ use stackable_operator::{
use super::crd::CONNECT_APP_NAME;
use crate::{
connect::{
GRPC, HTTP,
common::{self, SparkConnectRole, object_name},
crd::{
CONNECT_GRPC_PORT, CONNECT_UI_PORT, DEFAULT_SPARK_CONNECT_GROUP_NAME,
Expand All @@ -57,9 +58,6 @@ use crate::{
product_logging,
};

const GRPC: &str = "grpc";
const HTTP: &str = "http";

#[derive(Snafu, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum Error {
Expand Down Expand Up @@ -396,63 +394,6 @@ pub(crate) fn build_stateful_set(
})
}

// This is the headless driver service used for the internal
// communication with the executors as recommended by the Spark docs.
pub(crate) fn build_internal_service(
scs: &v1alpha1::SparkConnectServer,
app_version_label: &str,
) -> Result<Service, Error> {
let service_name = format!(
"{cluster}-{role}-headless",
cluster = scs.name_any(),
role = SparkConnectRole::Server
);

let selector =
Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string())
.context(LabelBuildSnafu)?
.into();

Ok(Service {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(scs)
.name(service_name)
.ownerreference_from_resource(scs, None, Some(true))
.context(ObjectMissingMetadataForOwnerRefSnafu)?
.with_recommended_labels(common::labels(
scs,
app_version_label,
&SparkConnectRole::Server.to_string(),
))
.context(MetadataBuildSnafu)?
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
.build(),
spec: Some(ServiceSpec {
type_: Some("ClusterIP".to_owned()),
cluster_ip: Some("None".to_owned()),
ports: Some(vec![
ServicePort {
name: Some(String::from(GRPC)),
port: CONNECT_GRPC_PORT,
..ServicePort::default()
},
ServicePort {
name: Some(String::from(HTTP)),
port: CONNECT_UI_PORT,
..ServicePort::default()
},
]),
selector: Some(selector),
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
// "ready" until the Service is "ready" and vice versa.
publish_not_ready_addresses: Some(true),
..ServiceSpec::default()
}),
status: None,
})
}

#[allow(clippy::result_large_err)]
pub(crate) fn command_args(user_args: &[String]) -> Vec<String> {
let mut command = vec![
Expand Down
Loading