Skip to content

Commit b3f1652

Browse files
committed
Cache topic names per virtual cluster
We should be able to safely cache all the topic names as we cache at the edge. - Adds configuration to VirtualCluster: ``` topicNameCache: maxSize: 5 (default unlimited) expireAfterWrite: 60s (default never) expireAfterAccess: 60s (default 1h) ``` - Enables cache metrics - Makes cache size and expiry options configuration - Introduces RequestHeaderTagger utility to make the field tagging mechanism easier to re-use. To implement the caching behaviour we: 1. Install a TopicNameCacheFilter right at the end of the Filter chain, so that it is the Filter closest to the upstream cluster. I.e. it will be exposed to the raw topic names from upstream, without any other filter intervening. 2. The TopicNameRetriever adds a tag to the header of the MetadataRequest it makes to fetch topic names 3. The TopicNameCacheFilter detects this tag, and then if it knows the names of all topic ids requested it will short-circuit respond with those names, else it removes the tag and forwards the MetadataRequest upstream. The TopicNameCacheFilter is special in that there is a single instance installed into all channels for a VirtualCluster, so they can all benefit from the caching. The assumption is that the name for a uuid cannot change. Signed-off-by: Robert Young <robertyoungnz@gmail.com>
1 parent 5232b5f commit b3f1652

File tree

17 files changed

+815
-27
lines changed

17 files changed

+815
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#3112](https://github.com/kroxylicious/kroxylicious/pull/3112): cache topic name lookups per VirtualCluster
1011
* [#3129](https://github.com/kroxylicious/kroxylicious/pull/3129): build(deps): bump netty.version from 4.2.7.Final to 4.2.9.Final
1112
* [#2969](https://github.com/kroxylicious/kroxylicious/issues/2969): Give `ResponseFilter#onResponse` access to the api-version
1213
* [#3035](https://github.com/kroxylicious/kroxylicious/issues/3035): fix(sasl inspector): Fix config parsing error if SaslInspector with subject builder

kroxylicious-integration-tests/src/test/java/io/kroxylicious/it/FilterIT.java

Lines changed: 100 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@
5757
import io.kroxylicious.it.testplugins.RequestResponseMarkingFilterFactory;
5858
import io.kroxylicious.it.testplugins.TopicIdToNameResponseStamper;
5959
import io.kroxylicious.it.testplugins.TopicNameMetadataPrefixer;
60+
import io.kroxylicious.proxy.config.ConfigurationBuilder;
6061
import io.kroxylicious.proxy.config.NamedFilterDefinition;
6162
import io.kroxylicious.proxy.config.NamedFilterDefinitionBuilder;
6263
import io.kroxylicious.proxy.filter.simpletransform.FetchResponseTransformation;
6364
import io.kroxylicious.proxy.filter.simpletransform.ProduceRequestTransformation;
65+
import io.kroxylicious.proxy.internal.TopicNameRetriever;
6466
import io.kroxylicious.test.Request;
6567
import io.kroxylicious.test.Response;
6668
import io.kroxylicious.test.ResponsePayload;
@@ -168,14 +170,106 @@ void filtersCanLookUpTopicNames(KafkaCluster cluster, Topic topic1, Topic topic2
168170
message.unknownTaggedFields().add(
169171
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
170172
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
171-
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
172-
assertThat(tags).hasSize(1);
173-
String tag = tags.getFirst();
174-
String[] topicNames = tag.split(",");
173+
String[] topicNames = extractTopicNames(response);
175174
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1.name(), null), topicNameMapping(topic2Id, topic2.name(), null));
176175
}
177176
}
178177

178+
@Test
179+
void topicNamesAreCached() {
180+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
181+
TopicIdToNameResponseStamper.class.getName())
182+
.build();
183+
try (var tester = mockKafkaKroxyliciousTester(bootstrap -> {
184+
ConfigurationBuilder proxy = proxy(bootstrap);
185+
proxy.addToFilterDefinitions(namedFilterDefinition)
186+
.addToDefaultFilters(namedFilterDefinition.name());
187+
return proxy;
188+
});
189+
var client = tester.simpleTestClient()) {
190+
Uuid topic1Id = Uuid.randomUuid();
191+
String topic1Name = "topic1";
192+
Uuid topic2Id = Uuid.randomUuid();
193+
String topic2Name = "topic2";
194+
MetadataResponseData responseData = new MetadataResponseData();
195+
responseData.topics().add(getResponseTopic(topic1Name, topic1Id));
196+
responseData.topics().add(getResponseTopic(topic2Name, topic2Id));
197+
tester.addMockResponseForApiKey(new ResponsePayload(METADATA, TopicNameRetriever.METADATA_API_VER_WITH_TOPIC_ID_SUPPORT, responseData));
198+
tester.addMockResponseForApiKey(new ResponsePayload(API_VERSIONS, API_VERSIONS.latestVersion(), new ApiVersionsResponseData()));
199+
ApiVersionsRequestData message = new ApiVersionsRequestData();
200+
message.unknownTaggedFields().add(
201+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
202+
Response response = client.getSync(new Request(API_VERSIONS, API_VERSIONS.latestVersion(), "client", message));
203+
String[] topicNames = extractTopicNames(response);
204+
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1Name, null), topicNameMapping(topic2Id, topic2Name, null));
205+
assertThat(tester.getRequestsForApiKey(METADATA)).hasSize(1);
206+
assertThat(tester.getRequestsForApiKey(API_VERSIONS)).hasSize(1);
207+
208+
Response response2 = client.getSync(new Request(API_VERSIONS, API_VERSIONS.latestVersion(), "client", message));
209+
String[] topicNames2 = extractTopicNames(response2);
210+
assertThat(topicNames2).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1Name, null), topicNameMapping(topic2Id, topic2Name, null));
211+
// we infer that the topic ids were cached as they have been augmented onto the response but the mock server
212+
// has received no further metadata requests
213+
assertThat(tester.getRequestsForApiKey(METADATA)).hasSize(1);
214+
assertThat(tester.getRequestsForApiKey(API_VERSIONS)).hasSize(2);
215+
}
216+
}
217+
218+
@Test
219+
void topicNamesCachingScopedToCluster() {
220+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
221+
TopicIdToNameResponseStamper.class.getName())
222+
.build();
223+
try (var tester = mockKafkaKroxyliciousTester(bootstrap -> {
224+
ConfigurationBuilder proxy = proxy(bootstrap);
225+
proxy.addToFilterDefinitions(namedFilterDefinition)
226+
.addToDefaultFilters(namedFilterDefinition.name());
227+
return proxy;
228+
});
229+
var client = tester.simpleTestClient();
230+
var client2 = tester.simpleTestClient()) {
231+
Uuid topic1Id = Uuid.randomUuid();
232+
String topic1Name = "topic1";
233+
Uuid topic2Id = Uuid.randomUuid();
234+
String topic2Name = "topic2";
235+
MetadataResponseData responseData = new MetadataResponseData();
236+
responseData.topics().add(getResponseTopic(topic1Name, topic1Id));
237+
responseData.topics().add(getResponseTopic(topic2Name, topic2Id));
238+
tester.addMockResponseForApiKey(new ResponsePayload(METADATA, TopicNameRetriever.METADATA_API_VER_WITH_TOPIC_ID_SUPPORT, responseData));
239+
tester.addMockResponseForApiKey(new ResponsePayload(API_VERSIONS, API_VERSIONS.latestVersion(), new ApiVersionsResponseData()));
240+
ApiVersionsRequestData message = new ApiVersionsRequestData();
241+
message.unknownTaggedFields().add(
242+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
243+
Response response = client.getSync(new Request(API_VERSIONS, API_VERSIONS.latestVersion(), "client", message));
244+
String[] topicNames = extractTopicNames(response);
245+
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1Name, null), topicNameMapping(topic2Id, topic2Name, null));
246+
assertThat(tester.getRequestsForApiKey(METADATA)).hasSize(1);
247+
assertThat(tester.getRequestsForApiKey(API_VERSIONS)).hasSize(1);
248+
249+
// using a second client to prove that the cache scope is virtual-cluster wide
250+
Response response2 = client2.getSync(new Request(API_VERSIONS, API_VERSIONS.latestVersion(), "client", message));
251+
String[] topicNames2 = extractTopicNames(response2);
252+
assertThat(topicNames2).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1Name, null), topicNameMapping(topic2Id, topic2Name, null));
253+
// we infer that the topic names were cached as a single metadata requests was sent to the mock
254+
assertThat(tester.getRequestsForApiKey(METADATA)).hasSize(1);
255+
assertThat(tester.getRequestsForApiKey(API_VERSIONS)).hasSize(2);
256+
}
257+
}
258+
259+
private static String[] extractTopicNames(Response response) {
260+
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
261+
assertThat(tags).hasSize(1);
262+
String tag = tags.getFirst();
263+
return tag.split(",");
264+
}
265+
266+
private static MetadataResponseData.MetadataResponseTopic getResponseTopic(String topic1Name, Uuid topic1Id) {
267+
MetadataResponseData.MetadataResponseTopic topic = new MetadataResponseData.MetadataResponseTopic();
268+
topic.setName(topic1Name);
269+
topic.setTopicId(topic1Id);
270+
return topic;
271+
}
272+
179273
@Test
180274
void filtersCanLookUpNonExistentTopicNames(KafkaCluster cluster) {
181275
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
@@ -216,10 +310,7 @@ void filtersCanLookUpPartiallyExistingTopics(KafkaCluster cluster, Topic topic1)
216310
message.unknownTaggedFields().add(
217311
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (nonexistentTopic + "," + topic1Id).getBytes(StandardCharsets.UTF_8)));
218312
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
219-
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
220-
assertThat(tags).hasSize(1);
221-
String tag = tags.getFirst();
222-
String[] topicNames = tag.split(",");
313+
String[] topicNames = extractTopicNames(response);
223314
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, topic1.name(), null),
224315
topicNameMapping(nonexistentTopic, null, "UNKNOWN_TOPIC_ID"));
225316
}
@@ -261,10 +352,7 @@ private static void assertTopicStampingFilterObservesTopicNames(KafkaCluster clu
261352
message.unknownTaggedFields().add(
262353
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, (topic1Id + "," + topic2Id).getBytes(StandardCharsets.UTF_8)));
263354
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
264-
List<String> tags = unknownTaggedFieldsToStrings(response.payload().message(), TopicIdToNameResponseStamper.TOPIC_NAME_TAG).toList();
265-
assertThat(tags).hasSize(1);
266-
String tag = tags.getFirst();
267-
String[] topicNames = tag.split(",");
355+
String[] topicNames = extractTopicNames(response);
268356
assertThat(topicNames).containsExactlyInAnyOrder(topicNameMapping(topic1Id, expectedNameForTopic1, null),
269357
topicNameMapping(topic2Id, expectedNameForTopic2, null));
270358
}

kroxylicious-runtime/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@
6464
<groupId>com.fasterxml.jackson.core</groupId>
6565
<artifactId>jackson-databind</artifactId>
6666
</dependency>
67+
<dependency>
68+
<groupId>com.github.ben-manes.caffeine</groupId>
69+
<artifactId>caffeine</artifactId>
70+
</dependency>
6771
<dependency>
6872
<groupId>com.fasterxml.jackson.dataformat</groupId>
6973
<artifactId>jackson-dataformat-yaml</artifactId>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.config;
8+
9+
import java.time.Duration;
10+
import java.time.temporal.ChronoUnit;
11+
12+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
13+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
14+
15+
import edu.umd.cs.findbugs.annotations.Nullable;
16+
17+
/**
18+
* Cache Configuration
19+
* @param maxSize the maximum number of entries the cache may contain, default (null) means no maximum
20+
* @param expireAfterWrite cache entries should be automatically removed from the cache once this duration has elapsed after the entry's creation, or the most recent replacement of its value. The default is to never expire.
21+
* @param expireAfterAccess cache entries should be automatically removed from the cache once this duration has elapsed after the entry's creation, creation, the most recent replacement of its value, or its last access. The default is 1 hour.
22+
*/
23+
public record CacheConfiguration(@Nullable Integer maxSize,
24+
@JsonSerialize(using = DurationSerde.Serializer.class) @JsonDeserialize(using = DurationSerde.Deserializer.class) @Nullable Duration expireAfterWrite,
25+
@JsonSerialize(using = DurationSerde.Serializer.class) @JsonDeserialize(using = DurationSerde.Deserializer.class) @Nullable Duration expireAfterAccess) {
26+
27+
public static CacheConfiguration DEFAULT = new CacheConfiguration(null, null, null);
28+
29+
@Override
30+
public Duration expireAfterAccess() {
31+
return expireAfterAccess == null ? Duration.of(1L, ChronoUnit.HOURS) : expireAfterAccess;
32+
}
33+
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/config/Configuration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ private static VirtualClusterModel toVirtualClusterModel(VirtualCluster virtualC
133133
virtualCluster.logNetwork(),
134134
virtualCluster.logFrames(),
135135
filterDefinitions,
136+
virtualCluster.topicNameCacheConfig(),
136137
virtualCluster.subjectBuilder());
137138

138139
addGateways(virtualCluster.gateways(), virtualClusterModel);

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/config/VirtualCluster.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public record VirtualCluster(@JsonProperty(required = true) String name,
3333
boolean logNetwork,
3434
boolean logFrames,
3535
@Nullable List<String> filters,
36-
@Nullable TransportSubjectBuilderConfig subjectBuilder) {
36+
@Nullable TransportSubjectBuilderConfig subjectBuilder,
37+
@Nullable CacheConfiguration topicNameCache) {
3738

3839
private static final Pattern DNS_LABEL_PATTERN = Pattern.compile("^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", Pattern.CASE_INSENSITIVE);
3940

@@ -61,7 +62,7 @@ public VirtualCluster(@JsonProperty(required = true) String name,
6162
boolean logNetwork,
6263
boolean logFrames,
6364
@Nullable List<String> filters) {
64-
this(name, targetCluster, gateways, logNetwork, logFrames, filters, null);
65+
this(name, targetCluster, gateways, logNetwork, logFrames, filters, null, null);
6566
}
6667

6768
boolean isDnsLabel(String name) {
@@ -87,4 +88,8 @@ private void validateNoDuplicatedGatewayNames(List<VirtualClusterGateway> gatewa
8788
}
8889
}
8990

91+
CacheConfiguration topicNameCacheConfig() {
92+
return topicNameCache == null ? CacheConfiguration.DEFAULT : topicNameCache;
93+
}
94+
9095
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ void inSelectingServer() {
306306
if (endpointBinding.restrictUpstreamToMetadataDiscovery()) {
307307
filterAndInvokers.addAll(FilterAndInvoker.build("EagerMetadataLearner (internal)", new EagerMetadataLearner()));
308308
}
309+
filterAndInvokers.addAll(FilterAndInvoker.build("VirtualCluster TopicNameCache (internal)", virtualClusterModel.getTopicNameCacheFilter()));
309310
List<FilterAndInvoker> brokerAddressFilters = FilterAndInvoker.build("BrokerAddress (internal)",
310311
new BrokerAddressFilter(endpointBinding.endpointGateway(), endpointReconciler));
311312
filterAndInvokers.addAll(brokerAddressFilters);

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/TopicNameRetriever.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@
2929
import io.kroxylicious.proxy.filter.metadata.TopicLevelMetadataErrorException;
3030
import io.kroxylicious.proxy.filter.metadata.TopicNameMapping;
3131
import io.kroxylicious.proxy.filter.metadata.TopicNameMappingException;
32+
import io.kroxylicious.proxy.internal.util.RequestHeaderTagger;
3233
import io.kroxylicious.proxy.tag.VisibleForTesting;
3334

3435
import static java.util.Collections.unmodifiableMap;
3536
import static java.util.stream.Collectors.toMap;
3637

37-
final class TopicNameRetriever {
38+
public final class TopicNameRetriever {
3839
// Version 12 was the first version that uses topic ids.
39-
private static final short METADATA_API_VER_WITH_TOPIC_ID_SUPPORT = (short) 12;
40+
public static final short METADATA_API_VER_WITH_TOPIC_ID_SUPPORT = (short) 12;
4041
private final FilterContext filterContext;
4142
private final ThreadAwareExecutor filterDispatchExecutor;
4243

@@ -76,6 +77,7 @@ private CompletionStage<ApiMessage> requestTopicMetadata(Collection<Uuid> topicI
7677
RequestHeaderData requestHeaderData = new RequestHeaderData();
7778
requestHeaderData.setRequestApiKey(ApiKeys.METADATA.id);
7879
requestHeaderData.setRequestApiVersion(METADATA_API_VER_WITH_TOPIC_ID_SUPPORT);
80+
RequestHeaderTagger.tag(requestHeaderData, RequestHeaderTagger.Tag.LEARN_TOPIC_NAMES);
7981
return filterContext.sendRequest(requestHeaderData, request);
8082
}
8183

0 commit comments

Comments
 (0)