Skip to content

Commit 027dbbd

Browse files
k-wallrobobario
andauthored
fix(filter-api): give ResponseFilter#onResponse access to the api-version (kroxylicious#2969)
* logs warning if there is use of a (Request|Response)Filter implementation that implements the deprecated method Signed-off-by: Keith Wall <[email protected]> Signed-off-by: Robert Young <[email protected]> Co-authored-by: Robert Young <[email protected]>
1 parent 5801b90 commit 027dbbd

File tree

40 files changed

+521
-129
lines changed

40 files changed

+521
-129
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#2969](https://github.com/kroxylicious/kroxylicious/issues/2969): Give `ResponseFilter#onResponse` access to the api-version
1011
* [#3035](https://github.com/kroxylicious/kroxylicious/issues/3035): fix(sasl inspector): Fix config parsing error if SaslInspector with subject builder
1112

13+
### Changes, deprecations and removals
14+
15+
* The four argument forms of `RequestFilter#onRequest` and `ResponseFilter#onResponse` are deprecated and will be removed in a future release.
16+
Implement the five argument form, which includes the `apiVersion` instead.
17+
1218
## 0.18.0
1319

1420
* [#2922](https://github.com/kroxylicious/kroxylicious/pull/2922): build(deps): bump kafka.version from 4.1.0 to 4.1.1

kroxylicious-api/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,16 @@
7676
<groupId>io.kroxylicious</groupId>
7777
<artifactId>kroxylicious-annotations</artifactId>
7878
</dependency>
79+
<dependency>
80+
<groupId>org.mockito</groupId>
81+
<artifactId>mockito-junit-jupiter</artifactId>
82+
<scope>test</scope>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.mockito</groupId>
86+
<artifactId>mockito-core</artifactId>
87+
<scope>test</scope>
88+
</dependency>
7989
</dependencies>
8090

8191
<build>
@@ -156,6 +166,9 @@
156166
<!-- The following filter exclusions relate to RPCs that were renamed in kafka-clients 4.1 -->
157167
<exclude>io.kroxylicious.proxy.filter.ListClientMetricsResourcesResponseFilter</exclude>
158168
<exclude>io.kroxylicious.proxy.filter.ListClientMetricsResourcesRequestFilter</exclude>
169+
<!-- The following methods become deprecated and are given a default implementation at 0.19.0 which throws UnsupportedOperationException -->
170+
<exclude>io.kroxylicious.proxy.filter.RequestFilter#onRequest(org.apache.kafka.common.protocol.ApiKeys, org.apache.kafka.common.message.RequestHeaderData, org.apache.kafka.common.protocol.ApiMessage, io.kroxylicious.proxy.filter.FilterContext)</exclude>
171+
<exclude>io.kroxylicious.proxy.filter.ResponseFilter#onResponse(org.apache.kafka.common.protocol.ApiKeys, org.apache.kafka.common.message.ResponseHeaderData, org.apache.kafka.common.protocol.ApiMessage, io.kroxylicious.proxy.filter.FilterContext)</exclude>
159172
</excludes>
160173
<!-- see documentation -->
161174
</parameter>

kroxylicious-api/src/main/java/io/kroxylicious/proxy/filter/RequestFilter.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public interface RequestFilter extends Filter {
2525

2626
/**
2727
* Does this filter implementation want to handle a request. If so, the
28-
* {@link #onRequest(ApiKeys, RequestHeaderData, ApiMessage, FilterContext)} method
28+
* {@link #onRequest(ApiKeys, short, RequestHeaderData, ApiMessage, FilterContext)} method
2929
* will be eligible to be called with the deserialized request data (if the
3030
* message reaches this filter in the filter chain).
3131
*
@@ -52,9 +52,42 @@ default boolean shouldHandleRequest(ApiKeys apiKey, short apiVersion) {
5252
* request to be forwarded.
5353
* @see io.kroxylicious.proxy.filter Creating Filter Result objects
5454
* @see io.kroxylicious.proxy.filter Thread Safety
55+
* @deprecated implement {@link #onRequest(ApiKeys, short, RequestHeaderData, ApiMessage, FilterContext)} instead.
5556
*/
56-
CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
57-
RequestHeaderData header,
58-
ApiMessage request,
59-
FilterContext context);
57+
@Deprecated(forRemoval = true, since = "0.19.0")
58+
default CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
59+
RequestHeaderData header,
60+
ApiMessage request,
61+
FilterContext context) {
62+
throw new UnsupportedOperationException("implement #onRequest(ApiKeys, short, RequestHeaderData, ApiMessage, FilterContext)");
63+
}
64+
65+
/**
66+
* Handle the given {@code header} and {@code request} pair, returning the {@code header} and {@code request}
67+
* pair to be passed to the next filter using the RequestFilterResult.
68+
* <br/>
69+
* The implementation may modify the given {@code header} and {@code request} in-place, or instantiate a
70+
* new instances.
71+
*
72+
* @param apiKey key of the request
73+
* @param apiVersion api version of the request
74+
* @param header header of the request
75+
* @param request body of the request
76+
* @param context context containing methods to continue the filter chain and other contextual data
77+
* @return a non-null CompletionStage that, when complete, will yield a RequestFilterResult containing the
78+
* request to be forwarded.
79+
* @see io.kroxylicious.proxy.filter Creating Filter Result objects
80+
* @see io.kroxylicious.proxy.filter Thread Safety
81+
*/
82+
@SuppressWarnings("deprecated")
83+
default CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
84+
short apiVersion,
85+
RequestHeaderData header,
86+
ApiMessage request,
87+
FilterContext context) {
88+
// default implementation exists so that pre-0.19.0 implementations of RequestFilter continue to work without change.
89+
// when the deprecated method is removed, remove this default implementation.
90+
return onRequest(apiKey, header, request, context);
91+
}
92+
6093
}

kroxylicious-api/src/main/java/io/kroxylicious/proxy/filter/ResponseFilter.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public interface ResponseFilter extends Filter {
2525

2626
/**
2727
* Does this filter implementation want to handle a response. If so, the
28-
* {@link #onResponse(ApiKeys, ResponseHeaderData, ApiMessage, FilterContext)} method
28+
* {@link #onResponse(ApiKeys, short, ResponseHeaderData, ApiMessage, FilterContext)} method
2929
* will be eligible to be called with the deserialized response data (if the
3030
* message reaches this filter in the filter chain).
3131
*
@@ -44,17 +44,49 @@ default boolean shouldHandleResponse(ApiKeys apiKey, short apiVersion) {
4444
* The implementation may modify the given {@code header} and {@code response} in-place, or instantiate a
4545
* new instances.
4646
*
47-
* @param apiKey key of the request
47+
* @param apiKey key of the response
4848
* @param header response header.
4949
* @param response The body to handle.
5050
* @param context The context.
5151
* @return a non-null CompletionStage that, when complete, will yield a ResponseFilterResult containing the
5252
* response to be forwarded.
5353
* @see io.kroxylicious.proxy.filter Creating Filter Result objects
5454
* @see io.kroxylicious.proxy.filter Thread Safety
55+
* @deprecated implement {@link #onResponse(ApiKeys, short, ResponseHeaderData, ApiMessage, FilterContext)} instead.
5556
*/
56-
CompletionStage<ResponseFilterResult> onResponse(ApiKeys apiKey,
57-
ResponseHeaderData header,
58-
ApiMessage response,
59-
FilterContext context);
57+
@Deprecated(forRemoval = true, since = "0.19.0")
58+
default CompletionStage<ResponseFilterResult> onResponse(ApiKeys apiKey,
59+
ResponseHeaderData header,
60+
ApiMessage response,
61+
FilterContext context) {
62+
throw new UnsupportedOperationException("implement #onResponse(ApiKeys, short, ResponseHeaderData, ApiMessage, FilterContext)");
63+
}
64+
65+
/**
66+
* Handle the given {@code header} and {@code response} pair, returning the {@code header} and {@code response}
67+
* pair to be passed to the next filter using the ResponseFilterResult.
68+
* <br/>
69+
* The implementation may modify the given {@code header} and {@code response} in-place, or instantiate a
70+
* new instances.
71+
*
72+
* @param apiKey key of the response
73+
* @param apiVersion api version of the response
74+
* @param header response header.
75+
* @param response The body to handle.
76+
* @param context The context.
77+
* @return a non-null CompletionStage that, when complete, will yield a ResponseFilterResult containing the
78+
* response to be forwarded.
79+
* @see io.kroxylicious.proxy.filter Creating Filter Result objects
80+
* @see io.kroxylicious.proxy.filter Thread Safety
81+
*/
82+
@SuppressWarnings("deprecated")
83+
default CompletionStage<ResponseFilterResult> onResponse(ApiKeys apiKey,
84+
short apiVersion,
85+
ResponseHeaderData header,
86+
ApiMessage response,
87+
FilterContext context) {
88+
// default implementation exists so that pre-0.19.0 implementations of ResponseFilter continue to work without change.
89+
// when the deprecated method is removed, remove this default implementation.
90+
return onResponse(apiKey, header, response, context);
91+
}
6092
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter;
8+
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.CompletionStage;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.apache.kafka.common.message.ProduceRequestData;
14+
import org.apache.kafka.common.message.RequestHeaderData;
15+
import org.apache.kafka.common.protocol.ApiKeys;
16+
import org.apache.kafka.common.protocol.ApiMessage;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
import org.mockito.Mock;
20+
import org.mockito.junit.jupiter.MockitoExtension;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
@ExtendWith(MockitoExtension.class)
25+
class RequestFilterTest {
26+
27+
@Mock
28+
private FilterContext requestContext;
29+
@Mock
30+
private RequestFilterResult requestFilterResult;
31+
32+
@Test
33+
void newDefaultMethodImplChainsToDeprecatedMethod() {
34+
var requestHeader = new RequestHeaderData();
35+
var requestData = new ProduceRequestData();
36+
37+
var filterImplementingOldApi = new RequestFilter() {
38+
@Override
39+
@SuppressWarnings("removal")
40+
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey, RequestHeaderData header, ApiMessage request, FilterContext context) {
41+
assertThat(apiKey).isEqualTo(ApiKeys.PRODUCE);
42+
assertThat(header).isSameAs(requestHeader);
43+
assertThat(request).isSameAs(requestData);
44+
assertThat(context).isSameAs(requestContext);
45+
46+
return CompletableFuture.completedStage(requestFilterResult);
47+
}
48+
};
49+
50+
var result = filterImplementingOldApi.onRequest(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion(), requestHeader, requestData, requestContext);
51+
assertThat(result)
52+
.succeedsWithin(1, TimeUnit.SECONDS)
53+
.isSameAs(requestFilterResult);
54+
}
55+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter;
8+
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.CompletionStage;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import org.apache.kafka.common.message.ProduceResponseData;
14+
import org.apache.kafka.common.message.ResponseHeaderData;
15+
import org.apache.kafka.common.protocol.ApiKeys;
16+
import org.apache.kafka.common.protocol.ApiMessage;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
import org.mockito.Mock;
20+
import org.mockito.junit.jupiter.MockitoExtension;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
@ExtendWith(MockitoExtension.class)
25+
class ResponseFilterTest {
26+
27+
@Mock
28+
private FilterContext responseContext;
29+
@Mock
30+
private ResponseFilterResult responseFilterResult;
31+
32+
@Test
33+
void newDefaultMethodImplChainsToDeprecatedMethod() {
34+
var responseHeader = new ResponseHeaderData();
35+
var responseData = new ProduceResponseData();
36+
37+
var filterImplementingOldApi = new ResponseFilter() {
38+
@SuppressWarnings("removal")
39+
@Override
40+
public CompletionStage<ResponseFilterResult> onResponse(ApiKeys apiKey, ResponseHeaderData header, ApiMessage response, FilterContext context) {
41+
assertThat(apiKey).isEqualTo(ApiKeys.PRODUCE);
42+
assertThat(header).isSameAs(responseHeader);
43+
assertThat(response).isSameAs(responseData);
44+
assertThat(context).isSameAs(responseContext);
45+
46+
return CompletableFuture.completedStage(responseFilterResult);
47+
48+
}
49+
};
50+
51+
var result = filterImplementingOldApi.onResponse(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion(), responseHeader, responseData, responseContext);
52+
assertThat(result)
53+
.succeedsWithin(1, TimeUnit.SECONDS)
54+
.isSameAs(responseFilterResult);
55+
}
56+
}

kroxylicious-filters/kroxylicious-authorization/src/main/java/io/kroxylicious/filter/authorization/AuthorizationFilter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ <R> R popAndApplyInflightState(ResponseHeaderData header, R response) {
210210

211211
@Override
212212
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey,
213+
short apiVersion,
213214
RequestHeaderData header,
214215
ApiMessage request,
215216
FilterContext context) {
@@ -267,6 +268,7 @@ private CompletionStage<RequestFilterResult> enforceRequest(RequestHeaderData he
267268

268269
@Override
269270
public CompletionStage<ResponseFilterResult> onResponse(ApiKeys apiKey,
271+
short apiVersion,
270272
ResponseHeaderData header,
271273
ApiMessage response,
272274
FilterContext context) {

kroxylicious-filters/kroxylicious-authorization/src/test/java/io/kroxylicious/filter/authorization/AuthorizationFilterTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void authorization(ScenarioDefinition definition) {
8888
Map<Uuid, String> topicNames = Optional.ofNullable(definition.given().topicNames()).orElse(Map.of());
8989
Subject subject = new Subject(new User(definition.when().subject()));
9090
FilterContext context = new MockFilterContext(requestHeader, request, subject, topicNames, mockUpstream);
91-
CompletionStage<RequestFilterResult> stage = authorizationFilter.onRequest(apiKeys, requestHeader, request, context);
91+
CompletionStage<RequestFilterResult> stage = authorizationFilter.onRequest(apiKeys, version, requestHeader, request, context);
9292
ScenarioDefinition.RequestError expectedRequestError = definition.then().expectedRequestError();
9393
if (expectedRequestError != null) {
9494
assertThat(stage).failsWithin(0, TimeUnit.SECONDS).withThrowableThat()
@@ -150,7 +150,8 @@ private static void handleRequestForward(ScenarioDefinition definition, Completi
150150
MockUpstream.Response response = mockUpstream.respond((RequestHeaderData) forwardedHeader, forwardedMessage);
151151
if (definition.then().getHasResponse()) {
152152
MockFilterContext responseContext = new MockFilterContext(response.header(), response.message(), subject, topicNames, mockUpstream);
153-
CompletionStage<ResponseFilterResult> filterResultCompletionStage = authorizationFilter.onResponse(apiKeys, response.header(), response.message(),
153+
CompletionStage<ResponseFilterResult> filterResultCompletionStage = authorizationFilter.onResponse(apiKeys, version, response.header(),
154+
response.message(),
154155
responseContext);
155156
ResponseFilterResult responseResult = assertThat(filterResultCompletionStage).succeedsWithin(Duration.ZERO).actual();
156157
if (responseResult.drop()) {

kroxylicious-filters/kroxylicious-sasl-inspection/src/main/java/io/kroxylicious/filters/sasl/inspection/SaslInspectionFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class SaslInspectionFilter
7777
}
7878

7979
@Override
80-
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey, RequestHeaderData header, ApiMessage request, FilterContext context) {
80+
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey, short apiVersion, RequestHeaderData header, ApiMessage request, FilterContext context) {
8181
return switch (apiKey) {
8282
case API_VERSIONS -> context.forwardRequest(header, request);
8383
case SASL_AUTHENTICATE -> onSaslAuthenticateRequest(header.requestApiVersion(), header, (SaslAuthenticateRequestData) request, context);

0 commit comments

Comments
 (0)