Skip to content

Commit b649150

Browse files
Add support for Confluenct Kafka 8.x
1 parent b146b68 commit b649150

File tree

7 files changed

+460
-8
lines changed

7 files changed

+460
-8
lines changed

docs/modules/kafka.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,34 @@
88
npm install @testcontainers/kafka --save-dev
99
```
1010

11-
## Examples
11+
## Kafka 8.x
12+
13+
### Examples
14+
15+
<!--codeinclude-->
16+
[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connect
17+
<!--/codeinclude-->
18+
19+
<!--codeinclude-->
20+
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:ssl
21+
<!--/codeinclude-->
22+
23+
## Kafka 7.x
24+
25+
### Examples
1226

1327
<!--codeinclude-->
14-
[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectBuiltInZK
28+
[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK
1529
<!--/codeinclude-->
1630

1731
<!--codeinclude-->
18-
[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectProvidedZK
32+
[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK
1933
<!--/codeinclude-->
2034

2135
<!--codeinclude-->
22-
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:ssl
36+
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:ssl
2337
<!--/codeinclude-->
2438

2539
<!--codeinclude-->
26-
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectKraft
40+
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft
2741
<!--/codeinclude-->

packages/modules/kafka/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
FROM confluentinc/cp-kafka:7.9.1
1+
FROM confluentinc/cp-kafka:8.0.0
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import fs from "fs";
2+
import path from "path";
3+
import { GenericContainer, Network } from "testcontainers";
4+
import { KafkaContainer } from "./kafka-container";
5+
import { testPubSub } from "./test-helper";
6+
7+
const IMAGE = "confluentinc/cp-kafka:7.9.1";
8+
9+
describe("KafkaContainer", { timeout: 240_000 }, () => {
10+
// connectBuiltInZK {
11+
it("should connect using in-built zoo-keeper", async () => {
12+
const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start();
13+
14+
await testPubSub(kafkaContainer);
15+
16+
await kafkaContainer.stop();
17+
});
18+
// }
19+
20+
it("should connect using in-built zoo-keeper and custom images", async () => {
21+
const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start();
22+
23+
await testPubSub(kafkaContainer);
24+
25+
await kafkaContainer.stop();
26+
});
27+
28+
it("should connect using in-built zoo-keeper and custom network", async () => {
29+
const network = await new Network().start();
30+
31+
const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start();
32+
33+
await testPubSub(kafkaContainer);
34+
35+
await kafkaContainer.stop();
36+
await network.stop();
37+
});
38+
39+
// connectProvidedZK {
40+
it("should connect using provided zoo-keeper and network", async () => {
41+
const network = await new Network().start();
42+
43+
const zooKeeperHost = "zookeeper";
44+
const zooKeeperPort = 2181;
45+
const zookeeperContainer = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
46+
.withNetwork(network)
47+
.withNetworkAliases(zooKeeperHost)
48+
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
49+
.withExposedPorts(zooKeeperPort)
50+
.start();
51+
52+
const kafkaContainer = await new KafkaContainer(IMAGE)
53+
.withNetwork(network)
54+
.withZooKeeper(zooKeeperHost, zooKeeperPort)
55+
.withExposedPorts(9093)
56+
.start();
57+
58+
await testPubSub(kafkaContainer);
59+
60+
await zookeeperContainer.stop();
61+
await kafkaContainer.stop();
62+
await network.stop();
63+
});
64+
// }
65+
66+
it("should be reusable", async () => {
67+
const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
68+
const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
69+
70+
expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId());
71+
72+
await originalKafkaContainer.stop();
73+
});
74+
75+
describe.each([
76+
{
77+
name: "and zookpeer enabled",
78+
configure: () => ({}),
79+
},
80+
{
81+
name: "and kraft enabled",
82+
configure: (kafkaContainer: KafkaContainer) => kafkaContainer.withKraft(),
83+
},
84+
])("when SASL SSL config listener provided $name", ({ configure }) => {
85+
const certificatesDir = path.resolve(__dirname, "..", "test-certs");
86+
87+
// ssl {
88+
it(`should connect locally`, async () => {
89+
const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
90+
port: 9096,
91+
sasl: {
92+
mechanism: "SCRAM-SHA-512",
93+
user: {
94+
name: "app-user",
95+
password: "userPassword",
96+
},
97+
},
98+
keystore: {
99+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
100+
passphrase: "serverKeystorePassword",
101+
},
102+
truststore: {
103+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
104+
passphrase: "serverTruststorePassword",
105+
},
106+
});
107+
configure(kafkaContainer);
108+
const startedKafkaContainer = await kafkaContainer.start();
109+
110+
await testPubSub(startedKafkaContainer, {
111+
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
112+
sasl: {
113+
username: "app-user",
114+
password: "userPassword",
115+
mechanism: "scram-sha-512",
116+
},
117+
ssl: {
118+
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
119+
},
120+
});
121+
await startedKafkaContainer.stop();
122+
});
123+
// }
124+
125+
it(`should connect within Docker network`, async () => {
126+
const network = await new Network().start();
127+
128+
const kafkaContainer = await new KafkaContainer(IMAGE)
129+
.withNetwork(network)
130+
.withNetworkAliases("kafka")
131+
.withSaslSslListener({
132+
port: 9094,
133+
sasl: {
134+
mechanism: "SCRAM-SHA-512",
135+
user: {
136+
name: "app-user",
137+
password: "userPassword",
138+
},
139+
},
140+
keystore: {
141+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
142+
passphrase: "serverKeystorePassword",
143+
},
144+
truststore: {
145+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
146+
passphrase: "serverTruststorePassword",
147+
},
148+
})
149+
.start();
150+
151+
const kafkaCliContainer = await new GenericContainer(IMAGE)
152+
.withNetwork(network)
153+
.withCommand(["bash", "-c", "sleep infinity"])
154+
.withCopyFilesToContainer([
155+
{
156+
source: path.resolve(certificatesDir, "kafka.client.truststore.pem"),
157+
target: "/truststore.pem",
158+
},
159+
])
160+
.withCopyContentToContainer([
161+
{
162+
content: `
163+
security.protocol=SASL_SSL
164+
ssl.truststore.location=/truststore.pem
165+
ssl.truststore.type=PEM
166+
ssl.endpoint.identification.algorithm=
167+
sasl.mechanism=SCRAM-SHA-512
168+
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\
169+
username="app-user" \\
170+
password="userPassword";
171+
`,
172+
target: "/etc/kafka/consumer.properties",
173+
},
174+
])
175+
.start();
176+
177+
await kafkaCliContainer.exec(
178+
"kafka-topics --create --topic test-topic --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties"
179+
);
180+
const { output, exitCode } = await kafkaCliContainer.exec(
181+
"kafka-topics --list --bootstrap-server kafka:9094 --command-config /etc/kafka/consumer.properties"
182+
);
183+
184+
expect(exitCode).toBe(0);
185+
expect(output).toContain("test-topic");
186+
187+
await kafkaCliContainer.stop();
188+
await kafkaContainer.stop();
189+
});
190+
});
191+
192+
// connectKraft {
193+
it("should connect using kraft", async () => {
194+
const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withExposedPorts(9093).start();
195+
196+
await testPubSub(kafkaContainer);
197+
198+
await kafkaContainer.stop();
199+
});
200+
// }
201+
202+
it("should throw an error when using kraft and and confluence platfom below 7.0.0", async () => {
203+
expect(() => new KafkaContainer("confluentinc/cp-kafka:6.2.14").withKraft()).toThrow(
204+
"Provided Confluent Platform's version 6.2.14 is not supported in Kraft mode (must be 7.0.0 or above)"
205+
);
206+
});
207+
208+
it("should connect using kraft and custom network", async () => {
209+
const network = await new Network().start();
210+
const kafkaContainer = await new KafkaContainer(IMAGE)
211+
.withKraft()
212+
.withNetwork(network)
213+
.withExposedPorts(9093)
214+
.start();
215+
216+
await testPubSub(kafkaContainer);
217+
218+
await kafkaContainer.stop();
219+
await network.stop();
220+
});
221+
222+
it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => {
223+
const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0")
224+
.withKraft()
225+
.withExposedPorts(9093)
226+
.withSaslSslListener({
227+
port: 9094,
228+
sasl: {
229+
mechanism: "SCRAM-SHA-512",
230+
user: {
231+
name: "app-user",
232+
password: "userPassword",
233+
},
234+
},
235+
keystore: {
236+
content: "fake",
237+
passphrase: "serverKeystorePassword",
238+
},
239+
truststore: {
240+
content: "fake",
241+
passphrase: "serverTruststorePassword",
242+
},
243+
});
244+
await expect(() => kafkaContainer.start()).rejects.toThrow(
245+
"Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)"
246+
);
247+
});
248+
});

0 commit comments

Comments
 (0)