Skip to content

Commit cdcef65

Browse files
jiangzhopeter-toth
authored andcommitted
[SPARK-53405] Add metrics recording for latency of Spark app state transition
### What changes were proposed in this pull request? This PR adds metrics tracking state transition latency for Spark Application in format of ``` sparkapp.latency.from.<fromState>.to.<toState> ``` ### Why are the changes needed? Latency measuring would be useful to analyze the performance from scheduling / orchestration perspective. For example, to analyze which state causes significant overhad and therefore optimize at cluster/app level. ### Does this PR introduce _any_ user-facing change? More metrics becomes available. ### How was this patch tested? CIs. New unit test added. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#299 from jiangzho/state_transition. Authored-by: Zhou JIANG <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent 5f476c3 commit cdcef65

File tree

7 files changed

+301
-72
lines changed

7 files changed

+301
-72
lines changed

docs/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,20 @@ via [Codahale JVM Metrics](https://javadoc.io/doc/com.codahale.metrics/metrics-j
9191
| kubernetes.client.`ResourceName`.`Method` | Meter | Tracking the rates of HTTP request for a combination of one Kubernetes resource and one http method |
9292
| kubernetes.client.`NamespaceName`.`ResourceName`.`Method` | Meter | Tracking the rates of HTTP request for a combination of one namespace-scoped Kubernetes resource and one http method |
9393

94+
### Latency for State Transition
95+
96+
Spark Operator also measures the latency between each state transition for apps, in the format of
97+
98+
| Metrics Name | Type | Description |
99+
|------------------------------------------------------|-------|------------------------------------------------------------------|
100+
| sparkapp.latency.from.`<fromState>`.to.`<toState>` | Timer | Tracking latency for app of transition from one state to another |
101+
102+
The latency metrics can be used to provide insights about time spent in each state. For example, a
103+
long latency between `DriverRequested` and `DriverStarted` indicates overhead for driver pod to be
104+
scheduled. Latency between `DriverStarted` and `DriverReady` indicates overhead to pull image, to
105+
run init containers and to start SparkSession. These metrics can be used to analyze the overhead
106+
from multiple dimensions.
107+
94108
### Forward Metrics to Prometheus
95109

96110
In this section, we will show you how to forward Spark Operator metrics

spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.spark.k8s.operator.metrics.MetricsService;
4747
import org.apache.spark.k8s.operator.metrics.MetricsSystem;
4848
import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
49+
import org.apache.spark.k8s.operator.metrics.SparkAppStatusRecorderSource;
4950
import org.apache.spark.k8s.operator.metrics.healthcheck.SentinelManager;
5051
import org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
5152
import org.apache.spark.k8s.operator.metrics.source.OperatorJosdkMetrics;
@@ -83,7 +84,10 @@ public SparkOperator() {
8384
KubernetesClientFactory.buildKubernetesClient(getClientInterceptors(metricsSystem));
8485
this.appSubmissionWorker = new SparkAppSubmissionWorker();
8586
this.clusterSubmissionWorker = new SparkClusterSubmissionWorker();
86-
this.sparkAppStatusRecorder = new SparkAppStatusRecorder(getAppStatusListener());
87+
SparkAppStatusRecorderSource recorderSource = new SparkAppStatusRecorderSource();
88+
this.metricsSystem.registerSource(recorderSource);
89+
this.sparkAppStatusRecorder =
90+
new SparkAppStatusRecorder(getAppStatusListener(), recorderSource);
8791
this.sparkClusterStatusRecorder = new SparkClusterStatusRecorder(getClusterStatusListener());
8892
this.registeredSparkControllers = new HashSet<>();
8993
this.watchedNamespaces = getWatchedNamespaces();
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.metrics;
21+
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
25+
import com.codahale.metrics.Counter;
26+
import com.codahale.metrics.Gauge;
27+
import com.codahale.metrics.Histogram;
28+
import com.codahale.metrics.MetricRegistry;
29+
import com.codahale.metrics.Timer;
30+
import lombok.RequiredArgsConstructor;
31+
32+
@RequiredArgsConstructor
33+
public class BaseOperatorSource {
34+
protected final MetricRegistry metricRegistry;
35+
protected final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
36+
protected final Map<String, Counter> counters = new ConcurrentHashMap<>();
37+
protected final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
38+
protected final Map<String, Timer> timers = new ConcurrentHashMap<>();
39+
40+
protected Histogram getHistogram(String metricNamePrefix, String... names) {
41+
Histogram histogram;
42+
String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase();
43+
if (histograms.containsKey(metricName)) {
44+
histogram = histograms.get(metricName);
45+
} else {
46+
histogram = metricRegistry.histogram(metricName);
47+
histograms.put(metricName, histogram);
48+
}
49+
return histogram;
50+
}
51+
52+
protected Counter getCounter(String metricNamePrefix, String... names) {
53+
Counter counter;
54+
String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase();
55+
if (counters.containsKey(metricName)) {
56+
counter = counters.get(metricName);
57+
} else {
58+
counter = metricRegistry.counter(metricName);
59+
counters.put(metricName, counter);
60+
}
61+
return counter;
62+
}
63+
64+
protected Gauge<?> getGauge(Gauge<?> defaultGauge, String metricNamePrefix, String... names) {
65+
Gauge<?> gauge;
66+
String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase();
67+
if (gauges.containsKey(metricName)) {
68+
gauge = gauges.get(metricName);
69+
} else {
70+
gauge = defaultGauge;
71+
gauges.put(metricName, defaultGauge);
72+
}
73+
return gauge;
74+
}
75+
76+
protected Timer getTimer(String metricNamePrefix, String... names) {
77+
Timer timer;
78+
String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase();
79+
if (timers.containsKey(metricName)) {
80+
timer = timers.get(metricName);
81+
} else {
82+
timer = metricRegistry.timer(metricName);
83+
timers.put(metricName, timer);
84+
}
85+
return timer;
86+
}
87+
88+
protected String getMetricNamePrefix(Class<?> klass) {
89+
return klass.getSimpleName();
90+
}
91+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.metrics;
21+
22+
import java.time.Duration;
23+
import java.time.Instant;
24+
25+
import com.codahale.metrics.MetricRegistry;
26+
27+
import org.apache.spark.k8s.operator.status.ApplicationState;
28+
import org.apache.spark.k8s.operator.status.ApplicationStatus;
29+
import org.apache.spark.metrics.source.Source;
30+
31+
public class SparkAppStatusRecorderSource extends BaseOperatorSource implements Source {
32+
33+
public static final String RESOURCE_TYPE = "sparkapp";
34+
public static final String LATENCY_METRIC_FORMAT = "latency.from.%s.to.%s";
35+
36+
public SparkAppStatusRecorderSource() {
37+
super(new MetricRegistry());
38+
}
39+
40+
@Override
41+
public String sourceName() {
42+
return "SparkAppStatusRecorder";
43+
}
44+
45+
@Override
46+
public MetricRegistry metricRegistry() {
47+
return metricRegistry;
48+
}
49+
50+
public void recordStatusUpdateLatency(
51+
final ApplicationStatus status, final ApplicationState newState) {
52+
ApplicationState currentState = status.getCurrentState();
53+
if (currentState != null) {
54+
Duration duration =
55+
Duration.between(
56+
Instant.parse(currentState.getLastTransitionTime()),
57+
Instant.parse(newState.getLastTransitionTime()));
58+
getTimer(
59+
RESOURCE_TYPE,
60+
String.format(
61+
LATENCY_METRIC_FORMAT,
62+
currentState.getCurrentStateSummary().name(),
63+
newState.getCurrentStateSummary().name()))
64+
.update(duration);
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)