diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
index a326654f2a..6c2ed7009a 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_metric_configuration.html
@@ -32,6 +32,12 @@
Boolean |
Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx. |
+
+ kubernetes.operator.kubernetes.client.metrics.slow.request.threshold |
+ 5 s |
+ Duration |
+ Threshold value that triggers slow request counter for Kubernetes client metrics |
+
kubernetes.operator.metrics.histogram.sample.size |
1000 |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 71711e542f..ad044242af 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD;
import static org.apache.flink.kubernetes.operator.utils.EnvUtils.ENV_WATCH_NAMESPACES;
/** Configuration class for operator. */
@@ -75,6 +76,7 @@ public class FlinkOperatorConfiguration {
LeaderElectionConfiguration leaderElectionConfiguration;
DeletionPropagation deletionPropagation;
boolean snapshotResourcesEnabled;
+ Duration slowRequestThreshold;
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
@@ -190,6 +192,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
boolean snapshotResourcesEnabled =
operatorConfig.get(KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED);
+ Duration slowRequestThreshold =
+ operatorConfig.get(OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -218,7 +223,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
labelSelector,
getLeaderElectionConfig(operatorConfig),
deletionPropagation,
- snapshotResourcesEnabled);
+ snapshotResourcesEnabled,
+ slowRequestThreshold);
}
private static GenericRetry getRetryConfig(Configuration conf) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
index e74f73d614..2b796158de 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetrics.java
@@ -31,10 +31,9 @@
import io.fabric8.kubernetes.client.http.HttpRequest;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.Interceptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -48,6 +47,7 @@ public class KubernetesClientMetrics implements Interceptor {
public static final String KUBE_CLIENT_GROUP = "KubeClient";
public static final String HTTP_REQUEST_GROUP = "HttpRequest";
public static final String HTTP_REQUEST_FAILED_GROUP = "Failed";
+ public static final String HTTP_REQUEST_SLOW_GROUP = "Slow";
public static final String HTTP_RESPONSE_GROUP = "HttpResponse";
public static final String HTTP_RESPONSE_1XX = "1xx";
public static final String HTTP_RESPONSE_2XX = "2xx";
@@ -62,10 +62,12 @@ public class KubernetesClientMetrics implements Interceptor {
private final MetricGroup requestMetricGroup;
private final MetricGroup failedRequestMetricGroup;
+ private final MetricGroup slowRequestMetricGroup;
private final MetricGroup responseMetricGroup;
private final Counter requestCounter;
private final Counter failedRequestCounter;
+ private final Counter slowRequestCounter;
private final Counter responseCounter;
private final SynchronizedMeterView requestRateMeter;
@@ -79,7 +81,7 @@ public class KubernetesClientMetrics implements Interceptor {
private final Map requestMethodCounter = new ConcurrentHashMap<>();
private final LongSupplier nanoTimeSource;
- private final Logger logger = LoggerFactory.getLogger(KubernetesClientMetrics.class);
+ private final Duration slowRequestThreshold;
public KubernetesClientMetrics(
MetricGroup parentGroup, FlinkOperatorConfiguration flinkOperatorConfiguration) {
@@ -95,12 +97,16 @@ public KubernetesClientMetrics(
this.requestMetricGroup = metricGroup.addGroup(HTTP_REQUEST_GROUP);
this.failedRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_FAILED_GROUP);
+ this.slowRequestMetricGroup = requestMetricGroup.addGroup(HTTP_REQUEST_SLOW_GROUP);
this.responseMetricGroup = metricGroup.addGroup(HTTP_RESPONSE_GROUP);
this.requestCounter =
OperatorMetricUtils.synchronizedCounter(requestMetricGroup.counter(COUNTER));
this.failedRequestCounter =
OperatorMetricUtils.synchronizedCounter(failedRequestMetricGroup.counter(COUNTER));
+ this.slowRequestThreshold = flinkOperatorConfiguration.getSlowRequestThreshold();
+ this.slowRequestCounter =
+ OperatorMetricUtils.synchronizedCounter(slowRequestMetricGroup.counter(COUNTER));
this.responseCounter =
OperatorMetricUtils.synchronizedCounter(responseMetricGroup.counter(COUNTER));
@@ -207,6 +213,16 @@ Histogram getResponseLatency() {
return responseLatency;
}
+ @VisibleForTesting
+ public Counter getSlowRequestCounter() {
+ return slowRequestCounter;
+ }
+
+ @VisibleForTesting
+ public Duration getSlowRequestThreshold() {
+ return slowRequestThreshold;
+ }
+
@VisibleForTesting
SynchronizedMeterView getRequestFailedRateMeter() {
return requestFailedRateMeter;
@@ -236,6 +252,9 @@ private void trackRequestLatency(HttpRequest request) {
final long requestStartNanos = Long.parseLong(header);
final long latency = currentNanos - requestStartNanos;
this.responseLatency.update(latency);
+ if (latency >= slowRequestThreshold.toNanos()) {
+ slowRequestCounter.inc();
+ }
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
index bf13380c91..58412aeda3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -19,6 +19,8 @@
import org.apache.flink.configuration.ConfigOption;
+import java.time.Duration;
+
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
/** Configuration options for metrics. */
@@ -52,6 +54,13 @@ public class KubernetesOperatorMetricOptions {
.withDescription(
"Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server by response code group, e.g. 1xx, 2xx.");
+ public static final ConfigOption OPERATOR_KUBERNETES_SLOW_REQUEST_THRESHOLD =
+ operatorConfig("kubernetes.client.metrics.slow.request.threshold")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(5))
+ .withDescription(
+ "Threshold value that triggers slow request counter for Kubernetes client metrics");
+
public static final ConfigOption OPERATOR_RESOURCE_METRICS_ENABLED =
operatorConfig("resource.metrics.enabled")
.booleanType()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsFabric8InterceptorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsFabric8InterceptorTest.java
index 5b7423ea2c..196b0ed266 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsFabric8InterceptorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsFabric8InterceptorTest.java
@@ -345,6 +345,120 @@ registry, new Configuration(), NAMESPACE, NAME, HOST),
.isEqualTo(1000L);
}
+ @Test
+ void shouldTrackResponseAsSlowResponseAboveThreshold() {
+ // Given
+ long[] currentTime = {0L};
+ kubernetesClientMetrics =
+ new KubernetesClientMetrics(
+ KubernetesOperatorMetricGroup.create(
+ registry, new Configuration(), NAMESPACE, NAME, HOST),
+ FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
+ () -> currentTime[0]);
+ final HttpRequest postRequest =
+ builder.post("application/json", "{}").uri("/random").build();
+ kubernetesClientMetrics.before(builder, postRequest, emptyTags);
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() + 1L;
+
+ // When
+ kubernetesClientMetrics.after(
+ postRequest,
+ new StubHttpResponse(postRequest, Map.of(), 200),
+ (value, asyncBody) -> {});
+
+ // Then
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(1L);
+ }
+
+ @Test
+ void shouldNotTrackResponseAsSlowResponseBelowThreshold() {
+ // Given
+ long[] currentTime = {0L};
+ kubernetesClientMetrics =
+ new KubernetesClientMetrics(
+ KubernetesOperatorMetricGroup.create(
+ registry, new Configuration(), NAMESPACE, NAME, HOST),
+ FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
+ () -> currentTime[0]);
+ final HttpRequest postRequest =
+ builder.post("application/json", "{}").uri("/random").build();
+ kubernetesClientMetrics.before(builder, postRequest, emptyTags);
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() - 1L;
+
+ // When
+ kubernetesClientMetrics.after(
+ postRequest,
+ new StubHttpResponse(postRequest, Map.of(), 200),
+ (value, asyncBody) -> {});
+
+ // Then
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ }
+
+ @Test
+ void shouldTrackResponseForFailedConnectionAsSlowResponseAboveThreshold() {
+ // Given
+ long[] currentTime = {0L};
+ kubernetesClientMetrics =
+ new KubernetesClientMetrics(
+ KubernetesOperatorMetricGroup.create(
+ registry, new Configuration(), NAMESPACE, NAME, HOST),
+ FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
+ () -> currentTime[0]);
+ final HttpRequest postRequest =
+ builder.post("application/json", "{}").uri("/random").build();
+ kubernetesClientMetrics.before(builder, postRequest, emptyTags);
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() + 1L;
+
+ // When
+ kubernetesClientMetrics.afterConnectionFailure(postRequest, new RuntimeException("kaboom"));
+
+ // Then
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(1L);
+ }
+
+ @Test
+ void shouldNotTrackResponseForFailedConnectionAsSlowResponseUnderThreshold() {
+ // Given
+ long[] currentTime = {0L};
+ kubernetesClientMetrics =
+ new KubernetesClientMetrics(
+ KubernetesOperatorMetricGroup.create(
+ registry, new Configuration(), NAMESPACE, NAME, HOST),
+ FlinkOperatorConfiguration.fromConfiguration(operatorConfig),
+ () -> currentTime[0]);
+ final HttpRequest postRequest =
+ builder.post("application/json", "{}").uri("/random").build();
+ kubernetesClientMetrics.before(builder, postRequest, emptyTags);
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ currentTime[0] += kubernetesClientMetrics.getSlowRequestThreshold().toNanos() - 1L;
+
+ // When
+ kubernetesClientMetrics.afterConnectionFailure(postRequest, new RuntimeException("kaboom"));
+
+ // Then
+ assertThat(kubernetesClientMetrics.getSlowRequestCounter())
+ .extracting(Counter::getCount)
+ .isEqualTo(0L);
+ }
+
@Test
void shouldTrackFailedRequests() {
// Given