Skip to content

Commit 857b2ca

Browse files
committed
add metrics service for spark connect
1 parent 20fa6e0 commit 857b2ca

File tree

5 files changed

+186
-71
lines changed

5 files changed

+186
-71
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
1010
- Helm: Allow Pod `priorityClassName` to be configured ([#608]).
1111
- Support for Spark 3.5.7 ([#610]).
1212
- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark history server ([#619]).
13+
- Add metrics service with `prometheus.io/path|port|scheme` annotations for spark connect ([#619]).
1314

1415
### Fixed
1516

rust/operator-binary/src/connect/controller.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
2121
use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1};
2222
use crate::{
2323
Ctx,
24-
connect::{common, crd::SparkConnectServerStatus, executor, server},
24+
connect::{common, crd::SparkConnectServerStatus, executor, server, service},
2525
crd::constants::{OPERATOR_NAME, SPARK_IMAGE_BASE_NAME},
2626
};
2727

@@ -47,7 +47,7 @@ pub enum Error {
4747
ServerProperties { source: server::Error },
4848

4949
#[snafu(display("failed to build spark connect service"))]
50-
BuildService { source: server::Error },
50+
BuildService { source: service::Error },
5151

5252
#[snafu(display("failed to build spark connect executor config map for {name}"))]
5353
BuildExecutorConfigMap {
@@ -67,9 +67,6 @@ pub enum Error {
6767
name: String,
6868
},
6969

70-
#[snafu(display("spark connect object has no namespace"))]
71-
ObjectHasNoNamespace,
72-
7370
#[snafu(display("failed to update the connect server stateful set"))]
7471
ApplyStatefulSet {
7572
source: stackable_operator::cluster_resources::Error,
@@ -208,12 +205,22 @@ pub async fn reconcile(
208205
.context(ApplyRoleBindingSnafu)?;
209206

210207
// Headless service used by executors connect back to the driver
211-
let service =
212-
server::build_internal_service(scs, &resolved_product_image.app_version_label_value)
208+
let headless_service =
209+
service::build_headless_service(scs, &resolved_product_image.app_version_label_value)
210+
.context(BuildServiceSnafu)?;
211+
212+
let applied_headless_service = cluster_resources
213+
.add(client, headless_service.clone())
214+
.await
215+
.context(ApplyServiceSnafu)?;
216+
217+
// Headless service used by executors connect back to the driver
218+
let metrics_service =
219+
service::build_metrics_service(scs, &resolved_product_image.app_version_label_value)
213220
.context(BuildServiceSnafu)?;
214221

215-
let applied_internal_service = cluster_resources
216-
.add(client, service.clone())
222+
cluster_resources
223+
.add(client, metrics_service.clone())
217224
.await
218225
.context(ApplyServiceSnafu)?;
219226

@@ -224,7 +231,7 @@ pub async fn reconcile(
224231
server::server_properties(
225232
scs,
226233
&server_config,
227-
&applied_internal_service,
234+
&applied_headless_service,
228235
&service_account,
229236
&resolved_product_image,
230237
)

rust/operator-binary/src/connect/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,7 @@ pub mod controller;
33
pub mod crd;
44
mod executor;
55
pub mod server;
6+
mod service;
7+
8+
pub(crate) const GRPC: &str = "grpc";
9+
pub(crate) const HTTP: &str = "http";

rust/operator-binary/src/connect/server.rs

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use stackable_operator::{
2424
apps::v1::{StatefulSet, StatefulSetSpec},
2525
core::v1::{
2626
ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service,
27-
ServiceAccount, ServicePort, ServiceSpec,
27+
ServiceAccount,
2828
},
2929
},
3030
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
@@ -57,9 +57,6 @@ use crate::{
5757
product_logging,
5858
};
5959

60-
const GRPC: &str = "grpc";
61-
const HTTP: &str = "http";
62-
6360
#[derive(Snafu, Debug)]
6461
#[allow(clippy::enum_variant_names)]
6562
pub enum Error {
@@ -396,63 +393,6 @@ pub(crate) fn build_stateful_set(
396393
})
397394
}
398395

399-
// This is the headless driver service used for the internal
400-
// communication with the executors as recommended by the Spark docs.
401-
pub(crate) fn build_internal_service(
402-
scs: &v1alpha1::SparkConnectServer,
403-
app_version_label: &str,
404-
) -> Result<Service, Error> {
405-
let service_name = format!(
406-
"{cluster}-{role}-headless",
407-
cluster = scs.name_any(),
408-
role = SparkConnectRole::Server
409-
);
410-
411-
let selector =
412-
Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string())
413-
.context(LabelBuildSnafu)?
414-
.into();
415-
416-
Ok(Service {
417-
metadata: ObjectMetaBuilder::new()
418-
.name_and_namespace(scs)
419-
.name(service_name)
420-
.ownerreference_from_resource(scs, None, Some(true))
421-
.context(ObjectMissingMetadataForOwnerRefSnafu)?
422-
.with_recommended_labels(common::labels(
423-
scs,
424-
app_version_label,
425-
&SparkConnectRole::Server.to_string(),
426-
))
427-
.context(MetadataBuildSnafu)?
428-
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
429-
.build(),
430-
spec: Some(ServiceSpec {
431-
type_: Some("ClusterIP".to_owned()),
432-
cluster_ip: Some("None".to_owned()),
433-
ports: Some(vec![
434-
ServicePort {
435-
name: Some(String::from(GRPC)),
436-
port: CONNECT_GRPC_PORT,
437-
..ServicePort::default()
438-
},
439-
ServicePort {
440-
name: Some(String::from(HTTP)),
441-
port: CONNECT_UI_PORT,
442-
..ServicePort::default()
443-
},
444-
]),
445-
selector: Some(selector),
446-
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
447-
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
448-
// "ready" until the Service is "ready" and vice versa.
449-
publish_not_ready_addresses: Some(true),
450-
..ServiceSpec::default()
451-
}),
452-
status: None,
453-
})
454-
}
455-
456396
#[allow(clippy::result_large_err)]
457397
pub(crate) fn command_args(user_args: &[String]) -> Vec<String> {
458398
let mut command = vec![
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use snafu::{ResultExt, Snafu};
2+
use stackable_operator::{
3+
builder::{self, meta::ObjectMetaBuilder},
4+
k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec},
5+
kube::ResourceExt,
6+
kvp::{Annotations, Labels},
7+
};
8+
9+
use super::crd::CONNECT_APP_NAME;
10+
use crate::connect::{
11+
GRPC, HTTP,
12+
common::{self, SparkConnectRole},
13+
crd::{CONNECT_GRPC_PORT, CONNECT_UI_PORT, v1alpha1},
14+
};
15+
16+
#[derive(Snafu, Debug)]
17+
#[allow(clippy::enum_variant_names)]
18+
pub enum Error {
19+
#[snafu(display("object is missing metadata to build owner reference"))]
20+
ObjectMissingMetadataForOwnerRef { source: builder::meta::Error },
21+
22+
#[snafu(display("failed to build Labels"))]
23+
LabelBuild {
24+
source: stackable_operator::kvp::LabelError,
25+
},
26+
27+
#[snafu(display("failed to build Metadata"))]
28+
MetadataBuild { source: builder::meta::Error },
29+
}
30+
31+
// This is the headless driver service used for the internal
32+
// communication with the executors as recommended by the Spark docs.
33+
pub(crate) fn build_headless_service(
34+
scs: &v1alpha1::SparkConnectServer,
35+
app_version_label: &str,
36+
) -> Result<Service, Error> {
37+
let service_name = format!(
38+
"{cluster}-{role}-headless",
39+
cluster = scs.name_any(),
40+
role = SparkConnectRole::Server
41+
);
42+
43+
let selector =
44+
Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string())
45+
.context(LabelBuildSnafu)?
46+
.into();
47+
48+
Ok(Service {
49+
metadata: ObjectMetaBuilder::new()
50+
.name_and_namespace(scs)
51+
.name(service_name)
52+
.ownerreference_from_resource(scs, None, Some(true))
53+
.context(ObjectMissingMetadataForOwnerRefSnafu)?
54+
.with_recommended_labels(common::labels(
55+
scs,
56+
app_version_label,
57+
&SparkConnectRole::Server.to_string(),
58+
))
59+
.context(MetadataBuildSnafu)?
60+
.build(),
61+
spec: Some(ServiceSpec {
62+
type_: Some("ClusterIP".to_owned()),
63+
cluster_ip: Some("None".to_owned()),
64+
ports: Some(vec![
65+
ServicePort {
66+
name: Some(String::from(GRPC)),
67+
port: CONNECT_GRPC_PORT,
68+
..ServicePort::default()
69+
},
70+
ServicePort {
71+
name: Some(String::from(HTTP)),
72+
port: CONNECT_UI_PORT,
73+
..ServicePort::default()
74+
},
75+
]),
76+
selector: Some(selector),
77+
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
78+
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
79+
// "ready" until the Service is "ready" and vice versa.
80+
publish_not_ready_addresses: Some(true),
81+
..ServiceSpec::default()
82+
}),
83+
status: None,
84+
})
85+
}
86+
87+
// This is the metrics service
88+
pub(crate) fn build_metrics_service(
89+
scs: &v1alpha1::SparkConnectServer,
90+
app_version_label: &str,
91+
) -> Result<Service, Error> {
92+
let service_name = format!(
93+
"{cluster}-{role}-metrics",
94+
cluster = scs.name_any(),
95+
role = SparkConnectRole::Server
96+
);
97+
98+
let selector =
99+
Labels::role_selector(scs, CONNECT_APP_NAME, &SparkConnectRole::Server.to_string())
100+
.context(LabelBuildSnafu)?
101+
.into();
102+
103+
Ok(Service {
104+
metadata: ObjectMetaBuilder::new()
105+
.name_and_namespace(scs)
106+
.name(service_name)
107+
.ownerreference_from_resource(scs, None, Some(true))
108+
.context(ObjectMissingMetadataForOwnerRefSnafu)?
109+
.with_recommended_labels(common::labels(
110+
scs,
111+
app_version_label,
112+
&SparkConnectRole::Server.to_string(),
113+
))
114+
.context(MetadataBuildSnafu)?
115+
.with_labels(prometheus_labels())
116+
.with_annotations(prometheus_annotations())
117+
.build(),
118+
spec: Some(ServiceSpec {
119+
type_: Some("ClusterIP".to_owned()),
120+
cluster_ip: Some("None".to_owned()),
121+
ports: Some(metrics_ports()),
122+
selector: Some(selector),
123+
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
124+
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
125+
// "ready" until the Service is "ready" and vice versa.
126+
publish_not_ready_addresses: Some(true),
127+
..ServiceSpec::default()
128+
}),
129+
status: None,
130+
})
131+
}
132+
133+
fn metrics_ports() -> Vec<ServicePort> {
134+
vec![ServicePort {
135+
name: Some("metrics".to_string()),
136+
port: CONNECT_UI_PORT,
137+
protocol: Some("TCP".to_string()),
138+
..ServicePort::default()
139+
}]
140+
}
141+
142+
/// Common labels for Prometheus
143+
fn prometheus_labels() -> Labels {
144+
Labels::try_from([("prometheus.io/scrape", "true")]).expect("should be a valid label")
145+
}
146+
147+
/// Common annotations for Prometheus
148+
///
149+
/// These annotations can be used in a ServiceMonitor.
150+
///
151+
/// see also <https://github.com/prometheus-community/helm-charts/blob/prometheus-27.32.0/charts/prometheus/values.yaml#L983-L1036>
152+
fn prometheus_annotations() -> Annotations {
153+
Annotations::try_from([
154+
(
155+
"prometheus.io/path".to_owned(),
156+
"/metrics/prometheus".to_owned(),
157+
),
158+
("prometheus.io/port".to_owned(), CONNECT_UI_PORT.to_string()),
159+
("prometheus.io/scheme".to_owned(), "http".to_owned()),
160+
("prometheus.io/scrape".to_owned(), "true".to_owned()),
161+
])
162+
.expect("should be valid annotations")
163+
}

0 commit comments

Comments
 (0)