Skip to content

Commit 0762ec2

Browse files
committed
Set defalut log_level as NOTICE(5)
1 parent b5f5ba2 commit 0762ec2

File tree

6 files changed

+93
-70
lines changed

6 files changed

+93
-70
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ Eventually, we worked out the `modern-cpp-kafka`, -- a header-only library that
163163

164164
* `client.id`
165165

166+
* Log level
167+
168+
* The default `log_level` is `NOTICE` (`5`) for all these clients
166169

167170
* Test Environment (ZooKeeper/Kafka cluster) Setup
168171

include/kafka/Consumer.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,31 @@ namespace Consumer
2323
/**
2424
* A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.
2525
*/
26-
using RebalanceCallback = std::function<void(RebalanceEventType eventType, const TopicPartitions& topicPartitions)>;
26+
using RebalanceCallback = std::function<void(RebalanceEventType eventType, const TopicPartitions& topicPartitions)>;
27+
28+
/**
29+
* Null RebalanceCallback
30+
*/
31+
#if __cplusplus >= 201703L
32+
const inline RebalanceCallback NullRebalanceCallback = RebalanceCallback{};
33+
#else
34+
const static RebalanceCallback NullRebalanceCallback = RebalanceCallback{};
35+
#endif
36+
2737
/**
2838
* A callback interface that the user can implement to trigger custom actions when a commit request completes.
2939
*/
3040
using OffsetCommitCallback = std::function<void(const TopicPartitionOffsets& topicPartitionOffsets, std::error_code ec)>;
3141

42+
/**
43+
* Null OffsetCommitCallback
44+
*/
45+
#if __cplusplus >= 201703L
46+
const inline OffsetCommitCallback NullOffsetCommitCallback = OffsetCommitCallback{};
47+
#else
48+
const static OffsetCommitCallback NullOffsetCommitCallback = OffsetCommitCallback{};
49+
#endif
50+
3251
/**
3352
* A metadata struct containing the consumer group information.
3453
*/

include/kafka/KafkaClient.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,12 @@ KafkaClient::validateAndReformProperties(const Properties& origProperties)
390390
}
391391
}
392392

393+
// If no "log_level" configured, use LOG_NOTICE as default
394+
if (!properties.getProperty(LOG_LEVEL))
395+
{
396+
properties.put(LOG_LEVEL, std::to_string(LOG_NOTICE));
397+
}
398+
393399
return properties;
394400
}
395401

include/kafka/KafkaConsumer.h

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ namespace KAFKA_API {
2424
class KafkaConsumer: public KafkaClient
2525
{
2626
protected:
27-
using RebalanceEventType = Consumer::RebalanceEventType;
28-
using RebalanceCallback = Consumer::RebalanceCallback;
29-
using OffsetCommitCallback = Consumer::OffsetCommitCallback;
30-
3127
// Default value for property "max.poll.records" (which is same with Java API)
3228
static const constexpr char* DEFAULT_MAX_POLL_RECORDS_VALUE = "500";
3329

@@ -75,7 +71,7 @@ class KafkaConsumer: public KafkaClient
7571
* An exception would be thrown if assign is called previously (without a subsequent call to unsubscribe())
7672
*/
7773
void subscribe(const Topics& topics,
78-
Consumer::RebalanceCallback cb = Consumer::RebalanceCallback(),
74+
Consumer::RebalanceCallback cb = Consumer::NullRebalanceCallback,
7975
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_SUBSCRIBE_TIMEOUT_MS));
8076
/**
8177
* Get the current subscription.
@@ -293,7 +289,7 @@ class KafkaConsumer: public KafkaClient
293289
// Rebalance Callback (for class instance)
294290
void onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* rk_partitions);
295291

296-
RebalanceCallback _rebalanceCb;
292+
Consumer::RebalanceCallback _rebalanceCb;
297293
};
298294

299295

@@ -360,7 +356,7 @@ KafkaConsumer::close()
360356

361357
// Subscription
362358
inline void
363-
KafkaConsumer::subscribe(const Topics& topics, RebalanceCallback cb, std::chrono::milliseconds timeout)
359+
KafkaConsumer::subscribe(const Topics& topics, Consumer::RebalanceCallback cb, std::chrono::milliseconds timeout)
364360
{
365361
std::string topicsStr = toString(topics);
366362

@@ -746,8 +742,8 @@ KafkaConsumer::onRebalance(rd_kafka_resp_err_t err, rd_kafka_topic_partition_lis
746742

747743
if (_rebalanceCb)
748744
{
749-
RebalanceEventType et =
750-
(err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? RebalanceEventType::PartitionsAssigned : RebalanceEventType::PartitionsRevoked);
745+
Consumer::RebalanceEventType et =
746+
(err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? Consumer::RebalanceEventType::PartitionsAssigned : Consumer::RebalanceEventType::PartitionsRevoked);
751747
_rebalanceCb(et, tps);
752748
}
753749
}
@@ -895,17 +891,17 @@ class KafkaManualCommitConsumer: public KafkaConsumer
895891
* Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
896892
* Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).
897893
*/
898-
void commitAsync(const Consumer::OffsetCommitCallback& cb = OffsetCommitCallback());
894+
void commitAsync(const Consumer::OffsetCommitCallback& cb = Consumer::NullOffsetCommitCallback);
899895
/**
900896
* Commit the specified offsets for the specified records
901897
* Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).
902898
*/
903-
void commitAsync(const ConsumerRecord& record, const Consumer::OffsetCommitCallback& cb = OffsetCommitCallback());
899+
void commitAsync(const ConsumerRecord& record, const Consumer::OffsetCommitCallback& cb = Consumer::NullOffsetCommitCallback);
904900
/**
905901
* Commit the specified offsets for the specified list of topics and partitions to Kafka.
906902
* Note: If a callback is provided, it's guaranteed to be triggered (before closing the consumer).
907903
*/
908-
void commitAsync(const TopicPartitionOffsets& tpos, const Consumer::OffsetCommitCallback& cb = OffsetCommitCallback());
904+
void commitAsync(const TopicPartitionOffsets& tpos, const Consumer::OffsetCommitCallback& cb = Consumer::NullOffsetCommitCallback);
909905

910906
/**
911907
* Call the OffsetCommit callbacks (if any)
@@ -968,16 +964,16 @@ KafkaManualCommitConsumer::commitSync(const TopicPartitionOffsets& tpos)
968964
}
969965

970966
inline void
971-
KafkaManualCommitConsumer::commitAsync(const TopicPartitionOffsets& tpos, const OffsetCommitCallback& cb)
967+
KafkaManualCommitConsumer::commitAsync(const TopicPartitionOffsets& tpos, const Consumer::OffsetCommitCallback& cb)
972968
{
973969
auto rk_tpos = rd_kafka_topic_partition_list_unique_ptr(tpos.empty() ? nullptr : createRkTopicPartitionList(tpos));
974970

975-
rd_kafka_resp_err_t err = rd_kafka_commit_queue(getClientHandle(), rk_tpos.get(), getCommitCbQueue(), &KafkaConsumer::offsetCommitCallback, new OffsetCommitCallback(cb));
971+
rd_kafka_resp_err_t err = rd_kafka_commit_queue(getClientHandle(), rk_tpos.get(), getCommitCbQueue(), &KafkaConsumer::offsetCommitCallback, new Consumer::OffsetCommitCallback(cb));
976972
KAFKA_THROW_IF_WITH_RESP_ERROR(err);
977973
}
978974

979975
inline void
980-
KafkaManualCommitConsumer::commitAsync(const ConsumerRecord& record, const OffsetCommitCallback& cb)
976+
KafkaManualCommitConsumer::commitAsync(const ConsumerRecord& record, const Consumer::OffsetCommitCallback& cb)
981977
{
982978
TopicPartitionOffsets tpos;
983979
// committed offset should be "current received record's offset" + 1
@@ -986,7 +982,7 @@ KafkaManualCommitConsumer::commitAsync(const ConsumerRecord& record, const Offse
986982
}
987983

988984
inline void
989-
KafkaManualCommitConsumer::commitAsync(const OffsetCommitCallback& cb)
985+
KafkaManualCommitConsumer::commitAsync(const Consumer::OffsetCommitCallback& cb)
990986
{
991987
commitAsync(TopicPartitionOffsets(), cb);
992988
}

include/kafka/Types.h

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <string>
1515
#include <vector>
1616

17+
1718
// Use `boost::optional` for C++14, which doesn't support `std::optional`
1819
#if __cplusplus >= 201703L
1920
#include <optional>
@@ -25,9 +26,38 @@ template<class T>
2526
using Optional = boost::optional<T>;
2627
#endif
2728

29+
2830
namespace KAFKA_API {
2931

30-
class ConstBuffer;
32+
// Which is similar with `boost::const_buffer` (thus avoid the dependency towards `boost`)
33+
class ConstBuffer
34+
{
35+
public:
36+
explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
37+
const void* data() const { return _data; }
38+
std::size_t size() const { return _size; }
39+
std::string toString() const
40+
{
41+
if (_size == 0) return _data ? "[empty]" : "[NULL]";
42+
43+
std::ostringstream oss;
44+
45+
auto printChar = [&oss](const unsigned char c) {
46+
if (std::isprint(c)) {
47+
oss << c;
48+
} else {
49+
oss << "[0x" << std::hex << std::setfill('0') << std::setw(2) << static_cast<int>(c) << "]";
50+
}
51+
};
52+
const auto* beg = static_cast<const unsigned char*>(_data);
53+
std::for_each(beg, beg + _size, printChar);
54+
55+
return oss.str();
56+
}
57+
private:
58+
const void* _data;
59+
std::size_t _size;
60+
};
3161

3262
/**
3363
* Topic name.
@@ -50,12 +80,30 @@ using Offset = std::int64_t;
5080
using Key = ConstBuffer;
5181
using KeySize = std::size_t;
5282

83+
/**
84+
* Null Key.
85+
*/
86+
#if __cplusplus >= 201703L
87+
const inline Key NullKey = Key{};
88+
#else
89+
const static Key NullKey = Key{};
90+
#endif
91+
5392
/**
5493
* Record value.
5594
*/
5695
using Value = ConstBuffer;
5796
using ValueSize = std::size_t;
5897

98+
/**
99+
* Null Value.
100+
*/
101+
#if __cplusplus >= 201703L
102+
const inline Value NullValue = Value{};
103+
#else
104+
const static Value NullValue = Value{};
105+
#endif
106+
59107
/**
60108
* Topic set.
61109
*/
@@ -82,53 +130,6 @@ using TopicPartitionOffset = std::tuple<Topic, Partition, Offset>;
82130
using TopicPartitionOffsets = std::map<TopicPartition, Offset>;
83131

84132

85-
class ConstBuffer
86-
{
87-
public:
88-
explicit ConstBuffer(const void* data = nullptr, std::size_t size = 0): _data(data), _size(size) {}
89-
const void* data() const { return _data; }
90-
std::size_t size() const { return _size; }
91-
std::string toString() const
92-
{
93-
if (_size == 0) return _data ? "[empty]" : "[NULL]";
94-
95-
std::ostringstream oss;
96-
97-
auto printChar = [&oss](const unsigned char c) {
98-
if (std::isprint(c)) {
99-
oss << c;
100-
} else {
101-
oss << "[0x" << std::hex << std::setfill('0') << std::setw(2) << static_cast<int>(c) << "]";
102-
}
103-
};
104-
const auto* beg = static_cast<const unsigned char*>(_data);
105-
std::for_each(beg, beg + _size, printChar);
106-
107-
return oss.str();
108-
}
109-
private:
110-
const void* _data;
111-
std::size_t _size;
112-
};
113-
114-
115-
/**
116-
* Empty Key.
117-
*/
118-
#if __cplusplus >= 201703L
119-
const inline Key NullKey = Key();
120-
#else
121-
const static Key NullKey = Key();
122-
#endif
123-
/**
124-
* Empty Value.
125-
*/
126-
#if __cplusplus >= 201703L
127-
const inline Value NullValue = Value();
128-
#else
129-
const static Value NullValue = Value();
130-
#endif
131-
132133
/**
133134
* Obtains explanatory string for Topics.
134135
*/

tests/integration/TestKafkaConsumer.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,22 +1682,20 @@ TEST(KafkaAutoCommitConsumer, AutoCreateTopics)
16821682
const Topic topic = Utility::getRandomString();
16831683

16841684
KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
1685-
.put("allow.auto.create.topics", "true")
1686-
/* .put("debug", "all")
1687-
.put("log_level", "7") */ );
1685+
.put("allow.auto.create.topics", "true"));
16881686

16891687
constexpr int MAX_RETRIES = 2;
16901688
for (int i = 0; i < MAX_RETRIES; ++i)
16911689
{
16921690
// Subscribe topics
1693-
consumer.subscribe({topic});
1691+
consumer.subscribe({topic}, Consumer::NullRebalanceCallback, std::chrono::milliseconds(10));
16941692
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " subscribed" << std::endl;
16951693
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " subscription: " << toString(consumer.subscription()) << std::endl;
16961694
std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " assignment: " << toString(consumer.assignment()) << std::endl;
16971695

16981696
if (!consumer.assignment().empty()) break;
16991697

1700-
consumer.unsubscribe();
1698+
consumer.unsubscribe(std::chrono::milliseconds(5));
17011699
}
17021700

17031701
// Would never make it!

0 commit comments

Comments
 (0)