Skip to content

Commit 12a03ce

Browse files
authored
feat: support producer access mode. (apache#331)
1 parent 1380086 commit 12a03ce

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

index.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export interface ProducerConfig {
6363
cryptoFailureAction?: ProducerCryptoFailureAction;
6464
chunkingEnabled?: boolean;
6565
schema?: SchemaInfo;
66+
accessMode?: ProducerAccessMode;
6667
}
6768

6869
export class Producer {
@@ -267,3 +268,9 @@ export type SchemaType =
267268
'Bytes' |
268269
'AutoConsume' |
269270
'AutoPublish';
271+
272+
export type ProducerAccessMode =
273+
'Shared' |
274+
'Exclusive' |
275+
'WaitForExclusive' |
276+
'ExclusiveWithFencing';

src/ProducerConfig.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
3939
static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
4040
static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
4141
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
42+
static const std::string CFG_ACCESS_MODE = "accessMode";
4243

4344
static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = {
4445
{"UseSinglePartition", pulsar_UseSinglePartition},
@@ -63,6 +64,13 @@ static std::map<std::string, pulsar_producer_crypto_failure_action> PRODUCER_CRY
6364
{"SEND", pulsar_ProducerSend},
6465
};
6566

67+
static std::map<std::string, pulsar_producer_access_mode> PRODUCER_ACCESS_MODE = {
68+
{"Shared", pulsar_ProducerAccessModeShared},
69+
{"Exclusive", pulsar_ProducerAccessModeExclusive},
70+
{"WaitForExclusive", pulsar_ProducerAccessModeWaitForExclusive},
71+
{"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing},
72+
};
73+
6674
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
6775
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
6876
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
@@ -194,6 +202,12 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
194202
bool chunkingEnabled = producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
195203
pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(), chunkingEnabled);
196204
}
205+
206+
std::string accessMode = producerConfig.Get(CFG_ACCESS_MODE).ToString().Utf8Value();
207+
if (PRODUCER_ACCESS_MODE.count(accessMode)) {
208+
pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(),
209+
PRODUCER_ACCESS_MODE.at(accessMode));
210+
}
197211
}
198212

199213
ProducerConfig::~ProducerConfig() {}

tests/producer.test.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,67 @@ const Pulsar = require('../index.js');
9494
await producer.close();
9595
});
9696
});
97+
describe('Access Mode', () => {
98+
test('Exclusive', async () => {
99+
const topicName = 'test-access-mode-exclusive';
100+
const producer1 = await client.createProducer({
101+
topic: topicName,
102+
producerName: 'p-1',
103+
accessMode: 'Exclusive',
104+
});
105+
expect(producer1.getProducerName()).toBe('p-1');
106+
107+
await expect(client.createProducer({
108+
topic: topicName,
109+
producerName: 'p-2',
110+
accessMode: 'Exclusive',
111+
})).rejects.toThrow('Failed to create producer: ResultProducerFenced');
112+
113+
await producer1.close();
114+
});
115+
116+
test('WaitForExclusive', async () => {
117+
const topicName = 'test-access-mode-wait-for-exclusive';
118+
const producer1 = await client.createProducer({
119+
topic: topicName,
120+
producerName: 'p-1',
121+
accessMode: 'Exclusive',
122+
});
123+
expect(producer1.getProducerName()).toBe('p-1');
124+
// async close producer1
125+
producer1.close();
126+
// when p1 close, p2 success created.
127+
const producer2 = await client.createProducer({
128+
topic: topicName,
129+
producerName: 'p-2',
130+
accessMode: 'WaitForExclusive',
131+
});
132+
expect(producer2.getProducerName()).toBe('p-2');
133+
await producer2.close();
134+
});
135+
136+
test('ExclusiveWithFencing', async () => {
137+
const topicName = 'test-access-mode';
138+
const producer1 = await client.createProducer({
139+
topic: topicName,
140+
producerName: 'p-1',
141+
accessMode: 'Exclusive',
142+
});
143+
expect(producer1.getProducerName()).toBe('p-1');
144+
const producer2 = await client.createProducer({
145+
topic: topicName,
146+
producerName: 'p-2',
147+
accessMode: 'ExclusiveWithFencing',
148+
});
149+
expect(producer2.getProducerName()).toBe('p-2');
150+
// producer1 will be fenced.
151+
await expect(
152+
producer1.send({
153+
data: Buffer.from('test-msg'),
154+
}),
155+
).rejects.toThrow('Failed to send message: ResultProducerFenced');
156+
await producer2.close();
157+
});
158+
});
97159
});
98160
})();

0 commit comments

Comments
 (0)