diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index c8cd48a57..bdadf1f1b 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -8,20 +8,34 @@ npm install @testcontainers/kafka --save-dev ``` -## Examples +## Kafka 8.x + +### Examples + + +[Connect to Kafka:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:connectKafkaLatest + + + +[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-latest.test.ts) inside_block:ssl + + +## Kafka 7.x + +### Examples -[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectBuiltInZK +[Connect to Kafka using in-built ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectBuiltInZK -[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectProvidedZK +[Connect to Kafka using your own ZooKeeper:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectProvidedZK -[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:ssl +[Connect to Kafka using SSL:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:ssl -[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container.test.ts) inside_block:connectKraft +[Connect to Kafka using Kraft:](../../packages/modules/kafka/src/kafka-container-7.test.ts) inside_block:connectKraft diff --git a/package-lock.json b/package-lock.json index be58cebb2..d79d8b762 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5308,10 +5308,11 @@ } }, "node_modules/@npmcli/arborist/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -5419,10 +5420,11 @@ } }, "node_modules/@npmcli/map-workspaces/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -5528,10 +5530,11 @@ } }, "node_modules/@npmcli/package-json/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -7277,10 +7280,11 @@ } }, "node_modules/@tufjs/models/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -8003,9 +8007,9 @@ } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -8603,9 +8607,10 @@ } }, "node_modules/archiver-utils/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -9211,10 +9216,11 @@ "license": "MIT" }, "node_modules/brace-expansion": { - "version": "1.1.11", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", - "integrity": "sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==", + "version": "1.1.12", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.12.tgz", + "integrity": "sha512-9T9UjW3r0UW5c1Q7GTwllptXwhvYmEzFhzMfZ9H7FQWt+uZePjZPjBP/W1ZEyZ1twGWom5/56TF4lPcqjnDHcg==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -9389,9 +9395,9 @@ } }, "node_modules/cacache/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -10212,6 +10218,12 @@ "integrity": "sha512-L3sHRo1pXXEqX8VU28kfgUY+YGsk09hPqZiZmLacNib6XNTCM8ubYeT7ryXQw8asB1sKgcU5lkB7ONug08aB8w==", "dev": true }, + "node_modules/compare-versions": { + "version": "6.1.1", + "resolved": "https://registry.npmjs.org/compare-versions/-/compare-versions-6.1.1.tgz", + "integrity": "sha512-4hm4VPpIecmlg59CHXnRDnqGplJFrbLG4aFEl5vl6cK1u76ws3LLvX7ikFnTDl5vo39sjWD6AaDPYodJp/NNHg==", + "license": "MIT" + }, "node_modules/component-emitter": { "version": "1.3.1", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.3.1.tgz", @@ -12968,10 +12980,11 @@ } }, "node_modules/ignore-walk/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -14794,10 +14807,11 @@ } }, "node_modules/make-fetch-happen/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -17662,9 +17676,9 @@ } }, "node_modules/read-package-json/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -17730,9 +17744,10 @@ } }, "node_modules/readdir-glob/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", + "license": "MIT", "dependencies": { "balanced-match": "^1.0.0" } @@ -19640,9 +19655,9 @@ } }, "node_modules/test-exclude/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.2.tgz", + "integrity": "sha512-Jt0vHyM+jmUBqojB7E1NIYadt0vI0Qxjxd2TErW94wDz+E2LAm5vKMXXwg6ZZBTHPuUlDgQHKXvjGBdfcF1ZDQ==", "dev": true, "license": "MIT", "dependencies": { @@ -21571,6 +21586,7 @@ "version": "11.1.0", "license": "MIT", "dependencies": { + "compare-versions": "^6.1.1", "testcontainers": "^11.1.0" }, "devDependencies": { diff --git a/packages/modules/kafka/Dockerfile b/packages/modules/kafka/Dockerfile index 8afe540be..f80590ea7 100644 --- a/packages/modules/kafka/Dockerfile +++ b/packages/modules/kafka/Dockerfile @@ -1 +1 @@ -FROM confluentinc/cp-kafka:7.9.1 +FROM confluentinc/cp-kafka:8.0.0 diff --git a/packages/modules/kafka/package.json b/packages/modules/kafka/package.json index 9b2b39823..88647bbc4 100644 --- a/packages/modules/kafka/package.json +++ b/packages/modules/kafka/package.json @@ -32,6 +32,7 @@ "kafkajs": "^2.2.4" }, "dependencies": { + "compare-versions": "^6.1.1", "testcontainers": "^11.1.0" } } diff --git a/packages/modules/kafka/src/kafka-container.test.ts b/packages/modules/kafka/src/kafka-container-7.test.ts similarity index 76% rename from packages/modules/kafka/src/kafka-container.test.ts rename to packages/modules/kafka/src/kafka-container-7.test.ts index 658b6c1d4..e4df2f51a 100644 --- a/packages/modules/kafka/src/kafka-container.test.ts +++ b/packages/modules/kafka/src/kafka-container-7.test.ts @@ -1,16 +1,15 @@ import fs from "fs"; -import { Kafka, KafkaConfig, logLevel } from "kafkajs"; import path from "path"; -import { GenericContainer, Network, StartedTestContainer } from "testcontainers"; -import { getImage } from "../../../testcontainers/src/utils/test-helper"; +import { GenericContainer, Network } from "testcontainers"; import { KafkaContainer } from "./kafka-container"; +import { testPubSub } from "./test-helper"; -const IMAGE = getImage(__dirname); +const IMAGE = "confluentinc/cp-kafka:7.9.1"; describe("KafkaContainer", { timeout: 240_000 }, () => { // connectBuiltInZK { it("should connect using in-built zoo-keeper", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).start(); await testPubSub(kafkaContainer); @@ -19,7 +18,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { // } it("should connect using in-built zoo-keeper and custom images", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).start(); await testPubSub(kafkaContainer); @@ -29,7 +28,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { it("should connect using in-built zoo-keeper and custom network", async () => { const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start(); await testPubSub(kafkaContainer); @@ -53,7 +52,6 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { const kafkaContainer = await new KafkaContainer(IMAGE) .withNetwork(network) .withZooKeeper(zooKeeperHost, zooKeeperPort) - .withExposedPorts(9093) .start(); await testPubSub(kafkaContainer); @@ -192,7 +190,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { // connectKraft { it("should connect using kraft", async () => { - const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withExposedPorts(9093).start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().start(); await testPubSub(kafkaContainer); @@ -208,11 +206,7 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { it("should connect using kraft and custom network", async () => { const network = await new Network().start(); - const kafkaContainer = await new KafkaContainer(IMAGE) - .withKraft() - .withNetwork(network) - .withExposedPorts(9093) - .start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withKraft().withNetwork(network).start(); await testPubSub(kafkaContainer); @@ -221,61 +215,26 @@ describe("KafkaContainer", { timeout: 240_000 }, () => { }); it("should throw an error when using kraft wit sasl and confluence platfom below 7.5.0", async () => { - const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0") - .withKraft() - .withExposedPorts(9093) - .withSaslSslListener({ - port: 9094, - sasl: { - mechanism: "SCRAM-SHA-512", - user: { - name: "app-user", - password: "userPassword", - }, - }, - keystore: { - content: "fake", - passphrase: "serverKeystorePassword", - }, - truststore: { - content: "fake", - passphrase: "serverTruststorePassword", + const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.4.0").withKraft().withSaslSslListener({ + port: 9094, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", }, - }); + }, + keystore: { + content: "fake", + passphrase: "serverKeystorePassword", + }, + truststore: { + content: "fake", + passphrase: "serverTruststorePassword", + }, + }); await expect(() => kafkaContainer.start()).rejects.toThrow( "Provided Confluent Platform's version 7.4.0 is not supported in Kraft mode with sasl (must be 7.5.0 or above)" ); }); - - const testPubSub = async (kafkaContainer: StartedTestContainer, additionalConfig: Partial = {}) => { - const kafka = new Kafka({ - logLevel: logLevel.NOTHING, - brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`], - ...additionalConfig, - }); - - const producer = kafka.producer(); - await producer.connect(); - - const consumer = kafka.consumer({ groupId: "test-group" }); - await consumer.connect(); - - await producer.send({ - topic: "test-topic", - messages: [{ value: "test message" }], - }); - - await consumer.subscribe({ topic: "test-topic", fromBeginning: true }); - - const consumedMessage = await new Promise((resolve) => { - consumer.run({ - eachMessage: async ({ message }) => resolve(message.value?.toString()), - }); - }); - - expect(consumedMessage).toBe("test message"); - - await consumer.disconnect(); - await producer.disconnect(); - }; }); diff --git a/packages/modules/kafka/src/kafka-container-latest.test.ts b/packages/modules/kafka/src/kafka-container-latest.test.ts new file mode 100644 index 000000000..96a7fb041 --- /dev/null +++ b/packages/modules/kafka/src/kafka-container-latest.test.ts @@ -0,0 +1,148 @@ +import fs from "fs"; +import path from "path"; +import { GenericContainer, Network } from "testcontainers"; +import { getImage } from "../../../testcontainers/src/utils/test-helper"; +import { KafkaContainer, SaslSslListenerOptions } from "./kafka-container"; +import { testPubSub } from "./test-helper"; + +const IMAGE = getImage(__dirname); + +describe("KafkaContainer", { timeout: 240_000 }, () => { + const certificatesDir = path.resolve(__dirname, "..", "test-certs"); + + // connectKafkaLatest { + it("should connect", async () => { + const kafkaContainer = await new KafkaContainer(IMAGE).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + }); + // } + + it("should connect with custom network", async () => { + const network = await new Network().start(); + const kafkaContainer = await new KafkaContainer(IMAGE).withNetwork(network).start(); + + await testPubSub(kafkaContainer); + + await kafkaContainer.stop(); + await network.stop(); + }); + + it("should be reusable", async () => { + const originalKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + const newKafkaContainer = await new KafkaContainer(IMAGE).withReuse().start(); + + expect(newKafkaContainer.getId()).toBe(originalKafkaContainer.getId()); + + await originalKafkaContainer.stop(); + }); + + // ssl { + it(`should connect with SASL`, async () => { + const saslConfig: SaslSslListenerOptions = { + port: 9096, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }; + + const kafkaContainer = new KafkaContainer("confluentinc/cp-kafka:7.5.0").withSaslSslListener(saslConfig); + const startedKafkaContainer = await kafkaContainer.start(); + + await testPubSub(startedKafkaContainer, { + brokers: [`${startedKafkaContainer.getHost()}:${startedKafkaContainer.getMappedPort(9096)}`], + sasl: { + username: "app-user", + password: "userPassword", + mechanism: "scram-sha-512", + }, + ssl: { + ca: [fs.readFileSync(path.resolve(certificatesDir, "kafka.client.truststore.pem"))], + }, + }); + await startedKafkaContainer.stop(); + }); + // } + + it(`should connect with SASL in custom network`, async () => { + const network = await new Network().start(); + + const saslConfig: SaslSslListenerOptions = { + port: 9096, + sasl: { + mechanism: "SCRAM-SHA-512", + user: { + name: "app-user", + password: "userPassword", + }, + }, + keystore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.keystore.pfx")), + passphrase: "serverKeystorePassword", + }, + truststore: { + content: fs.readFileSync(path.resolve(certificatesDir, "kafka.server.truststore.pfx")), + passphrase: "serverTruststorePassword", + }, + }; + + const kafkaContainer = await new KafkaContainer(IMAGE) + .withNetwork(network) + .withNetworkAliases("kafka") + .withSaslSslListener(saslConfig) + .start(); + + const kafkaCliContainer = await new GenericContainer(IMAGE) + .withNetwork(network) + .withCommand(["bash", "-c", "sleep infinity"]) + .withCopyFilesToContainer([ + { + source: path.resolve(certificatesDir, "kafka.client.truststore.pem"), + target: "/truststore.pem", + }, + ]) + .withCopyContentToContainer([ + { + content: ` + security.protocol=SASL_SSL + ssl.truststore.location=/truststore.pem + ssl.truststore.type=PEM + ssl.endpoint.identification.algorithm= + sasl.mechanism=SCRAM-SHA-512 + sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \\ + username="app-user" \\ + password="userPassword"; + `, + target: "/etc/kafka/consumer.properties", + }, + ]) + .start(); + + await kafkaCliContainer.exec( + "kafka-topics --create --topic test-topic --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties" + ); + const { output, exitCode } = await kafkaCliContainer.exec( + "kafka-topics --list --bootstrap-server kafka:9096 --command-config /etc/kafka/consumer.properties" + ); + + expect(exitCode).toBe(0); + expect(output).toContain("test-topic"); + + await kafkaCliContainer.stop(); + await kafkaContainer.stop(); + }); +}); diff --git a/packages/modules/kafka/src/kafka-container.ts b/packages/modules/kafka/src/kafka-container.ts index 8b711e70e..0cbfa98b8 100644 --- a/packages/modules/kafka/src/kafka-container.ts +++ b/packages/modules/kafka/src/kafka-container.ts @@ -1,3 +1,4 @@ +import { satisfies } from "compare-versions"; import { AbstractStartedContainer, BoundPorts, @@ -25,7 +26,7 @@ const WAIT_FOR_SCRIPT_MESSAGE = "Waiting for script..."; const MIN_KRAFT_VERSION = "7.0.0"; const MIN_KRAFT_SASL_VERSION = "7.5.0"; -interface SaslSslListenerOptions { +export interface SaslSslListenerOptions { sasl: SaslOptions; port: number; keystore: PKCS12CertificateStore; @@ -64,6 +65,11 @@ export class KafkaContainer extends GenericContainer { constructor(image: string) { super(image); + + if (satisfies(this.imageName.tag, ">=8.0.0")) { + this.withKraft(); + } + this.withExposedPorts(KAFKA_PORT).withStartupTimeout(180_000).withEnvironment({ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT", KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER", @@ -142,7 +148,11 @@ export class KafkaContainer extends GenericContainer { } public override async start(): Promise { - if (this.mode === KafkaMode.KRAFT && this.saslSslConfig && this.isLessThanCP(7, 5)) { + if ( + this.mode === KafkaMode.KRAFT && + this.saslSslConfig && + satisfies(this.imageName.tag, `<${MIN_KRAFT_SASL_VERSION}`) + ) { throw new Error( `Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode with sasl (must be ${MIN_KRAFT_SASL_VERSION} or above)` ); @@ -159,7 +169,7 @@ export class KafkaContainer extends GenericContainer { // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname command += `export KAFKA_ADVERTISED_LISTENERS=${advertisedListeners}\n`; - if (this.mode !== KafkaMode.KRAFT || this.isLessThanCP(7, 4)) { + if (this.mode !== KafkaMode.KRAFT || satisfies(this.imageName.tag, "<7.4.0")) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; } @@ -167,7 +177,7 @@ export class KafkaContainer extends GenericContainer { if (this.saslSslConfig) { command += this.commandKraftCreateUser(this.saslSslConfig); } - if (this.isLessThanCP(7, 4)) { + if (satisfies(this.imageName.tag, "<7.4.0")) { command += this.commandKraft(); } } else if (this.mode === KafkaMode.EMBEDDED_ZOOKEEPER) { @@ -261,26 +271,13 @@ export class KafkaContainer extends GenericContainer { } private verifyMinKraftVersion() { - if (this.isLessThanCP(7)) { + if (satisfies(this.imageName.tag, `<${MIN_KRAFT_VERSION}`)) { throw new Error( `Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode (must be ${MIN_KRAFT_VERSION} or above)` ); } } - private isLessThanCP(max: number, min = 0, patch = 0): boolean { - if (this.imageName.tag === "latest") { - return false; - } - const parts = this.imageName.tag.split("."); - return !( - parts.length > 2 && - (Number(parts[0]) > max || - (Number(parts[0]) === max && - (Number(parts[1]) > min || (Number(parts[1]) === min && Number(parts[2]) >= patch)))) - ); - } - private commandKraftCreateUser(saslOptions: SaslSslListenerOptions): string { return ( "echo 'kafka-storage format --ignore-formatted " + diff --git a/packages/modules/kafka/src/test-helper.ts b/packages/modules/kafka/src/test-helper.ts new file mode 100644 index 000000000..f3232d29e --- /dev/null +++ b/packages/modules/kafka/src/test-helper.ts @@ -0,0 +1,34 @@ +import { Kafka, KafkaConfig, logLevel } from "kafkajs"; +import { StartedTestContainer } from "testcontainers"; + +export async function testPubSub(kafkaContainer: StartedTestContainer, additionalConfig: Partial = {}) { + const kafka = new Kafka({ + logLevel: logLevel.NOTHING, + brokers: [`${kafkaContainer.getHost()}:${kafkaContainer.getMappedPort(9093)}`], + ...additionalConfig, + }); + + const producer = kafka.producer(); + await producer.connect(); + + const consumer = kafka.consumer({ groupId: "test-group" }); + await consumer.connect(); + + await producer.send({ + topic: "test-topic", + messages: [{ value: "test message" }], + }); + + await consumer.subscribe({ topic: "test-topic", fromBeginning: true }); + + const consumedMessage = await new Promise((resolve) => { + consumer.run({ + eachMessage: async ({ message }) => resolve(message.value?.toString()), + }); + }); + + expect(consumedMessage).toBe("test message"); + + await consumer.disconnect(); + await producer.disconnect(); +} diff --git a/packages/modules/kafka/tsconfig.build.json b/packages/modules/kafka/tsconfig.build.json index ff7390b10..ee782d3d0 100644 --- a/packages/modules/kafka/tsconfig.build.json +++ b/packages/modules/kafka/tsconfig.build.json @@ -2,7 +2,8 @@ "extends": "./tsconfig.json", "exclude": [ "build", - "src/**/*.test.ts" + "src/**/*.test.ts", + "src/test-helper.ts" ], "references": [ {