Skip to content

Commit ed81c90

Browse files
authored
Fix kroxylicious#1612: [Kafka Unstable] Rewrite broker address information carried by ShareFetchResponse and ShareAckResponse (KIP-932) (kroxylicious#1656)
Kafka 3.9 introduces the new RPC that carry broker host/port information. The feature is marked as unstable, but we said we want to implement it early to allow Filter Authors to experiment with the new Kafka features, if they wish. Signed-off-by: kwall <kwall@apache.org>
1 parent 483f68a commit ed81c90

File tree

4 files changed

+90
-2
lines changed

4 files changed

+90
-2
lines changed

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ProxyRpcTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import static org.apache.kafka.common.protocol.ApiKeys.DESCRIBE_CLUSTER;
4444
import static org.apache.kafka.common.protocol.ApiKeys.FIND_COORDINATOR;
4545
import static org.apache.kafka.common.protocol.ApiKeys.METADATA;
46+
import static org.apache.kafka.common.protocol.ApiKeys.SHARE_ACKNOWLEDGE;
47+
import static org.apache.kafka.common.protocol.ApiKeys.SHARE_FETCH;
4648
import static org.junit.jupiter.api.Assertions.assertEquals;
4749

4850
@ExtendWith(NettyLeakDetectorExtension.class)
@@ -56,7 +58,7 @@ public record Scenario(String name, ResponsePayload givenMockResponse, Request w
5658
* API_VERSIONS is not proxied, kroxylicious can respond to this itself
5759
* FIND_COORDINATOR, METADATA, DESCRIBE_CLUSTER, kroxylicious takes charge of rewriting these responses itself.
5860
*/
59-
private static final Set<ApiKeys> SKIPPED_API_KEYS = Set.of(API_VERSIONS, FIND_COORDINATOR, METADATA, DESCRIBE_CLUSTER);
61+
private static final Set<ApiKeys> SKIPPED_API_KEYS = Set.of(API_VERSIONS, FIND_COORDINATOR, METADATA, DESCRIBE_CLUSTER, SHARE_ACKNOWLEDGE, SHARE_FETCH);
6062

6163
@BeforeAll
6264
public static void beforeAll() {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/filter/BrokerAddressFilter.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
2323
import org.apache.kafka.common.message.ProduceResponseData;
2424
import org.apache.kafka.common.message.ResponseHeaderData;
25+
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
26+
import org.apache.kafka.common.message.ShareFetchResponseData;
2527
import org.apache.kafka.common.protocol.ApiMessage;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
@@ -33,6 +35,8 @@
3335
import io.kroxylicious.proxy.filter.MetadataResponseFilter;
3436
import io.kroxylicious.proxy.filter.ProduceResponseFilter;
3537
import io.kroxylicious.proxy.filter.ResponseFilterResult;
38+
import io.kroxylicious.proxy.filter.ShareAcknowledgeResponseFilter;
39+
import io.kroxylicious.proxy.filter.ShareFetchResponseFilter;
3640
import io.kroxylicious.proxy.internal.net.EndpointReconciler;
3741
import io.kroxylicious.proxy.model.VirtualCluster;
3842
import io.kroxylicious.proxy.service.HostPort;
@@ -42,7 +46,7 @@
4246
* is responsible for updating the virtual cluster's cache of upstream broker endpoints.
4347
*/
4448
public class BrokerAddressFilter implements MetadataResponseFilter, FindCoordinatorResponseFilter, DescribeClusterResponseFilter,
45-
ProduceResponseFilter, FetchResponseFilter {
49+
ProduceResponseFilter, FetchResponseFilter, ShareFetchResponseFilter, ShareAcknowledgeResponseFilter {
4650

4751
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerAddressFilter.class);
4852

@@ -129,6 +133,32 @@ public CompletionStage<ResponseFilterResult> onFetchResponse(short apiVersion, R
129133
return context.forwardResponse(header, response);
130134
}
131135

136+
@Override
137+
public CompletionStage<ResponseFilterResult> onShareAcknowledgeResponse(short apiVersion, ResponseHeaderData header, ShareAcknowledgeResponseData response,
138+
FilterContext context) {
139+
// KIP-932
140+
if (response.nodeEndpoints() != null) {
141+
response.nodeEndpoints()
142+
.forEach(ne -> apply(context, ne, ShareAcknowledgeResponseData.NodeEndpoint::nodeId,
143+
ShareAcknowledgeResponseData.NodeEndpoint::host, ShareAcknowledgeResponseData.NodeEndpoint::port,
144+
ShareAcknowledgeResponseData.NodeEndpoint::setHost, ShareAcknowledgeResponseData.NodeEndpoint::setPort));
145+
}
146+
return context.forwardResponse(header, response);
147+
}
148+
149+
@Override
150+
public CompletionStage<ResponseFilterResult> onShareFetchResponse(short apiVersion, ResponseHeaderData header, ShareFetchResponseData response,
151+
FilterContext context) {
152+
// KIP-932
153+
if (response.nodeEndpoints() != null) {
154+
response.nodeEndpoints()
155+
.forEach(ne -> apply(context, ne, ShareFetchResponseData.NodeEndpoint::nodeId,
156+
ShareFetchResponseData.NodeEndpoint::host, ShareFetchResponseData.NodeEndpoint::port,
157+
ShareFetchResponseData.NodeEndpoint::setHost, ShareFetchResponseData.NodeEndpoint::setPort));
158+
}
159+
return context.forwardResponse(header, response);
160+
}
161+
132162
private <T> void apply(FilterContext context, T broker, Function<T, Integer> nodeIdGetter, Function<T, String> hostGetter, ToIntFunction<T> portGetter,
133163
BiConsumer<T, String> hostSetter,
134164
ObjIntConsumer<T> portSetter) {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
- apiMessageType: SHARE_ACKNOWLEDGE
8+
description: Node endpoints rewritten
9+
version: 0
10+
response:
11+
payload:
12+
errorCode: 0
13+
errorMessage: ""
14+
responses: []
15+
nodeEndpoints:
16+
- nodeId: 0
17+
host: upstreamz
18+
port: 9199
19+
rack: a
20+
throttleTimeMs: 0
21+
diff:
22+
- op: replace
23+
path: "/nodeEndpoints/0/host"
24+
value: downstream
25+
- op: replace
26+
path: "/nodeEndpoints/0/port"
27+
value: 19199
28+
disabled: false
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
- apiMessageType: SHARE_FETCH
8+
description: Node endpoints rewritten
9+
version: 0
10+
response:
11+
payload:
12+
errorCode: 0
13+
errorMessage: ""
14+
responses: []
15+
nodeEndpoints:
16+
- nodeId: 0
17+
host: upstreamz
18+
port: 9199
19+
rack: a
20+
throttleTimeMs: 0
21+
diff:
22+
- op: replace
23+
path: "/nodeEndpoints/0/host"
24+
value: downstream
25+
- op: replace
26+
path: "/nodeEndpoints/0/port"
27+
value: 19199
28+
disabled: false

0 commit comments

Comments
 (0)