Skip to content

Commit f69c81b

Browse files
committed
Add transactional producer and EOS example
1 parent fd43067 commit f69c81b

File tree

5 files changed

+270
-21
lines changed

5 files changed

+270
-21
lines changed

examples/kafkajs/eos.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
const { Kafka } = require('../..').KafkaJS
2+
//const { Kafka } = require('kafkajs')
3+
4+
async function eosStart() {
5+
const kafka = new Kafka({
6+
brokers: ['<fill>'],
7+
ssl: true,
8+
sasl: {
9+
mechanism: 'plain',
10+
username: '<fill>',
11+
password: '<fill>',
12+
}
13+
});
14+
15+
const consumer = kafka.consumer({
16+
groupId: 'groupId',
17+
rdKafka: {
18+
"enable.auto.commit": false,
19+
},
20+
});
21+
22+
const producer = kafka.producer({
23+
transactionalId: 'txid'
24+
});
25+
26+
await consumer.connect();
27+
await producer.connect();
28+
29+
await consumer.subscribe({
30+
topics: ["consumeTopic"]
31+
});
32+
33+
// Similar to https://github.com/tulios/kafkajs/issues/1221
34+
// The run method acts like a consume-transform-produce loop.
35+
consumer.run({
36+
eachMessage: async ({ topic, partition, message }) => {
37+
const msgAckString = JSON.stringify({topic,
38+
partition,
39+
offset: message.offset,
40+
key: message.key?.toString(),
41+
value: message.value.toString()
42+
});
43+
44+
console.log(msgAckString);
45+
46+
try {
47+
const transaction = await producer.transaction();
48+
49+
await transaction.send({
50+
topic: 'produceTopic',
51+
messages: [
52+
{ value: 'consumed a message: ' + msgAckString },
53+
]
54+
});
55+
56+
await transaction.sendOffsets({
57+
// Either a consumer can be used, which is typically used to consume
58+
// in the EOS consume-transform-produce looop.
59+
// Or use consumer group id (like KafkaJS - but it's recommended to use consumer).
60+
consumer,
61+
// consumerGroupId: 'groupdId',
62+
topics: [
63+
{
64+
topic,
65+
partitions: [
66+
{ partition, offset: message.offset },
67+
],
68+
}
69+
],
70+
});
71+
72+
await transaction.commit();
73+
74+
} catch (e) {
75+
console.log({ e, s: "ERROR" });
76+
await transaction.abort();
77+
}
78+
},
79+
});
80+
81+
const disconnect = async () => {
82+
process.off('SIGINT', disconnect);
83+
process.off('SIGTERM', disconnect);
84+
await consumer.disconnect();
85+
await producer.disconnect();
86+
}
87+
process.on('SIGINT', disconnect);
88+
process.on('SIGTERM', disconnect);
89+
}
90+
91+
eosStart();

lib/kafkajs/_common.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,12 @@ async function kafkaJSToRdKafkaConfig(config) {
3535
return ret;
3636
}
3737

38-
module.exports = { kafkaJSToRdKafkaConfig }
38+
function topicPartitionOffsetToRdKafka(tpo) {
39+
return {
40+
topic: tpo.topic,
41+
partition: tpo.partition,
42+
offset: Number(tpo.offset),
43+
}
44+
}
45+
46+
module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka }

lib/kafkajs/_consumer.js

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const LibrdKafkaError = require('../error');
22
const RdKafka = require('../rdkafka');
3-
const { kafkaJSToRdKafkaConfig } = require('./_common');
3+
const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common');
44

55
const ConsumerState = Object.freeze({
66
INIT: 0,
@@ -158,18 +158,10 @@ class Consumer {
158158
});
159159
}
160160

161-
#topicPartitionOffsetToRdKafka(tpo) {
162-
return {
163-
topic: tpo.topic,
164-
partition: tpo.partition,
165-
offset: Number(tpo.offset),
166-
}
167-
}
168-
169161
#flattenTopicPartitions(topics) {
170162
const ret = [];
171163
for (let topic of topics) {
172-
if (topic.partition != null)
164+
if (topic.partition !== null)
173165
ret.push({
174166
topic: topic.topic,
175167
partition: topic.partition
@@ -183,6 +175,10 @@ class Consumer {
183175
return ret;
184176
}
185177

178+
_getInternalConsumer() {
179+
return this.#internalClient;
180+
}
181+
186182
async connect() {
187183
if (this.#state !== ConsumerState.INIT) {
188184
return Promise.reject('Connect has already been called elsewhere.');
@@ -231,7 +227,7 @@ class Consumer {
231227
this.#internalClient.commitSync();
232228
} else {
233229
const topicPartitions = topicPartitions.map(
234-
this.#topicPartitionOffsetToRdKafka);
230+
topicPartitionOffsetToRdKafka);
235231
this.#internalClient.commitSync(topicPartitions);
236232
}
237233
} catch (e) {
@@ -244,7 +240,7 @@ class Consumer {
244240
seek(topicPartitionOffset) {
245241
return new Promise((resolve, reject) => {
246242
const rdKafkaTopicPartitionOffset =
247-
this.#topicPartitionOffsetToRdKafka(topicPartitionOffset);
243+
topicPartitionOffsetToRdKafka(topicPartitionOffset);
248244
this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => {
249245
if (err) {
250246
reject(new Error(`Seek error code ${err.code}`));

lib/kafkajs/_kafka.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class Kafka {
1717
Object.assign(baseConfig, config);
1818
if (rdKafka && config.rdKafka) {
1919
baseConfig.rdKafka = {
20-
...rdkafka,
20+
...rdKafka,
2121
...config.rdKafka
2222
}
2323
}

lib/kafkajs/_producer.js

Lines changed: 161 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
const RdKafka = require('../rdkafka');
2-
const { kafkaJSToRdKafkaConfig } = require('./_common');
2+
const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common');
3+
const { Consumer } = require('./_consumer');
34

45
const ProducerState = Object.freeze({
56
INIT: 0,
67
CONNECTING: 1,
7-
CONNECTED: 2,
8-
DISCONNECTING: 3,
9-
DISCONNECTED: 4,
8+
INITIALIZING_TRANSACTIONS: 2,
9+
INITIALIZED_TRANSACTIONS: 3,
10+
CONNECTED: 4,
11+
DISCONNECTING: 5,
12+
DISCONNECTED: 6,
1013
});
1114

1215
class Producer {
@@ -15,6 +18,7 @@ class Producer {
1518
#internalClient = null;
1619
#connectPromiseFunc = {};
1720
#state = ProducerState.INIT;
21+
#ongoingTransaction = false;
1822

1923
constructor(kJSConfig) {
2024
this.#kJSConfig = kJSConfig;
@@ -29,15 +33,51 @@ class Producer {
2933
async #finalizedConfig() {
3034
const config = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
3135
config.dr_cb = 'true';
36+
37+
if (this.#kJSConfig.hasOwnProperty('transactionalId')) {
38+
config['transactional.id'] = this.#kJSConfig.transactionalId;
39+
}
40+
3241
return config;
3342
}
3443

35-
#readyCb(arg) {
36-
//console.log('Connected and ready.');
37-
if (this.#state !== ProducerState.CONNECTING) {
44+
#flattenTopicPartitionOffsets(topics) {
45+
return topics.flatMap(topic => {
46+
return topic.partitions.map(partition => {
47+
return { partition: partition.partition, offset: partition.offset, topic: topic.topic };
48+
})
49+
})
50+
}
51+
52+
#readyTransactions(err) {
53+
if (err) {
54+
this.#connectPromiseFunc["reject"](err);
55+
return;
56+
}
57+
58+
if (this.#state !== ProducerState.INITIALIZING_TRANSACTIONS) {
59+
// FSM impossible state. We should add error handling for
60+
// this later.
61+
return;
62+
}
63+
64+
this.#state = ProducerState.INITIALIZED_TRANSACTIONS;
65+
this.#readyCb(null);
66+
}
67+
68+
async #readyCb(arg) {
69+
if (this.#state !== ProducerState.CONNECTING && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) {
3870
// I really don't know how to handle this now.
3971
return;
4072
}
73+
74+
let config = await this.#config();
75+
if (config.hasOwnProperty('transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) {
76+
this.#state = ProducerState.INITIALIZING_TRANSACTIONS;
77+
this.#internalClient.initTransactions(5000 /* default: 5s */, this.#readyTransactions.bind(this));
78+
return;
79+
}
80+
4181
this.#state = ProducerState.CONNECTED;
4282

4383
// Start a loop to poll.
@@ -121,6 +161,120 @@ class Producer {
121161
});
122162
}
123163

164+
async transaction() {
165+
if (this.#state !== ProducerState.CONNECTED) {
166+
return Promise.reject("Cannot start transaction without awaiting connect()");
167+
}
168+
169+
if (this.#ongoingTransaction) {
170+
return Promise.reject("Can only start one transaction at a time.");
171+
}
172+
173+
return new Promise((resolve, reject) => {
174+
this.#internalClient.beginTransaction((err) => {
175+
if (err) {
176+
reject(err);
177+
return;
178+
}
179+
this.#ongoingTransaction = true;
180+
181+
// Resolve with 'this' because we don't need any specific transaction object.
182+
// Just using the producer works since we can only have one transaction
183+
// ongoing for one producer.
184+
resolve(this);
185+
});
186+
});
187+
}
188+
189+
async commit() {
190+
if (this.#state !== ProducerState.CONNECTED) {
191+
return Promise.reject("Cannot commit without awaiting connect()");
192+
}
193+
194+
if (!this.#ongoingTransaction) {
195+
return Promise.reject("Cannot commit, no transaction ongoing.");
196+
}
197+
198+
return new Promise((resolve, reject) => {
199+
this.#internalClient.commitTransaction(5000 /* default: 5000ms */, err => {
200+
if (err) {
201+
// TODO: Do we reset ongoingTransaction here?
202+
reject(err);
203+
return;
204+
}
205+
this.#ongoingTransaction = false;
206+
resolve();
207+
});
208+
});
209+
}
210+
211+
212+
async abort() {
213+
if (this.#state !== ProducerState.CONNECTED) {
214+
return Promise.reject("Cannot abort without awaiting connect()");
215+
}
216+
217+
if (!this.#ongoingTransaction) {
218+
return Promise.reject("Cannot abort, no transaction ongoing.");
219+
}
220+
221+
return new Promise((resolve, reject) => {
222+
this.#internalClient.abortTransaction(5000 /* default: 5000ms */, err => {
223+
if (err) {
224+
// TODO: Do we reset ongoingTransaction here?
225+
reject(err);
226+
return;
227+
}
228+
this.#ongoingTransaction = false;
229+
resolve();
230+
});
231+
});
232+
}
233+
234+
async sendOffsets(arg) {
235+
let { consumerGroupId, topics, consumer } = arg;
236+
237+
if ((!consumerGroupId && !consumer) || !Array.isArray(topics) || topics.length === 0) {
238+
return Promise.reject("sendOffsets must have the arguments {consumerGroupId: string or consumer: Consumer, topics: non-empty array");
239+
}
240+
241+
if (this.#state !== ProducerState.CONNECTED) {
242+
return Promise.reject("Cannot sendOffsets without awaiting connect()");
243+
}
244+
245+
if (!this.#ongoingTransaction) {
246+
return Promise.reject("Cannot sendOffsets, no transaction ongoing.");
247+
}
248+
249+
// If we don't have a consumer, we must create a consumer at this point internally.
250+
// This isn't exactly efficient, but we expect people to use either a consumer,
251+
// or we will need to change the C/C++ code to facilitate using the consumerGroupId
252+
// directly.
253+
// TODO: Change the C/C++ code to facilitate this if we go to release with this.
254+
255+
let consumerCreated = false;
256+
if (!consumer) {
257+
const config = Object.assign({ groupId: consumerGroupId }, this.#kJSConfig);
258+
consumer = new Consumer(config);
259+
consumerCreated = true;
260+
await consumer.connect();
261+
}
262+
263+
return new Promise((resolve, reject) => {
264+
this.#internalClient.sendOffsetsToTransaction(
265+
this.#flattenTopicPartitionOffsets(topics).map(topicPartitionOffsetToRdKafka),
266+
consumer._getInternalConsumer(),
267+
async err => {
268+
if (consumerCreated)
269+
await consumer.disconnect();
270+
if (err)
271+
reject(err);
272+
else
273+
resolve();
274+
})
275+
});
276+
}
277+
124278
async send(sendOptions) {
125279
if (this.#state !== ProducerState.CONNECTED) {
126280
return Promise.reject("Cannot send message without awaiting connect()");

0 commit comments

Comments
 (0)