Skip to content

Commit 3b2555d

Browse files
authored
Feat: more defensive handling of malformed LeaveGroup requests (kroxylicious#2788)
* Special case handling of badly formed LeaveGroup requests Signed-off-by: Sam Barker <[email protected]> --------- Signed-off-by: Sam Barker <[email protected]>
1 parent fb3ef3a commit 3b2555d

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyExceptionMapper.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
package io.kroxylicious.proxy.internal;
88

9+
import java.util.List;
10+
911
import org.apache.kafka.common.ElectionType;
1012
import org.apache.kafka.common.IsolationLevel;
1113
import org.apache.kafka.common.TopicPartition;
@@ -290,9 +292,8 @@ private static AbstractRequest errorResponse(ApiKeys apiKey, ApiMessage reqBody,
290292
.build(apiVersion);
291293
break;
292294
case LEAVE_GROUP:
293-
LeaveGroupRequestData data = (LeaveGroupRequestData) reqBody;
294-
req = new LeaveGroupRequest.Builder(data.groupId(), data.members())
295-
.build(apiVersion);
295+
LeaveGroupRequest.Builder builder = toLeaveGroupBuilder((LeaveGroupRequestData) reqBody);
296+
req = builder.build(apiVersion);
296297
break;
297298
case SYNC_GROUP:
298299
req = new SyncGroupRequest((SyncGroupRequestData) reqBody, apiVersion);
@@ -574,4 +575,16 @@ private static AbstractRequest errorResponse(ApiKeys apiKey, ApiMessage reqBody,
574575
}
575576
return req;
576577
}
578+
579+
private static LeaveGroupRequest.Builder toLeaveGroupBuilder(LeaveGroupRequestData reqBody) {
580+
LeaveGroupRequest.Builder builder;
581+
if (!reqBody.members().isEmpty()) {
582+
builder = new LeaveGroupRequest.Builder(reqBody.groupId(), reqBody.members());
583+
}
584+
else {
585+
// This should never happen with a legitimate client but as a edge service we should handle malicious ones too
586+
builder = new LeaveGroupRequest.Builder(reqBody.groupId(), List.of(new LeaveGroupRequestData.MemberIdentity()));
587+
}
588+
return builder;
589+
}
577590
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/filter/RequestFilterResultBuilderTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.kafka.common.errors.UnknownServerException;
1313
import org.apache.kafka.common.message.FetchRequestData;
1414
import org.apache.kafka.common.message.FetchResponseData;
15+
import org.apache.kafka.common.message.LeaveGroupRequestData;
1516
import org.apache.kafka.common.message.RequestHeaderData;
1617
import org.apache.kafka.common.message.ResponseHeaderData;
1718
import org.apache.kafka.common.protocol.ApiKeys;
@@ -164,6 +165,30 @@ void shouldBuildErrorResponse(RequestFactory.ApiMessageVersion versionedMessage)
164165
});
165166
}
166167

168+
@Test
169+
void shouldBuildErrorResponseForIllegitimateLeaveGroupRequest() {
170+
// Given
171+
final ApiKeys apiKey = ApiKeys.LEAVE_GROUP;
172+
final Class<? extends ApiMessage> responseMessageClass = apiKey.messageType.newResponse().getClass();
173+
var header = new RequestHeaderData();
174+
header.setRequestApiKey(apiKey.id);
175+
header.setRequestApiVersion(apiKey.latestVersion());
176+
header.setCorrelationId(23456);
177+
178+
// When
179+
var future = builder.errorResponse(header, new LeaveGroupRequestData(), FILTER_RUNTIME_EXCEPTION).completed();
180+
181+
// Then
182+
assertThat(future)
183+
.succeedsWithin(Duration.ofSeconds(10))
184+
.satisfies(result -> {
185+
assertThat(result.header())
186+
.satisfies(headerMessage -> assertThat(headerMessage).asInstanceOf(InstanceOfAssertFactories.type(ResponseHeaderData.class))
187+
.satisfies(responseHeaderData -> assertThat(responseHeaderData.correlationId()).isEqualTo(header.correlationId())));
188+
assertThat(result.message()).satisfies(actualResponse -> assertThat(actualResponse).isExactlyInstanceOf(responseMessageClass));
189+
});
190+
}
191+
167192
@Test
168193
void shouldErrorResponseShouldNotInvokeRemainingFilterChain() {
169194
// Given

0 commit comments

Comments
 (0)