Skip to content

Commit ad6e725

Browse files
authored
End to end test verifying a two stage api-versions downgrade actually works for real. (kroxylicious#1662)
Uses Redpanda (22.1.11) as a broker a this version supports ApiVersions version up to 2. The test uses this broker together with the proxy locked to v3 and a client talking v4. Signed-off-by: Keith Wall <kwall@apache.org>
1 parent 6760a4a commit ad6e725

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsDowngradeIT.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
import org.apache.kafka.common.security.plain.PlainLoginModule;
2424
import org.assertj.core.api.InstanceOfAssertFactories;
2525
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.condition.EnabledIf;
2627
import org.junit.jupiter.api.extension.ExtendWith;
28+
import org.testcontainers.DockerClientFactory;
29+
import org.testcontainers.containers.GenericContainer;
30+
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
31+
import org.testcontainers.utility.DockerImageName;
2732

2833
import io.github.nettyplus.leakdetector.junit.NettyLeakDetectorExtension;
2934
import io.netty.buffer.ByteBuf;
@@ -69,6 +74,7 @@ public class ApiVersionsDowngradeIT {
6974
public static final short API_VERSIONS_ID = ApiKeys.API_VERSIONS.id;
7075
public static final String SASL_USER = "alice";
7176
public static final String SASL_PASSWORD = "foo";
77+
private static final DockerImageName OLD_REDPANDA_USING_API_VER0_2 = DockerImageName.parse("docker.redpanda.com/redpandadata/redpanda:v22.1.11");
7278

7379
@Test
7480
void clientAheadOfProxy() {
@@ -136,6 +142,51 @@ private void doProxyRestrictedToOlderApiVersion(KafkaCluster cluster, Map<String
136142
}
137143
}
138144

145+
/**
146+
* In this test, we verify the assumption that the Kafka Client is capable
147+
* of negotiating down api-versions version twice. This test uses an
148+
* old version of Redpanda (that supports max v2) and the Proxy (restricted
149+
* to v3). The Kafka Client is using v4.
150+
* <br>
151+
* Client will first make a v4 request. The proxy's response will cause the client
152+
* to try again at v3. The broker will then cause the client to try a third time at v2.
153+
* This request will satisfy all parties and the connection establishment will continue.
154+
* All this occurs on a single connection.
155+
*/
156+
@Test
157+
@EnabledIf(value = "isDockerAvailable", disabledReason = "docker unavailable")
158+
void clientAheadOfProxyWhichIsAheadOfBroker() {
159+
160+
try (var redpanda = createRedpanda(OLD_REDPANDA_USING_API_VER0_2)) {
161+
redpanda.start();
162+
var redpandaApiVersion = (short) 2;
163+
var proxyApiVersion = (short) (ApiKeys.API_VERSIONS.latestVersion() - 1);
164+
assertThat(redpandaApiVersion).isLessThan(proxyApiVersion);
165+
166+
var testConfigEnabled = Features.builder().enable(Feature.TEST_ONLY_CONFIGURATION).build();
167+
var proxy = proxy("localhost:9092")
168+
.withDevelopment(Map.of("apiKeyIdMaxVersionOverride", Map.of(ApiKeys.API_VERSIONS.name(), proxyApiVersion)));
169+
try (var tester = newBuilder(proxy).setFeatures(testConfigEnabled).createDefaultKroxyliciousTester();
170+
var admin = tester.admin()) {
171+
// We've got no way to observe the actual version of the API versions request that is used during _negotiation_
172+
// so we make do with asserting the connection is usable.
173+
final var result = admin.describeCluster().clusterId();
174+
assertThat(result).as("Unable to get the clusterId from the Kafka cluster").succeedsWithin(Duration.ofSeconds(10));
175+
// check that the client is actually using the correct version.
176+
assertThat(admin)
177+
.extracting("instance")
178+
.extracting("client")
179+
.extracting("apiVersions")
180+
.extracting("nodeApiVersions", InstanceOfAssertFactories.map(String.class, NodeApiVersions.class))
181+
.hasEntrySatisfying("-1", nav -> {
182+
assertThat(nav.apiVersion(ApiKeys.API_VERSIONS).maxVersion())
183+
.isEqualTo(redpandaApiVersion);
184+
});
185+
}
186+
187+
}
188+
}
189+
139190
private static @NonNull OpaqueRequestFrame createHypotheticalFutureRequest() {
140191
short unsupportedVersion = (short) (ApiKeys.API_VERSIONS.latestVersion(true) + 1);
141192
RequestHeaderData requestHeaderData = getRequestHeaderData(API_VERSIONS_ID, unsupportedVersion, CORRELATION_ID);
@@ -164,4 +215,27 @@ private void doProxyRestrictedToOlderApiVersion(KafkaCluster cluster, Map<String
164215
return requestHeaderData;
165216
}
166217

218+
@NonNull
219+
private RedpandaContainer createRedpanda(DockerImageName image) {
220+
var redpanda = new RedpandaContainer(image);
221+
redpanda.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*Bootstrap complete.*"));
222+
redpanda.addFixedExposedPort(9092, 9092);
223+
return redpanda;
224+
}
225+
226+
private static class RedpandaContainer extends GenericContainer<RedpandaContainer> {
227+
228+
private RedpandaContainer(DockerImageName dockerImageName) {
229+
super(dockerImageName);
230+
}
231+
232+
@Override
233+
protected void addFixedExposedPort(int hostPort, int containerPort) {
234+
super.addFixedExposedPort(hostPort, containerPort);
235+
}
236+
}
237+
238+
static boolean isDockerAvailable() {
239+
return DockerClientFactory.instance().isDockerAvailable();
240+
}
167241
}

0 commit comments

Comments
 (0)