Skip to content

Commit 5d05a53

Browse files
authored
Confluentinc kafka javascript (#279)
* Adding @confluentinc/kafka-javascript * Trying to add test * Fixing threads issue * Minor change * Adding kafka-ui * Fixing name in kafka-ui * Fixing test * Adding kafka ci for node 20 * Simplifying CI action * Adding back platformatic * Kafka requires node 22.14 * Separating tests * Adding Kafka CI back * Typo fix * Removing kaka CI for node 24 for now
1 parent f5ab683 commit 5d05a53

File tree

7 files changed

+123
-13
lines changed

7 files changed

+123
-13
lines changed

.github/workflows/ci.common.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
# We need to trigger the rebuild of @confluentinc/kafka-javascript to run scrips
2727
- name: Run @confluentic/kafka-javascript scripts
2828
if: ${{ inputs.package_name == '@message-queue-toolkit/kafka' }}
29-
run: npm rebuild rebuild @confluentinc/kafka-javascript
29+
run: npm rebuild @confluentinc/kafka-javascript
3030

3131
- name: Build TS
3232
run: npm run build -- --filter=${{ inputs.package_name }}

.github/workflows/ci.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ jobs:
2727
node_version: ${{ matrix.node-version }}
2828
package_name: ${{ matrix.package-name }}
2929

30+
kafka:
31+
strategy:
32+
matrix:
33+
node-version: [22.x]
34+
uses: kibertoad/message-queue-toolkit/.github/workflows/ci.common.yml@main
35+
with:
36+
package_name: '@message-queue-toolkit/kafka'
37+
node_version: ${{ matrix.node-version }}
38+
3039
automerge:
3140
needs: [ general ]
3241
runs-on: ubuntu-latest

docker-compose.yml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,33 @@ services:
4242
container_name: kafka
4343
ports:
4444
- 9092:9092
45+
environment:
46+
KAFKA_NODE_ID: 1
47+
KAFKA_PROCESS_ROLES: broker,controller
48+
KAFKA_LISTENERS: LOCAL://0.0.0.0:9092,DOCKER://kafka:9093,CONTROLLER://localhost:9094
49+
KAFKA_ADVERTISED_LISTENERS: LOCAL://localhost:9092,DOCKER://kafka:9093
50+
KAFKA_INTER_BROKER_LISTENER_NAME: LOCAL
51+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
52+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,LOCAL:PLAINTEXT,DOCKER:PLAINTEXT
53+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094
54+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
55+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
56+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
57+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
58+
KAFKA_NUM_PARTITIONS: 1
59+
restart: on-failure
60+
61+
kafka-ui:
62+
container_name: kafka-ui
63+
image: provectuslabs/kafka-ui:latest
64+
ports:
65+
- 8080:8080
66+
depends_on:
67+
- kafka
68+
environment:
69+
DYNAMIC_CONFIG_ENABLED: 'true'
70+
KAFKA_CLUSTERS_0_NAME: Local
71+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
4572
restart: on-failure
4673

4774
volumes:
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { randomUUID } from 'node:crypto'
2+
import { KafkaJS } from '@confluentinc/kafka-javascript'
3+
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
4+
import {} from '@platformatic/kafka'
5+
import { type TestContext, registerDependencies } from '../test/testContext.ts'
6+
7+
describe('Test confluentic-kafka', () => {
8+
let testContext: TestContext
9+
10+
beforeAll(async () => {
11+
testContext = await registerDependencies()
12+
})
13+
14+
afterAll(async () => {
15+
await testContext.dispose()
16+
})
17+
18+
it('should send and receive a message', async () => {
19+
// Given
20+
const clientId = randomUUID()
21+
const groupId = randomUUID()
22+
// Use a fresh, unique topic per run to avoid stale state
23+
const topic = `test-topic-${Date.now()}`
24+
const messageValue = 'My test message'
25+
26+
const kafka = new KafkaJS.Kafka({
27+
'client.id': clientId,
28+
'bootstrap.servers': testContext.cradle.kafkaConfig.brokers.join(','),
29+
})
30+
31+
// Topics can be created from producers, but as we will first connect a consumer, we need to create the topic first
32+
const admin = kafka.admin()
33+
await admin.connect()
34+
await admin.createTopics({
35+
topics: [{ topic }],
36+
})
37+
38+
const messages: string[] = []
39+
40+
const consumer = kafka.consumer({ 'group.id': groupId })
41+
await consumer.connect()
42+
await consumer.subscribe({ topic })
43+
44+
await consumer.run({
45+
eachMessage: ({ message }) => {
46+
const messageString = message.value?.toString()
47+
if (messageString) messages.push(messageString)
48+
return Promise.resolve()
49+
},
50+
})
51+
// Wait for the consumer to be assigned partitions
52+
await waitAndRetry(() => consumer.assignment().length > 0, 100, 10)
53+
54+
// When
55+
const producer = kafka.producer()
56+
await producer.connect()
57+
await producer.send({
58+
topic,
59+
messages: [{ value: messageValue }],
60+
})
61+
62+
// Then
63+
await waitAndRetry(() => messages.length > 0)
64+
65+
// Cleaning up before checks to avoid stale state
66+
await consumer.disconnect()
67+
await producer.disconnect()
68+
await admin.disconnect()
69+
70+
expect(messages).toHaveLength(1)
71+
expect(messages[0]).toEqual(messageValue)
72+
})
73+
})

packages/kafka/lib/test.spec.ts renamed to packages/kafka/lib/test-platformatic.spec.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
33
import {
44
Consumer,
55
type Message,
6+
ProduceAcks,
67
Producer,
78
stringDeserializers,
8-
stringSerializers,
99
} from '@platformatic/kafka'
10-
import { ProduceAcks } from '@platformatic/kafka'
11-
import { afterAll } from 'vitest'
10+
import { stringSerializers } from '@platformatic/kafka'
1211
import { type TestContext, registerDependencies } from '../test/testContext.ts'
1312

14-
// TODO: to be removed once we have proper tests
15-
describe('Test', () => {
13+
describe('Test platformatic-kafka', () => {
1614
let testContext: TestContext
1715

1816
beforeAll(async () => {
@@ -23,7 +21,7 @@ describe('Test', () => {
2321
await testContext.dispose()
2422
})
2523

26-
it('should send and receive a message', { timeout: 10000 }, async () => {
24+
it('should send and receive a message', async () => {
2725
// Given
2826
const clientId = randomUUID()
2927
// Use a fresh, unique topic per run to avoid stale state
@@ -62,12 +60,13 @@ describe('Test', () => {
6260
})
6361

6462
// Then
65-
await waitAndRetry(() => receivedMessages.length > 0, 10, 800)
66-
expect(receivedMessages).toHaveLength(1)
67-
expect(receivedMessages[0]?.value?.toString()).toBe(messageValue)
63+
await waitAndRetry(() => receivedMessages.length > 0)
6864

6965
// Cleanup
7066
producer.close()
7167
consumer.close()
68+
69+
expect(receivedMessages).toHaveLength(1)
70+
expect(receivedMessages[0]?.value?.toString()).toBe(messageValue)
7271
})
7372
})

packages/kafka/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
33
"version": "0.0.0",
4-
"engines": { "node": ">= 22.14.0" },
4+
"engines": {
5+
"node": ">= 22.14.0"
6+
},
57
"private": false,
68
"license": "MIT",
79
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
@@ -45,9 +47,10 @@
4547
"prepublishOnly": "npm run lint && npm run build"
4648
},
4749
"dependencies": {
50+
"@confluentinc/kafka-javascript": "^1.3.0",
4851
"@lokalise/node-core": "^14.0.1",
4952
"@lokalise/universal-ts-utils": "^4.4.1",
50-
"@platformatic/kafka": "^1.1.0",
53+
"@platformatic/kafka": "^1.2.0",
5154
"zod": "^3.25.7"
5255
},
5356
"peerDependencies": {

packages/kafka/vitest.config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ export default defineConfig({
66
globals: true,
77
watch: false,
88
restoreMocks: true,
9-
pool: 'threads',
109
coverage: {
1110
provider: 'v8',
1211
include: ['lib/**/*.ts'],

0 commit comments

Comments
 (0)