Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,34 @@
npm install @testcontainers/kafka --save-dev
```

## Examples
## Kafka 8.x

### Examples

<!--codeinclude-->
[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connect
<!--/codeinclude-->

<!--codeinclude-->
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:ssl
<!--/codeinclude-->

## Kafka 7.x

### Examples

<!--codeinclude-->
[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectBuiltInZK
[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK
<!--/codeinclude-->

<!--codeinclude-->
[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectProvidedZK
[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK
<!--/codeinclude-->

<!--codeinclude-->
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:ssl
[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:ssl
<!--/codeinclude-->

<!--codeinclude-->
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectKraft
[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft
<!--/codeinclude-->
2 changes: 1 addition & 1 deletion packages/modules/kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
FROM confluentinc/cp-kafka:7.9.1
FROM confluentinc/cp-kafka:8.0.0
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import fs from "fs";
import { Kafka, KafkaConfig, logLevel } from "kafkajs";
import path from "path";
import { GenericContainer, Network, StartedTestContainer } from "testcontainers";
import { getImage } from "../../../testcontainers/src/utils/test-helper";
import { GenericContainer, Network } from "testcontainers";
import { KafkaContainer } from "./kafka-container";
import { testPubSub } from "./test-helper";

const IMAGE = getImage(__dirname);
const IMAGE = "confluentinc/cp-kafka:7.9.1";

describe("KafkaContainer", { timeout: 240_000 }, () => {
// connectBuiltInZK {
Expand Down Expand Up @@ -246,36 +245,4 @@ describe("KafkaContainer", { timeout: 240_000 }, () => {
"Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)"
);
});

const testPubSub = async (kafkaContainer: StartedTestContainer, additionalConfig: Partial<KafkaConfig> = {}) => {
const kafka = new Kafka({
logLevel: logLevel.NOTHING,
brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`],
...additionalConfig,
});

const producer = kafka.producer();
await producer.connect();

const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();

await producer.send({
topic: "test-topic",
messages: [{ value: "test message" }],
});

await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

const consumedMessage = await new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
});
});

expect(consumedMessage).toBe("test message");

await consumer.disconnect();
await producer.disconnect();
};
});
148 changes: 148 additions & 0 deletions packages/modules/kafka/src/kafka-container-latest.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import fs from "fs";
import path from "path";
import { GenericContainer, Network } from "testcontainers";
import { getImage } from "../../../testcontainers/src/utils/test-helper";
import { KafkaContainer, SaslSslListenerOptions } from "./kafka-container";
import { testPubSub } from "./test-helper";

const IMAGE = getImage(__dirname);

describe("KafkaContainer", { timeout: 240_000 }, () => {
const certificatesDir = path.resolve(__dirname, "..", "test-certs");

// connect {
it("should connect", async () => {
const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start();

await testPubSub(kafkaContainer);

await kafkaContainer.stop();
});
// }

it("should connect with custom network", async () => {
const network = await new Network().start();
const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start();

await testPubSub(kafkaContainer);

await kafkaContainer.stop();
await network.stop();
});

it("should be reusable", async () => {
const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();
const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start();

expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId());

await originalKafkaContainer.stop();
});

// ssl {
it(`should connect with SASL`, async () => {
const saslConfig: SaslSslListenerOptions = {
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
};

const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener(saslConfig);
const startedKafkaContainer = await kafkaContainer.start();

await testPubSub(startedKafkaContainer, {
brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`],
sasl: {
username: "app-user",
password: "userPassword",
mechanism: "scram-sha-512",
},
ssl: {
ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))],
},
});
await startedKafkaContainer.stop();
});
// }

it(`should connect with SASL in custom network`, async () => {
const network = await new Network().start();

const saslConfig: SaslSslListenerOptions = {
port: 9096,
sasl: {
mechanism: "SCRAM-SHA-512",
user: {
name: "app-user",
password: "userPassword",
},
},
keystore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")),
passphrase: "serverKeystorePassword",
},
truststore: {
content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")),
passphrase: "serverTruststorePassword",
},
};

const kafkaContainer = await new KafkaContainer(IMAGE)
.withNetwork(network)
.withNetworkAliases("kafka")
.withSaslSslListener(saslConfig)
.start();

const kafkaCliContainer = await new GenericContainer(IMAGE)
.withNetwork(network)
.withCommand(["bash", "-c", "sleep infinity"])
.withCopyFilesToContainer([
{
source: path.resolve(certificatesDir, "kafka.client.truststore.pem"),
target: "/truststore.pem",
},
])
.withCopyContentToContainer([
{
content: `
security.protocol=SASL_SSL
ssl.truststore.location=/truststore.pem
ssl.truststore.type=PEM
ssl.endpoint.identification.algorithm=
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\
username="app-user" \\
password="userPassword";
`,
target: "/etc/kafka/consumer.properties",
},
])
.start();

await kafkaCliContainer.exec(
"kafka-topics --create --topic test-topic --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties"
);
const { output, exitCode } = await kafkaCliContainer.exec(
"kafka-topics --list --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties"
);

expect(exitCode).toBe(0);
expect(output).toContain("test-topic");

await kafkaCliContainer.stop();
await kafkaContainer.stop();
});
});
9 changes: 8 additions & 1 deletion packages/modules/kafka/src/kafka-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Content,
GenericContainer,
getContainerRuntimeClient,
ImageName,
InspectResult,
RandomUuid,
StartedTestContainer,
Expand All @@ -25,7 +26,7 @@ const WAIT_FOR_SCRIPT_MESSAGE = "Waiting for script...";
const MIN_KRAFT_VERSION = "7.0.0";
const MIN_KRAFT_SASL_VERSION = "7.5.0";

interface SaslSslListenerOptions {
export interface SaslSslListenerOptions {
sasl: SaslOptions;
port: number;
keystore: PKCS12CertificateStore;
Expand Down Expand Up @@ -64,6 +65,12 @@ export class KafkaContainer extends GenericContainer {

constructor(image: string) {
super(image);

const parsedImage = ImageName.fromString(image);
if (parsedImage.image === "confluentinc/cp-kafka" && parsedImage.tag.startsWith("8.")) {
this.withKraft();
}

this.withExposedPorts(KAFKA_PORT).withStartupTimeout(180_000).withEnvironment({
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER",
Expand Down
34 changes: 34 additions & 0 deletions packages/modules/kafka/src/test-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Kafka, KafkaConfig, logLevel } from "kafkajs";
import { StartedTestContainer } from "testcontainers";

export async function testPubSub(kafkaContainer: StartedTestContainer, additionalConfig: Partial<KafkaConfig> = {}) {
const kafka = new Kafka({
logLevel: logLevel.NOTHING,
brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`],
...additionalConfig,
});

const producer = kafka.producer();
await producer.connect();

const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();

await producer.send({
topic: "test-topic",
messages: [{ value: "test message" }],
});

await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

const consumedMessage = await new Promise((resolve) => {
consumer.run({
eachMessage: async ({ message }) => resolve(message.value?.toString()),
});
});

expect(consumedMessage).toBe("test message");

await consumer.disconnect();
await producer.disconnect();
}
3 changes: 2 additions & 1 deletion packages/modules/kafka/tsconfig.build.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "./tsconfig.json",
"exclude": [
"build",
"src/**/*.test.ts"
"src/**/*.test.ts",
"src/test-helper.ts"
],
"references": [
{
Expand Down