Skip to content

Commit 009fbb3

Browse files
committed
Initial commit of shibd#44
1 parent 05ce5a1 commit 009fbb3

File tree

6 files changed

+187
-1
lines changed

6 files changed

+187
-1
lines changed

index.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export interface ProducerConfig {
6868
schema?: SchemaInfo;
6969
accessMode?: ProducerAccessMode;
7070
batchingType?: ProducerBatchType;
71+
messageRouter?: MessageRouter;
7172
}
7273

7374
export class Producer {
@@ -176,6 +177,26 @@ export class MessageId {
176177
toString(): string;
177178
}
178179

180+
/**
181+
* Metadata for a topic that the MessageRouter can use.
182+
*/
183+
export interface TopicMetadata {
184+
numPartitions: number;
185+
}
186+
187+
/**
188+
* A custom message router interface that can be implemented by the user.
189+
*/
190+
export interface MessageRouter {
191+
/**
192+
* Choose a partition for the given message.
193+
* @param message The message to be routed.
194+
* @param topicMetadata Metadata for the topic.
195+
* @returns The partition index to send the message to.
196+
*/
197+
getPartition(message: ProducerMessage, topicMetadata: TopicMetadata): number;
198+
}
199+
179200
export interface SchemaInfo {
180201
schemaType: SchemaType;
181202
name?: string;

src/Producer.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
7373
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
7474
auto deferred = instanceContext->deferred;
7575
auto cClient = instanceContext->cClient;
76+
auto producerConfig = instanceContext->producerConfig;
7677
delete instanceContext;
7778

7879
if (result != pulsar_result_Ok) {
@@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
8182

8283
std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);
8384

84-
deferred->Resolve([cProducer](const Napi::Env env) {
85+
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
8586
Napi::Object obj = Producer::constructor.New({});
8687
Producer *producer = Producer::Unwrap(obj);
8788
producer->SetCProducer(cProducer);
89+
producer->producerConfig = producerConfig;
8890
return obj;
8991
});
9092
},
@@ -107,6 +109,9 @@ Napi::Value Producer::Send(const Napi::CallbackInfo &info) {
107109
auto cMessage = Message::BuildMessage(info[0].As<Napi::Object>());
108110
auto deferred = ThreadSafeDeferred::New(Env());
109111
auto ctx = new ProducerSendContext(deferred, cMessage);
112+
113+
pulsar_message_set_property()
114+
110115

111116
pulsar_producer_send_async(
112117
this->cProducer.get(), cMessage.get(),

src/Producer.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <napi.h>
2424
#include <pulsar/c/client.h>
2525
#include <pulsar/c/producer.h>
26+
#include <memory>
27+
#include "ProducerConfig.h"
2628

2729
class Producer : public Napi::ObjectWrap<Producer> {
2830
public:
@@ -35,6 +37,7 @@ class Producer : public Napi::ObjectWrap<Producer> {
3537

3638
private:
3739
std::shared_ptr<pulsar_producer_t> cProducer;
40+
std::shared_ptr<ProducerConfig> producerConfig;
3841
Napi::Value Send(const Napi::CallbackInfo &info);
3942
Napi::Value Flush(const Napi::CallbackInfo &info);
4043
Napi::Value Close(const Napi::CallbackInfo &info);

src/ProducerConfig.cc

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "SchemaInfo.h"
2020
#include "ProducerConfig.h"
21+
#include "Message.h"
22+
#include <future>
2123
#include <map>
2224
#include "pulsar/ProducerConfiguration.h"
2325

@@ -42,6 +44,8 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
4244
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
4345
static const std::string CFG_ACCESS_MODE = "accessMode";
4446
static const std::string CFG_BATCHING_TYPE = "batchingType";
47+
static const std::string CFG_MESSAGE_ROUTER = "messageRouter";
48+
static const std::string CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD = "getPartition";
4549

4650
struct _pulsar_producer_configuration {
4751
pulsar::ProducerConfiguration conf;
@@ -82,6 +86,46 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
8286
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
8387
};
8488

89+
struct MessageRouterContext {
90+
Napi::ThreadSafeFunction jsRouterFunction;
91+
};
92+
93+
static int messageRouterTrampoline(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
94+
void *ctx) {
95+
printf("test1");
96+
MessageRouterContext *context = static_cast<MessageRouterContext *>(ctx);
97+
int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata);
98+
std::promise<int> promise;
99+
std::future<int> future = promise.get_future();
100+
auto callback = [msg, numPartitions, &promise](Napi::Env env, Napi::Function jsCallback) {
101+
printf("test2");
102+
Napi::Object jsMessage = Message::NewInstance(Napi::Object::New(env),
103+
std::shared_ptr<pulsar_message_t>(msg, [](pulsar_message_t*){}));
104+
Napi::Object jsTopicMetadata = Napi::Object::New(env);
105+
jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));
106+
try {
107+
printf("test3");
108+
Napi::Value result = jsCallback.Call({jsMessage, jsTopicMetadata});
109+
if (result.IsNumber()) {
110+
promise.set_value(result.As<Napi::Number>().Int32Value());
111+
} else {
112+
promise.set_value(numPartitions);
113+
}
114+
} catch (const Napi::Error& e) {
115+
fprintf(stderr, "Error in custom message router: %s\n", e.what());
116+
promise.set_value(numPartitions);
117+
}
118+
};
119+
120+
printf("test3 %d", numPartitions);
121+
napi_status status = context->jsRouterFunction.BlockingCall(callback);
122+
context->jsRouterFunction.Release();
123+
if (status != napi_ok) {
124+
return numPartitions;
125+
}
126+
return future.get();
127+
}
128+
85129
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
86130
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
87131
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
@@ -224,6 +268,39 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
224268
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
225269
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
226270
}
271+
272+
if (producerConfig.Has(CFG_MESSAGE_ROUTER)) {
273+
Napi::Value routerValue = producerConfig.Get(CFG_MESSAGE_ROUTER);
274+
Napi::Function routerFunc;
275+
276+
// Case 1: User passed a function directly, e.g., messageRouter: (msg, meta) => 0
277+
if (routerValue.IsFunction()) {
278+
routerFunc = routerValue.As<Napi::Function>();
279+
}
280+
// Case 2: User passed an object, e.g., messageRouter: { getPartition: (msg, meta) => 0 }
281+
else if (routerValue.IsObject()) {
282+
Napi::Object jsRouter = routerValue.As<Napi::Object>();
283+
if (jsRouter.Has(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD) &&
284+
jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).IsFunction()) {
285+
routerFunc = jsRouter.Get(CFG_MESSAGE_ROUTER_GET_PARTITION_METHOD).As<Napi::Function>();
286+
}
287+
}
288+
289+
// If we found a valid function from either case, set it up.
290+
if (routerFunc) {
291+
auto context = new MessageRouterContext();
292+
context->jsRouterFunction =
293+
Napi::ThreadSafeFunction::New(producerConfig.Env(), routerFunc, "MessageRouterCallback", 0, 1);
294+
this->routerContext.reset(context);
295+
pulsar_producer_configuration_set_message_router(
296+
this->cProducerConfig.get(), messageRouterTrampoline, this->routerContext.get());
297+
} else {
298+
Napi::TypeError::New(producerConfig.Env(), "The 'messageRouter' option must be a function, or an "
299+
"object with a 'getPartition' method.")
300+
.ThrowAsJavaScriptException();
301+
return;
302+
}
303+
}
227304
}
228305

229306
ProducerConfig::~ProducerConfig() {}

src/ProducerConfig.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <napi.h>
2424
#include <pulsar/c/producer_configuration.h>
2525

26+
struct MessageRouterContext;
27+
2628
class ProducerConfig {
2729
public:
2830
ProducerConfig(const Napi::Object &producerConfig);
@@ -33,6 +35,7 @@ class ProducerConfig {
3335
private:
3436
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
3537
std::string topic;
38+
std::unique_ptr<MessageRouterContext> routerContext;
3639
};
3740

3841
#endif

tests/producer.test.js

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919

2020
const Pulsar = require('../index');
21+
const httpRequest = require('./http_utils');
22+
23+
const adminUrl = 'http://localhost:8080';
2124

2225
(() => {
2326
describe('Producer', () => {
@@ -27,6 +30,9 @@ const Pulsar = require('../index');
2730
client = new Pulsar.Client({
2831
serviceUrl: 'pulsar://localhost:6650',
2932
operationTimeoutSeconds: 30,
33+
log: (level, file, line, message) => {
34+
console.log('[%s][%s:%d] %s', Pulsar.LogLevel.toString(level), file, line, message);
35+
},
3036
});
3137
});
3238

@@ -156,5 +162,76 @@ const Pulsar = require('../index');
156162
await producer2.close();
157163
});
158164
});
165+
describe('Message Routing', () => {
166+
test('Custom Message Router', async () => {
167+
// 1. Define a partitioned topic and a custom router
168+
const targetPartition = 1;
169+
const partitionedTopicName = `test-custom-router-${Date.now()}`;
170+
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
171+
const numPartitions = 10;
172+
173+
// Use admin client to create a partitioned topic. This is more robust.
174+
// Assuming 'adminUrl' and 'httpRequest' are available from your test setup.
175+
const partitionedTopicAdminURL = `${adminUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
176+
const createPartitionedTopicRes = await httpRequest(
177+
partitionedTopicAdminURL, {
178+
headers: {
179+
'Content-Type': 'application/json', // Use application/json for REST API
180+
},
181+
data: numPartitions,
182+
method: 'PUT',
183+
},
184+
);
185+
// 204 No Content is success for PUT create
186+
expect(createPartitionedTopicRes.statusCode).toBe(204);
187+
188+
// 2. Create a producer with the custom message router
189+
const producer = await client.createProducer({
190+
topic: partitionedTopic, // Note: For producer, use the base topic name
191+
messageRouter: (message, topicMetadata) => {
192+
console.log(`Custom router called. Total partitions: ${topicMetadata.numPartitions}, return partition: ${targetPartition}`);
193+
// Always route to the target partition for this test
194+
return targetPartition;
195+
},
196+
messageRoutingMode: 'CustomPartition',
197+
});
198+
199+
// 3. Create a single consumer for the entire partitioned topic
200+
const consumer = await client.subscribe({
201+
topic: partitionedTopic,
202+
subscription: 'test-sub',
203+
subscriptionInitialPosition: 'Earliest',
204+
});
205+
206+
// 4. Send 1000 messages in parallel for efficiency
207+
console.log(`Sending messages to partitioned topic ${partitionedTopic}...`);
208+
const numMessages = 1000;
209+
for (let i = 0; i < numMessages; i += 1) {
210+
console.log('before send message');
211+
const msg = `message-${i}`;
212+
producer.send({
213+
data: Buffer.from(msg),
214+
});
215+
console.log('send message');
216+
}
217+
console.log('Send done.');
218+
await producer.flush();
219+
console.log(`Sent ${numMessages} messages.`);
220+
221+
// 5. Receive messages and assert they all come from the target partition
222+
const receivedMessages = new Set();
223+
const expectedPartitionName = `${partitionedTopic}-partition-${targetPartition}`;
224+
225+
for (let i = 0; i < numMessages; i += 1) {
226+
const msg = await consumer.receive(10000);
227+
expect(msg.getTopicName()).toBe(expectedPartitionName);
228+
receivedMessages.add(msg.getData().toString());
229+
await consumer.acknowledge(msg);
230+
}
231+
// Final assertion to ensure all unique messages were received
232+
expect(receivedMessages.size).toBe(numMessages);
233+
console.log(`Successfully received and verified ${receivedMessages.size} messages from ${expectedPartitionName}.`);
234+
}, 30000);
235+
});
159236
});
160237
})();

0 commit comments

Comments
 (0)