Skip to content

Commit 258241c

Browse files
authored
feat: Support batch receive for consumer. (#357)
1 parent b8e6276 commit 258241c

File tree

8 files changed

+237
-32
lines changed

8 files changed

+237
-32
lines changed

index.d.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ export interface ConsumerConfig {
9898
batchIndexAckEnabled?: boolean;
9999
regexSubscriptionMode?: RegexSubscriptionMode;
100100
deadLetterPolicy?: DeadLetterPolicy;
101+
batchReceivePolicy?: ConsumerBatchReceivePolicy;
101102
}
102103

103104
export class Consumer {
104105
receive(timeout?: number): Promise<Message>;
106+
batchReceive(): Promise<Message []>;
105107
acknowledge(message: Message): Promise<null>;
106108
acknowledgeId(messageId: MessageId): Promise<null>;
107109
negativeAcknowledge(message: Message): void;
@@ -181,6 +183,12 @@ export interface DeadLetterPolicy {
181183
initialSubscriptionName?: string;
182184
}
183185

186+
export interface ConsumerBatchReceivePolicy {
187+
maxNumMessages?: number;
188+
maxNumBytes?: number;
189+
timeoutMs?: number;
190+
}
191+
184192
export class AuthenticationTls {
185193
constructor(params: { certificatePath: string, privateKeyPath: string });
186194
}

src/Consumer.cc

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
3838
DefineClass(env, "Consumer",
3939
{
4040
InstanceMethod("receive", &Consumer::Receive),
41+
InstanceMethod("batchReceive", &Consumer::BatchReceive),
4142
InstanceMethod("acknowledge", &Consumer::Acknowledge),
4243
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
4344
InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
@@ -192,36 +193,17 @@ struct ConsumerNewInstanceContext {
192193
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
193194
auto deferred = ThreadSafeDeferred::New(info.Env());
194195
auto config = info[0].As<Napi::Object>();
195-
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
196+
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();
197+
198+
consumerConfig->InitConfig(deferred, config, &MessageListener);
199+
if (deferred->IsDone()) {
200+
return deferred->Promise();
201+
}
196202

197203
const std::string &topic = consumerConfig->GetTopic();
198204
const std::vector<std::string> &topics = consumerConfig->GetTopics();
199205
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
200-
if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
201-
deferred->Reject(
202-
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
203-
"creating consumer"));
204-
return deferred->Promise();
205-
}
206206
const std::string &subscription = consumerConfig->GetSubscription();
207-
if (subscription.empty()) {
208-
deferred->Reject(
209-
std::string("Subscription is required and must be specified as a string when creating consumer"));
210-
return deferred->Promise();
211-
}
212-
int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
213-
if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
214-
std::string msg("Ack timeout should be 0 or greater than or equal to " +
215-
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
216-
deferred->Reject(msg);
217-
return deferred->Promise();
218-
}
219-
int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
220-
if (nAckRedeliverTimeoutMs < 0) {
221-
std::string msg("NAck timeout should be greater than or equal to zero");
222-
deferred->Reject(msg);
223-
return deferred->Promise();
224-
}
225207

226208
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);
227209

@@ -291,6 +273,39 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
291273
int64_t timeout;
292274
};
293275

276+
Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
277+
auto deferred = ThreadSafeDeferred::New(Env());
278+
auto ctx = new ExtDeferredContext(deferred);
279+
pulsar_consumer_batch_receive_async(
280+
this->cConsumer.get(),
281+
[](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
282+
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
283+
auto deferred = deferredContext->deferred;
284+
delete deferredContext;
285+
286+
if (result != pulsar_result_Ok) {
287+
deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
288+
} else {
289+
deferred->Resolve([rawMessages](const Napi::Env env) {
290+
int listSize = pulsar_messages_size(rawMessages);
291+
Napi::Array jsArray = Napi::Array::New(env, listSize);
292+
for (int i = 0; i < listSize; i++) {
293+
pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
294+
pulsar_message_t *message = pulsar_message_create();
295+
pulsar_message_copy(rawMessage, message);
296+
Napi::Object obj =
297+
Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
298+
jsArray.Set(i, obj);
299+
}
300+
pulsar_messages_free(rawMessages);
301+
return jsArray;
302+
});
303+
}
304+
},
305+
ctx);
306+
return deferred->Promise();
307+
}
308+
294309
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
295310
if (info[0].IsUndefined()) {
296311
auto deferred = ThreadSafeDeferred::New(Env());

src/Consumer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
4444
MessageListenerCallback *listener;
4545

4646
Napi::Value Receive(const Napi::CallbackInfo &info);
47+
Napi::Value BatchReceive(const Napi::CallbackInfo &info);
4748
Napi::Value Acknowledge(const Napi::CallbackInfo &info);
4849
Napi::Value AcknowledgeId(const Napi::CallbackInfo &info);
4950
void NegativeAcknowledge(const Napi::CallbackInfo &info);

src/ConsumerConfig.cc

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
5151
static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
5252
static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount";
5353
static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName";
54+
static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy";
55+
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages";
56+
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes";
57+
static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs";
5458

5559
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
5660
{"Exclusive", pulsar_ConsumerExclusive},
@@ -74,7 +78,7 @@ static const std::map<std::string, pulsar_consumer_crypto_failure_action> CONSUM
7478

7579
void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; }
7680

77-
ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener)
81+
ConsumerConfig::ConsumerConfig()
7882
: topic(""),
7983
topicsPattern(""),
8084
subscription(""),
@@ -83,7 +87,10 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
8387
listener(nullptr) {
8488
this->cConsumerConfig = std::shared_ptr<pulsar_consumer_configuration_t>(
8589
pulsar_consumer_configuration_create(), pulsar_consumer_configuration_free);
90+
}
8691

92+
void ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
93+
const Napi::Object &consumerConfig, pulsar_message_listener messageListener) {
8794
if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
8895
this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
8996
}
@@ -101,9 +108,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
101108
this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value();
102109
}
103110

111+
if (this->topic.empty() && this->topics.size() == 0 && this->topicsPattern.empty()) {
112+
deferred->Reject(
113+
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
114+
"creating consumer"));
115+
return;
116+
}
117+
104118
if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) {
105119
this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value();
106120
}
121+
if (subscription.empty()) {
122+
deferred->Reject(
123+
std::string("Subscription is required and must be specified as a string when creating consumer"));
124+
return;
125+
}
107126

108127
if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) {
109128
std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value();
@@ -139,18 +158,25 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
139158

140159
if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) {
141160
this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value();
142-
if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) {
143-
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
161+
if (this->ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
162+
std::string msg("Ack timeout should be 0 or greater than or equal to " +
163+
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
164+
deferred->Reject(msg);
165+
return;
144166
}
167+
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
145168
}
146169

147170
if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) &&
148171
consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) {
149172
this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value();
150-
if (this->nAckRedeliverTimeoutMs >= 0) {
151-
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
152-
this->nAckRedeliverTimeoutMs);
173+
if (nAckRedeliverTimeoutMs < 0) {
174+
std::string msg("NAck timeout should be greater than or equal to zero");
175+
deferred->Reject(msg);
176+
return;
153177
}
178+
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
179+
this->nAckRedeliverTimeoutMs);
154180
}
155181

156182
if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
@@ -265,6 +291,39 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
265291
}
266292
pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy);
267293
}
294+
295+
if (consumerConfig.Has(CFG_BATCH_RECEIVE_POLICY) &&
296+
consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).IsObject()) {
297+
Napi::Object propObj = consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).ToObject();
298+
int maxNumMessages = -1;
299+
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES) &&
300+
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).IsNumber()) {
301+
maxNumMessages = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).ToNumber().Int32Value();
302+
}
303+
int maxNumBytes = 10 * 1024 * 1024;
304+
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES) &&
305+
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).IsNumber()) {
306+
maxNumBytes = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).ToNumber().Int64Value();
307+
}
308+
int timeoutMs = 100;
309+
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS) &&
310+
propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).IsNumber()) {
311+
timeoutMs = propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).ToNumber().Int64Value();
312+
}
313+
if (maxNumMessages <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
314+
std::string msg("At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
315+
deferred->Reject(msg);
316+
return;
317+
}
318+
pulsar_consumer_batch_receive_policy_t batch_receive_policy{maxNumMessages, maxNumBytes, timeoutMs};
319+
int result = pulsar_consumer_configuration_set_batch_receive_policy(this->cConsumerConfig.get(),
320+
&batch_receive_policy);
321+
if (result == -1) {
322+
std::string msg("Set batch receive policy failed: C client returned failure");
323+
deferred->Reject(msg);
324+
return;
325+
}
326+
}
268327
}
269328

270329
ConsumerConfig::~ConsumerConfig() {

src/ConsumerConfig.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
#define CONSUMER_CONFIG_H
2222

2323
#include <pulsar/c/consumer_configuration.h>
24+
#include "ThreadSafeDeferred.h"
2425
#include "MessageListener.h"
2526

2627
#define MIN_ACK_TIMEOUT_MILLIS 10000
2728

2829
class ConsumerConfig {
2930
public:
30-
ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener);
31+
ConsumerConfig();
3132
~ConsumerConfig();
33+
void InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred, const Napi::Object &consumerConfig,
34+
pulsar_message_listener messageListener);
3235
std::shared_ptr<pulsar_consumer_configuration_t> GetCConsumerConfig();
3336
std::string GetTopic();
3437
std::vector<std::string> GetTopics();

src/ThreadSafeDeferred.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,7 @@ void ThreadSafeDeferred::Reject(const std::string &errorMsg) {
9696
this->fate = EFate::REJECTED;
9797
this->tsf.Release();
9898
}
99+
100+
bool ThreadSafeDeferred::IsDone() const {
101+
return this->fate == EFate::RESOLVED || this->fate == EFate::REJECTED;
102+
}

src/ThreadSafeDeferred.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class ThreadSafeDeferred : public Napi::Promise::Deferred {
7373
inline void Reject() { this->Reject(""); }
7474
void Reject(
7575
const std::string &); // <- if only Reject were virtual... But we can live without polymorphism here
76+
bool IsDone() const;
7677

7778
static std::shared_ptr<ThreadSafeDeferred> New(const Napi::Env env);
7879
};

0 commit comments

Comments
 (0)