Skip to content

Commit 0d62e6b

Browse files
erobotRobertIndie
authored andcommitted
[fix] Fix PartitionedProducerImpl::closeAsync to close sub-producers properly (#125)
### Motivation PartitionedProducerImpl do not close sub-producers properly when any sub-producer creation fails. Continuing to retry creating producer will eventually reach the maximum producer limit. It seems a regression caused by #54. When sub-producer creation fails, state_ is set to Failed. PartitionedProducerImpl::closeAsync only do cleanup when state_==Ready and sub-producers do not close when state_==Failed. https://github.com/apache/pulsar-client-cpp/blob/f0268ecd29a6d0030b7d07379ec609884b4c14ff/lib/PartitionedProducerImpl.cc#L273-L276 ### Modifications Close sub-producers when state != Closed.
1 parent 2a0c59a commit 0d62e6b

File tree

2 files changed

+53
-5
lines changed

2 files changed

+53
-5
lines changed

lib/PartitionedProducerImpl.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,11 @@ void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
266266
originalCallback(result);
267267
}
268268
};
269-
if (state_ == Closed) {
269+
270+
if (state_ == Closed || state_.exchange(Closing) == Closing) {
270271
closeCallback(ResultAlreadyClosed);
271272
return;
272273
}
273-
State expectedState = Ready;
274-
if (!state_.compare_exchange_strong(expectedState, Closing)) {
275-
return;
276-
}
277274

278275
cancelTimers();
279276

tests/ProducerTest.cc

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,4 +373,55 @@ TEST_P(ProducerTest, testFlushNoBatch) {
373373
client.close();
374374
}
375375

376+
TEST(ProducerTest, testCloseSubProducerWhenFail) {
377+
Client client(serviceUrl);
378+
379+
std::string ns = "test-close-sub-producer-when-fail";
380+
std::string localName = std::string("testCloseSubProducerWhenFail") + std::to_string(time(nullptr));
381+
std::string topicName = "persistent://public/" + ns + '/' + localName;
382+
const int maxProducersPerTopic = 10;
383+
const int partitionNum = 5;
384+
385+
// call admin api to create namespace with max prodcuer limit
386+
std::string url = adminUrl + "admin/v2/namespaces/public/" + ns;
387+
int res =
388+
makePutRequest(url, "{\"max_producers_per_topic\": " + std::to_string(maxProducersPerTopic) + "}");
389+
ASSERT_TRUE(res == 204 || res == 409) << "res:" << res;
390+
391+
// call admin api to create partitioned topic
392+
res = makePutRequest(adminUrl + "admin/v2/persistent/public/" + ns + "/" + localName + "/partitions",
393+
std::to_string(partitionNum));
394+
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
395+
396+
ProducerConfiguration producerConfiguration;
397+
producerConfiguration.setBatchingEnabled(false);
398+
399+
// create producers for partition-0 up to max producer limit
400+
std::vector<Producer> producers;
401+
for (int i = 0; i < maxProducersPerTopic; ++i) {
402+
Producer producer;
403+
ASSERT_EQ(ResultOk,
404+
client.createProducer(topicName + "-partition-0", producerConfiguration, producer));
405+
producers.push_back(producer);
406+
}
407+
408+
// create partitioned producer, should fail because partition-0 already reach max producer limit
409+
for (int i = 0; i < maxProducersPerTopic; ++i) {
410+
Producer producer;
411+
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producer));
412+
}
413+
414+
std::this_thread::sleep_for(std::chrono::seconds(1));
415+
416+
// create producer for partition-1, should succeed
417+
Producer producer;
418+
ASSERT_EQ(ResultOk, client.createProducer(topicName + "-partition-1", producerConfiguration, producer));
419+
producers.push_back(producer);
420+
421+
for (auto& producer : producers) {
422+
producer.close();
423+
}
424+
client.close();
425+
}
426+
376427
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)