Skip to content

Commit 0d6b4fa

Browse files
committed
Refactor Properties
1 parent b39fd40 commit 0d6b4fa

31 files changed

+896
-888
lines changed

examples/kafka_async_producer_copy_payload.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,24 @@
66

77
int main(int argc, char **argv)
88
{
9+
using namespace kafka;
910
using namespace kafka::clients;
11+
using namespace kafka::clients::producer;
1012

1113
if (argc != 3) {
1214
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1315
exit(argc == 1 ? 0 : 1); // NOLINT
1416
}
1517

1618
const std::string brokers = argv[1];
17-
const kafka::Topic topic = argv[2];
19+
const Topic topic = argv[2];
1820

1921
try {
2022

2123
// Create configuration object
22-
const kafka::Properties props ({
23-
{"bootstrap.servers", brokers},
24-
{"enable.idempotence", "true"},
24+
const Properties props ({
25+
{"bootstrap.servers", {brokers}},
26+
{"enable.idempotence", {"true" }},
2527
});
2628

2729
// Create a producer instance
@@ -32,13 +34,11 @@ int main(int argc, char **argv)
3234

3335
for (std::string line; std::getline(std::cin, line);) {
3436
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
35-
auto record = producer::ProducerRecord(topic,
36-
kafka::NullKey,
37-
kafka::Value(line.c_str(), line.size()));
37+
auto record = ProducerRecord(topic, NullKey, Value(line.c_str(), line.size()));
3838
// Send the message
3939
producer.send(record,
4040
// The delivery report handler
41-
[](const producer::RecordMetadata& metadata, const kafka::Error& error) {
41+
[](const RecordMetadata& metadata, const Error& error) {
4242
if (!error) {
4343
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4444
} else {
@@ -53,7 +53,7 @@ int main(int argc, char **argv)
5353

5454
// producer.close(); // No explicit close is needed, RAII will take care of it
5555

56-
} catch (const kafka::KafkaException& e) {
56+
} catch (const KafkaException& e) {
5757
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
5858
}
5959
}

examples/kafka_async_producer_not_copy_payload.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,24 @@
66

77
int main(int argc, char **argv)
88
{
9+
using namespace kafka;
910
using namespace kafka::clients;
11+
using namespace kafka::clients::producer;
1012

1113
if (argc != 3) {
1214
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1315
exit(argc == 1 ? 0 : 1); // NOLINT
1416
}
1517

1618
const std::string brokers = argv[1];
17-
const kafka::Topic topic = argv[2];
19+
const Topic topic = argv[2];
1820

1921
try {
2022

2123
// Create configuration object
22-
const kafka::Properties props ({
23-
{"bootstrap.servers", brokers},
24-
{"enable.idempotence", "true"},
24+
const Properties props ({
25+
{"bootstrap.servers", {brokers}},
26+
{"enable.idempotence", {"true" }},
2527
});
2628

2729
// Create a producer instance
@@ -34,17 +36,17 @@ int main(int argc, char **argv)
3436
std::getline(std::cin, *line);
3537
line = std::make_shared<std::string>()) {
3638
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
37-
auto record = producer::ProducerRecord(topic,
38-
kafka::NullKey,
39-
kafka::Value(line->c_str(), line->size()));
39+
auto record = ProducerRecord(topic,
40+
NullKey,
41+
Value(line->c_str(), line->size()));
4042

4143
// Send the message
4244
producer.send(record,
4345
// The delivery report handler
4446
// Note: Here we capture the shared_pointer of `line`,
4547
// which holds the content for `record.value()`.
4648
// It makes sure the memory block is valid until the lambda finishes.
47-
[line](const producer::RecordMetadata& metadata, const kafka::Error& error) {
49+
[line](const RecordMetadata& metadata, const Error& error) {
4850
if (!error) {
4951
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
5052
} else {
@@ -57,7 +59,7 @@ int main(int argc, char **argv)
5759

5860
// producer.close(); // No explicit close is needed, RAII will take care of it
5961

60-
} catch (const kafka::KafkaException& e) {
62+
} catch (const KafkaException& e) {
6163
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
6264
}
6365
}

examples/kafka_auto_commit_consumer.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,28 @@
55

66
int main(int argc, char **argv)
77
{
8+
using namespace kafka;
9+
using namespace kafka::clients;
10+
using namespace kafka::clients::consumer;
11+
812
if (argc != 3) {
913
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1014
exit(argc == 1 ? 0 : 1); // NOLINT
1115
}
1216

1317
const std::string brokers = argv[1];
14-
const kafka::Topic topic = argv[2];
18+
const Topic topic = argv[2];
1519

1620
try {
1721

1822
// Create configuration object
19-
const kafka::Properties props ({
20-
{"bootstrap.servers", brokers},
21-
{"enable.auto.commit", "true"}
23+
const Properties props ({
24+
{"bootstrap.servers", {brokers}},
25+
{"enable.auto.commit", {"true" }}
2226
});
2327

2428
// Create a consumer instance
25-
kafka::clients::KafkaConsumer consumer(props);
29+
KafkaConsumer consumer(props);
2630

2731
// Subscribe to topics
2832
consumer.subscribe({topic});
@@ -41,7 +45,7 @@ int main(int argc, char **argv)
4145
std::cout << " Partition: " << record.partition() << std::endl;
4246
std::cout << " Offset : " << record.offset() << std::endl;
4347
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
44-
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
48+
std::cout << " Headers : " << toString(record.headers()) << std::endl;
4549
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
4650
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
4751
} else {
@@ -52,7 +56,7 @@ int main(int argc, char **argv)
5256

5357
// consumer.close(); // No explicit close is needed, RAII will take care of it
5458

55-
} catch (const kafka::KafkaException& e) {
59+
} catch (const KafkaException& e) {
5660
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
5761
}
5862
}

examples/kafka_manual_commit_consumer.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,27 @@
55

66
int main(int argc, char **argv)
77
{
8+
using namespace kafka;
9+
using namespace kafka::clients;
10+
using namespace kafka::clients::consumer;
11+
812
if (argc != 3) {
913
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1014
exit(argc == 1 ? 0 : 1); // NOLINT
1115
}
1216

1317
const std::string brokers = argv[1];
14-
const kafka::Topic topic = argv[2];
18+
const Topic topic = argv[2];
1519

1620
try {
1721

1822
// Create configuration object
19-
const kafka::Properties props ({
20-
{"bootstrap.servers", brokers},
23+
const Properties props ({
24+
{"bootstrap.servers", {brokers}},
2125
});
2226

2327
// Create a consumer instance
24-
kafka::clients::KafkaConsumer consumer(props);
28+
KafkaConsumer consumer(props);
2529

2630
// Subscribe to topics
2731
consumer.subscribe({topic});
@@ -47,7 +51,7 @@ int main(int argc, char **argv)
4751
std::cout << " Partition: " << record.partition() << std::endl;
4852
std::cout << " Offset : " << record.offset() << std::endl;
4953
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
50-
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
54+
std::cout << " Headers : " << toString(record.headers()) << std::endl;
5155
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
5256
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
5357

@@ -61,7 +65,7 @@ int main(int argc, char **argv)
6165
auto now = std::chrono::steady_clock::now();
6266
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
6367
// Commit offsets for messages polled
64-
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
68+
std::cout << "% syncCommit offsets: " << utility::getCurrentTime() << std::endl;
6569
consumer.commitSync(); // or commitAsync()
6670

6771
lastTimeCommitted = now;
@@ -72,7 +76,7 @@ int main(int argc, char **argv)
7276

7377
// consumer.close(); // No explicit close is needed, RAII will take care of it
7478

75-
} catch (const kafka::KafkaException& e) {
79+
} catch (const KafkaException& e) {
7680
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
7781
}
7882
}

examples/kafka_sync_producer.cc

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,24 @@
55

66
int main(int argc, char **argv)
77
{
8+
using namespace kafka;
89
using namespace kafka::clients;
10+
using namespace kafka::clients::producer;
911

1012
if (argc != 3) {
1113
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1214
exit(argc == 1 ? 0 : 1); // NOLINT
1315
}
1416

1517
const std::string brokers = argv[1];
16-
const kafka::Topic topic = argv[2];
18+
const Topic topic = argv[2];
1719

1820
try {
1921

2022
// Create configuration object
21-
const kafka::Properties props({
22-
{"bootstrap.servers", brokers},
23-
{"enable.idempotence", "true"},
23+
const Properties props({
24+
{"bootstrap.servers", {brokers}},
25+
{"enable.idempotence", {"true" }},
2426
});
2527

2628
// Create a producer instance.
@@ -31,15 +33,15 @@ int main(int argc, char **argv)
3133

3234
for (std::string line; std::getline(std::cin, line);) {
3335
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
34-
auto record = producer::ProducerRecord(topic,
35-
kafka::NullKey,
36-
kafka::Value(line.c_str(), line.size()));
36+
auto record = ProducerRecord(topic,
37+
NullKey,
38+
Value(line.c_str(), line.size()));
3739

3840
// Send the message.
3941
try {
40-
const producer::RecordMetadata metadata = producer.syncSend(record);
42+
const RecordMetadata metadata = producer.syncSend(record);
4143
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
42-
} catch (const kafka::KafkaException& e) {
44+
} catch (const KafkaException& e) {
4345
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
4446
}
4547

@@ -48,7 +50,7 @@ int main(int argc, char **argv)
4850

4951
// producer.close(); // No explicit close is needed, RAII will take care of it
5052

51-
} catch (const kafka::KafkaException& e) {
53+
} catch (const KafkaException& e) {
5254
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
5355
}
5456
}

include/kafka/AdminClient.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#include <vector>
1919

2020

21-
namespace KAFKA_API { namespace clients {
21+
namespace KAFKA_API { namespace clients { namespace admin {
2222

2323
/**
2424
* The administrative client for Kafka, which supports managing and inspecting topics, etc.
@@ -27,11 +27,7 @@ class AdminClient: public KafkaClient
2727
{
2828
public:
2929
explicit AdminClient(const Properties& properties)
30-
: KafkaClient(ClientType::AdminClient,
31-
KafkaClient::validateAndReformProperties(properties),
32-
ConfigCallbacksRegister{},
33-
EventsPollingOption::Auto,
34-
Interceptors{})
30+
: KafkaClient(ClientType::AdminClient, KafkaClient::validateAndReformProperties(properties))
3531
{
3632
}
3733

@@ -148,10 +144,14 @@ AdminClient::createTopics(const Topics& topics,
148144

149145
for (const auto& conf: topicConfig.map())
150146
{
151-
const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), conf.first.c_str(), conf.second.c_str());
147+
const auto& k = conf.first;
148+
const auto& v = topicConfig.getProperty(k);
149+
if (!v) continue;
150+
151+
const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), k.c_str(), v->c_str());
152152
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
153153
{
154-
const std::string errMsg = "Invalid config[" + conf.first + "=" + conf.second + "]";
154+
const std::string errMsg = "Invalid config[" + k + "=" + *v + "]";
155155
KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
156156
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
157157
}
@@ -344,5 +344,5 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
344344
return admin::DeleteRecordsResult(combineErrors(errors));
345345
}
346346

347-
} } // end of KAFKA_API::clients
347+
} } } // end of KAFKA_API::clients::admin
348348

0 commit comments

Comments
 (0)