Skip to content

Commit b0d7d95

Browse files
committed
[SPARK-53647][SPARK-53648] Use VertxHttpClientFactory and io.fabric8.kubernetes.client.http.Interceptor
### What changes were proposed in this pull request? Like Apache Spark main repository, this PR aims to make `Apache Spark K8s Operator` be independent from `OkHttp3` library for long-term maintainability. - apache/spark#49159 - [SPARK-50493 Migrate kubernetes-client from `6.x` to `7.x`](https://issues.apache.org/jira/browse/SPARK-50493) - [SPARK-37687 Cleanup direct usage of OkHttpClient](https://issues.apache.org/jira/browse/SPARK-37687) - apache/spark#52346 Technically, this goal is achieved by the following in this PR. - SPARK-53647 Use `io.fabric8.kubernetes.client.http.Interceptor` instead of `okhttp3.Interceptor` - SPARK-53648 Use `VertxHttpClientFactory` instead of `OkHttpClientFactory` ### Why are the changes needed? Currently, `Apache Spark K8s Operator` has a hard compilation dependency on `OkHttp3` library like the following. https://github.com/apache/spark-kubernetes-operator/blob/a04c2bb9aeee5856681f796129e2f698a38e6ac1/spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java#L38-L41 From `Fabric8` v7.0.0, we should avoid `OkHttp3` because `Fabric8` community moved away from it like the following. - `VertxHttpClientFactory` is the default HTTP client factory now. - fabric8io/kubernetes-client#6470 - `io.fabric8.kubernetes.client.http.Interceptor` is the `fabric8`'s interceptor layer which we should use to be independent from the underlying HTTP factories. We should depend on this instead of exposing `okhttp3.Interceptor`. ### Does this PR introduce _any_ user-facing change? Yes, but Apache Spark K8s versions are still 0.x releases. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#327 from dongjoon-hyun/TODO_METRICS. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6a67c23 commit b0d7d95

File tree

8 files changed

+81
-44
lines changed

8 files changed

+81
-44
lines changed

docs/config_properties.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
| spark.kubernetes.operator.api.statusPatchMaxAttempts | Long | 3 | false | Maximal number of retry attempts of requests to k8s server for resource status update. This would be performed on top of k8s client spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting update on the same SparkApplication. This should be positive number. |
2727
| spark.kubernetes.operator.api.secondaryResourceCreateMaxAttempts | Long | 3 | false | Maximal number of retry attempts of requesting secondary resource for Spark application. This would be performed on top of k8s client spark.kubernetes.operator.retry.maxAttempts to overcome potential conflicting reconcile on the same SparkApplication. This should be positive number |
2828
| spark.kubernetes.operator.metrics.josdkMetricsEnabled | Boolean | true | false | When enabled, the josdk metrics will be added in metrics source and configured for operator. |
29-
| spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | false | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server. Since the metrics is collected via Okhttp interceptors, can be disabled when opt in customized interceptors. |
29+
| spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | false | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server. Since the metrics is collected via interceptors, can be disabled when opt in customized interceptors. |
3030
| spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled | Boolean | true | false | When enabled, additional metrics group by http response code group(1xx, 2xx, 3xx, 4xx, 5xx) received from API server will be added. Users can disable it when their monitoring system can combine lower level kubernetes.client.http.response.<3-digit-response-code> metrics. |
3131
| spark.kubernetes.operator.metrics.port | Integer | 19090 | false | The port used for checking metrics |
3232
| spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled | Boolean | true | false | Whether or not to enable text-based format for Prometheus 2.0, as recommended by https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format |

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ shadow-jar-plugin = "8.3.6"
4040

4141
[libraries]
4242
kubernetes-client = { group = "io.fabric8", name = "kubernetes-client", version.ref = "fabric8" }
43-
kubernetes-httpclient-okhttp = { group = "io.fabric8", name = "kubernetes-httpclient-okhttp", version.ref = "fabric8" }
43+
kubernetes-httpclient-vertx = { group = "io.fabric8", name = "kubernetes-httpclient-vertx", version.ref = "fabric8" }
4444
kubernetes-server-mock = { group = "io.fabric8", name = "kubernetes-server-mock", version.ref = "fabric8" }
4545
kube-api-test-client-inject = {group = "io.fabric8", name = "kube-api-test-client-inject", version.ref = "fabric8"}
4646
crd-generator-apt = { group = "io.fabric8", name = "crd-generator-apt", version.ref = "fabric8" }

spark-operator/build.gradle

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ dependencies {
2626
exclude group: 'com.squareup.okio'
2727
exclude group: 'io.fabric8'
2828
}
29-
implementation(libs.kubernetes.httpclient.okhttp) {
30-
exclude group: 'com.squareup.okhttp3'
31-
}
32-
implementation(libs.okhttp)
29+
implementation(libs.kubernetes.httpclient.vertx)
3330
implementation(libs.spotbugs.annotations)
3431
// logging
3532
implementation(libs.log4j.api)

spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@
3333
import java.util.concurrent.Executors;
3434

3535
import io.fabric8.kubernetes.client.KubernetesClient;
36+
import io.fabric8.kubernetes.client.http.Interceptor;
3637
import io.javaoperatorsdk.operator.Operator;
3738
import io.javaoperatorsdk.operator.RegisteredController;
3839
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
3940
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
4041
import lombok.extern.slf4j.Slf4j;
41-
import okhttp3.Interceptor;
4242

4343
import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
4444
import org.apache.spark.k8s.operator.config.SparkOperatorConf;
@@ -249,8 +249,8 @@ protected void overrideControllerConfigs(ControllerConfigurationOverrider<?> ove
249249
}
250250

251251
/**
252-
* Returns a list of OkHttp interceptors for the Kubernetes client, including metrics interceptors
253-
* if enabled.
252+
* Returns a list of interceptors for the Kubernetes client, including metrics interceptors if
253+
* enabled.
254254
*
255255
* @param metricsSystem The MetricsSystem to register interceptors with.
256256
* @return A List of Interceptor objects.

spark-operator/src/main/java/org/apache/spark/k8s/operator/client/KubernetesClientFactory.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import io.fabric8.kubernetes.client.Config;
2424
import io.fabric8.kubernetes.client.KubernetesClient;
2525
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
26-
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory;
27-
import okhttp3.Interceptor;
28-
import okhttp3.OkHttpClient;
26+
import io.fabric8.kubernetes.client.http.HttpClient;
27+
import io.fabric8.kubernetes.client.http.Interceptor;
28+
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
29+
import io.fabric8.kubernetes.client.vertx.VertxHttpClientBuilder;
30+
import io.fabric8.kubernetes.client.vertx.VertxHttpClientFactory;
2931

3032
/** Factory for building Kubernetes clients with metrics configured. */
3133
public final class KubernetesClientFactory {
@@ -35,7 +37,7 @@ private KubernetesClientFactory() {}
3537
/**
3638
* Builds a KubernetesClient with the given interceptors.
3739
*
38-
* @param interceptors A list of OkHttp interceptors to add to the client.
40+
* @param interceptors A list of interceptors to add to the client.
3941
* @return A new KubernetesClient instance.
4042
*/
4143
public static KubernetesClient buildKubernetesClient(final List<Interceptor> interceptors) {
@@ -45,7 +47,7 @@ public static KubernetesClient buildKubernetesClient(final List<Interceptor> int
4547
/**
4648
* Builds a KubernetesClient with the given interceptors and configuration.
4749
*
48-
* @param interceptors A list of OkHttp interceptors to add to the client.
50+
* @param interceptors A list of interceptors to add to the client.
4951
* @param kubernetesClientConfig The Kubernetes client configuration.
5052
* @return A new KubernetesClient instance.
5153
*/
@@ -54,12 +56,15 @@ public static KubernetesClient buildKubernetesClient(
5456
return new KubernetesClientBuilder()
5557
.withConfig(kubernetesClientConfig)
5658
.withHttpClientFactory(
57-
new OkHttpClientFactory() {
59+
new VertxHttpClientFactory() {
5860
@Override
59-
protected void additionalConfig(OkHttpClient.Builder builder) {
61+
public HttpClient.Builder newBuilder(Config config) {
62+
VertxHttpClientBuilder builder = super.newBuilder();
63+
HttpClientUtils.applyCommonConfiguration(config, builder, this);
6064
for (Interceptor interceptor : interceptors) {
61-
builder.addInterceptor(interceptor);
65+
builder.addOrReplaceInterceptor(interceptor.getClass().getName(), interceptor);
6266
}
67+
return builder;
6368
}
6469
})
6570
.build();

spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ public final class SparkOperatorConf {
377377

378378
/**
379379
* Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server.
380-
* Since the metrics is collected via Okhttp interceptors, can be disabled when opt in customized
380+
* Since the metrics is collected via interceptors, can be disabled when opt in customized
381381
* interceptors.
382382
*/
383383
public static final ConfigOption<Boolean> KUBERNETES_CLIENT_METRICS_ENABLED =
@@ -387,7 +387,7 @@ public final class SparkOperatorConf {
387387
.description(
388388
"Enable KubernetesClient metrics for measuring the HTTP traffic to "
389389
+ "the Kubernetes API Server. Since the metrics is collected "
390-
+ "via Okhttp interceptors, can be disabled when opt in "
390+
+ "via interceptors, can be disabled when opt in "
391391
+ "customized interceptors.")
392392
.typeParameterClass(Boolean.class)
393393
.defaultValue(true)

spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptor.java

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,20 @@
2121

2222
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.KUBERNETES_CLIENT_METRICS_GROUP_BY_RESPONSE_CODE_GROUP_ENABLED;
2323

24-
import java.io.IOException;
24+
import java.nio.ByteBuffer;
2525
import java.util.ArrayList;
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.concurrent.ConcurrentHashMap;
3031

3132
import com.codahale.metrics.Histogram;
3233
import com.codahale.metrics.Meter;
3334
import com.codahale.metrics.MetricRegistry;
35+
import io.fabric8.kubernetes.client.http.*;
3436
import lombok.extern.slf4j.Slf4j;
35-
import okhttp3.Interceptor;
36-
import okhttp3.Request;
37-
import okhttp3.Response;
3837
import org.apache.commons.lang3.tuple.Pair;
39-
import org.jetbrains.annotations.NotNull;
4038

4139
import org.apache.spark.metrics.source.Source;
4240

@@ -90,25 +88,62 @@ public KubernetesMetricsInterceptor() {
9088
}
9189

9290
/**
93-
* Intercepts an HTTP request and updates Kubernetes client metrics.
91+
* Called before a request to allow for the manipulation of the request
9492
*
95-
* @param chain The Interceptor.Chain for the current request.
96-
* @return The Response from the intercepted request.
97-
* @throws IOException if an I/O error occurs during the request.
93+
* @param builder used to modify the request
94+
* @param request the current request
9895
*/
99-
@NotNull
10096
@Override
101-
public Response intercept(@NotNull Chain chain) throws IOException {
102-
Request request = chain.request();
97+
public void before(BasicBuilder builder, HttpRequest request, RequestTags tags) {
10398
updateRequestMetrics(request);
104-
Response response = null;
105-
final long startTime = System.nanoTime();
106-
try {
107-
response = chain.proceed(request);
108-
return response;
109-
} finally {
110-
updateResponseMetrics(response, startTime);
111-
}
99+
}
100+
101+
/**
102+
* Called after a non-WebSocket HTTP response is received. The body might or might not be already
103+
* consumed.
104+
*
105+
* <p>Should be used to analyze response codes and headers, original response shouldn't be
106+
* altered.
107+
*
108+
* @param request the original request sent to the server.
109+
* @param response the response received from the server.
110+
*/
111+
@Override
112+
public void after(
113+
HttpRequest request,
114+
HttpResponse<?> response,
115+
AsyncBody.Consumer<List<ByteBuffer>> consumer) {
116+
updateResponseMetrics(response, System.nanoTime());
117+
}
118+
119+
/**
120+
* Called after a websocket failure or by default from a normal request.
121+
*
122+
* <p>Failure is determined by HTTP status code and will be invoked in addition to {@link
123+
* Interceptor#after(HttpRequest, HttpResponse, AsyncBody.Consumer)}
124+
*
125+
* @param builder used to modify the request
126+
* @param response the failed response
127+
* @return true if the builder should be used to execute a new request
128+
*/
129+
@Override
130+
public CompletableFuture<Boolean> afterFailure(
131+
BasicBuilder builder, HttpResponse<?> response, RequestTags tags) {
132+
updateResponseMetrics(null, System.nanoTime());
133+
return CompletableFuture.completedFuture(false);
134+
}
135+
136+
/**
137+
* Called after a connection attempt fails.
138+
*
139+
* <p>This method will be invoked on each failed connection attempt.
140+
*
141+
* @param request the HTTP request.
142+
* @param failure the Java exception that caused the failure.
143+
*/
144+
@Override
145+
public void afterConnectionFailure(HttpRequest request, Throwable failure) {
146+
updateResponseMetrics(null, System.nanoTime());
112147
}
113148

114149
/**
@@ -131,11 +166,11 @@ public MetricRegistry metricRegistry() {
131166
return this.metricRegistry;
132167
}
133168

134-
private void updateRequestMetrics(Request request) {
169+
private void updateRequestMetrics(HttpRequest request) {
135170
this.requestRateMeter.mark();
136171
getMeterByRequestMethod(request.method()).mark();
137172
Optional<Pair<String, String>> resourceNamePairOptional =
138-
parseNamespaceScopedResource(request.url().uri().getPath());
173+
parseNamespaceScopedResource(request.uri().getPath());
139174
resourceNamePairOptional.ifPresent(
140175
pair -> {
141176
getMeterByRequestMethodAndResourceName(pair.getValue(), request.method()).mark();
@@ -145,7 +180,7 @@ private void updateRequestMetrics(Request request) {
145180
});
146181
}
147182

148-
private void updateResponseMetrics(Response response, long startTimeNanos) {
183+
private void updateResponseMetrics(HttpResponse response, long startTimeNanos) {
149184
final long latency = System.nanoTime() - startTimeNanos;
150185
if (response != null) {
151186
this.responseRateMeter.mark();

spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/source/KubernetesMetricsInterceptorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
import io.fabric8.kubernetes.api.model.ConfigMap;
3434
import io.fabric8.kubernetes.api.model.ObjectMeta;
3535
import io.fabric8.kubernetes.client.KubernetesClient;
36+
import io.fabric8.kubernetes.client.http.Interceptor;
3637
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
3738
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
38-
import okhttp3.Interceptor;
3939
import org.junit.jupiter.api.AfterEach;
4040
import org.junit.jupiter.api.Assertions;
4141
import org.junit.jupiter.api.MethodOrderer;
@@ -122,7 +122,7 @@ void testWhenKubernetesServerNotWorking() {
122122
Map<String, Metric> map = metricsInterceptor.metricRegistry().getMetrics();
123123
Assertions.assertEquals(12, map.size());
124124
Meter metric = (Meter) map.get("failed");
125-
Assertions.assertEquals(metric.getCount(), retry + 1);
125+
Assertions.assertEquals(metric.getCount(), retry);
126126
Assertions.assertEquals(((Meter) map.get("http.request")).getCount(), retry + 1);
127127
}
128128
}

0 commit comments

Comments
 (0)