Skip to content

Commit 718530e

Browse files
committed
Add purge() interface for KafkaProducer, and improve the KafkaProducer::close()
1 parent 968b1ef commit 718530e

File tree

4 files changed

+43
-26
lines changed

4 files changed

+43
-26
lines changed

.github/workflows/kafka_api_ci_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,8 @@ jobs:
270270
vcpkg install boost-program-options
271271
272272
cp -v "C:\VCPKG\INSTALLED\x86-windows\lib\boost_program_options-vc140-mt.lib" "C:\VCPKG\INSTALLED\x86-windows\lib\boost_program_options.lib"
273-
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_76.dll" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.dll"
274-
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_76.pdb" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.pdb"
273+
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_77.dll" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.dll"
274+
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_77.pdb" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.pdb"
275275
276276
vcpkg integrate install
277277

include/kafka/KafkaConsumer.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ KafkaConsumer::KafkaConsumer(const Properties &properties, EventsPollingOption e
419419
startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });
420420

421421
const auto propsStr = KafkaClient::properties().toString();
422-
KAFKA_API_DO_LOG(Log::Level::Info, "initializes with properties[%s]", propsStr.c_str());
422+
KAFKA_API_DO_LOG(Log::Level::Notice, "initialized with properties[%s]", propsStr.c_str());
423423
}
424424

425425
inline void
@@ -452,7 +452,7 @@ KafkaConsumer::close()
452452
rd_kafka_queue_poll_callback(queue, TIMEOUT_INFINITE);
453453
}
454454

455-
KAFKA_API_DO_LOG(Log::Level::Info, "closed");
455+
KAFKA_API_DO_LOG(Log::Level::Notice, "closed");
456456
}
457457

458458

@@ -492,7 +492,7 @@ KafkaConsumer::subscribe(const Topics& topics, consumer::RebalanceCallback rebal
492492

493493
if (!_pendingEvent)
494494
{
495-
KAFKA_API_DO_LOG(Log::Level::Info, "subscribed, topics[%s]", topicsStr.c_str());
495+
KAFKA_API_DO_LOG(Log::Level::Notice, "subscribed, topics[%s]", topicsStr.c_str());
496496
return;
497497
}
498498
}
@@ -519,7 +519,7 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
519519
_userAssignment);
520520
_userAssignment.clear();
521521

522-
KAFKA_API_DO_LOG(Log::Level::Info, "unsubscribed (the previously assigned partitions)");
522+
KAFKA_API_DO_LOG(Log::Level::Notice, "unsubscribed (the previously assigned partitions)");
523523
return;
524524
}
525525

@@ -537,7 +537,7 @@ KafkaConsumer::unsubscribe(std::chrono::milliseconds timeout)
537537

538538
if (!_pendingEvent)
539539
{
540-
KAFKA_API_DO_LOG(Log::Level::Info, "unsubscribed");
540+
KAFKA_API_DO_LOG(Log::Level::Notice, "unsubscribed");
541541
return;
542542
}
543543
}
@@ -860,7 +860,7 @@ KafkaConsumer::pauseOrResumePartitions(const TopicPartitions& topicPartitions, P
860860
}
861861
else
862862
{
863-
KAFKA_API_DO_LOG(Log::Level::Info, "%sd topic-partition[%s-%d]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
863+
KAFKA_API_DO_LOG(Log::Level::Notice, "%sd topic-partition[%s-%d]", opString, rk_tp.topic, rk_tp.partition, rd_kafka_err2str(rk_tp.err));
864864
++cnt;
865865
}
866866
}
@@ -918,7 +918,7 @@ KafkaConsumer::onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_lis
918918
}
919919
}
920920

921-
KAFKA_API_DO_LOG(Log::Level::Info, "re-balance event triggered[%s], cooperative[%s], topic-partitions[%s]",
921+
KAFKA_API_DO_LOG(Log::Level::Notice, "re-balance event triggered[%s], cooperative[%s], topic-partitions[%s]",
922922
err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? "ASSIGN_PARTITIONS" : "REVOKE_PARTITIONS",
923923
isCooperativeEnabled() ? "enabled" : "disabled",
924924
tpsStr.c_str());

include/kafka/KafkaProducer.h

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,14 @@ class KafkaProducer: public KafkaClient
5555
Error flush(std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
5656

5757
/**
58-
* Close this producer. This method waits up to timeout for the producer to complete the sending of all incomplete requests.
58+
* Purge messages currently handled by the KafkaProducer.
5959
*/
60-
Error close(std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
60+
Error purge();
61+
62+
/**
63+
* Close this producer. This method would wait up to timeout for the producer to complete the sending of all incomplete requests (before purging them).
64+
*/
65+
void close(std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
6166

6267
/**
6368
* Options for sending messages.
@@ -225,7 +230,7 @@ KafkaProducer::KafkaProducer(const Properties& properties, EventsPollingOption e
225230
startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });
226231

227232
const auto propStr = KafkaClient::properties().toString();
228-
KAFKA_API_DO_LOG(Log::Level::Info, "initializes with properties[%s]", propStr.c_str());
233+
KAFKA_API_DO_LOG(Log::Level::Notice, "initializes with properties[%s]", propStr.c_str());
229234
}
230235

231236
inline void
@@ -430,18 +435,30 @@ KafkaProducer::flush(std::chrono::milliseconds timeout)
430435
}
431436

432437
inline Error
438+
KafkaProducer::purge()
439+
{
440+
return Error{rd_kafka_purge(getClientHandle(),
441+
(static_cast<unsigned>(RD_KAFKA_PURGE_F_QUEUE) | static_cast<unsigned>(RD_KAFKA_PURGE_F_INFLIGHT)))};
442+
}
443+
444+
inline void
433445
KafkaProducer::close(std::chrono::milliseconds timeout)
434446
{
435447
_opened = false;
436448

437449
stopBackgroundPollingIfNecessary();
438450

439451
Error result = flush(timeout);
452+
if (result.value() == RD_KAFKA_RESP_ERR__TIMED_OUT)
453+
{
454+
KAFKA_API_DO_LOG(Log::Level::Notice, "purge messages before close, outQLen[%d]", rd_kafka_outq_len(getClientHandle()));
455+
purge();
456+
}
457+
458+
rd_kafka_poll(getClientHandle(), 0);
440459

441-
std::string resultMsg = result.message();
442-
KAFKA_API_DO_LOG(Log::Level::Info, "closed [%s]", resultMsg.c_str());
460+
KAFKA_API_DO_LOG(Log::Level::Notice, "closed");
443461

444-
return result;
445462
}
446463

447464
inline void

tests/robustness/TestKafkaProducer.cc

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,26 +151,27 @@ TEST(KafkaProducer, NoMissedDeliveryCallback)
151151
EXPECT_EQ(0, sizeOfIdsInFlight());
152152
}
153153

154-
TEST(KafkaProducer, MightMissDeliveryCallbackIfCloseWithLimitedTimeout)
154+
TEST(KafkaProducer, DeliveryCallbackTriggeredByPurgeWithinClose)
155155
{
156156
const kafka::Topic topic = kafka::utility::getRandomString();
157157
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
158158

159-
std::size_t deliveryCount = 0;
159+
constexpr int NUM_OF_MESSAGES = 10;
160+
161+
std::size_t deliveryCbTriggeredCount = 0;
160162
{
161163
kafka::clients::KafkaProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig());
162164
producer.setErrorCallback(KafkaTestUtility::DumpError);
163165

164166
KafkaTestUtility::PauseBrokers();
165167

166-
constexpr int NUM_OF_MESSAGES = 10;
167168
for (std::size_t i = 0; i < NUM_OF_MESSAGES; ++i)
168169
{
169170
auto record = kafka::clients::producer::ProducerRecord(topic, kafka::NullKey, kafka::NullValue, i);
170171
producer.send(record,
171-
[&deliveryCount](const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) {
172+
[&deliveryCbTriggeredCount](const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) {
172173
std::cout << "[" << kafka::utility::getCurrentTime() << "] Delivery callback: metadata[" << metadata.toString() << "], result[" << error.message() << "]" << std::endl;
173-
++deliveryCount;
174+
++deliveryCbTriggeredCount;
174175
});
175176
std::cout << "[" << kafka::utility::getCurrentTime() << "] Message was just sent: " << record.toString() << std::endl;
176177
}
@@ -180,15 +181,14 @@ TEST(KafkaProducer, MightMissDeliveryCallbackIfCloseWithLimitedTimeout)
180181
EXPECT_EQ(RD_KAFKA_RESP_ERR__TIMED_OUT, error.value());
181182
std::cout << "[" << kafka::utility::getCurrentTime() << "] producer flush result[" << error.message() << "]" << std::endl;
182183

183-
// Still fail since no response from brokers
184-
error = producer.close(std::chrono::seconds(1));
185-
EXPECT_EQ(RD_KAFKA_RESP_ERR__TIMED_OUT, error.value());
186-
std::cout << "[" << kafka::utility::getCurrentTime() << "] producer close result[" << error.message() << "]" << std::endl;
184+
// The in-flight messages would be purged within `close()` (thus trigger the delivery callbacks)
185+
producer.close(std::chrono::seconds(1));
186+
std::cout << "[" << kafka::utility::getCurrentTime() << "] producer closed" << std::endl;
187187
}
188188

189-
KafkaTestUtility::ResumeBrokers();
189+
EXPECT_EQ(NUM_OF_MESSAGES, deliveryCbTriggeredCount);
190190

191-
EXPECT_EQ(0, deliveryCount); // No message delivery callback was called
191+
KafkaTestUtility::ResumeBrokers();
192192
}
193193

194194
TEST(KafkaProducer, BrokerStopWhileSendingMessages)

0 commit comments

Comments
 (0)