Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/config_properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
| spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | false | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server. Since the metrics is collected via Okhttp interceptors, can be disabled when opt in customized interceptors. |
| spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled | Boolean | true | false | When enabled, additional metrics group by http response code group(1xx, 2xx, 3xx, 4xx, 5xx) received from API server will be added. Users can disable it when their monitoring system can combine lower level kubernetes.client.http.response.<3-digit-response-code> metrics. |
| spark.kubernetes.operator.metrics.port | Integer | 19090 | false | The port used for checking metrics |
| spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled | Boolean | true | false | Whether or not to enable text-based format for Prometheus 2.0, as recommended by https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format |
| spark.kubernetes.operator.metrics.sanitizePrometheusMetricsNameEnabled | Boolean | true | false | Whether or not to enable automatic name sanitizing for all metrics based on best-practice guide from Prometheus https://prometheus.io/docs/practices/naming/ |
| spark.kubernetes.operator.health.probePort | Integer | 19091 | false | The port used for health/readiness check probe status. |
| spark.kubernetes.operator.health.sentinelExecutorPoolSize | Integer | 3 | false | Size of executor service in Sentinel Managers to check the health of sentinel resources. |
| spark.kubernetes.operator.health.sentinelResourceReconciliationDelaySeconds | Integer | 60 | true | Allowed max time(seconds) between spec update and reconciliation for sentinel resources. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,30 @@ public final class SparkOperatorConf {
.defaultValue(19090)
.build();

public static final ConfigOption<Boolean> PROMETHEUS_TEXT_BASED_FORMAT_ENABLED =
ConfigOption.<Boolean>builder()
.key("spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled")
.enableDynamicOverride(false)
.description(
"Whether or not to enable text-based format for Prometheus 2.0, as "
+ "recommended by "
+ "https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format")
.typeParameterClass(Boolean.class)
.defaultValue(true)
.build();

public static final ConfigOption<Boolean> SANITIZE_PROMETHEUS_METRICS_NAME_ENABLED =
ConfigOption.<Boolean>builder()
.key("spark.kubernetes.operator.metrics.sanitizePrometheusMetricsNameEnabled")
.enableDynamicOverride(false)
.description(
"Whether or not to enable automatic name sanitizing for all metrics based on "
+ "best-practice guide from Prometheus "
+ "https://prometheus.io/docs/practices/naming/")
.typeParameterClass(Boolean.class)
.defaultValue(true)
.build();

public static final ConfigOption<Integer> OPERATOR_PROBE_PORT =
ConfigOption.<Integer>builder()
.key("spark.kubernetes.operator.health.probePort")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,38 @@
import java.util.Map;
import java.util.Properties;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import jakarta.servlet.http.HttpServletRequest;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import org.apache.spark.k8s.operator.config.SparkOperatorConf;
import org.apache.spark.metrics.sink.PrometheusServlet;

/** Serves as simple Prometheus sink (pull model), presenting metrics snapshot as HttpHandler. */
@Slf4j
public class PrometheusPullModelHandler extends PrometheusServlet implements HttpHandler {
private static final String EMPTY_RECORD_VALUE = "[]";
@Getter private final MetricRegistry registry;
@Getter private final boolean enablePrometheusTextBasedFormat;
@Getter private final boolean enableSanitizePrometheusMetricsName;

public PrometheusPullModelHandler(Properties properties, MetricRegistry registry) {
super(properties, registry);
this.registry = registry;
this.enablePrometheusTextBasedFormat =
SparkOperatorConf.PROMETHEUS_TEXT_BASED_FORMAT_ENABLED.getValue();
this.enableSanitizePrometheusMetricsName =
SparkOperatorConf.SANITIZE_PROMETHEUS_METRICS_NAME_ENABLED.getValue();
}

@Override
Expand All @@ -58,13 +75,21 @@ public void stop() {

@Override
public void handle(HttpExchange exchange) throws IOException {
HttpServletRequest httpServletRequest = null;
String value = getMetricsSnapshot(httpServletRequest);
sendMessage(
exchange,
HTTP_OK,
String.join("\n", filterNonEmptyRecords(value)),
Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4")));
if (enablePrometheusTextBasedFormat) {
sendMessage(
exchange,
HTTP_OK,
formatMetricsSnapshot(),
Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4")));
} else {
HttpServletRequest httpServletRequest = null;
String value = getMetricsSnapshot(httpServletRequest);
sendMessage(
exchange,
HTTP_OK,
String.join("\n", filterNonEmptyRecords(value)),
Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4")));
}
}

protected List<String> filterNonEmptyRecords(String metricsSnapshot) {
Expand All @@ -82,4 +107,254 @@ protected List<String> filterNonEmptyRecords(String metricsSnapshot) {
}
return filteredRecords;
}

protected String formatMetricsSnapshot() {
Map<String, Gauge> gauges = registry.getGauges();
Map<String, Counter> counters = registry.getCounters();
Map<String, Histogram> histograms = registry.getHistograms();
Map<String, Meter> meters = registry.getMeters();
Map<String, Timer> timers = registry.getTimers();

StringBuilder stringBuilder = new StringBuilder();

for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
appendIfNotEmpty(stringBuilder, formatGauge(entry.getKey(), entry.getValue()));
}

// Counters
for (Map.Entry<String, Counter> entry : counters.entrySet()) {
String name = sanitize(entry.getKey()) + "_total";
Counter counter = entry.getValue();
appendIfNotEmpty(stringBuilder, formatCounter(name, counter));
}

// Histograms
for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
appendIfNotEmpty(stringBuilder, formatHistogram(entry.getKey(), entry.getValue()));
}

// Meters
for (Map.Entry<String, Meter> entry : meters.entrySet()) {
appendIfNotEmpty(stringBuilder, formatMeter(entry.getKey(), entry.getValue()));
}

// Timers (Meter + Histogram in nanoseconds)
for (Map.Entry<String, Timer> entry : timers.entrySet()) {
appendIfNotEmpty(stringBuilder, formatTimer(entry.getKey(), entry.getValue()));
}
return stringBuilder.toString();
}

protected void appendIfNotEmpty(StringBuilder stringBuilder, String value) {
if (StringUtils.isNotEmpty(value)) {
stringBuilder.append(value);
}
}

protected String formatGauge(String name, Gauge gauge) {
if (gauge != null
&& gauge.getValue() != null
&& !EMPTY_RECORD_VALUE.equals(gauge.getValue())
&& gauge.getValue() instanceof Number) {
String formattedName = sanitize(name);
return "# HELP "
+ formattedName
+ " Gauge metric\n"
+ "# TYPE "
+ formattedName
+ " gauge\n"
+ formattedName
+ ' '
+ gauge.getValue()
+ "\n\n";
}
return null;
}

protected String formatCounter(String name, Counter counter) {
if (counter != null) {
String formattedName = sanitize(name);
return "# HELP "
+ formattedName
+ " Counter metric\n"
+ "# TYPE "
+ formattedName
+ " counter\n"
+ formattedName
+ " "
+ counter.getCount()
+ "\n\n";
}
return null;
}

protected String formatHistogram(String name, Histogram histogram) {
if (histogram != null && histogram.getSnapshot() != null) {
String baseName = sanitize(name);
Snapshot snap = histogram.getSnapshot();
long count = histogram.getCount();
boolean isNanosHistogram = baseName.contains("nanos");
if (isNanosHistogram) {
baseName = nanosMetricsNameToSeconds(baseName);
}
double sum =
isNanosHistogram ? nanosToSeconds(snap.getMean() * count) : snap.getMean() * count;
return "# HELP "
+ baseName
+ " Histogram metric\n# TYPE "
+ baseName
+ " summary\n"
+ baseName
+ "{quantile=\"0.5\"} "
+ (isNanosHistogram ? nanosToSeconds(snap.getMedian()) : snap.getMean())
+ "\n"
+ baseName
+ "{quantile=\"0.75\"} "
+ (isNanosHistogram ? nanosToSeconds(snap.get75thPercentile()) : snap.get75thPercentile())
+ "\n"
+ baseName
+ "{quantile=\"0.95\"} "
+ (isNanosHistogram ? nanosToSeconds(snap.get95thPercentile()) : snap.get95thPercentile())
+ "\n"
+ baseName
+ "{quantile=\"0.98\"} "
+ (isNanosHistogram ? nanosToSeconds(snap.get98thPercentile()) : snap.get98thPercentile())
+ "\n"
+ baseName
+ "{quantile=\"0.99\"} "
+ (isNanosHistogram ? nanosToSeconds(snap.get99thPercentile()) : snap.get99thPercentile())
+ "\n"
+ baseName
+ "{quantile=\"0.999\"} "
+ (isNanosHistogram
? nanosToSeconds(snap.get999thPercentile())
: snap.get99thPercentile())
+ "\n"
+ baseName
+ "_count "
+ count
+ "\n"
+ baseName
+ "_sum "
+ sum
+ "\n\n";
}
return null;
}

protected String formatMeter(String name, Meter meter) {
if (meter != null) {
String baseName = sanitize(name);
return "# HELP "
+ baseName
+ "_total Meter count\n# TYPE "
+ baseName
+ "_total counter\n"
+ baseName
+ "_total "
+ meter.getCount()
+ "\n\n# TYPE "
+ baseName
+ "_rate gauge\n"
+ baseName
+ "_m1_rate "
+ meter.getOneMinuteRate()
+ "\n"
+ baseName
+ "_m5_rate "
+ meter.getFiveMinuteRate()
+ "\n"
+ baseName
+ "_m15_rate "
+ meter.getFifteenMinuteRate()
+ "\n\n";
}
return null;
}

protected String formatTimer(String name, Timer timer) {
if (timer != null && timer.getSnapshot() != null) {
String baseName = sanitize(name);
Snapshot snap = timer.getSnapshot();
long count = timer.getCount();
return "# HELP "
+ baseName
+ "_duration_seconds Timer summary\n# TYPE "
+ baseName
+ "_duration_seconds summary\n"
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.5\"} "
+ nanosToSeconds(snap.getMedian())
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.75\"} "
+ nanosToSeconds(snap.get75thPercentile())
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.95\"} "
+ nanosToSeconds(snap.get95thPercentile())
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.98\"} "
+ nanosToSeconds(snap.get98thPercentile())
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.99\"} "
+ nanosToSeconds(snap.get99thPercentile())
+ "\n"
+ baseName
+ "_duration_seconds"
+ "{quantile=\"0.999\"} "
+ nanosToSeconds(snap.get999thPercentile())
+ "\n"
+ baseName
+ "_duration_seconds_count "
+ count
+ "\n"
+ baseName
+ "_duration_seconds_sum "
+ nanosToSeconds(snap.getMean() * count)
+ "\n\n# TYPE "
+ baseName
+ " gauge\n"
+ baseName
+ "_count "
+ count
+ "\n"
+ baseName
+ "_m1_rate "
+ timer.getOneMinuteRate()
+ "\n"
+ baseName
+ "_m5_rate "
+ timer.getFiveMinuteRate()
+ "\n"
+ baseName
+ "_m15_rate "
+ timer.getFifteenMinuteRate()
+ "\n\n";
}
return null;
}

protected double nanosToSeconds(double nanos) {
return nanos / 1_000_000_000.0;
}

protected String sanitize(String name) {
if (enableSanitizePrometheusMetricsName) {
return name.replaceAll("[^a-zA-Z0-9_]", "_").toLowerCase();
}
return name;
}

protected String nanosMetricsNameToSeconds(String name) {
return name.replaceAll("_nanos", "_seconds");
}
}
Loading
Loading