Skip to content

Commit 71dede1

Browse files
Kafka
1 parent 0e45373 commit 71dede1

File tree

6 files changed

+153
-102
lines changed

6 files changed

+153
-102
lines changed

docs/modules/cosmosdb.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# CosmosDB
22

3-
!!!info
3+
!!! info
44
This module uses the **Linux-based** version of the CosmosDB emulator. In general, it:
55

66
- Provides better compatibility on a variety of systems.

docs/modules/kafka.md

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,71 @@
1-
# Kafka Module
2-
3-
[Kafka](https://kafka.apache.org/) is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
1+
# Kafka
42

53
## Install
64

75
```bash
86
npm install @testcontainers/kafka --save-dev
97
```
108

11-
## Kafka 8.x
9+
## Examples
10+
11+
### Kafka 8.x
12+
13+
These examples use the following libraries:
14+
15+
- [kafkajs](https://www.npmjs.com/package/kafkajs)
1216

13-
### Examples
17+
npm install kafkajs
18+
19+
Choose an image from the [container registry](https://hub.docker.com/r/confluentinc/cp-kafka) and substitute `IMAGE`.
20+
21+
#### Produce/consume a message
1422

1523
<!--codeinclude-->
16-
[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connectKafkaLatest
24+
[Code](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:kafkaLatestConnect
25+
[Helper function](../../packages/modules/kafka/src/test-helper.ts) inside_block:kafkaTestHelper
1726
<!--/codeinclude-->
1827

28+
#### With SSL
29+
1930
<!--codeinclude-->
20-
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:ssl
31+
[Code](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:kafkaLatestSsl
32+
[Helper function](../../packages/modules/kafka/src/test-helper.ts) inside_block:kafkaTestHelper
2133
<!--/codeinclude-->
2234

23-
## Kafka 7.x
35+
---
36+
37+
### Kafka 7.x
2438

25-
### Examples
39+
These examples use the following libraries:
40+
41+
- [kafkajs](https://www.npmjs.com/package/kafkajs)
42+
43+
npm install kafkajs
44+
45+
Choose an image from the [container registry](https://hub.docker.com/r/confluentinc/cp-kafka) and substitute `IMAGE`.
46+
47+
#### Produce/consume a message
2648

2749
<!--codeinclude-->
28-
[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK
50+
[Code](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK
51+
[Helper function](../../packages/modules/kafka/src/test-helper.ts) inside_block:kafkaTestHelper
2952
<!--/codeinclude-->
3053

54+
#### With SSL
55+
3156
<!--codeinclude-->
32-
[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK
57+
[Code](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:kafkaSsl
58+
[Helper function](../../packages/modules/kafka/src/test-helper.ts) inside_block:kafkaTestHelper
3359
<!--/codeinclude-->
3460

61+
#### With provided ZooKeeper
62+
3563
<!--codeinclude-->
36-
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:ssl
64+
[](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK
3765
<!--/codeinclude-->
3866

67+
#### With Kraft
68+
3969
<!--codeinclude-->
40-
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft
70+
[](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft
4171
<!--/codeinclude-->

docs/modules/postgresql.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ This example shows the usage of the postgres module's Snapshot feature to give e
3232
to recreate the database container on every test or run heavy scripts to clean your database. This makes the individual
3333
tests very modular, since they always run on a brand-new database.
3434

35-
!!!tip
35+
!!! tip
3636
You should never pass the `"postgres"` system database as the container database name if you want to use snapshots.
3737
The Snapshot logic requires dropping the connected database and using the system database to run commands, which will
3838
not work if the database for the container is set to `"postgres"`.

packages/modules/kafka/src/kafka-container-7.test.ts

Lines changed: 81 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,99 +2,127 @@ import fs from "fs";
22
import path from "path";
33
import { GenericContainer, Network } from "testcontainers";
44
import { KafkaContainer } from "./kafka-container";
5-
import { testPubSub } from "./test-helper";
5+
import { assertMessageProducedAndConsumed } from "./test-helper";
66

77
const IMAGE = "confluentinc/cp-kafka:7.9.1";
88

99
describe("KafkaContainer", { timeout: 240_000 }, () => {
10-
// connectBuiltInZK {
1110
it("should connect using in-built zoo-keeper", async () => {
12-
await using kafkaContainer = await new KafkaContainer(IMAGE).start();
13-
14-
await testPubSub(kafkaContainer);
11+
// connectBuiltInZK {
12+
await using container = await new KafkaContainer(IMAGE).start();
13+
await assertMessageProducedAndConsumed(container);
14+
// }
1515
});
16-
// }
1716

1817
it("should connect using in-built zoo-keeper and custom images", async () => {
19-
await using kafkaContainer = await new KafkaContainer(IMAGE).start();
18+
await using container = await new KafkaContainer(IMAGE).start();
2019

21-
await testPubSub(kafkaContainer);
20+
await assertMessageProducedAndConsumed(container);
2221
});
2322

2423
it("should connect using in-built zoo-keeper and custom network", async () => {
2524
await using network = await new Network().start();
2625

27-
await using kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start();
26+
await using container = await new KafkaContainer(IMAGE).withNetwork(network).start();
2827

29-
await testPubSub(kafkaContainer);
28+
await assertMessageProducedAndConsumed(container);
3029
});
3130

32-
// connectProvidedZK {
3331
it("should connect using provided zoo-keeper and network", async () => {
32+
// connectProvidedZK {
3433
await using network = await new Network().start();
3534

3635
const zooKeeperHost = "zookeeper";
3736
const zooKeeperPort = 2181;
37+
3838
await using _ = await new GenericContainer("confluentinc/cp-zookeeper:5.5.4")
3939
.withNetwork(network)
4040
.withNetworkAliases(zooKeeperHost)
4141
.withEnvironment({ ZOOKEEPER_CLIENT_PORT: zooKeeperPort.toString() })
4242
.withExposedPorts(zooKeeperPort)
4343
.start();
4444

45-
await using kafkaContainer = await new KafkaContainer(IMAGE)
45+
await using container = await new KafkaContainer(IMAGE)
4646
.withNetwork(network)
4747
.withZooKeeper(zooKeeperHost, zooKeeperPort)
4848
.start();
49+
// }
4950

50-
await testPubSub(kafkaContainer);
51+
await assertMessageProducedAndConsumed(container);
5152
});
52-
// }
5353

5454
it("should be reusable", async () => {
55-
await using originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
56-
const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
55+
await using container1 = await new KafkaContainer(IMAGE).withReuse().start();
56+
const container2 = await new KafkaContainer(IMAGE).withReuse().start();
5757

58-
expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId());
58+
expect(container2.getId()).toBe(container1.getId());
5959
});
6060

61-
describe.each([
62-
{
63-
name: "and zookpeer enabled",
64-
configure: () => ({}),
65-
},
66-
{
67-
name: "and kraft enabled",
68-
configure: (kafkaContainer: KafkaContainer) => kafkaContainer.withKraft(),
69-
},
70-
])("when SASL SSL config listener provided $name", ({ configure }) => {
61+
describe("when SASL SSL config listener provided with Kraft", () => {
7162
const certificatesDir = path.resolve(__dirname, "..", "test-certs");
7263

73-
// ssl {
74-
it(`should connect locally`, async () => {
75-
const kafkaContainer = await new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener({
76-
port: 9096,
77-
sasl: {
78-
mechanism: "SCRAM-SHA-512",
79-
user: {
80-
name: "app-user",
81-
password: "userPassword",
64+
it(`should connect locally with ZK`, async () => {
65+
// kafkaSsl {
66+
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.5.0")
67+
.withSaslSslListener({
68+
port: 9096,
69+
sasl: {
70+
mechanism: "SCRAM-SHA-512",
71+
user: {
72+
name: "app-user",
73+
password: "userPassword",
74+
},
8275
},
76+
keystore: {
77+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
78+
passphrase: "serverKeystorePassword",
79+
},
80+
truststore: {
81+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
82+
passphrase: "serverTruststorePassword",
83+
},
84+
})
85+
.start();
86+
87+
await assertMessageProducedAndConsumed(container, {
88+
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
89+
sasl: {
90+
username: "app-user",
91+
password: "userPassword",
92+
mechanism: "scram-sha-512",
8393
},
84-
keystore: {
85-
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
86-
passphrase: "serverKeystorePassword",
87-
},
88-
truststore: {
89-
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
90-
passphrase: "serverTruststorePassword",
94+
ssl: {
95+
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
9196
},
9297
});
93-
configure(kafkaContainer);
94-
await using startedKafkaContainer = await kafkaContainer.start();
98+
// }
99+
});
100+
101+
it(`should connect locally with Kraft`, async () => {
102+
await using container = await new KafkaContainer("confluentinc/cp-kafka:7.5.0")
103+
.withKraft()
104+
.withSaslSslListener({
105+
port: 9096,
106+
sasl: {
107+
mechanism: "SCRAM-SHA-512",
108+
user: {
109+
name: "app-user",
110+
password: "userPassword",
111+
},
112+
},
113+
keystore: {
114+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
115+
passphrase: "serverKeystorePassword",
116+
},
117+
truststore: {
118+
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
119+
passphrase: "serverTruststorePassword",
120+
},
121+
})
122+
.start();
95123

96-
await testPubSub(startedKafkaContainer, {
97-
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
124+
await assertMessageProducedAndConsumed(container, {
125+
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
98126
sasl: {
99127
username: "app-user",
100128
password: "userPassword",
@@ -105,7 +133,6 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
105133
},
106134
});
107135
});
108-
// }
109136

110137
it(`should connect within Docker network`, async () => {
111138
await using network = await new Network().start();
@@ -171,13 +198,13 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
171198
});
172199
});
173200

174-
// connectKraft {
175201
it("should connect using kraft", async () => {
176-
await using kafkaContainer = await new KafkaContainer(IMAGE).withKraft().start();
202+
// connectKraft {
203+
await using container = await new KafkaContainer(IMAGE).withKraft().start();
204+
// }
177205

178-
await testPubSub(kafkaContainer);
206+
await assertMessageProducedAndConsumed(container);
179207
});
180-
// }
181208

182209
it("should throw an error when using kraft and and confluence platfom below 7.0.0", async () => {
183210
expect(() => new KafkaContainer("confluentinc/cp-kafka:6.2.14").withKraft()).toThrow(
@@ -187,9 +214,9 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
187214

188215
it("should connect using kraft and custom network", async () => {
189216
await using network = await new Network().start();
190-
await using kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withNetwork(network).start();
217+
await using container = await new KafkaContainer(IMAGE).withKraft().withNetwork(network).start();
191218

192-
await testPubSub(kafkaContainer);
219+
await assertMessageProducedAndConsumed(container);
193220
});
194221

195222
it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => {

packages/modules/kafka/src/kafka-container-latest.test.ts

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,36 @@ import path from "path";
33
import { GenericContainer, Network } from "testcontainers";
44
import { getImage } from "../../../testcontainers/src/utils/test-helper";
55
import { KafkaContainer, SaslSslListenerOptions } from "./kafka-container";
6-
import { testPubSub } from "./test-helper";
6+
import { assertMessageProducedAndConsumed } from "./test-helper";
77

88
const IMAGE = getImage(__dirname);
99

1010
describe("KafkaContainer", { timeout: 240_000 }, () => {
1111
const certificatesDir = path.resolve(__dirname, "..", "test-certs");
1212

13-
// connectKafkaLatest {
1413
it("should connect", async () => {
15-
await using kafkaContainer = await new KafkaContainer(IMAGE).start();
16-
17-
await testPubSub(kafkaContainer);
14+
// kafkaLatestConnect {
15+
await using container = await new KafkaContainer(IMAGE).start();
16+
await assertMessageProducedAndConsumed(container);
17+
// }
1818
});
19-
// }
2019

2120
it("should connect with custom network", async () => {
2221
await using network = await new Network().start();
23-
await using kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start();
22+
await using container = await new KafkaContainer(IMAGE).withNetwork(network).start();
2423

25-
await testPubSub(kafkaContainer);
24+
await assertMessageProducedAndConsumed(container);
2625
});
2726

2827
it("should be reusable", async () => {
29-
await using originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
30-
const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
28+
await using container1 = await new KafkaContainer(IMAGE).withReuse().start();
29+
const container2 = await new KafkaContainer(IMAGE).withReuse().start();
3130

32-
expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId());
31+
expect(container2.getId()).toBe(container1.getId());
3332
});
3433

35-
// ssl {
3634
it(`should connect with SASL`, async () => {
35+
// kafkaLatestSsl {
3736
const saslConfig: SaslSslListenerOptions = {
3837
port: 9096,
3938
sasl: {
@@ -53,11 +52,10 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
5352
},
5453
};
5554

56-
const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener(saslConfig);
57-
await using startedKafkaContainer = await kafkaContainer.start();
55+
await using container = await new KafkaContainer(IMAGE).withSaslSslListener(saslConfig).start();
5856

59-
await testPubSub(startedKafkaContainer, {
60-
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
57+
await assertMessageProducedAndConsumed(container, {
58+
brokers: [`${container.getHost()}:${container.getMappedPort(9096)}`],
6159
sasl: {
6260
username: "app-user",
6361
password: "userPassword",
@@ -67,8 +65,8 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
6765
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
6866
},
6967
});
68+
// }
7069
});
71-
// }
7270

7371
it(`should connect with SASL in custom network`, async () => {
7472
await using network = await new Network().start();

0 commit comments

Comments
 (0)