Skip to content

Commit 07660be

Browse files
authored
Support transactions (#881 -> #903)
1 parent 4885043 commit 07660be

13 files changed

+1000
-5
lines changed

docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,6 @@ kafka:
1818
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
1919
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
2020
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
21+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
22+
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
23+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

e2e/producer-transaction.spec.js

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
/*
2+
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
3+
*
4+
* Copyright (c) 2016 Blizzard Entertainment
5+
*
6+
* This software may be modified and distributed under the terms
7+
* of the MIT license. See the LICENSE.txt file for details.
8+
*/
9+
10+
var Kafka = require('../');
11+
12+
var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';
13+
14+
describe('Transactional Producer', function () {
15+
this.timeout(5000);
16+
var TRANSACTIONS_TIMEOUT_MS = 30000;
17+
var r = Date.now() + '_' + Math.round(Math.random() * 1000);
18+
var topicIn = 'transaction_input_' + r;
19+
var topicOut = 'transaction_output_' + r;
20+
21+
var producerTras;
22+
var consumerTrans;
23+
24+
before(function (done) {
25+
/*
26+
prepare:
27+
transactional consumer (read from input topic)
28+
transactional producer (write to output topic)
29+
write 3 messages to input topic: A, B, C
30+
A will be skipped, B will be committed, C will be aborted
31+
*/
32+
var connecting = 3;
33+
var producerInput;
34+
function connectedCb(err) {
35+
if (err) {
36+
done(err);
37+
return;
38+
}
39+
connecting--;
40+
if (connecting === 0) {
41+
producerInput.produce(topicIn, -1, Buffer.from('A'));
42+
producerInput.produce(topicIn, -1, Buffer.from('B'));
43+
producerInput.produce(topicIn, -1, Buffer.from('C'));
44+
producerInput.disconnect(function (err) {
45+
consumerTrans.subscribe([topicIn]);
46+
done(err);
47+
})
48+
}
49+
}
50+
producerInput = Kafka.Producer({
51+
'client.id': 'kafka-test',
52+
'metadata.broker.list': kafkaBrokerList,
53+
'enable.idempotence': true
54+
});
55+
producerInput.setPollInterval(100);
56+
producerInput.connect({}, connectedCb);
57+
58+
producerTras = new Kafka.Producer({
59+
'client.id': 'kafka-test',
60+
'metadata.broker.list': kafkaBrokerList,
61+
'dr_cb': true,
62+
'debug': 'all',
63+
'transactional.id': 'noderdkafka_transactions_send_offset',
64+
'enable.idempotence': true
65+
});
66+
producerTras.setPollInterval(100);
67+
producerTras.connect({}, connectedCb);
68+
69+
consumerTrans = new Kafka.KafkaConsumer({
70+
'metadata.broker.list': kafkaBrokerList,
71+
'group.id': 'gropu_transaction_consumer',
72+
'enable.auto.commit': false
73+
}, {
74+
'auto.offset.reset': 'earliest',
75+
});
76+
consumerTrans.connect({}, connectedCb);
77+
});
78+
79+
after(function (done) {
80+
let connected = 2;
81+
function execDisconnect(client) {
82+
if (!client.isConnected) {
83+
connected--;
84+
if (connected === 0) {
85+
done();
86+
}
87+
} else {
88+
client.disconnect(function() {
89+
connected--;
90+
if (connected === 0) {
91+
done();
92+
}
93+
});
94+
}
95+
}
96+
execDisconnect(producerTras);
97+
execDisconnect(consumerTrans);
98+
});
99+
100+
it('should init transactions', function(done) {
101+
producerTras.initTransactions(TRANSACTIONS_TIMEOUT_MS, function (err) {
102+
done(err);
103+
});
104+
});
105+
106+
it('should complete transaction', function(done) {
107+
function readMessage() {
108+
consumerTrans.consume(1, function(err, m) {
109+
if (err) {
110+
done(err);
111+
return;
112+
}
113+
if (m.length === 0) {
114+
readMessage();
115+
} else {
116+
var v = m[0].value.toString();
117+
if (v === 'A') { // skip first message
118+
readMessage();
119+
return;
120+
}
121+
if (v !== 'B') {
122+
done('Expected B');
123+
return;
124+
}
125+
producerTras.beginTransaction(function (err) {
126+
if (err) {
127+
done(err);
128+
return;
129+
}
130+
producerTras.produce(topicOut, -1, Buffer.from(v));
131+
var position = consumerTrans.position();
132+
producerTras.sendOffsetsToTransaction(position, consumerTrans, function(err) {
133+
if (err) {
134+
done(err);
135+
return;
136+
}
137+
producerTras.commitTransaction(function(err) {
138+
if (err) {
139+
done(err);
140+
return;
141+
}
142+
consumerTrans.committed(5000, function(err, tpo) {
143+
if (err) {
144+
done(err);
145+
return;
146+
}
147+
if (JSON.stringify(position) !== JSON.stringify(tpo)) {
148+
done('Committed mismatch');
149+
return;
150+
}
151+
done();
152+
});
153+
});
154+
});
155+
});
156+
}
157+
});
158+
}
159+
readMessage();
160+
});
161+
162+
describe('abort transaction', function() {
163+
var lastConsumerTransPosition;
164+
before(function(done) {
165+
function readMessage() {
166+
consumerTrans.consume(1, function(err, m) {
167+
if (err) {
168+
done(err);
169+
return;
170+
}
171+
if (m.length === 0) {
172+
readMessage();
173+
} else {
174+
var v = m[0].value.toString();
175+
if (v !== 'C') {
176+
done('Expected C');
177+
return;
178+
}
179+
producerTras.beginTransaction(function (err) {
180+
if (err) {
181+
done(err);
182+
return;
183+
}
184+
producerTras.produce(topicOut, -1, Buffer.from(v));
185+
lastConsumerTransPosition = consumerTrans.position();
186+
producerTras.sendOffsetsToTransaction(lastConsumerTransPosition, consumerTrans, function(err) {
187+
if (err) {
188+
done(err);
189+
return;
190+
}
191+
done();
192+
});
193+
});
194+
}
195+
});
196+
}
197+
readMessage();
198+
});
199+
200+
it ('should consume committed and uncommitted for read_uncommitted', function(done) {
201+
var allMsgs = [];
202+
var consumer = new Kafka.KafkaConsumer({
203+
'metadata.broker.list': kafkaBrokerList,
204+
'group.id': 'group_read_uncommitted',
205+
'enable.auto.commit': false,
206+
'isolation.level': 'read_uncommitted'
207+
}, {
208+
'auto.offset.reset': 'earliest',
209+
});
210+
consumer.connect({}, function(err) {
211+
if (err) {
212+
done(err);
213+
return;
214+
}
215+
consumer.subscribe([topicOut]);
216+
consumer.consume();
217+
});
218+
consumer.on('data', function(msg) {
219+
var v = msg.value.toString();
220+
allMsgs.push(v);
221+
// both B and C must be consumed
222+
if (allMsgs.length === 2 && allMsgs[0] === 'B' && allMsgs[1] === 'C') {
223+
consumer.disconnect(function(err) {
224+
if (err) {
225+
done(err);
226+
return;
227+
}
228+
done();
229+
})
230+
}
231+
});
232+
});
233+
234+
it ('should consume only committed for read_committed', function(done) {
235+
var allMsgs = [];
236+
var consumer = new Kafka.KafkaConsumer({
237+
'metadata.broker.list': kafkaBrokerList,
238+
'group.id': 'group_read_committed',
239+
'enable.partition.eof': true,
240+
'enable.auto.commit': false,
241+
'isolation.level': 'read_committed'
242+
}, {
243+
'auto.offset.reset': 'earliest',
244+
});
245+
consumer.connect({}, function(err) {
246+
if (err) {
247+
done(err);
248+
return;
249+
}
250+
consumer.subscribe([topicOut]);
251+
consumer.consume();
252+
});
253+
consumer.on('data', function(msg) {
254+
var v = msg.value.toString();
255+
allMsgs.push(v);
256+
});
257+
consumer.on('partition.eof', function(eof) {
258+
if (allMsgs.length === 1 && allMsgs[0] === 'B') {
259+
consumer.disconnect(function(err) {
260+
if (err) {
261+
done(err);
262+
return;
263+
}
264+
done();
265+
})
266+
} else {
267+
done('Expected only B');
268+
return;
269+
}
270+
});
271+
});
272+
273+
it('should abort transaction', function(done) {
274+
producerTras.abortTransaction(function(err) {
275+
if (err) {
276+
done(err);
277+
return;
278+
}
279+
consumerTrans.committed(5000, function(err, tpo) {
280+
if (err) {
281+
done(err);
282+
return;
283+
}
284+
if (lastConsumerTransPosition[0].offset <= tpo[0].offset) {
285+
done('Committed mismatch');
286+
return;
287+
}
288+
done();
289+
});
290+
});
291+
});
292+
293+
it('should consume only committed', function(done) {
294+
var gotB = false;
295+
var consumer = new Kafka.KafkaConsumer({
296+
'metadata.broker.list': kafkaBrokerList,
297+
'group.id': 'group_default',
298+
'enable.partition.eof': true,
299+
'enable.auto.commit': false,
300+
}, {
301+
'auto.offset.reset': 'earliest',
302+
});
303+
consumer.connect({}, function(err) {
304+
if (err) {
305+
done(err);
306+
return;
307+
}
308+
consumer.subscribe([topicOut]);
309+
consumer.consume();
310+
});
311+
consumer.on('data', function(msg) {
312+
var v = msg.value.toString();
313+
if (v !== 'B') {
314+
done('Expected B');
315+
return;
316+
}
317+
gotB = true;
318+
});
319+
consumer.on('partition.eof', function(eof) {
320+
if (!gotB) {
321+
done('Expected B');
322+
return;
323+
}
324+
consumer.disconnect(function(err) {
325+
if (err) {
326+
done(err);
327+
return;
328+
}
329+
done();
330+
});
331+
});
332+
});
333+
});
334+
});

index.d.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ export interface LibrdKafkaError {
1818
errno: number;
1919
origin: string;
2020
stack?: string;
21+
isFatal?: boolean;
22+
isRetriable?: boolean;
23+
isTxnRequiresAbort?: boolean;
2124
}
2225

2326
export interface ReadyInfo {
@@ -147,7 +150,7 @@ type EventListenerMap = {
147150
'disconnected': (metrics: ClientMetrics) => void,
148151
'ready': (info: ReadyInfo, metadata: Metadata) => void,
149152
'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
150-
// event messages
153+
// event messages
151154
'event.error': (error: LibrdKafkaError) => void,
152155
'event.stats': (eventData: any) => void,
153156
'event.log': (eventData: any) => void,
@@ -262,6 +265,16 @@ export class Producer extends Client<KafkaProducerEvents> {
262265
setPollInterval(interval: number): this;
263266

264267
static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;
268+
269+
initTransactions(cb: (err: LibrdKafkaError) => void): void;
270+
initTransactions(timeout: number, cb: (err: LibrdKafkaError) => void): void;
271+
beginTransaction(cb: (err: LibrdKafkaError) => void): void;
272+
commitTransaction(cb: (err: LibrdKafkaError) => void): void;
273+
commitTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
274+
abortTransaction(cb: (err: LibrdKafkaError) => void): void;
275+
abortTransaction(timeout: number, cb: (err: LibrdKafkaError) => void): void;
276+
sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, cb: (err: LibrdKafkaError) => void): void;
277+
sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout: number, cb: (err: LibrdKafkaError) => void): void;
265278
}
266279

267280
export class HighLevelProducer extends Producer {

lib/error.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,10 @@ function LibrdKafkaError(e) {
444444

445445
}
446446

447+
if (e.hasOwnProperty('isFatal')) this.isFatal = e.isFatal;
448+
if (e.hasOwnProperty('isRetriable')) this.isRetriable = e.isRetriable;
449+
if (e.hasOwnProperty('isTxnRequiresAbort')) this.isTxnRequiresAbort = e.isTxnRequiresAbort;
450+
447451
}
448452

449453
function createLibrdkafkaError(e) {

0 commit comments

Comments
 (0)