Skip to content

Commit 6457aef

Browse files
authored
feat: Support dead letter topic. (#335)
1 parent 1e51f5a commit 6457aef

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

index.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ export interface ConsumerConfig {
9797
schema?: SchemaInfo;
9898
batchIndexAckEnabled?: boolean;
9999
regexSubscriptionMode?: RegexSubscriptionMode;
100+
deadLetterPolicy?: DeadLetterPolicy;
100101
}
101102

102103
export class Consumer {
@@ -174,6 +175,12 @@ export interface SchemaInfo {
174175
properties?: Record<string, string>;
175176
}
176177

178+
export interface DeadLetterPolicy {
179+
deadLetterTopic: string;
180+
maxRedeliverCount?: number;
181+
initialSubscriptionName?: string;
182+
}
183+
177184
export class AuthenticationTls {
178185
constructor(params: { certificatePath: string, privateKeyPath: string });
179186
}

src/ConsumerConfig.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMes
4747
static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
4848
"autoAckOldestChunkedMessageOnQueueFull";
4949
static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
50+
static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
51+
static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
52+
static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount";
53+
static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName";
5054

5155
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
5256
{"Exclusive", pulsar_ConsumerExclusive},
@@ -239,6 +243,28 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
239243
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
240244
batchIndexAckEnabled);
241245
}
246+
247+
if (consumerConfig.Has(CFG_DEAD_LETTER_POLICY) && consumerConfig.Get(CFG_DEAD_LETTER_POLICY).IsObject()) {
248+
pulsar_consumer_config_dead_letter_policy_t dlq_policy{};
249+
Napi::Object dlqPolicyObject = consumerConfig.Get(CFG_DEAD_LETTER_POLICY).ToObject();
250+
std::string dlq_topic_str;
251+
std::string init_subscription_name;
252+
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_TOPIC) && dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).IsString()) {
253+
dlq_topic_str = dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).ToString().Utf8Value();
254+
dlq_policy.dead_letter_topic = dlq_topic_str.c_str();
255+
}
256+
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT) &&
257+
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).IsNumber()) {
258+
dlq_policy.max_redeliver_count =
259+
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).ToNumber().Int32Value();
260+
}
261+
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_INIT_SUB_NAME) &&
262+
dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).IsString()) {
263+
init_subscription_name = dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).ToString().Utf8Value();
264+
dlq_policy.initial_subscription_name = init_subscription_name.c_str();
265+
}
266+
pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy);
267+
}
242268
}
243269

244270
ConsumerConfig::~ConsumerConfig() {

tests/consumer.test.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,74 @@ const Pulsar = require('../index.js');
247247
await producer4.close();
248248
await consumer.close();
249249
});
250+
251+
test('Dead Letter topic', async () => {
252+
const topicName = 'test-dead_letter_topic';
253+
const dlqTopicName = 'test-dead_letter_topic_customize';
254+
const producer = await client.createProducer({
255+
topic: topicName,
256+
});
257+
258+
const maxRedeliverCountNum = 3;
259+
const consumer = await client.subscribe({
260+
topic: topicName,
261+
subscription: 'sub-1',
262+
subscriptionType: 'Shared',
263+
deadLetterPolicy: {
264+
deadLetterTopic: dlqTopicName,
265+
maxRedeliverCount: maxRedeliverCountNum,
266+
initialSubscriptionName: 'init-sub-1-dlq',
267+
},
268+
nAckRedeliverTimeoutMs: 50,
269+
});
270+
271+
// Send messages.
272+
const sendNum = 5;
273+
const messages = [];
274+
for (let i = 0; i < sendNum; i += 1) {
275+
const msg = `my-message-${i}`;
276+
await producer.send({ data: Buffer.from(msg) });
277+
messages.push(msg);
278+
}
279+
280+
// Redelivery all messages maxRedeliverCountNum time.
281+
let results = [];
282+
for (let i = 1; i <= maxRedeliverCountNum * sendNum + sendNum; i += 1) {
283+
const msg = await consumer.receive();
284+
results.push(msg);
285+
if (i % sendNum === 0) {
286+
results.forEach((message) => {
287+
console.log(`Redeliver message ${message.getData().toString()} ${i} times ${message.getRedeliveryCount()} redeliver Count`);
288+
consumer.negativeAcknowledge(message);
289+
});
290+
results = [];
291+
}
292+
}
293+
// assert no more msgs.
294+
await expect(consumer.receive(100)).rejects.toThrow(
295+
'Failed to receive message: TimeOut',
296+
);
297+
298+
const dlqConsumer = await client.subscribe({
299+
topic: dlqTopicName,
300+
subscription: 'sub-1',
301+
});
302+
const dlqResult = [];
303+
for (let i = 0; i < sendNum; i += 1) {
304+
const msg = await dlqConsumer.receive();
305+
dlqResult.push(msg.getData().toString());
306+
}
307+
expect(dlqResult).toEqual(messages);
308+
309+
// assert no more msgs.
310+
await expect(dlqConsumer.receive(500)).rejects.toThrow(
311+
'Failed to receive message: TimeOut',
312+
);
313+
314+
producer.close();
315+
consumer.close();
316+
dlqConsumer.close();
317+
});
250318
});
251319
});
252320
})();

0 commit comments

Comments
 (0)