Skip to content

Commit 1e51f5a

Browse files
authored
feat: support pattern subscription non persistent topic. (apache#334)
1 parent 12a03ce commit 1e51f5a

File tree

3 files changed

+72
-0
lines changed

3 files changed

+72
-0
lines changed

index.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export interface ConsumerConfig {
9696
autoAckOldestChunkedMessageOnQueueFull?: number;
9797
schema?: SchemaInfo;
9898
batchIndexAckEnabled?: boolean;
99+
regexSubscriptionMode?: RegexSubscriptionMode;
99100
}
100101

101102
export class Consumer {
@@ -251,6 +252,11 @@ export type ConsumerCryptoFailureAction =
251252
'DISCARD' |
252253
'CONSUME';
253254

255+
export type RegexSubscriptionMode =
256+
'PersistentOnly' |
257+
'NonPersistentOnly' |
258+
'AllTopics';
259+
254260
export type SchemaType =
255261
'None' |
256262
'String' |

src/ConsumerConfig.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
3131
static const std::string CFG_SUBSCRIPTION = "subscription";
3232
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
3333
static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
34+
static const std::string CFG_REGEX_SUBSCRIPTION_MODE = "regexSubscriptionMode";
3435
static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
3536
static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
3637
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
@@ -53,6 +54,11 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
5354
{"KeyShared", pulsar_ConsumerKeyShared},
5455
{"Failover", pulsar_ConsumerFailover}};
5556

57+
static const std::map<std::string, pulsar_consumer_regex_subscription_mode> REGEX_SUBSCRIPTION_MODE = {
58+
{"PersistentOnly", pulsar_consumer_regex_sub_mode_PersistentOnly},
59+
{"NonPersistentOnly", pulsar_consumer_regex_sub_mode_NonPersistentOnly},
60+
{"AllTopics", pulsar_consumer_regex_sub_mode_AllTopics}};
61+
5662
static const std::map<std::string, initial_position> INIT_POSITION = {
5763
{"Latest", initial_position_latest}, {"Earliest", initial_position_earliest}};
5864

@@ -111,6 +117,16 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
111117
}
112118
}
113119

120+
if (consumerConfig.Has(CFG_REGEX_SUBSCRIPTION_MODE) &&
121+
consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).IsString()) {
122+
std::string regexSubscriptionMode =
123+
consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).ToString().Utf8Value();
124+
if (REGEX_SUBSCRIPTION_MODE.count(regexSubscriptionMode)) {
125+
pulsar_consumer_configuration_set_regex_subscription_mode(
126+
this->cConsumerConfig.get(), REGEX_SUBSCRIPTION_MODE.at(regexSubscriptionMode));
127+
}
128+
}
129+
114130
if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
115131
std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
116132
if (!consumerName.empty())

tests/consumer.test.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,56 @@ const Pulsar = require('../index.js');
197197
'Failed to receive message: TimeOut',
198198
);
199199
});
200+
201+
test('Regex subscription', async () => {
202+
const topicName1 = 'persistent://public/default/regex-sub-1';
203+
const topicName2 = 'persistent://public/default/regex-sub-2';
204+
const topicName3 = 'non-persistent://public/default/regex-sub-3';
205+
const topicName4 = 'persistent://public/default/no-match-regex-sub-2';
206+
const producer1 = await client.createProducer({
207+
topic: topicName1,
208+
});
209+
const producer2 = await client.createProducer({
210+
topic: topicName2,
211+
});
212+
const producer3 = await client.createProducer({
213+
topic: topicName3,
214+
});
215+
const producer4 = await client.createProducer({
216+
topic: topicName4,
217+
});
218+
219+
const consumer = await client.subscribe({
220+
topicsPattern: 'persistent://public/default/regex-sub.*',
221+
subscription: 'sub1',
222+
subscriptionType: 'Shared',
223+
regexSubscriptionMode: 'AllTopics',
224+
});
225+
226+
const num = 10;
227+
for (let i = 0; i < num; i += 1) {
228+
const msg = `my-message-${i}`;
229+
await producer1.send({ data: Buffer.from(msg) });
230+
await producer2.send({ data: Buffer.from(msg) });
231+
await producer3.send({ data: Buffer.from(msg) });
232+
await producer4.send({ data: Buffer.from(msg) });
233+
}
234+
const results = [];
235+
for (let i = 0; i < 3 * num; i += 1) {
236+
const msg = await consumer.receive();
237+
results.push(msg.getData().toString());
238+
}
239+
expect(results.length).toEqual(3 * num);
240+
// assert no more msgs.
241+
await expect(consumer.receive(1000)).rejects.toThrow(
242+
'Failed to receive message: TimeOut',
243+
);
244+
await producer1.close();
245+
await producer2.close();
246+
await producer3.close();
247+
await producer4.close();
248+
await consumer.close();
249+
});
200250
});
201251
});
202252
})();

0 commit comments

Comments
 (0)