Skip to content

Commit 82cbd27

Browse files
committed
Fix KafkaRecoverableProducer.close()
1 parent 14ce365 commit 82cbd27

File tree

2 files changed

+49
-45
lines changed

2 files changed

+49
-45
lines changed

include/kafka/addons/KafkaRecoverableProducer.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,7 @@ class KafkaRecoverableProducer
2929

3030
~KafkaRecoverableProducer()
3131
{
32-
std::lock_guard<std::mutex> lock(_producerMutex);
33-
34-
_running = false;
35-
if (_pollThread.joinable()) _pollThread.join();
36-
37-
_producer->close();
32+
if (_running) close();
3833
}
3934

4035
/**
@@ -167,6 +162,9 @@ class KafkaRecoverableProducer
167162
{
168163
std::lock_guard<std::mutex> lock(_producerMutex);
169164

165+
_running = false;
166+
if (_pollThread.joinable()) _pollThread.join();
167+
170168
_producer->close(timeout);
171169
}
172170

tests/integration/TestKafkaRecoverableProducer.cc

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,53 +20,59 @@ TEST(KafkaRecoverableProducer, SendMessages)
2020

2121
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
2222

23-
// Properties for the producer
24-
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "all");
25-
26-
// Recoverable producer
27-
kafka::clients::KafkaRecoverableProducer producer(props);
28-
29-
// Send messages
30-
kafka::clients::producer::ProducerRecord::Id id = 0;
31-
for (const auto& msg: messages)
3223
{
33-
auto record = kafka::clients::producer::ProducerRecord(topic, partition,
34-
kafka::Key(msg.first.c_str(), msg.first.size()),
35-
kafka::Value(msg.second.c_str(), msg.second.size()),
36-
id++);
37-
std::cout << "[" <<kafka::utility::getCurrentTime() << "] ProducerRecord: " << record.toString() << std::endl;
24+
// Properties for the producer
25+
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "all");
3826

39-
// sync-send
40-
{
41-
auto metadata = producer.syncSend(record);
42-
std::cout << "[" <<kafka::utility::getCurrentTime() << "] Message sync-sent. Metadata: " << metadata.toString() << std::endl;
43-
}
27+
// Recoverable producer
28+
kafka::clients::KafkaRecoverableProducer producer(props);
4429

45-
// async-send
30+
// Send messages
31+
kafka::clients::producer::ProducerRecord::Id id = 0;
32+
for (const auto& msg: messages)
4633
{
47-
producer.send(record,
48-
[] (const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) {
49-
EXPECT_FALSE(error);
50-
std::cout << "[" <<kafka::utility::getCurrentTime() << "] Message async-sent. Metadata: " << metadata.toString() << std::endl;
51-
});
52-
}
53-
}
34+
auto record = kafka::clients::producer::ProducerRecord(topic, partition,
35+
kafka::Key(msg.first.c_str(), msg.first.size()),
36+
kafka::Value(msg.second.c_str(), msg.second.size()),
37+
id++);
38+
std::cout << "[" <<kafka::utility::getCurrentTime() << "] ProducerRecord: " << record.toString() << std::endl;
5439

55-
// Prepare a consumer
56-
const auto consumerProps = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::consumer::Config::AUTO_OFFSET_RESET, "earliest");
57-
kafka::clients::KafkaConsumer consumer(consumerProps);
58-
consumer.setLogLevel(kafka::Log::Level::Crit);
59-
consumer.subscribe({topic});
40+
// sync-send
41+
{
42+
auto metadata = producer.syncSend(record);
43+
std::cout << "[" <<kafka::utility::getCurrentTime() << "] Message sync-sent. Metadata: " << metadata.toString() << std::endl;
44+
}
6045

61-
// Poll these messages
62-
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
46+
// async-send
47+
{
48+
producer.send(record,
49+
[] (const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) {
50+
EXPECT_FALSE(error);
51+
std::cout << "[" <<kafka::utility::getCurrentTime() << "] Message async-sent. Metadata: " << metadata.toString() << std::endl;
52+
});
53+
}
54+
}
55+
56+
producer.close();
57+
}
6358

64-
// Check the messages
65-
EXPECT_EQ(messages.size() * 2, records.size());
66-
for (std::size_t i = 0; i < records.size(); ++i)
6759
{
68-
EXPECT_EQ(messages[i/2].first, std::string(static_cast<const char*>(records[i].key().data()), records[i].key().size()));
69-
EXPECT_EQ(messages[i/2].second, std::string(static_cast<const char*>(records[i].value().data()), records[i].value().size()));
60+
// Prepare a consumer
61+
const auto consumerProps = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::consumer::Config::AUTO_OFFSET_RESET, "earliest");
62+
kafka::clients::KafkaConsumer consumer(consumerProps);
63+
consumer.setLogLevel(kafka::Log::Level::Crit);
64+
consumer.subscribe({topic});
65+
66+
// Poll these messages
67+
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
68+
69+
// Check the messages
70+
EXPECT_EQ(messages.size() * 2, records.size());
71+
for (std::size_t i = 0; i < records.size(); ++i)
72+
{
73+
EXPECT_EQ(messages[i/2].first, std::string(static_cast<const char*>(records[i].key().data()), records[i].key().size()));
74+
EXPECT_EQ(messages[i/2].second, std::string(static_cast<const char*>(records[i].value().data()), records[i].value().size()));
75+
}
7076
}
7177
}
7278

0 commit comments

Comments
 (0)