Skip to content

Commit d1839cc

Browse files
committed
name retriever short-circuit on empty names request
If a client requests a mapping for the empty set, immediately supply them with an empty mapping, using the filter dispatch executor associated with the filter (until now we have leant on sendRequest to control which thread chained futures will execute on). Why: There is nothing to retrieve, so making a Metadata Request is a waste of time and resources. In some filters it may be convenient to always request topic names, to simplify async logic, even if the topic name set is empty. Signed-off-by: Robert Young <[email protected]>
1 parent cb8612a commit d1839cc

File tree

5 files changed

+63
-6
lines changed

5 files changed

+63
-6
lines changed

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,26 @@ class FilterIT {
110110
public static final String TOPIC_ID_LOOKUP_FILTER_NAME = "topicIdLookup";
111111
public static final String TOPIC_NAME_PREFIXER_FILTER_NAME = "topicNamePrefixer";
112112

113+
@Test
114+
void filtersCanLookUpEmptyTopicNames(KafkaCluster cluster) {
115+
NamedFilterDefinition namedFilterDefinition = new NamedFilterDefinitionBuilder(TOPIC_ID_LOOKUP_FILTER_NAME,
116+
TopicIdToNameResponseStamper.class.getName())
117+
.build();
118+
var config = proxy(cluster)
119+
.addToFilterDefinitions(namedFilterDefinition)
120+
.addToDefaultFilters(namedFilterDefinition.name());
121+
122+
try (var tester = kroxyliciousTester(config);
123+
var client = tester.simpleTestClient()) {
124+
MetadataRequestData message = new MetadataRequestData();
125+
message.unknownTaggedFields().add(
126+
new RawTaggedField(TopicIdToNameResponseStamper.TOPIC_ID_TAG, ("").getBytes(StandardCharsets.UTF_8)));
127+
Response response = client.getSync(new Request(METADATA, METADATA.latestVersion(), "client", message));
128+
// checking that the request/response flows through despite requesting an empty topic id list
129+
assertThat(response).isNotNull();
130+
}
131+
}
132+
113133
@Test
114134
void filtersCanLookUpTopicNames(KafkaCluster cluster, Topic topic1, Topic topic2) {
115135
Uuid topic1Id = topic1.topicId().orElseThrow();

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/testplugins/TopicIdToNameResponseStamper.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.stream.Stream;
1818

1919
import org.apache.kafka.common.Uuid;
20+
import org.apache.kafka.common.errors.InvalidRequestException;
2021
import org.apache.kafka.common.message.RequestHeaderData;
2122
import org.apache.kafka.common.message.ResponseHeaderData;
2223
import org.apache.kafka.common.protocol.ApiKeys;
@@ -26,6 +27,7 @@
2627
import io.kroxylicious.UnknownTaggedFields;
2728
import io.kroxylicious.proxy.filter.Filter;
2829
import io.kroxylicious.proxy.filter.FilterContext;
30+
import io.kroxylicious.proxy.filter.FilterDispatchExecutor;
2931
import io.kroxylicious.proxy.filter.FilterFactory;
3032
import io.kroxylicious.proxy.filter.FilterFactoryContext;
3133
import io.kroxylicious.proxy.filter.RequestFilter;
@@ -57,22 +59,31 @@ public Void initialize(FilterFactoryContext context, Void config) throws PluginC
5759

5860
@Override
5961
public Filter createFilter(FilterFactoryContext context, Void initializationData) {
60-
return new TopicIdToNameResponseStamperFilter();
62+
return new TopicIdToNameResponseStamperFilter(context.filterDispatchExecutor());
6163
}
6264

6365
static class TopicIdToNameResponseStamperFilter implements RequestFilter, ResponseFilter {
6466

67+
private final FilterDispatchExecutor filterDispatchExecutor;
6568
Map<Integer, TopicNameMapping> correlated = new HashMap<>();
6669

70+
TopicIdToNameResponseStamperFilter(FilterDispatchExecutor filterDispatchExecutor) {
71+
this.filterDispatchExecutor = filterDispatchExecutor;
72+
}
73+
6774
@Override
6875
public CompletionStage<RequestFilterResult> onRequest(ApiKeys apiKey, RequestHeaderData header, ApiMessage request, FilterContext context) {
6976
List<String> list = UnknownTaggedFields.unknownTaggedFieldsToStrings(request, TOPIC_ID_TAG).toList();
7077
if (list.isEmpty()) {
71-
return context.forwardRequest(header, request);
78+
return context.requestFilterResultBuilder().errorResponse(header, request, new InvalidRequestException("no topic id tag")).withCloseConnection()
79+
.completed();
7280
}
7381

74-
Set<Uuid> uuids = Arrays.stream(list.getFirst().split(",")).map(Uuid::fromString).collect(Collectors.toSet());
82+
Set<Uuid> uuids = Arrays.stream(list.getFirst().split(",")).filter(s -> !s.isEmpty()).map(Uuid::fromString).collect(Collectors.toSet());
7583
return context.topicNames(uuids).thenCompose(topicNames -> {
84+
if (!filterDispatchExecutor.isInFilterDispatchThread()) {
85+
throw new IllegalStateException("work chained to topicNames future should execute in filter dispatch thread");
86+
}
7687
correlated.put(header.correlationId(), topicNames);
7788
return context.forwardRequest(header, request);
7889
});

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/FilterHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ public <M extends ApiMessage> CompletionStage<M> sendRequest(RequestHeaderData h
622622

623623
@Override
624624
public CompletionStage<TopicNameMapping> topicNames(Collection<Uuid> topicIds) {
625-
return new TopicNameRetriever(this).topicNames(topicIds);
625+
return new TopicNameRetriever(this, Objects.requireNonNull(ctx).executor()).topicNames(topicIds);
626626
}
627627

628628
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/TopicNameRetriever.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import java.util.Objects;
14+
import java.util.concurrent.CompletableFuture;
1415
import java.util.concurrent.CompletionException;
1516
import java.util.concurrent.CompletionStage;
17+
import java.util.concurrent.Executor;
1618

1719
import org.apache.kafka.common.Uuid;
1820
import org.apache.kafka.common.message.MetadataRequestData;
@@ -36,13 +38,18 @@ final class TopicNameRetriever {
3638
// Version 12 was the first version that uses topic ids.
3739
private static final short METADATA_API_VER_WITH_TOPIC_ID_SUPPORT = (short) 12;
3840
private final FilterContext filterContext;
41+
private final Executor filterDispatchExecutor;
3942

40-
TopicNameRetriever(FilterContext filterContext) {
43+
TopicNameRetriever(FilterContext filterContext, Executor filterDispatchExecutor) {
4144
this.filterContext = filterContext;
45+
this.filterDispatchExecutor = filterDispatchExecutor;
4246
}
4347

4448
CompletionStage<TopicNameMapping> topicNames(Collection<Uuid> topicIds) {
4549
Objects.requireNonNull(topicIds);
50+
if (topicIds.isEmpty()) {
51+
return CompletableFuture.supplyAsync(() -> MapTopicNameMapping.EMPTY, filterDispatchExecutor).minimalCompletionStage();
52+
}
4653
CompletionStage<ApiMessage> apiMessageCompletionStage = requestTopicMetadata(topicIds);
4754
return apiMessageCompletionStage
4855
.thenApply(message -> extractTopicNames(topicIds, message))
@@ -116,6 +123,8 @@ private static TopicNameMapping doExtractTopicNames(Collection<Uuid> topicIds, M
116123
*/
117124
private record MapTopicNameMapping(Map<Uuid, String> topicNames, Map<Uuid, TopicNameMappingException> failures) implements TopicNameMapping {
118125

126+
private static final TopicNameMapping EMPTY = new MapTopicNameMapping(Map.of(), Map.of());
127+
119128
private MapTopicNameMapping {
120129
Objects.requireNonNull(topicNames);
121130
Objects.requireNonNull(failures);

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/TopicNameRetrieverTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.kafka.common.message.MetadataResponseData;
1717
import org.apache.kafka.common.protocol.ApiMessage;
1818
import org.apache.kafka.common.protocol.Errors;
19+
import org.awaitility.core.InternalExecutorServiceFactory;
1920
import org.junit.jupiter.api.BeforeEach;
2021
import org.junit.jupiter.api.Test;
2122
import org.junit.jupiter.api.extension.ExtendWith;
@@ -33,6 +34,8 @@
3334
import static org.assertj.core.api.Assertions.assertThat;
3435
import static org.assertj.core.api.Assertions.entry;
3536
import static org.mockito.ArgumentMatchers.any;
37+
import static org.mockito.Mockito.never;
38+
import static org.mockito.Mockito.verify;
3639
import static org.mockito.Mockito.when;
3740

3841
@ExtendWith(MockitoExtension.class)
@@ -49,7 +52,7 @@ class TopicNameRetrieverTest {
4952

5053
@BeforeEach
5154
void setUp() {
52-
retriever = new TopicNameRetriever(filterContext);
55+
retriever = new TopicNameRetriever(filterContext, InternalExecutorServiceFactory.sameThreadExecutorService());
5356
}
5457

5558
@Test
@@ -68,6 +71,20 @@ void retrieveTopicNamesMapping() {
6871
});
6972
}
7073

74+
@Test
75+
void retrieveEmptyTopicNamesMapping() {
76+
// when
77+
CompletionStage<TopicNameMapping> topicNames = getTopicNamesMapping(Set.of());
78+
// then
79+
assertThat(topicNames.toCompletableFuture()).succeedsWithin(Duration.ZERO)
80+
.satisfies(topicNamesMapping -> {
81+
assertThat(topicNamesMapping.anyFailures()).isFalse();
82+
assertThat(topicNamesMapping.topicNames()).isEmpty();
83+
assertThat(topicNamesMapping.failures()).isEmpty();
84+
});
85+
verify(filterContext, never()).sendRequest(any(), any());
86+
}
87+
7188
@Test
7289
void retrieveTopicNamesMappingServerPartialResponse() {
7390
// given

0 commit comments

Comments
 (0)