Skip to content

Commit 6fd7143

Browse files
authored
feat(filterapi): Add FilterContext#topicNames (kroxylicious#2775)
This API enables Filters to map topic ids to topic names. Invoking this method causes an out-of-band Metadata request to be sent to the broker requesting the Metadata for the specified set of topic ids. The result is then made available to clients. We aim to always complete the stage with a TopicNameMapping that maps every input topic id to a name or TopicNameMappingException. TopicNameMappingException always offers a org.apache.kafka.common.protocol.Errors enum to make it convenient for clients to detect common cases like UNKNOWN_TOPIC_ID. Exceptions within the proxy will be presented as UNKNOWN_SERVER_ERROR. Why: Kafka has gradually been transitioning to their RPCs using topicIds in many cases to identify the topic to act on. The topic id is an internal identifier intended to prevent some bugs that are possible if the RPCs only carry names, around topic deletion especially. Kafka has recently introduce topic id to the Produce request. Many of the proxy Filters rely on the topic name in the Produce request to execute their logic, we need a mechanism to obtain the topic names. Rather than having each Filter implement this themselves through sendRequest, we offer it as a framework method. Future extensions: - We could add framework level caching of the names per Filter instance. Caching will have to be carefully considered to ensure we invalidate appropriately. - We could intercept client Metadata requests to learn what version the client prefers to send. This would enable the proxy to more accurately mimic the client and avoid exposing the Kafka cluster to novel patterns like receiving multiple Metadata requests at different API versions. Though this is not always possible, we do not always observe Metadata requests on every connection. Signed-off-by: Robert Young <[email protected]>
1 parent a4723b9 commit 6fd7143

File tree

14 files changed

+807
-7
lines changed

14 files changed

+807
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ For changes that effect a public API, the [deprecation policy](./DEV_GUIDE.md#de
66
Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
9+
10+
* [#1318](https://github.com/kroxylicious/kroxylicious/issues/1318): Add FilterContext#topicNames to enable filters to retrieve names for topic ids
11+
912
## 0.17.0
1013

1114
* [#2830](https://github.com/kroxylicious/kroxylicious/pull/2830): Refactor authentication configuration class names and optional fields in Azure KMS provider for Record Encryption

kroxylicious-api/src/main/java/io/kroxylicious/proxy/filter/FilterContext.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@
55
*/
66
package io.kroxylicious.proxy.filter;
77

8+
import java.util.Collection;
89
import java.util.Optional;
910
import java.util.concurrent.CompletionStage;
1011

1112
import javax.annotation.Nullable;
1213

14+
import org.apache.kafka.common.Uuid;
1315
import org.apache.kafka.common.message.RequestHeaderData;
1416
import org.apache.kafka.common.message.ResponseHeaderData;
1517
import org.apache.kafka.common.protocol.ApiMessage;
1618
import org.apache.kafka.common.utils.ByteBufferOutputStream;
1719

1820
import io.kroxylicious.proxy.authentication.ClientSaslContext;
21+
import io.kroxylicious.proxy.filter.metadata.TopicNameMapping;
22+
import io.kroxylicious.proxy.filter.metadata.TopicNameMappingException;
1923
import io.kroxylicious.proxy.tls.ClientTlsContext;
2024

2125
/**
@@ -114,6 +118,20 @@ public interface FilterContext {
114118
<M extends ApiMessage> CompletionStage<M> sendRequest(RequestHeaderData header,
115119
ApiMessage request);
116120

121+
/**
122+
* Attempts to map all the given {@code topicIds} to the current corresponding topic names.
123+
* @param topicIds topic ids to map to names
124+
* @return a CompletionStage that will be completed with a complete mapping, with every requested topic id mapped to either an
125+
* {@link TopicNameMappingException} or a name. All failure modes should complete the stage with a TopicNameMapping, with the
126+
* TopicNameMapping used to convey the reason for failure, rather than failing the Stage.
127+
* <h4>Chained Computation stages</h4>
128+
* <p>Default and asynchronous default computation stages chained to the returned
129+
* {@link java.util.concurrent.CompletionStage} are guaranteed to be executed by the thread
130+
* associated with the connection. See {@link io.kroxylicious.proxy.filter} for more details.
131+
* </p>
132+
*/
133+
CompletionStage<TopicNameMapping> topicNames(Collection<Uuid> topicIds);
134+
117135
/**
118136
* Generates a completed filter results containing the given header and response. When
119137
* response filters implementations return this result, the response will be sent towards
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter.metadata;
8+
9+
import org.apache.kafka.common.protocol.Errors;
10+
11+
/**
12+
* Indicates that there was an {@link Error} set at the top level of {@link org.apache.kafka.common.message.MetadataResponseData}.
13+
*/
14+
public class TopLevelMetadataErrorException extends TopicNameMappingException {
15+
public TopLevelMetadataErrorException(Errors error) {
16+
super(error);
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter.metadata;
8+
9+
import org.apache.kafka.common.protocol.Errors;
10+
11+
/**
12+
* Indicates that there was an {@link Error} set on a {@link org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic}.
13+
*/
14+
public class TopicLevelMetadataErrorException extends TopicNameMappingException {
15+
public TopicLevelMetadataErrorException(Errors error) {
16+
super(error);
17+
}
18+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter.metadata;
8+
9+
import java.util.Map;
10+
11+
import org.apache.kafka.common.Uuid;
12+
import org.apache.kafka.common.protocol.Errors;
13+
14+
/**
15+
* The result of discovering the topic names for a collection of topic ids
16+
*/
17+
public interface TopicNameMapping {
18+
/**
19+
* @return true if there are any failures
20+
*/
21+
boolean anyFailures();
22+
23+
/**
24+
* @return immutable map from topic id to successfully mapped topic name
25+
*/
26+
Map<Uuid, String> topicNames();
27+
28+
/**
29+
* Describes the reason for every failed mapping. Expected exception types are:
30+
* <ul>
31+
* <li>{@link TopLevelMetadataErrorException} indicates that we attempted to obtain Metadata from upstream, but received a top-level error in the response</li>
32+
* <li>{@link TopicLevelMetadataErrorException} indicates that we attempted to obtain Metadata from upstream, but received a topic-level error in the response</li>
33+
* <li>{@link TopicNameMappingException} can be used to convey any other exception</li>
34+
* </ul>
35+
* All the exception types offer {@link TopicNameMappingException#getError()} for conveniently determining the cause. Unhandled
36+
* exceptions will be mapped to an {@link Errors#UNKNOWN_SERVER_ERROR}. Callers will be able to use this to detect expected
37+
* cases like {@link Errors#UNKNOWN_TOPIC_ID}.
38+
* @return immutable map from topic id to kafka server error
39+
*/
40+
Map<Uuid, TopicNameMappingException> failures();
41+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.filter.metadata;
8+
9+
import java.util.Objects;
10+
11+
import org.apache.kafka.common.protocol.Errors;
12+
13+
/**
14+
* Indicates there was some problem obtaining a name for a topic id
15+
*/
16+
public class TopicNameMappingException extends RuntimeException {
17+
private final Errors error;
18+
19+
public TopicNameMappingException(Errors error) {
20+
this(error, error.message(), error.exception());
21+
}
22+
23+
public TopicNameMappingException(Errors error, String message) {
24+
this(error, message, error.exception());
25+
}
26+
27+
public TopicNameMappingException(Errors error, String message, Throwable cause) {
28+
super(message, cause);
29+
this.error = Objects.requireNonNull(error);
30+
}
31+
32+
public Errors getError() {
33+
return error;
34+
}
35+
}

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.admin.NewTopic;
2323
import org.apache.kafka.clients.consumer.ConsumerRecord;
2424
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.common.Uuid;
2526
import org.apache.kafka.common.errors.InvalidTopicException;
2627
import org.apache.kafka.common.message.ApiVersionsRequestData;
2728
import org.apache.kafka.common.message.ApiVersionsResponseData;
@@ -34,6 +35,7 @@
3435
import org.apache.kafka.common.message.MetadataResponseData;
3536
import org.apache.kafka.common.message.ProduceRequestData;
3637
import org.apache.kafka.common.protocol.Errors;
38+
import org.apache.kafka.common.protocol.types.RawTaggedField;
3739
import org.apache.kafka.common.serialization.Serdes;
3840
import org.junit.jupiter.api.Test;
3941
import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,6 +59,8 @@
5759
import io.kroxylicious.proxy.filter.RequestResponseMarkingFilterFactory;
5860
import io.kroxylicious.proxy.filter.simpletransform.FetchResponseTransformation;
5961
import io.kroxylicious.proxy.filter.simpletransform.ProduceRequestTransformation;
62+
import io.kroxylicious.proxy.testplugins.TopicIdToNameResponseStamper;
63+
import io.kroxylicious.proxy.testplugins.TopicNameMetadataPrefixer;
6064
import io.kroxylicious.test.Request;
6165
import io.kroxylicious.test.Response;
6266
import io.kroxylicious.test.ResponsePayload;
@@ -66,6 +70,7 @@
6670

6771
import static io.kroxylicious.UnknownTaggedFields.unknownTaggedFieldsToStrings;
6872
import static io.kroxylicious.proxy.filter.RequestResponseMarkingFilter.FILTER_NAME_TAG;
73+
import static io.kroxylicious.proxy.testplugins.TopicIdToNameResponseStamper.topicNameMapping;
6974
import static io.kroxylicious.test.tester.KroxyliciousConfigUtils.proxy;
7075
import static io.kroxylicious.test.tester.KroxyliciousTesters.kroxyliciousTester;
7176
import static io.kroxylicious.test.tester.KroxyliciousTesters.mockKafkaKroxyliciousTester;
@@ -102,6 +107,128 @@ class FilterIT {
102107
GenericResponseSpecificRequestFilterFactory.class.getName(),
103108
GenericResponseSpecificRequestFilterFactory.class.getName());
104109

110+
public static final String TOPIC_ID_LOOKUP_FILTER_NAME = "topicIdLookup";
111+
public static final String TOPIC_NAME_PREFIXER_FILTER_NAME = "topicNamePrefixer";
112+
113+
@Test
114+
void filtersCanLookUpTopicNames(KafkaCluster cluster, Topic topic1, Topic topic2) {
115+
Uuid topic1Id = topic1.topicId().orElseThrow();
116+
Uuid topic2Id = topic2.topicId().orElseThrow();
117+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
118+
TopicIdToNameResponseStamper.class.getName())
119+
.build();
120+
var config = proxy(cluster)
121+
.addToFilterDefinitions(namedFilterDefinition)
122+
.addToDefaultFilters(namedFilterDefinition.name());
123+
124+
try (var tester = kroxyliciousTester(config);
125+
var client = tester.simpleTestClient()) {
126+
MetadataRequestData message = new MetadataRequestData();
127+
message.unknownTaggedFields().add(
128+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
129+
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
130+
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
131+
assertThat(tags).hasSize(1);
132+
String tag = tags.getFirst();
133+
String[] topicNames = tag.split(",");
134+
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1.name(), null), topicNameMapping(topic2Id, topic2.name(), null));
135+
}
136+
}
137+
138+
@Test
139+
void filtersCanLookUpNonExistentTopicNames(KafkaCluster cluster) {
140+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
141+
TopicIdToNameResponseStamper.class.getName())
142+
.build();
143+
var config = proxy(cluster)
144+
.addToFilterDefinitions(namedFilterDefinition)
145+
.addToDefaultFilters(namedFilterDefinition.name());
146+
147+
try (var tester = kroxyliciousTester(config);
148+
var client = tester.simpleTestClient()) {
149+
MetadataRequestData message = new MetadataRequestData();
150+
Uuid nonexistentTopic = Uuid.randomUuid();
151+
message.unknownTaggedFields().add(
152+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, nonexistentTopic.toString().getBytes(StandardCharsets.UTF_8)));
153+
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
154+
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
155+
assertThat(tags).hasSize(1);
156+
String tag = tags.getFirst();
157+
assertThat(tag).isEqualTo(topicNameMapping(nonexistentTopic, null, "UNKNOWN_TOPIC_ID"));
158+
}
159+
}
160+
161+
@Test
162+
void filtersCanLookUpPartiallyExistingTopics(KafkaCluster cluster, Topic topic1) {
163+
Uuid topic1Id = topic1.topicId().orElseThrow();
164+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
165+
TopicIdToNameResponseStamper.class.getName())
166+
.build();
167+
var config = proxy(cluster)
168+
.addToFilterDefinitions(namedFilterDefinition)
169+
.addToDefaultFilters(namedFilterDefinition.name());
170+
171+
try (var tester = kroxyliciousTester(config);
172+
var client = tester.simpleTestClient()) {
173+
MetadataRequestData message = new MetadataRequestData();
174+
Uuid nonexistentTopic = Uuid.randomUuid();
175+
message.unknownTaggedFields().add(
176+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (nonexistentTopic + "," + topic1Id).getBytes(StandardCharsets.UTF_8)));
177+
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
178+
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
179+
assertThat(tags).hasSize(1);
180+
String tag = tags.getFirst();
181+
String[] topicNames = tag.split(",");
182+
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1.name(), null),
183+
topicNameMapping(nonexistentTopic, null, "UNKNOWN_TOPIC_ID"));
184+
}
185+
}
186+
187+
@Test
188+
void topicNameLookupComposesWithDownstreamFilter(KafkaCluster cluster, Topic topic1, Topic topic2) {
189+
String nameOne = TopicNameMetadataPrefixer.PREFIX + topic1.name();
190+
String nameTwo = TopicNameMetadataPrefixer.PREFIX + topic2.name();
191+
String[] filterOrder = { TOPIC_ID_LOOKUP_FILTER_NAME, TOPIC_NAME_PREFIXER_FILTER_NAME };
192+
assertTopicStampingFilterObservesTopicNames(cluster, topic1, topic2, filterOrder, nameOne, nameTwo);
193+
}
194+
195+
@Test
196+
void topicNameLookupComposesWithUpstreamFilter(KafkaCluster cluster, Topic topic1, Topic topic2) {
197+
String[] filterOrder = { TOPIC_NAME_PREFIXER_FILTER_NAME, TOPIC_ID_LOOKUP_FILTER_NAME };
198+
assertTopicStampingFilterObservesTopicNames(cluster, topic1, topic2, filterOrder, topic1.name(), topic2.name());
199+
}
200+
201+
private static void assertTopicStampingFilterObservesTopicNames(KafkaCluster cluster, Topic topic1, Topic topic2, String[] filterOrder, String expectedNameForTopic1,
202+
String expectedNameForTopic2) {
203+
Uuid topic1Id = topic1.topicId().orElseThrow();
204+
Uuid topic2Id = topic2.topicId().orElseThrow();
205+
206+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
207+
TopicIdToNameResponseStamper.class.getName())
208+
.build();
209+
NamedFilterDefinition topicNamePrefixer = new NamedFilterDefinitionBuilder(TOPIC_NAME_PREFIXER_FILTER_NAME,
210+
TopicNameMetadataPrefixer.class.getName())
211+
.build();
212+
var config = proxy(cluster)
213+
.addToFilterDefinitions(namedFilterDefinition)
214+
.addToFilterDefinitions(topicNamePrefixer)
215+
.addToDefaultFilters(filterOrder);
216+
217+
try (var tester = kroxyliciousTester(config);
218+
var client = tester.simpleTestClient()) {
219+
MetadataRequestData message = new MetadataRequestData();
220+
message.unknownTaggedFields().add(
221+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
222+
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
223+
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
224+
assertThat(tags).hasSize(1);
225+
String tag = tags.getFirst();
226+
String[] topicNames = tag.split(",");
227+
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, expectedNameForTopic1, null),
228+
topicNameMapping(topic2Id, expectedNameForTopic2, null));
229+
}
230+
}
231+
105232
@Test
106233
void reversibleEncryption() {
107234
// The precise details of the cipher don't matter
@@ -176,7 +303,8 @@ void shouldPassThroughRecordUnchanged(KafkaCluster cluster, Topic topic) throws
176303
}
177304

178305
@Test
179-
@SuppressWarnings("java:S5841") // java:S5841 warns that doesNotContain passes for the empty case. Which is what we want here.
306+
@SuppressWarnings("java:S5841")
307+
// java:S5841 warns that doesNotContain passes for the empty case. Which is what we want here.
180308
void requestFiltersCanRespondWithoutProxying(KafkaCluster cluster, Admin admin) throws Exception {
181309
var config = proxy(cluster)
182310
.addToFilterDefinitions(REJECTING_CREATE_TOPIC_FILTER.build())
@@ -195,7 +323,7 @@ void requestFiltersCanRespondWithoutProxying(KafkaCluster cluster, Admin admin)
195323

196324
@Test
197325
void filtersCanImplementGenericRequestFilterAndSpecificResponseFilter() {
198-
try (var tester = mockKafkaKroxyliciousTester((mockBootstrap) -> proxy(mockBootstrap)
326+
try (var tester = mockKafkaKroxyliciousTester(mockBootstrap -> proxy(mockBootstrap)
199327
.addToFilterDefinitions(GENERIC_REQUEST_SPECIFIC_RESPONSE.build())
200328
.addToDefaultFilters(GENERIC_REQUEST_SPECIFIC_RESPONSE.name()));
201329
var simpleTestClient = tester.simpleTestClient()) {
@@ -215,7 +343,7 @@ void filtersCanImplementGenericRequestFilterAndSpecificResponseFilter() {
215343

216344
@Test
217345
void filtersCanImplementGenericResponseFilterAndSpecificRequestFilter() {
218-
try (var tester = mockKafkaKroxyliciousTester((mockBootstrap) -> proxy(mockBootstrap)
346+
try (var tester = mockKafkaKroxyliciousTester(mockBootstrap -> proxy(mockBootstrap)
219347
.addToFilterDefinitions(GENERIC_RESPONSE_SPECIFIC_REQUEST.build())
220348
.addToDefaultFilters(GENERIC_RESPONSE_SPECIFIC_REQUEST.name()));
221349
var simpleTestClient = tester.simpleTestClient()) {
@@ -248,7 +376,7 @@ void requestFilterCanShortCircuitResponse(String name, boolean withCloseConnecti
248376
.withConfig("withCloseConnection", withCloseConnection,
249377
"forwardingStyle", forwardingStyle)
250378
.build();
251-
try (var tester = mockKafkaKroxyliciousTester((mockBootstrap) -> proxy(mockBootstrap)
379+
try (var tester = mockKafkaKroxyliciousTester(mockBootstrap -> proxy(mockBootstrap)
252380
.addToFilterDefinitions(rejectFilter)
253381
.addToDefaultFilters(REJECTING_CREATE_TOPIC_FILTER.name()));
254382
var requestClient = tester.simpleTestClient()) {
@@ -322,7 +450,7 @@ private void doSupportsForwardDeferredByAsynchronousRequest(RequestResponseMarki
322450
"name", name,
323451
"forwardingStyle", forwardingStyle)
324452
.build();
325-
try (var tester = mockKafkaKroxyliciousTester((mockBootstrap) -> proxy(mockBootstrap)
453+
try (var tester = mockKafkaKroxyliciousTester(mockBootstrap -> proxy(mockBootstrap)
326454
.addToFilterDefinitions(markingFilter)
327455
.addToDefaultFilters(name));
328456
var kafkaClient = tester.simpleTestClient()) {
@@ -354,7 +482,8 @@ private void doSupportsForwardDeferredByAsynchronousRequest(RequestResponseMarki
354482
}
355483

356484
@Test
357-
@SuppressWarnings("java:S5841") // java:S5841 warns that doesNotContain passes for the empty case. Which is what we want here.
485+
@SuppressWarnings("java:S5841")
486+
// java:S5841 warns that doesNotContain passes for the empty case. Which is what we want here.
358487
void requestFiltersCanRespondWithoutProxyingDoesntLeakBuffers(KafkaCluster cluster, Admin admin) throws Exception {
359488
var config = proxy(cluster)
360489
.addToFilterDefinitions(REJECTING_CREATE_TOPIC_FILTER.build())

0 commit comments

Comments
 (0)