Skip to content

Commit 0d8400b

Browse files
authored
Remove consumerGroupId argument from sendOffsets and add tests (#82)
1 parent 31abae4 commit 0d8400b

File tree

5 files changed

+167
-22
lines changed

5 files changed

+167
-22
lines changed

MIGRATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ producerRun().then(consumerRun).catch(console.error);
199199
```
200200
201201
* A transactional producer (with a `transactionId`) set, **cannot** send messages without initiating a transaction using `producer.transaction()`.
202+
* While using `sendOffsets` from a transactional producer, the `consumerGroupId` argument must be omitted, and rather, the consumer object itself must be passed instead.
202203
203204
### Consumer
204205

lib/kafkajs/_common.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ const CompatibilityErrorMessages = Object.freeze({
240240
createReplacementErrorMessage('producer', fn, 'timeout', 'timeout: <number>', 'timeout: <number>', false),
241241
sendBatchMandatoryMissing: () =>
242242
"The argument passed to sendbatch must be an object, and must contain the 'topicMessages' property: { topicMessages: {topic: string, messages: Message[]}[] } \n",
243+
sendOffsetsMustProvideConsumer: () =>
244+
"The sendOffsets method must be called with a connected consumer instance and without a consumerGroupId parameter.\n",
243245

244246
/* Consumer */
245247
partitionAssignors: () =>

lib/kafkajs/_producer.js

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ const { kafkaJSToRdKafkaConfig,
1111
CompatibilityErrorMessages,
1212
logLevel,
1313
} = require('./_common');
14-
const { Consumer } = require('./_consumer');
1514
const error = require('./_error');
1615
const { Buffer } = require('buffer');
1716

@@ -371,7 +370,7 @@ class Producer {
371370
return;
372371
}
373372
this.#state = ProducerState.DISCONNECTED;
374-
this.#logger.info("Producr disconnected", this.#createProducerBindingMessageMetadata());
373+
this.#logger.info("Producer disconnected", this.#createProducerBindingMessageMetadata());
375374
resolve();
376375
};
377376
this.#internalClient.disconnect(5000 /* default timeout, 5000ms */, cb);
@@ -465,17 +464,20 @@ class Producer {
465464
/**
466465
* Send offsets for the transaction.
467466
* @param {object} arg - The arguments to sendOffsets
468-
* @param {string} arg.consumerGroupId - The consumer group id to send offsets for.
469467
* @param {Consumer} arg.consumer - The consumer to send offsets for.
470468
* @param {import("../../types/kafkajs").TopicOffsets[]} arg.topics - The topics, partitions and the offsets to send.
471469
*
472-
* @note only one of consumerGroupId or consumer must be set. It is recommended to use `consumer`.
473470
* @returns {Promise<void>} Resolves when the offsets are sent.
474471
*/
475472
async sendOffsets(arg) {
476473
let { consumerGroupId, topics, consumer } = arg;
477474

478-
if ((!consumerGroupId && !consumer) || !Array.isArray(topics) || topics.length === 0) {
475+
/* If the user has not supplied a consumer, or supplied a consumerGroupId, throw immediately. */
476+
if (consumerGroupId || !consumer) {
477+
throw new error.KafkaJSError(CompatibilityErrorMessages.sendOffsetsMustProvideConsumer(), { code: error.ErrorCodes.ERR__INVALID_ARG });
478+
}
479+
480+
if (!Array.isArray(topics) || topics.length === 0) {
479481
throw new error.KafkaJSError("sendOffsets arguments are invalid", { code: error.ErrorCodes.ERR__INVALID_ARG });
480482
}
481483

@@ -487,27 +489,11 @@ class Producer {
487489
throw new error.KafkaJSError("Cannot sendOffsets, no transaction ongoing.", { code: error.ErrorCodes.ERR__STATE });
488490
}
489491

490-
// If we don't have a consumer, we must create a consumer at this point internally.
491-
// This isn't exactly efficient, but we expect people to use either a consumer,
492-
// or we will need to change the C/C++ code to facilitate using the consumerGroupId
493-
// directly.
494-
// TODO: Change the C/C++ code to facilitate this if we go to release with this.
495-
496-
let consumerCreated = false;
497-
if (!consumer) {
498-
const config = Object.assign({ 'group.id': consumerGroupId }, this.rdKafkaConfig);
499-
consumer = new Consumer(config);
500-
consumerCreated = true;
501-
await consumer.connect();
502-
}
503-
504492
return new Promise((resolve, reject) => {
505493
this.#internalClient.sendOffsetsToTransaction(
506494
this.#flattenTopicPartitionOffsets(topics).map(topicPartitionOffsetToRdKafka),
507495
consumer._getInternalConsumer(),
508496
async err => {
509-
if (consumerCreated)
510-
await consumer.disconnect();
511497
if (err)
512498
reject(createKafkaJsErrorFromLibRdKafkaError(err));
513499
else

test/promisified/producer/eos.spec.js

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
jest.setTimeout(30000);
2+
3+
const {
4+
secureRandom,
5+
createConsumer,
6+
createProducer,
7+
createTopic,
8+
waitForMessages,
9+
} = require('../testhelpers');
10+
const { ErrorCodes } = require('../../../lib').KafkaJS;
11+
12+
describe('Producer > Transactional producer', () => {
13+
let producer, basicProducer, topicName, topicName2, transactionalId, message, consumer, groupId;
14+
15+
beforeEach(async () => {
16+
topicName = `test-topic-${secureRandom()}`;
17+
topicName2 = `test-topic2-${secureRandom()}`;
18+
transactionalId = `transactional-id-${secureRandom()}`;
19+
message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` };
20+
groupId = `group-id-${secureRandom()}`;
21+
22+
producer = createProducer({
23+
idempotent: true,
24+
transactionalId,
25+
transactionTimeout: 1000,
26+
});
27+
28+
basicProducer = createProducer({});
29+
30+
consumer = createConsumer({ groupId, autoCommit: false, fromBeginning: true });
31+
32+
await createTopic({ topic: topicName, partitions: 1 });
33+
await createTopic({ topic: topicName2 });
34+
});
35+
36+
afterEach(async () => {
37+
consumer && (await consumer.disconnect());
38+
producer && (await producer.disconnect());
39+
basicProducer && (await basicProducer.disconnect());
40+
});
41+
42+
it('fails when using consumer group id while sending offsets from transactional producer', async () => {
43+
await producer.connect();
44+
await basicProducer.connect();
45+
await consumer.connect();
46+
47+
await basicProducer.send({ topic: topicName, messages: [message] });
48+
49+
await consumer.subscribe({ topic: topicName });
50+
51+
let messagesConsumed = [];
52+
await consumer.run({
53+
eachMessage: async ({ message }) => {
54+
const transaction = await producer.transaction();
55+
await transaction.send({ topic: topicName, messages: [message] });
56+
57+
await expect(
58+
transaction.sendOffsets({ consumerGroupId: groupId })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG);
59+
await expect(
60+
transaction.sendOffsets({ consumerGroupId: groupId, consumer })).rejects.toHaveProperty('code', ErrorCodes.ERR__INVALID_ARG);
61+
62+
await transaction.abort();
63+
messagesConsumed.push(message);
64+
}
65+
});
66+
67+
await waitForMessages(messagesConsumed, { number: 1 });
68+
expect(messagesConsumed.length).toBe(1);
69+
});
70+
71+
it('sends offsets when transaction is committed', async () => {
72+
await producer.connect();
73+
await basicProducer.connect();
74+
await consumer.connect();
75+
76+
await basicProducer.send({ topic: topicName, messages: [message] });
77+
78+
await consumer.subscribe({ topic: topicName });
79+
80+
let messagesConsumed = [];
81+
await consumer.run({
82+
eachMessage: async ({ topic, partition, message }) => {
83+
const transaction = await producer.transaction();
84+
await transaction.send({ topic: topicName2, messages: [message] });
85+
86+
await transaction.sendOffsets({ consumer, topics: [
87+
{
88+
topic,
89+
partitions: [
90+
{ partition, offset: Number(message.offset) + 1 },
91+
],
92+
}
93+
], });
94+
95+
await transaction.commit();
96+
messagesConsumed.push(message);
97+
}
98+
});
99+
100+
await waitForMessages(messagesConsumed, { number: 1 });
101+
expect(messagesConsumed.length).toBe(1);
102+
const committed = await consumer.committed();
103+
expect(committed).toEqual(
104+
expect.arrayContaining([
105+
expect.objectContaining({
106+
topic: topicName,
107+
offset: '1',
108+
partition: 0,
109+
}),
110+
])
111+
);
112+
});
113+
114+
it('sends no offsets when transaction is aborted', async () => {
115+
await producer.connect();
116+
await basicProducer.connect();
117+
await consumer.connect();
118+
119+
await basicProducer.send({ topic: topicName, messages: [message] });
120+
121+
await consumer.subscribe({ topic: topicName });
122+
123+
let messagesConsumed = [];
124+
await consumer.run({
125+
eachMessage: async ({ topic, partition, message }) => {
126+
const transaction = await producer.transaction();
127+
await transaction.send({ topic: topicName2, messages: [message] });
128+
129+
await transaction.sendOffsets({ consumer, topics: [
130+
{
131+
topic,
132+
partitions: [
133+
{ partition, offset: Number(message.offset) + 1 },
134+
],
135+
}
136+
], });
137+
138+
await transaction.abort();
139+
messagesConsumed.push(message);
140+
}
141+
});
142+
143+
await waitForMessages(messagesConsumed, { number: 1 });
144+
expect(messagesConsumed.length).toBe(1);
145+
const committed = await consumer.committed();
146+
expect(committed).toEqual(
147+
expect.arrayContaining([
148+
expect.objectContaining({
149+
topic: topicName,
150+
offset: null,
151+
partition: 0,
152+
}),
153+
])
154+
);
155+
});
156+
});

types/kafkajs.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export type Producer = Client & {
168168
transaction(): Promise<Transaction>
169169
commit(): Promise<void>
170170
abort(): Promise<void>
171-
sendOffsets(args: { consumerGroupId?: string, consumer?: Consumer, topics: TopicOffsets[] }): Promise<void>
171+
sendOffsets(args: { consumer: Consumer, topics: TopicOffsets[] }): Promise<void>
172172
isActive(): boolean
173173
}
174174

0 commit comments

Comments
 (0)