diff --git a/index.d.ts b/index.d.ts index 270c5e6..32d1230 100644 --- a/index.d.ts +++ b/index.d.ts @@ -68,6 +68,7 @@ export interface ProducerConfig { schema?: SchemaInfo; accessMode?: ProducerAccessMode; batchingType?: ProducerBatchType; + messageRouter?: MessageRouter; } export class Producer { @@ -176,6 +177,26 @@ export class MessageId { toString(): string; } +/** + * Metadata for a topic that the MessageRouter can use. + */ +export interface TopicMetadata { + numPartitions: number; +} + +/** + * A custom message router interface that can be implemented by the user. + */ +export interface MessageRouter { + /** + * Choose a partition for the given message. + * @param message The message to be routed. + * @param topicMetadata Metadata for the topic. + * @returns The partition index to send the message to. + */ + getPartition(message: ProducerMessage, topicMetadata: TopicMetadata): number; +} + export interface SchemaInfo { schemaType: SchemaType; name?: string; diff --git a/src/Producer.cc b/src/Producer.cc index c827f9f..4382ed0 100644 --- a/src/Producer.cc +++ b/src/Producer.cc @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt auto instanceContext = static_cast(ctx); auto deferred = instanceContext->deferred; auto cClient = instanceContext->cClient; + auto producerConfig = instanceContext->producerConfig; delete instanceContext; if (result != pulsar_result_Ok) { @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt std::shared_ptr cProducer(rawProducer, pulsar_producer_free); - deferred->Resolve([cProducer](const Napi::Env env) { + deferred->Resolve([cProducer, producerConfig](const Napi::Env env) { Napi::Object obj = Producer::constructor.New({}); Producer *producer = Producer::Unwrap(obj); producer->SetCProducer(cProducer); + producer->producerConfig = producerConfig; return obj; }); }, @@ -107,6 +109,9 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) { auto cMessage = Message::BuildMessage(info[0].As()); auto deferred = ThreadSafeDeferred::New(Env()); auto ctx = new ProducerSendContext(deferred, cMessage); + + pulsar_message_set_property() + pulsar_producer_send_async( this->cProducer.get(), cMessage.get(), diff --git a/src/Producer.h b/src/Producer.h index 70c2342..c6abfda 100644 --- a/src/Producer.h +++ b/src/Producer.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include "ProducerConfig.h" class Producer : public Napi::ObjectWrap { public: @@ -35,6 +37,7 @@ class Producer : public Napi::ObjectWrap { private: std::shared_ptr cProducer; + std::shared_ptr producerConfig; Napi::Value Send(const Napi::CallbackInfo &info); Napi::Value Flush(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 2c704bf..ea3d618 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -18,6 +18,8 @@ */ #include "SchemaInfo.h" #include "ProducerConfig.h" +#include "Message.h" +#include #include #include "pulsar/ProducerConfiguration.h" @@ -42,6 +44,8 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction"; static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled"; static const std::string CFG_ACCESS_MODE = "accessMode"; static const std::string CFG_BATCHING_TYPE = "batchingType"; +static const std::string CFG_MESSAGE_ROUTER = "messageRouter"; +static const std::string CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD = "getPartition"; struct _pulsar_producer_configuration { pulsar::ProducerConfiguration conf; @@ -82,6 +86,46 @@ static std::map PRODUC {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, }; +struct MessageRouterContext { + Napi::ThreadSafeFunction jsRouterFunction; +}; + +static int messageRouterTrampoline(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata, + void *ctx) { + printf("test1"); + MessageRouterContext *context = static_cast(ctx); + int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata); + std::promise promise; + std::future future = promise.get_future(); + auto callback = [msg, numPartitions, &promise](Napi::Env env, Napi::Function jsCallback) { + printf("test2"); + Napi::Object jsMessage = Message::NewInstance(Napi::Object::New(env), + std::shared_ptr(msg, [](pulsar_message_t*){})); + Napi::Object jsTopicMetadata = Napi::Object::New(env); + jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions)); + try { + printf("test3"); + Napi::Value result = jsCallback.Call({jsMessage, jsTopicMetadata}); + if (result.IsNumber()) { + promise.set_value(result.As().Int32Value()); + } else { + promise.set_value(numPartitions); + } + } catch (const Napi::Error& e) { + fprintf(stderr, "Error in custom message router: %s\n", e.what()); + promise.set_value(numPartitions); + } + }; + + printf("test3 %d", numPartitions); + napi_status status = context->jsRouterFunction.BlockingCall(callback); + context->jsRouterFunction.Release(); + if (status != napi_ok) { + return numPartitions; + } + return future.get(); +} + ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -224,6 +268,39 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { if (PRODUCER_BATCHING_TYPE.count(batchingType)) { this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType)); } + + if (producerConfig.Has(CFG_MESSAGE_ROUTER)) { + Napi::Value routerValue = producerConfig.Get(CFG_MESSAGE_ROUTER); + Napi::Function routerFunc; + + // Case 1: User passed a function directly, e.g., messageRouter: (msg, meta) => 0 + if (routerValue.IsFunction()) { + routerFunc = routerValue.As(); + } + // Case 2: User passed an object, e.g., messageRouter: { getPartition: (msg, meta) => 0 } + else if (routerValue.IsObject()) { + Napi::Object jsRouter = routerValue.As(); + if (jsRouter.Has(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD) && + jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).IsFunction()) { + routerFunc = jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).As(); + } + } + + // If we found a valid function from either case, set it up. + if (routerFunc) { + auto context = new MessageRouterContext(); + context->jsRouterFunction = + Napi::ThreadSafeFunction::New(producerConfig.Env(), routerFunc, "MessageRouterCallback", 0, 1); + this->routerContext.reset(context); + pulsar_producer_configuration_set_message_router( + this->cProducerConfig.get(), messageRouterTrampoline, this->routerContext.get()); + } else { + Napi::TypeError::New(producerConfig.Env(), "The 'messageRouter' option must be a function, or an " + "object with a 'getPartition' method.") + .ThrowAsJavaScriptException(); + return; + } + } } ProducerConfig::~ProducerConfig() {} diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h index 3d49557..8dd2125 100644 --- a/src/ProducerConfig.h +++ b/src/ProducerConfig.h @@ -23,6 +23,8 @@ #include #include +struct MessageRouterContext; + class ProducerConfig { public: ProducerConfig(const Napi::Object &producerConfig); @@ -33,6 +35,7 @@ class ProducerConfig { private: std::shared_ptr cProducerConfig; std::string topic; + std::unique_ptr routerContext; }; #endif diff --git a/tests/producer.test.js b/tests/producer.test.js index e6908cb..9edc866 100644 --- a/tests/producer.test.js +++ b/tests/producer.test.js @@ -18,6 +18,9 @@ */ const Pulsar = require('../index'); +const httpRequest = require('./http_utils'); + +const adminUrl = 'http://localhost:8080'; (() => { describe('Producer', () => { @@ -27,6 +30,9 @@ const Pulsar = require('../index'); client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', operationTimeoutSeconds: 30, + log: (level, file, line, message) => { + console.log('[%s][%s:%d] %s', Pulsar.LogLevel.toString(level), file, line, message); + }, }); }); @@ -156,5 +162,76 @@ const Pulsar = require('../index'); await producer2.close(); }); }); + describe('Message Routing', () => { + test('Custom Message Router', async () => { + // 1. Define a partitioned topic and a custom router + const targetPartition = 1; + const partitionedTopicName = `test-custom-router-${Date.now()}`; + const partitionedTopic = `persistent://public/default/${partitionedTopicName}`; + const numPartitions = 10; + + // Use admin client to create a partitioned topic. This is more robust. + // Assuming 'adminUrl' and 'httpRequest' are available from your test setup. + const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`; + const createPartitionedTopicRes = await httpRequest( + partitionedTopicAdminURL, { + headers: { + 'Content-Type': 'application/json', // Use application/json for REST API + }, + data: numPartitions, + method: 'PUT', + }, + ); + // 204 No Content is success for PUT create + expect(createPartitionedTopicRes.statusCode).toBe(204); + + // 2. Create a producer with the custom message router + const producer = await client.createProducer({ + topic: partitionedTopic, // Note: For producer, use the base topic name + messageRouter: (message, topicMetadata) => { + console.log(`Custom router called. Total partitions: ${topicMetadata.numPartitions}, return partition: ${targetPartition}`); + // Always route to the target partition for this test + return targetPartition; + }, + messageRoutingMode: 'CustomPartition', + }); + + // 3. Create a single consumer for the entire partitioned topic + const consumer = await client.subscribe({ + topic: partitionedTopic, + subscription: 'test-sub', + subscriptionInitialPosition: 'Earliest', + }); + + // 4. Send 1000 messages in parallel for efficiency + console.log(`Sending messages to partitioned topic ${partitionedTopic}...`); + const numMessages = 1000; + for (let i = 0; i < numMessages; i += 1) { + console.log('before send message'); + const msg = `message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log('send message'); + } + console.log('Send done.'); + await producer.flush(); + console.log(`Sent ${numMessages} messages.`); + + // 5. Receive messages and assert they all come from the target partition + const receivedMessages = new Set(); + const expectedPartitionName = `${partitionedTopic}-partition-${targetPartition}`; + + for (let i = 0; i < numMessages; i += 1) { + const msg = await consumer.receive(10000); + expect(msg.getTopicName()).toBe(expectedPartitionName); + receivedMessages.add(msg.getData().toString()); + await consumer.acknowledge(msg); + } + // Final assertion to ensure all unique messages were received + expect(receivedMessages.size).toBe(numMessages); + console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`); + }, 30000); + }); }); })();