Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ProducerConfig {
schema?: SchemaInfo;
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
}

export class Producer {
Expand Down Expand Up @@ -176,6 +177,12 @@ export class MessageId {
toString(): string;
}

export interface TopicMetadata {
numPartitions: number;
}

export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number;

export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
Expand Down
4 changes: 3 additions & 1 deletion src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto producerConfig = instanceContext->producerConfig;
delete instanceContext;

if (result != pulsar_result_Ok) {
Expand All @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt

std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);

deferred->Resolve([cProducer](const Napi::Env env) {
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
producer->producerConfig = producerConfig;
return obj;
});
},
Expand Down
5 changes: 5 additions & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include <pulsar/c/producer.h>
#include <memory>
#include "ProducerConfig.h"

class Producer : public Napi::ObjectWrap<Producer> {
public:
Expand All @@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap<Producer> {

private:
std::shared_ptr<pulsar_producer_t> cProducer;
// Extend the lifetime of the producer config since it's env and router function could be used when sending
// messages
std::shared_ptr<ProducerConfig> producerConfig;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Expand Down
38 changes: 38 additions & 0 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
*/
#include "SchemaInfo.h"
#include "ProducerConfig.h"
#include "Message.h"
#include <cstdio>
#include <map>
#include "napi-inl.h"
#include "napi.h"
#include "pulsar/ProducerConfiguration.h"
#include "pulsar/c/message.h"
#include "pulsar/c/message_router.h"

static const std::string CFG_TOPIC = "topic";
static const std::string CFG_PRODUCER_NAME = "producerName";
Expand All @@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
static const std::string CFG_ACCESS_MODE = "accessMode";
static const std::string CFG_BATCHING_TYPE = "batchingType";
static const std::string CFG_MESSAGE_ROUTER = "messageRouter";

struct _pulsar_producer_configuration {
pulsar::ProducerConfiguration conf;
Expand Down Expand Up @@ -82,6 +89,25 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
};

static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metadata, void* ctx) {
auto router = static_cast<Napi::FunctionReference*>(ctx);
const auto& env = router->Env();
auto jsMessage = Message::NewInstance(Napi::Object::New(env),
std::shared_ptr<pulsar_message_t>(msg, [](pulsar_message_t*) {}));
int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata);

Napi::Object jsTopicMetadata = Napi::Object::New(env);
jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));

try {
return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value();
} catch (const Napi::Error& e) {
// TODO: how to handle the error properly? For now, return an invalid partition to fail the send
fprintf(stderr, "Error when calling messageRouter: %s\n", e.what());
return numPartitions;
}
}

ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
Expand Down Expand Up @@ -224,6 +250,18 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
}

if (producerConfig.Has(CFG_MESSAGE_ROUTER)) {
auto value = producerConfig.Get(CFG_MESSAGE_ROUTER);
if (value.IsFunction()) {
messageRouter = Napi::Persistent(value.As<Napi::Function>());
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition,
&messageRouter);
} else {
Napi::TypeError::New(producerConfig.Env(), "messageRouter should be a function")
.ThrowAsJavaScriptException();
}
}
}

ProducerConfig::~ProducerConfig() {}
Expand Down
7 changes: 7 additions & 0 deletions src/ProducerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

#include <napi.h>
#include <pulsar/c/producer_configuration.h>
#include <memory>

struct MessageRouterContext {
Napi::FunctionReference messageRouter;
};

class ProducerConfig {
public:
Expand All @@ -33,6 +38,8 @@ class ProducerConfig {
private:
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
std::string topic;
std::unique_ptr<MessageRouterContext> routerContext;
Napi::FunctionReference messageRouter;
};

#endif
16 changes: 15 additions & 1 deletion tests/http_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new Promise((resolve, r
req.end();
});

module.exports = request;
function createPartitionedTopic(topic, numPartitions) {
const url = `http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`;
return request(url, {
headers: {
'Content-Type': 'application/json',
},
data: numPartitions,
method: 'PUT',
});
}

module.exports = {
createPartitionedTopic,
request,
};
55 changes: 55 additions & 0 deletions tests/producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

const Pulsar = require('../index');
const httpUtils = require('./http_utils');

(() => {
describe('Producer', () => {
Expand Down Expand Up @@ -156,5 +157,59 @@
await producer2.close();
});
});
describe('Message Routing', () => {
test('Custom Message Router', async () => {
const topic = `test-custom-router-${Date.now()}`;
const numPartitions = 3;
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
expect(response.statusCode).toBe(204);

const producer = await client.createProducer({
topic,
batchingMaxMessages: 2,
messageRouter: (message, topicMetadata) => parseInt(message.getPartitionKey(), 10)
% topicMetadata.numPartitions,
messageRoutingMode: 'CustomPartition',
});

const promises = [];
const numMessages = 5;
for (let i = 0; i < numMessages; i += 1) {
const sendPromise = producer.send({
partitionKey: `${i}`,
data: Buffer.from(`msg-${i}`),
});
await sendPromise;
promises.push(sendPromise);
}
try {
const allMsgIds = await Promise.all(promises);
console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`);
for (let i = 0; i < allMsgIds.length; i += 1) {
// The message id string is in the format of "entryId,ledgerId,partition,batchIndex"
const partition = Number(allMsgIds[i].toString().split(',')[2]);
expect(i % numPartitions).toBe(partition);
}
} catch (error) {
console.error('One or more messages failed to send:', error);
}
}, 30000);
test('Exception in router', async () => {
const topic = `test-exception-in-router-${Date.now()}`;
const numPartitions = 2;
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
expect(response.statusCode).toBe(204);
const producer = await client.createProducer({
topic,
messageRouter: (message, topicMetadata) => {

Check warning on line 204 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'topicMetadata' is defined but never used

Check warning on line 204 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('Custom error in message router');
},
messageRoutingMode: 'CustomPartition',
});
await expect(
producer.send({ data: Buffer.from('test') }),
).rejects.toThrow('Failed to send message: UnknownError');
}, 30000);
});
});
})();
4 changes: 2 additions & 2 deletions tests/reader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

const lodash = require('lodash');
const Pulsar = require('../index');
const httpRequest = require('./http_utils');
const httpUtils = require('./http_utils');

const baseUrl = 'http://localhost:8080';

Expand Down Expand Up @@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080';
const partitionedTopicName = 'test-reader-partitioned-topic';
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
const createPartitionedTopicRes = await httpRequest(
const createPartitionedTopicRes = await httpUtils.request(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
Expand Down
Loading