Skip to content

Commit daa572b

Browse files
committed
Make subscriptions replaceable with flag
1 parent 184e124 commit daa572b

File tree

4 files changed

+14
-7
lines changed

4 files changed

+14
-7
lines changed

lib/kafkajs/_consumer.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ class Consumer {
7373
*/
7474
#pausedPartitions = new Set();
7575

76+
/**
77+
* Contains a list of stored topics/regexes that the user has subscribed to.
78+
* @type {(string|RegExp)[]}
79+
*/
80+
#storedSubscriptions = [];
81+
7682
/**
7783
* @constructor
7884
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -425,7 +431,8 @@ class Consumer {
425431
}
426432
});
427433

428-
this.#internalClient.subscribe(topics);
434+
this.#storedSubscriptions = subscription.replace ? topics : this.#storedSubscriptions.concat(topics);
435+
this.#internalClient.subscribe(this.#storedSubscriptions);
429436
}
430437

431438
async stop() {

test/promisified/consumer/pause.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ describe('Consumer', () => {
6868
/* Send the first 2 messages to each topic. */
6969
for (const topic of topics) {
7070
await producer.send({ topic, messages: messages.slice(0, 2) });
71+
await consumer.subscribe({ topic });
7172
}
72-
await consumer.subscribe({ topics: topics });
7373

7474
let shouldPause = true;
7575
let pauseMessageRecvd = false;
@@ -141,7 +141,7 @@ describe('Consumer', () => {
141141
for (const topic of topics) {
142142
await producer.send({ topic, messages: messages.slice(0, 2) })
143143
}
144-
await consumer.subscribe({ topics })
144+
await consumer.subscribe({ topics, replace: true });
145145

146146
let shouldPause = true
147147
const messagesConsumed = []

test/promisified/consumer/subscribe.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ describe('Consumer', () => {
7171
expect(messagesConsumed.map(m => m.message.value.toString())).toEqual(
7272
expect.arrayContaining(['drink', 'your', 'ovaltine'])
7373
);
74-
}, 10000);
74+
});
7575
})
7676

7777
describe('Deprecated "topic" interface', () => {
@@ -151,7 +151,7 @@ describe('Consumer', () => {
151151
'value-br',
152152
'value-se',
153153
]);
154-
}, 10000);
154+
});
155155
});
156156
});
157157
});

types/kafkajs.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,9 @@ export type EachBatchHandler = (payload: EachBatchPayload) => Promise<void>
376376

377377
export type EachMessageHandler = (payload: EachMessagePayload) => Promise<void>
378378

379-
export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean }
379+
export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean, replace?: boolean }
380380

381-
export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; fromBeginning?: boolean }
381+
export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; fromBeginning?: boolean, replace?: boolean }
382382

383383
export type ConsumerRunConfig = {
384384
autoCommit?: boolean

0 commit comments

Comments
 (0)