Skip to content

Commit 1b6d4bf

Browse files
committed
Update examples for kafkaJs: block
1 parent 25eec75 commit 1b6d4bf

File tree

3 files changed

+113
-105
lines changed

3 files changed

+113
-105
lines changed

examples/kafkajs/consumer.js

Lines changed: 85 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
1-
const { Kafka, logLevel } = require('../..').KafkaJS;
1+
const { Kafka } = require('../..').KafkaJS
22
//const { Kafka } = require('kafkajs')
33

44
async function consumerStart() {
5-
let consumer;
6-
var stopped = false;
5+
let consumer;
6+
var stopped = false;
77

8-
const kafka = new Kafka({
9-
brokers: ['localhost:9092'],
10-
});
8+
const kafka = new Kafka({
9+
kafkaJs: {
10+
brokers: ['localhost:9092'],
11+
ssl: true,
12+
connectionTimeout: 5000,
13+
sasl: {
14+
mechanism: 'plain',
15+
username: '<fill>',
16+
password: '<fill>',
17+
},
18+
}
19+
});
1120

12-
consumer = kafka.consumer({
13-
groupId: 'test-group22' + Math.random(),
21+
consumer = kafka.consumer({
22+
kafkaJs: {
23+
groupId: 'test-group',
24+
autoCommit: false,
1425
rebalanceListener: {
1526
onPartitionsAssigned: async (assignment) => {
1627
console.log(`Assigned partitions ${JSON.stringify(assignment)}`);
@@ -24,95 +35,90 @@ async function consumerStart() {
2435
}
2536
}
2637
},
27-
rdKafka: {
28-
globalConfig: {
29-
'debug': 'conf',
30-
'enable.auto.commit': false,
31-
'auto.offset.reset': 'error'
32-
},
33-
topicConfig: {
34-
},
35-
}
36-
});
38+
},
39+
40+
/* Properties from librdkafka can also be used */
41+
'auto.commit.interval.ms': 6000,
42+
});
43+
44+
await consumer.connect();
45+
console.log("Connected successfully");
3746

38-
await consumer.connect();
39-
console.log("Connected successfully");
47+
await consumer.subscribe({
48+
topics: [
49+
"topic2"
50+
]
51+
})
4052

41-
await consumer.subscribe({
42-
topics: [
43-
"test-topic"
44-
]
45-
})
53+
// Batch consumer, commit and seek example
54+
var batch = 0;
55+
consumer.run({
56+
eachMessage: async ({ topic, partition, message }) => {
57+
console.log({
58+
topic,
59+
partition,
60+
offset: message.offset,
61+
key: message.key?.toString(),
62+
value: message.value.toString(),
63+
})
4664

47-
// Batch consumer, commit and seek example
48-
var batch = 0;
49-
consumer.run({
50-
eachMessage: async ({ topic, partition, message }) => {
51-
console.log({
65+
if (++batch % 100 == 0) {
66+
await consumer.seek({
5267
topic,
5368
partition,
54-
offset: message.offset,
55-
key: message.key?.toString(),
56-
value: message.value.toString(),
57-
})
58-
59-
if (++batch % 100 == 0) {
60-
await consumer.seek({
61-
topic,
62-
partition,
63-
offset: -2
64-
});
65-
await consumer.commitOffsets();
66-
batch = 0;
67-
}
68-
},
69-
});
69+
offset: -2
70+
});
71+
await consumer.commitOffsets();
72+
batch = 0;
73+
}
74+
},
75+
});
7076

71-
// Pause/Resume example
72-
const pauseResumeLoop = async () => {
73-
let paused = false;
74-
let ticks = 0;
75-
while (!stopped) {
76-
await new Promise((resolve) => setTimeout(resolve, 100));
77-
if (stopped)
78-
break;
77+
// Pause/Resume example
78+
const pauseResumeLoop = async () => {
79+
let paused = false;
80+
let ticks = 0;
81+
while (!stopped) {
82+
await new Promise((resolve) => setTimeout(resolve, 100));
83+
if (stopped)
84+
break;
7985

80-
ticks++;
81-
if (ticks == 200) {
82-
ticks = 0;
83-
const assignment = consumer.assignment();
84-
if (paused) {
85-
console.log(`Resuming partitions ${JSON.stringify(assignment)}`)
86-
consumer.resume(assignment);
87-
} else {
88-
console.log(`Pausing partitions ${JSON.stringify(assignment)}`);
89-
consumer.pause(assignment);
90-
}
91-
paused = !paused;
86+
ticks++;
87+
if (ticks == 200) {
88+
ticks = 0;
89+
const assignment = consumer.assignment();
90+
if (paused) {
91+
console.log(`Resuming partitions ${JSON.stringify(assignment)}`)
92+
consumer.resume(assignment);
93+
} else {
94+
console.log(`Pausing partitions ${JSON.stringify(assignment)}`);
95+
consumer.pause(assignment);
9296
}
97+
paused = !paused;
9398
}
9499
}
100+
}
95101

96-
if (consumer.assignment) {
97-
// KafkaJS doesn't have assignment()
98-
pauseResumeLoop()
99-
}
102+
if (consumer.assignment()) {
103+
// KafkaJS doesn't have assignment()
104+
pauseResumeLoop()
105+
}
100106

101-
// Disconnect example
102-
const disconnect = () => {
103-
process.off('SIGINT', disconnect);
104-
process.off('SIGTERM', disconnect);
105-
stopped = true;
106-
consumer.commitOffsets()
107+
// Disconnect example
108+
const disconnect = () => {
109+
process.off('SIGINT', disconnect);
110+
process.off('SIGTERM', disconnect);
111+
stopped = true;
112+
consumer.commitOffsets()
107113
.finally(() =>
108114
consumer.disconnect()
109115
)
110116
.finally(() =>
111117
console.log("Disconnected successfully")
112118
);
113-
}
114-
process.on('SIGINT', disconnect);
115-
process.on('SIGTERM', disconnect);
119+
}
120+
process.on('SIGINT', disconnect);
121+
process.on('SIGTERM', disconnect);
116122
}
117123

118124
consumerStart()

examples/kafkajs/eos.js

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,28 @@ const { Kafka } = require('../..').KafkaJS
33

44
async function eosStart() {
55
const kafka = new Kafka({
6-
brokers: ['<fill>'],
7-
ssl: true,
8-
sasl: {
9-
mechanism: 'plain',
10-
username: '<fill>',
11-
password: '<fill>',
6+
kafkaJs: {
7+
brokers: ['<fill>'],
8+
ssl: true,
9+
sasl: {
10+
mechanism: 'plain',
11+
username: '<fill>',
12+
password: '<fill>',
13+
}
1214
}
1315
});
1416

1517
const consumer = kafka.consumer({
16-
groupId: 'groupId',
17-
rdKafka: {
18-
globalConfig: {
19-
"enable.auto.commit": false,
20-
}
21-
},
18+
kafkaJs: {
19+
groupId: 'groupId',
20+
autoCommit: false,
21+
}
2222
});
2323

2424
const producer = kafka.producer({
25-
transactionalId: 'txid'
25+
kafkaJs: {
26+
transactionalId: 'txid'
27+
}
2628
});
2729

2830
await consumer.connect();
@@ -66,9 +68,7 @@ async function eosStart() {
6668
{
6769
topic,
6870
partitions: [
69-
/* The message.offset indicates current offset, so we need to add 1 to it, since committed offset denotes
70-
* the next offset to consume. */
71-
{ partition, offset: message.offset + 1 },
71+
{ partition, offset: message.offset },
7272
],
7373
}
7474
],

examples/kafkajs/producer.js

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ const { Kafka } = require('../..').KafkaJS
33

44
async function producerStart() {
55
const kafka = new Kafka({
6-
brokers: ['<fill>'],
7-
ssl: true,
8-
sasl: {
9-
mechanism: 'plain',
10-
username: '<fill>',
11-
password: '<fill>',
6+
kafkaJs: {
7+
brokers: ['<fill>'],
8+
ssl: true,
9+
sasl: {
10+
mechanism: 'plain',
11+
username: '<fill>',
12+
password: '<fill>',
13+
},
1214
}
1315
});
1416

@@ -19,16 +21,16 @@ async function producerStart() {
1921
console.log("Connected successfully");
2022

2123
const res = []
22-
for(let i = 0; i < 50; i++) {
24+
for (let i = 0; i < 50; i++) {
2325
res.push(producer.send({
2426
topic: 'topic2',
2527
messages: [
26-
{value: 'v222', partition: 0},
27-
{value: 'v11', partition: 0, key: 'x'},
28+
{ value: 'v222', partition: 0 },
29+
{ value: 'v11', partition: 0, key: 'x' },
2830
]
2931
}));
3032
}
31-
await Promise.allSettled(res);
33+
await Promise.all(res);
3234

3335
await producer.disconnect();
3436

0 commit comments

Comments
 (0)