Skip to content

Commit f7fc568

Browse files
authored
Support Kraft mode for Kafka container (#723)
1 parent c6930aa commit f7fc568

File tree

4 files changed

+232
-57
lines changed

4 files changed

+232
-57
lines changed

docs/modules/kafka.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ npm install @testcontainers/kafka --save-dev
2121
<!--codeinclude-->
2222
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:ssl
2323
<!--/codeinclude-->
24+
25+
<!--codeinclude-->
26+
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectKraft
27+
<!--/codeinclude-->

packages/modules/kafka/src/kafka-container.test.ts

Lines changed: 85 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,34 +72,43 @@ describe("KafkaContainer", () => {
7272
await originalKafkaContainer.stop();
7373
});
7474

75-
describe("when SASL SSL config listener provided", () => {
75+
describe.each([
76+
{
77+
name: "and zookpeer enabled",
78+
configure: () => ({}),
79+
},
80+
{
81+
name: "and kraft enabled",
82+
configure: (kafkaContainer: KafkaContainer) => kafkaContainer.withKraft(),
83+
},
84+
])("when SASL SSL config listener provided $name", ({ configure }) => {
7685
const certificatesDir = path.resolve(__dirname, "..", "test-certs");
7786

7887
// ssl {
7988
it(`should connect locally`, async () => {
80-
const kafkaContainer = await new KafkaContainer()
81-
.withSaslSslListener({
82-
port: 9094,
83-
sasl: {
84-
mechanism: "SCRAM-SHA-512",
85-
user: {
86-
name: "app-user",
87-
password: "userPassword",
88-
},
89-
},
90-
keystore: {
91-
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
92-
passphrase: "serverKeystorePassword",
93-
},
94-
truststore: {
95-
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
96-
passphrase: "serverTruststorePassword",
89+
const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
90+
port: 9096,
91+
sasl: {
92+
mechanism: "SCRAM-SHA-512",
93+
user: {
94+
name: "app-user",
95+
password: "userPassword",
9796
},
98-
})
99-
.start();
97+
},
98+
keystore: {
99+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
100+
passphrase: "serverKeystorePassword",
101+
},
102+
truststore: {
103+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
104+
passphrase: "serverTruststorePassword",
105+
},
106+
});
107+
configure(kafkaContainer);
108+
const startedKafkaContainer = await kafkaContainer.start();
100109

101-
await testPubSub(kafkaContainer, {
102-
brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9094)}`],
110+
await testPubSub(startedKafkaContainer, {
111+
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
103112
sasl: {
104113
username: "app-user",
105114
password: "userPassword",
@@ -109,7 +118,7 @@ describe("KafkaContainer", () => {
109118
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
110119
},
111120
});
112-
await kafkaContainer.stop();
121+
await startedKafkaContainer.stop();
113122
});
114123
// }
115124

@@ -180,6 +189,59 @@ describe("KafkaContainer", () => {
180189
});
181190
});
182191

192+
// connectKraft {
193+
it("should connect using kraft", async () => {
194+
const kafkaContainer = await new KafkaContainer().withKraft().withExposedPorts(9093).start();
195+
196+
await testPubSub(kafkaContainer);
197+
198+
await kafkaContainer.stop();
199+
});
200+
// }
201+
202+
it("should throw an error when using kraft and and confluence platfom below 7.0.0", async () => {
203+
expect(() => new KafkaContainer("confluentinc/cp-kafka:6.2.14").withKraft()).toThrow(
204+
"Provided Confluent Platform's version 6.2.14 is not supported in Kraft mode (must be 7.0.0 or above)"
205+
);
206+
});
207+
208+
it("should connect using kraft and custom network", async () => {
209+
const network = await new Network().start();
210+
const kafkaContainer = await new KafkaContainer().withKraft().withNetwork(network).withExposedPorts(9093).start();
211+
212+
await testPubSub(kafkaContainer);
213+
214+
await kafkaContainer.stop();
215+
await network.stop();
216+
});
217+
218+
it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => {
219+
const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0")
220+
.withKraft()
221+
.withExposedPorts(9093)
222+
.withSaslSslListener({
223+
port: 9094,
224+
sasl: {
225+
mechanism: "SCRAM-SHA-512",
226+
user: {
227+
name: "app-user",
228+
password: "userPassword",
229+
},
230+
},
231+
keystore: {
232+
content: "fake",
233+
passphrase: "serverKeystorePassword",
234+
},
235+
truststore: {
236+
content: "fake",
237+
passphrase: "serverTruststorePassword",
238+
},
239+
});
240+
await expect(() => kafkaContainer.start()).rejects.toThrow(
241+
"Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)"
242+
);
243+
});
244+
183245
const testPubSub = async (kafkaContainer: StartedTestContainer, additionalConfig: Partial<KafkaConfig> = {}) => {
184246
const kafka = new Kafka({
185247
logLevel: logLevel.NOTHING,

0 commit comments

Comments
 (0)