Skip to content

Commit 5a27ba3

Browse files
authored
[feat] Support WaitForExclusive producer access mode. (#109)
1 parent 1f7fdb8 commit 5a27ba3

File tree

7 files changed

+82
-19
lines changed

7 files changed

+82
-19
lines changed

include/pulsar/ProducerConfiguration.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ class PULSAR_PUBLIC ProducerConfiguration {
8989
/**
9090
* Require exclusive access for producer. Fail immediately if there's already a producer connected.
9191
*/
92-
Exclusive = 1
92+
Exclusive = 1,
93+
94+
/**
95+
* Producer creation is pending until it can acquire exclusive access.
96+
*/
97+
WaitForExclusive = 2
9398
};
9499

95100
ProducerConfiguration();

lib/ClientConnection.cc

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,22 +1063,29 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
10631063
PendingRequestsMap::iterator it = pendingRequests_.find(producerSuccess.request_id());
10641064
if (it != pendingRequests_.end()) {
10651065
PendingRequestData requestData = it->second;
1066-
pendingRequests_.erase(it);
1067-
lock.unlock();
1068-
1069-
ResponseData data;
1070-
data.producerName = producerSuccess.producer_name();
1071-
data.lastSequenceId = producerSuccess.last_sequence_id();
1072-
if (producerSuccess.has_schema_version()) {
1073-
data.schemaVersion = producerSuccess.schema_version();
1074-
}
1075-
if (producerSuccess.has_topic_epoch()) {
1076-
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
1066+
if (!producerSuccess.producer_ready()) {
1067+
LOG_INFO(cnxString_ << " Producer " << producerSuccess.producer_name()
1068+
<< " has been queued up at broker. req_id: "
1069+
<< producerSuccess.request_id());
1070+
requestData.hasGotResponse->store(true);
1071+
lock.unlock();
10771072
} else {
1078-
data.topicEpoch = Optional<uint64_t>::empty();
1073+
pendingRequests_.erase(it);
1074+
lock.unlock();
1075+
ResponseData data;
1076+
data.producerName = producerSuccess.producer_name();
1077+
data.lastSequenceId = producerSuccess.last_sequence_id();
1078+
if (producerSuccess.has_schema_version()) {
1079+
data.schemaVersion = producerSuccess.schema_version();
1080+
}
1081+
if (producerSuccess.has_topic_epoch()) {
1082+
data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
1083+
} else {
1084+
data.topicEpoch = Optional<uint64_t>::empty();
1085+
}
1086+
requestData.promise.setValue(data);
1087+
requestData.timer->cancel();
10791088
}
1080-
requestData.promise.setValue(data);
1081-
requestData.timer->cancel();
10821089
}
10831090
break;
10841091
}
@@ -1481,7 +1488,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
14811488

14821489
void ClientConnection::handleRequestTimeout(const boost::system::error_code& ec,
14831490
PendingRequestData pendingRequestData) {
1484-
if (!ec) {
1491+
if (!ec && !pendingRequestData.hasGotResponse->load()) {
14851492
pendingRequestData.promise.setFailed(ResultTimeout);
14861493
}
14871494
}

lib/ClientConnection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
172172
struct PendingRequestData {
173173
Promise<Result, ResponseData> promise;
174174
DeadlineTimerPtr timer;
175+
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};
175176
};
176177

177178
struct LookupRequestData {

lib/HandlerBase.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
130130
case NotStarted:
131131
case Closing:
132132
case Closed:
133+
case Producer_Fenced:
133134
case Failed:
134135
LOG_DEBUG(handler->getName()
135136
<< "Ignoring connection closed event since the handler is not used anymore");

lib/HandlerBase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ class HandlerBase {
118118
Ready,
119119
Closing,
120120
Closed,
121-
Failed
121+
Failed,
122+
Producer_Fenced
122123
};
123124

124125
std::atomic<State> state_;

lib/ProducerImpl.cc

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
236236
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
237237
}
238238

239-
if (producerCreatedPromise_.isComplete()) {
239+
if (result == ResultProducerFenced) {
240+
state_ = Producer_Fenced;
241+
failPendingMessages(result, true);
242+
auto client = client_.lock();
243+
if (client) {
244+
client->cleanupProducer(this);
245+
}
246+
producerCreatedPromise_.setFailed(result);
247+
} else if (producerCreatedPromise_.isComplete()) {
240248
if (result == ResultProducerBlockedQuotaExceededException) {
241249
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
242250
failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
@@ -378,6 +386,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
378386
case HandlerBase::Closed:
379387
callback(ResultAlreadyClosed, {});
380388
return false;
389+
case HandlerBase::Producer_Fenced:
390+
callback(ResultProducerFenced, {});
391+
return false;
381392
case HandlerBase::NotStarted:
382393
case HandlerBase::Failed:
383394
default:

tests/ProducerTest.cc

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ TEST(ProducerTest, testChunkingMaxMessageSize) {
275275
TEST(ProducerTest, testExclusiveProducer) {
276276
Client client(serviceUrl);
277277

278-
std::string topicName = "persistent://public/default/testExclusiveProducer";
278+
std::string topicName =
279+
"persistent://public/default/testExclusiveProducer" + std::to_string(time(nullptr));
279280

280281
Producer producer1;
281282
ProducerConfiguration producerConfiguration1;
@@ -296,6 +297,42 @@ TEST(ProducerTest, testExclusiveProducer) {
296297
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
297298
}
298299

300+
TEST(ProducerTest, testWaitForExclusiveProducer) {
301+
Client client(serviceUrl);
302+
303+
std::string topicName =
304+
"persistent://public/default/testWaitForExclusiveProducer" + std::to_string(time(nullptr));
305+
306+
Producer producer1;
307+
ProducerConfiguration producerConfiguration1;
308+
producerConfiguration1.setProducerName("p-name-1");
309+
producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);
310+
311+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1));
312+
313+
ASSERT_EQ(ResultOk, producer1.send(MessageBuilder().setContent("content").build()));
314+
315+
Producer producer2;
316+
ProducerConfiguration producerConfiguration2;
317+
producerConfiguration2.setProducerName("p-name-2");
318+
producerConfiguration2.setAccessMode(ProducerConfiguration::WaitForExclusive);
319+
320+
Latch latch(1);
321+
client.createProducerAsync(topicName, producerConfiguration2,
322+
[&latch, &producer2](Result res, Producer producer) {
323+
ASSERT_EQ(ResultOk, res);
324+
latch.countdown();
325+
producer2 = producer;
326+
});
327+
328+
// when p1 close, p2 success created.
329+
producer1.close();
330+
latch.wait();
331+
ASSERT_EQ(ResultOk, producer2.send(MessageBuilder().setContent("content").build()));
332+
333+
producer2.close();
334+
}
335+
299336
TEST_P(ProducerTest, testFlushNoBatch) {
300337
Client client(serviceUrl);
301338

0 commit comments

Comments
 (0)