Skip to content

Commit 938df93

Browse files
committed
Add storeOffsets and fix typings
1 parent ed7226b commit 938df93

File tree

4 files changed

+180
-1
lines changed

4 files changed

+180
-1
lines changed

lib/kafkajs/_consumer.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,30 @@ class Consumer {
10621062
// return m ?? null;
10631063
}
10641064

1065+
/**
1066+
* Store offsets for the given topic partitions.
1067+
*
1068+
* Stored offsets will be commited automatically at a later point if autoCommit is enabled.
1069+
* Otherwise, they will be committed when commitOffsets is called without arguments.
1070+
*
1071+
* enable.auto.offset.store must be set to false to use this API.
1072+
* @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions
1073+
*/
1074+
storeOffsets(topicPartitions) {
1075+
if (this.#state !== ConsumerState.CONNECTED) {
1076+
throw new error.KafkaJSError('Store can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
1077+
}
1078+
1079+
if (!this.#userManagedStores) {
1080+
throw new error.KafkaJSError(
1081+
'Store can only be called when enable.auto.offset.store is explicitly set to false.', { code: error.ErrorCodes.ERR__INVALID_ARG });
1082+
}
1083+
1084+
const topicPartitionsRdKafka = topicPartitions.map(
1085+
topicPartitionOffsetToRdKafka);
1086+
this.#internalClient.offsetsStore(topicPartitionsRdKafka);
1087+
}
1088+
10651089
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
10661090
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
10671091
do {

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
jest.setTimeout(30000)
22

3-
const { is } = require('bluebird');
43
const {
54
secureRandom,
65
createTopic,
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
jest.setTimeout(30000)
2+
3+
const {
4+
secureRandom,
5+
createTopic,
6+
waitFor,
7+
createProducer,
8+
createConsumer,
9+
sleep,
10+
} = require('../testhelpers');
11+
const { ErrorCodes } = require('../../../lib').KafkaJS;
12+
13+
describe.each([[false], [true]])('Consumer store', (isAutoCommit) => {
14+
let topicName, groupId, producer, consumer;
15+
16+
beforeEach(async () => {
17+
topicName = `test-topic-${secureRandom()}`
18+
groupId = `consumer-group-id-${secureRandom()}`
19+
20+
await createTopic({ topic: topicName, partitions: 3 })
21+
22+
producer = createProducer({});
23+
24+
consumer = createConsumer({
25+
groupId,
26+
maxWaitTimeInMs: 100,
27+
fromBeginning: true,
28+
autoCommit: isAutoCommit,
29+
autoCommitInterval: 500,
30+
}, {
31+
'enable.auto.offset.store': false,
32+
});
33+
});
34+
35+
afterEach(async () => {
36+
consumer && (await consumer.disconnect())
37+
producer && (await producer.disconnect())
38+
});
39+
40+
it('should not work if enable.auto.offset.store = true', async () => {
41+
let assignment = [];
42+
consumer = createConsumer({
43+
groupId,
44+
maxWaitTimeInMs: 100,
45+
fromBeginning: true,
46+
}, {
47+
/* Set to true manually - the default value with kafkaJS block is false. */
48+
'enable.auto.offset.store': true,
49+
'rebalance_cb': function (err, asg) {
50+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
51+
assignment = asg;
52+
}
53+
}
54+
});
55+
56+
await consumer.connect();
57+
await consumer.subscribe({ topic: topicName })
58+
await consumer.run({
59+
eachMessage: async () => {
60+
}
61+
});
62+
await waitFor(() => assignment.length > 0, () => null, 1000);
63+
expect(
64+
() => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }])
65+
).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/);
66+
});
67+
68+
it('should not work if enable.auto.offset.store is unset', async () => {
69+
let assignment = [];
70+
consumer = createConsumer({
71+
groupId,
72+
maxWaitTimeInMs: 100,
73+
fromBeginning: true,
74+
}, {
75+
/* Set to true manually - the default value with kafkaJS block is false. */
76+
'rebalance_cb': function (err, asg) {
77+
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
78+
assignment = asg;
79+
}
80+
}
81+
});
82+
83+
await consumer.connect();
84+
await consumer.subscribe({ topic: topicName })
85+
await consumer.run({
86+
eachMessage: async () => {
87+
}
88+
});
89+
await waitFor(() => assignment.length > 0, () => null, 1000);
90+
expect(
91+
() => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }])
92+
).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/);
93+
});
94+
95+
it('should commit stored offsets', async () => {
96+
/* Evenly distribute 30 messages across 3 partitions */
97+
let i = 0;
98+
const messages = Array(3 * 10)
99+
.fill()
100+
.map(() => {
101+
const value = secureRandom()
102+
return { value: `value-${value}`, partition: (i++) % 3 }
103+
})
104+
105+
await producer.connect();
106+
await producer.send({ topic: topicName, messages })
107+
await producer.flush();
108+
109+
let msgCount = 0;
110+
await consumer.connect();
111+
await consumer.subscribe({ topic: topicName })
112+
await consumer.run({
113+
eachMessage: async ({ topic, partition, message }) => {
114+
msgCount++;
115+
const offset = (Number(message.offset) + 1).toString();
116+
expect(() => consumer.storeOffsets([{ topic, partition, offset }])).not.toThrow();
117+
}
118+
});
119+
await waitFor(() => msgCount >= 30, () => null, { delay: 100 });
120+
expect(msgCount).toEqual(30);
121+
122+
if (!isAutoCommit)
123+
await expect(consumer.commitOffsets()).resolves.toBeUndefined();
124+
else
125+
await sleep(1000); /* Wait for auto-commit */
126+
127+
await consumer.disconnect();
128+
129+
/* Send 30 more messages */
130+
await producer.send({ topic: topicName, messages })
131+
await producer.flush();
132+
133+
134+
consumer = createConsumer({
135+
groupId,
136+
maxWaitTimeInMs: 100,
137+
fromBeginning: true,
138+
});
139+
140+
msgCount = 0;
141+
await consumer.connect();
142+
await consumer.subscribe({ topic: topicName })
143+
await consumer.run({
144+
eachMessage: async ({ message }) => {
145+
msgCount++;
146+
}
147+
})
148+
/* Only the extra 30 messages should come to us */
149+
await waitFor(() => msgCount >= 30, () => null, { delay: 100 });
150+
await sleep(1000);
151+
expect(msgCount).toEqual(30);
152+
});
153+
154+
});

types/kafkajs.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ type Sender = {
136136
export type Producer = Sender & {
137137
connect(): Promise<void>
138138
disconnect(): Promise<void>
139+
flush(args?: {timeout?: number}): Promise<void>
139140
}
140141

141142
export interface RetryOptions {
@@ -460,6 +461,7 @@ export type Consumer = {
460461
subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
461462
stop(): Promise<void>
462463
run(config?: ConsumerRunConfig): Promise<void>
464+
storeOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): void
463465
commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
464466
seek(topicPartitionOffset: TopicPartitionOffset): Promise<void>
465467
describeGroup(): Promise<GroupDescription>

0 commit comments

Comments
 (0)