Skip to content

Commit ffee4a0

Browse files
authored
[fix] callback of send batch message is error when flush (apache#303)
### Motivation When sendAsync batch messages and then flush() and close(), found that the callback of message is ResultAlreadyClosed but not Ok. The root is if there is no batch messages remain in BatchMessageContainer when do flush, the flush callback directly return Ok. Instead, it should return Ok until the lastSendOpFuture complete. ### Modifications 1. fix the error code. 2. test two cases of flush batch messages.
1 parent 0e63af9 commit ffee4a0

File tree

3 files changed

+73
-3
lines changed

3 files changed

+73
-3
lines changed

lib/BatchMessageContainerBase.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ void BatchMessageContainerBase::processAndClear(
9292
std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
9393
if (isEmpty()) {
9494
if (flushCallback) {
95-
flushCallback(ResultOk);
95+
// do nothing, flushCallback complete until the lastOpSend complete
9696
}
9797
} else {
9898
const auto numBatches = getNumBatches();

lib/ProducerImpl.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,16 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
374374
if (batchMessageContainer_) {
375375
Lock lock(mutex_);
376376
auto failures = batchMessageAndSend(callback);
377-
lock.unlock();
378-
failures.complete();
377+
if (!pendingMessagesQueue_.empty()) {
378+
auto& opSendMsg = pendingMessagesQueue_.back();
379+
lock.unlock();
380+
failures.complete();
381+
opSendMsg.addTrackerCallback(callback);
382+
} else {
383+
lock.unlock();
384+
failures.complete();
385+
callback(ResultOk);
386+
}
379387
} else {
380388
Lock lock(mutex_);
381389
if (!pendingMessagesQueue_.empty()) {

tests/ProducerTest.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,68 @@ TEST_P(ProducerTest, testFlushNoBatch) {
439439
client.close();
440440
}
441441

442+
TEST_P(ProducerTest, testFlushBatch) {
443+
Client client(serviceUrl);
444+
445+
auto partitioned = GetParam();
446+
const auto topicName = std::string("testFlushNoBatch") +
447+
(partitioned ? "partitioned-" : "-no-partitioned-") +
448+
std::to_string(time(nullptr));
449+
450+
if (partitioned) {
451+
// call admin api to make it partitioned
452+
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
453+
int res = makePutRequest(url, "5");
454+
LOG_INFO("res = " << res);
455+
ASSERT_FALSE(res != 204 && res != 409);
456+
}
457+
458+
ProducerConfiguration producerConfiguration;
459+
producerConfiguration.setBatchingEnabled(true);
460+
producerConfiguration.setBatchingMaxMessages(10);
461+
producerConfiguration.setBatchingMaxPublishDelayMs(1000);
462+
producerConfiguration.setBatchingMaxAllowedSizeInBytes(4 * 1024 * 1024);
463+
464+
// test all messages in batch has been sent
465+
Producer producer;
466+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
467+
468+
std::atomic_int needCallBack(100);
469+
auto cb = [&needCallBack](Result code, const MessageId& msgId) {
470+
ASSERT_EQ(code, ResultOk);
471+
needCallBack.fetch_sub(1);
472+
};
473+
474+
for (int i = 0; i < 100; ++i) {
475+
Message msg = MessageBuilder().setContent("content").build();
476+
producer.sendAsync(msg, cb);
477+
}
478+
479+
producer.flush();
480+
ASSERT_EQ(needCallBack.load(), 0);
481+
producer.close();
482+
483+
// test remain messages in batch not send
484+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
485+
486+
std::atomic_int needCallBack2(105);
487+
auto cb2 = [&needCallBack2](Result code, const MessageId& msgId) {
488+
ASSERT_EQ(code, ResultOk);
489+
needCallBack2.fetch_sub(1);
490+
};
491+
492+
for (int i = 0; i < 105; ++i) {
493+
Message msg = MessageBuilder().setContent("content").build();
494+
producer.sendAsync(msg, cb2);
495+
}
496+
497+
producer.flush();
498+
ASSERT_EQ(needCallBack2.load(), 0);
499+
producer.close();
500+
501+
client.close();
502+
}
503+
442504
TEST(ProducerTest, testCloseSubProducerWhenFail) {
443505
Client client(serviceUrl);
444506

0 commit comments

Comments
 (0)