Skip to content

Commit 0c99d3a

Browse files
committed
expose prometheus metrics
1 parent 6113819 commit 0c99d3a

File tree

6 files changed

+94
-14
lines changed

6 files changed

+94
-14
lines changed

docs/modules/spark-k8s/pages/usage-guide/spark-connect.adoc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,25 @@ include::example$example-spark-connect.yaml[]
2626
<7> Customize the driver properties in the `server` role. The number of cores here is not related to Kubernetes cores!
2727
<8> Customize `spark.executor.\*` and `spark.kubernetes.executor.*` in the `executor` role.
2828

29+
== Metrics
30+
31+
The server pod exposes Prometheus metrics at the following endpoints:
32+
33+
* `/metrics/prometheus` for driver instances.
34+
* `/metrics/executors/prometheus` for executor instances.
35+
36+
To customize the metrics configuration use the `spec.server.configOverrides' like this:
37+
38+
```
39+
spec:
40+
server:
41+
configOverrides:
42+
metrics.properties:
43+
applications.sink.prometheusServlet.path: "/metrics/applications/prometheus"
44+
```
45+
46+
The example above adds a new endpoint for application metrics.
47+
2948
== Notable Omissions
3049

3150
The following features are not supported by the Stackable Spark operator yet

rust/operator-binary/src/connect/common.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,29 @@ pub fn security_properties(
118118

119119
to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
120120
}
121+
122+
pub fn metrics_properties(
123+
config_overrides: Option<&HashMap<String, String>>,
124+
) -> Result<String, Error> {
125+
let mut result: BTreeMap<String, Option<String>> = [
126+
(
127+
"*.sink.prometheusServlet.class".to_string(),
128+
Some("org.apache.spark.metrics.sink.PrometheusServlet".to_string()),
129+
),
130+
(
131+
"*.sink.prometheusServlet.path".to_string(),
132+
Some("/metrics/prometheus".to_string()),
133+
),
134+
]
135+
.into();
136+
137+
if let Some(user_config) = config_overrides {
138+
result.extend(
139+
user_config
140+
.iter()
141+
.map(|(k, v)| (k.clone(), Some(v.clone()))),
142+
);
143+
}
144+
145+
to_java_properties_string(result.iter()).context(JvmSecurityPropertiesSnafu)
146+
}

rust/operator-binary/src/connect/executor.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use crate::{
2929
connect::{common, crd::v1alpha1},
3030
crd::constants::{
3131
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE,
32-
POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, VOLUME_MOUNT_NAME_CONFIG,
33-
VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG,
34-
VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
32+
METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME,
33+
VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG,
34+
VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
3535
},
3636
product_logging,
3737
};
@@ -78,6 +78,9 @@ pub enum Error {
7878
#[snafu(display("failed build connect executor security properties"))]
7979
ExecutorJvmSecurityProperties { source: common::Error },
8080

81+
#[snafu(display("executor metrics properties for spark connect {name}",))]
82+
MetricsProperties { source: common::Error, name: String },
83+
8184
#[snafu(display("failed build connect executor config map metadata"))]
8285
ConfigMapMetadataBuild { source: builder::meta::Error },
8386

@@ -331,6 +334,16 @@ pub fn executor_config_map(
331334
)
332335
.context(ExecutorJvmSecurityPropertiesSnafu)?;
333336

337+
let metrics_props = common::metrics_properties(
338+
scs.spec
339+
.executor
340+
.as_ref()
341+
.and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)),
342+
)
343+
.context(MetricsPropertiesSnafu {
344+
name: scs.name_unchecked(),
345+
})?;
346+
334347
let mut cm_builder = ConfigMapBuilder::new();
335348

336349
cm_builder
@@ -348,7 +361,8 @@ pub fn executor_config_map(
348361
.context(ConfigMapMetadataBuildSnafu)?
349362
.build(),
350363
)
351-
.add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props);
364+
.add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props)
365+
.add_data(METRICS_PROPERTIES_FILE, metrics_props);
352366

353367
let role_group_ref = RoleGroupRef {
354368
cluster: ObjectRef::from_obj(scs),

rust/operator-binary/src/connect/server.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::{
3939
},
4040
crd::constants::{
4141
APP_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE,
42-
METRICS_PORT, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID,
42+
METRICS_PROPERTIES_FILE, POD_TEMPLATE_FILE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID,
4343
VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG,
4444
VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
4545
},
@@ -83,6 +83,9 @@ pub enum Error {
8383
#[snafu(display("server jvm security properties for spark connect {name}",))]
8484
ServerJvmSecurityProperties { source: common::Error, name: String },
8585

86+
#[snafu(display("server metrics properties for spark connect {name}",))]
87+
MetricsProperties { source: common::Error, name: String },
88+
8689
#[snafu(display("failed to serialize [{SPARK_DEFAULTS_FILE_NAME}] for the connect server",))]
8790
SparkDefaultsProperties {
8891
source: product_config::writer::PropertiesWriterError,
@@ -138,6 +141,16 @@ pub fn server_config_map(
138141
name: scs.name_unchecked(),
139142
})?;
140143

144+
let metrics_props = common::metrics_properties(
145+
scs.spec
146+
.server
147+
.as_ref()
148+
.and_then(|s| s.config_overrides.get(METRICS_PROPERTIES_FILE)),
149+
)
150+
.context(MetricsPropertiesSnafu {
151+
name: scs.name_unchecked(),
152+
})?;
153+
141154
let mut cm_builder = ConfigMapBuilder::new();
142155

143156
cm_builder
@@ -157,7 +170,8 @@ pub fn server_config_map(
157170
)
158171
.add_data(SPARK_DEFAULTS_FILE_NAME, spark_properties)
159172
.add_data(POD_TEMPLATE_FILE, executor_pod_template_spec)
160-
.add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props);
173+
.add_data(JVM_SECURITY_PROPERTIES_FILE, jvm_sec_props)
174+
.add_data(METRICS_PROPERTIES_FILE, metrics_props);
161175

162176
let role_group_ref = RoleGroupRef {
163177
cluster: ObjectRef::from_obj(scs),
@@ -197,6 +211,7 @@ pub fn build_deployment(
197211
&SparkConnectRole::Server.to_string(),
198212
))
199213
.context(MetadataBuildSnafu)?
214+
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
200215
.build();
201216

202217
let mut pb = PodBuilder::new();
@@ -248,7 +263,6 @@ pub fn build_deployment(
248263
.args(args)
249264
.add_container_port("grpc", CONNECT_GRPC_PORT)
250265
.add_container_port("http", CONNECT_UI_PORT)
251-
.add_container_port("metrics", METRICS_PORT.into())
252266
.add_env_vars(container_env)
253267
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
254268
.context(AddVolumeMountSnafu)?
@@ -397,11 +411,6 @@ pub fn build_service(
397411
port: CONNECT_UI_PORT,
398412
..ServicePort::default()
399413
},
400-
ServicePort {
401-
name: Some(String::from("metrics")),
402-
port: METRICS_PORT.into(),
403-
..ServicePort::default()
404-
},
405414
]),
406415
selector: Some(selector),
407416
publish_not_ready_addresses,
@@ -512,6 +521,18 @@ pub fn server_properties(
512521
"spark.driver.extraClassPath".to_string(),
513522
Some(format!("/stackable/spark/extra-jars/*:/stackable/spark/connect/spark-connect_2.12-{spark_version}.jar")),
514523
),
524+
(
525+
"spark.metrics.conf".to_string(),
526+
Some(format!("{VOLUME_MOUNT_PATH_CONFIG}/{METRICS_PROPERTIES_FILE}")),
527+
),
528+
// This enables the "/metrics/executors/prometheus" endpoint on the server pod.
529+
// The driver collects metrics from the executors and makes them available here.
530+
// The "/metrics/prometheus" endpoint delievers the driver metrics.
531+
(
532+
"spark.ui.prometheus.enabled".to_string(),
533+
Some("true".to_string()),
534+
),
535+
515536
]
516537
.into();
517538

@@ -556,7 +577,7 @@ fn probe() -> Probe {
556577
http_get: Some(HTTPGetAction {
557578
port: IntOrString::Int(CONNECT_UI_PORT),
558579
scheme: Some("HTTP".to_string()),
559-
path: Some("/metrics".to_string()),
580+
path: Some("/metrics/prometheus".to_string()),
560581
..Default::default()
561582
}),
562583
failure_threshold: Some(10),

rust/operator-binary/src/crd/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub const VOLUME_MOUNT_PATH_LOG: &str = "/stackable/log";
3333
pub const LOG4J2_CONFIG_FILE: &str = "log4j2.properties";
3434

3535
pub const JVM_SECURITY_PROPERTIES_FILE: &str = "security.properties";
36+
pub const METRICS_PROPERTIES_FILE: &str = "metrics.properties";
3637

3738
pub const ACCESS_KEY_ID: &str = "accessKey";
3839
pub const SECRET_ACCESS_KEY: &str = "secretKey";

tests/test-definition.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ tests:
118118
dimensions:
119119
- spark-connect
120120
- spark-connect-client
121-
- s3-use-tls
122121
- openshift
123122

124123
suites:

0 commit comments

Comments
 (0)