Skip to content

Commit 968b1ef

Browse files
committed
Refine namespaces for Kafka clients
1 parent 284529c commit 968b1ef

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1509
-1513
lines changed

examples/kafka_async_producer_copy_payload.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
int main(int argc, char **argv)
88
{
9+
using namespace kafka::clients;
10+
911
if (argc != 3) {
1012
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1113
exit(1);
@@ -23,28 +25,28 @@ int main(int argc, char **argv)
2325
});
2426

2527
// Create a producer instance.
26-
kafka::KafkaProducer producer(props);
28+
KafkaProducer producer(props);
2729

2830
// Read messages from stdin and produce to the broker.
2931
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
3032

3133
for (std::string line; std::getline(std::cin, line);) {
3234
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
33-
auto record = kafka::ProducerRecord(topic,
34-
kafka::NullKey,
35-
kafka::Value(line.c_str(), line.size()));
35+
auto record = producer::ProducerRecord(topic,
36+
kafka::NullKey,
37+
kafka::Value(line.c_str(), line.size()));
3638
// Send the message.
3739
producer.send(record,
3840
// The delivery report handler
39-
[](const kafka::Producer::RecordMetadata& metadata, const kafka::Error& error) {
41+
[](const producer::RecordMetadata& metadata, const kafka::Error& error) {
4042
if (!error) {
4143
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4244
} else {
4345
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
4446
}
4547
},
4648
// The memory block given by record.value() would be copied
47-
kafka::KafkaProducer::SendOption::ToCopyRecordValue);
49+
KafkaProducer::SendOption::ToCopyRecordValue);
4850

4951
if (line.empty()) break;
5052
}

examples/kafka_async_producer_not_copy_payload.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
int main(int argc, char **argv)
88
{
9+
using namespace kafka::clients;
10+
911
if (argc != 3) {
1012
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1113
exit(1);
@@ -23,7 +25,7 @@ int main(int argc, char **argv)
2325
});
2426

2527
// Create a producer instance.
26-
kafka::KafkaProducer producer(props);
28+
KafkaProducer producer(props);
2729

2830
// Read messages from stdin and produce to the broker.
2931
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
@@ -32,17 +34,17 @@ int main(int argc, char **argv)
3234
std::getline(std::cin, *line);
3335
line = std::make_shared<std::string>()) {
3436
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
35-
auto record = kafka::ProducerRecord(topic,
36-
kafka::NullKey,
37-
kafka::Value(line->c_str(), line->size()));
37+
auto record = producer::ProducerRecord(topic,
38+
kafka::NullKey,
39+
kafka::Value(line->c_str(), line->size()));
3840

3941
// Send the message.
4042
producer.send(record,
4143
// The delivery report handler
4244
// Note: Here we capture the shared_pointer of `line`,
4345
// which holds the content for `record.value()`.
4446
// It makes sure the memory block is valid until the lambda finishes.
45-
[line](const kafka::Producer::RecordMetadata& metadata, const kafka::Error& error) {
47+
[line](const producer::RecordMetadata& metadata, const kafka::Error& error) {
4648
if (!error) {
4749
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4850
} else {

examples/kafka_auto_commit_consumer.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ int main(int argc, char **argv)
2222
});
2323

2424
// Create a consumer instance.
25-
kafka::KafkaConsumer consumer(props);
25+
kafka::clients::KafkaConsumer consumer(props);
2626

2727
// Subscribe to topics
2828
consumer.subscribe({topic});

examples/kafka_manual_commit_consumer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ int main(int argc, char **argv)
2121
});
2222

2323
// Create a consumer instance.
24-
kafka::KafkaConsumer consumer(props);
24+
kafka::clients::KafkaConsumer consumer(props);
2525

2626
// Subscribe to topics
2727
consumer.subscribe({topic});
@@ -61,7 +61,7 @@ int main(int argc, char **argv)
6161
auto now = std::chrono::steady_clock::now();
6262
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
6363
// Commit offsets for messages polled
64-
std::cout << "% syncCommit offsets: " << kafka::Utility::getCurrentTime() << std::endl;
64+
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
6565
consumer.commitSync(); // or commitAsync()
6666

6767
lastTimeCommitted = now;

examples/kafka_sync_producer.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
int main(int argc, char **argv)
77
{
8+
using namespace kafka::clients;
9+
810
if (argc != 3) {
911
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
1012
exit(1);
@@ -22,20 +24,20 @@ int main(int argc, char **argv)
2224
});
2325

2426
// Create a producer instance.
25-
kafka::KafkaProducer producer(props);
27+
KafkaProducer producer(props);
2628

2729
// Read messages from stdin and produce to the broker.
2830
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
2931

3032
for (std::string line; std::getline(std::cin, line);) {
3133
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
32-
auto record = kafka::ProducerRecord(topic,
33-
kafka::NullKey,
34-
kafka::Value(line.c_str(), line.size()));
34+
auto record = producer::ProducerRecord(topic,
35+
kafka::NullKey,
36+
kafka::Value(line.c_str(), line.size()));
3537

3638
// Send the message.
3739
try {
38-
kafka::Producer::RecordMetadata metadata = producer.syncSend(record);
40+
producer::RecordMetadata metadata = producer.syncSend(record);
3941
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4042
} catch (const kafka::KafkaException& e) {
4143
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;

include/kafka/AdminClient.h

Lines changed: 23 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "kafka/Project.h"
44

55
#include "kafka/AdminClientConfig.h"
6+
#include "kafka/AdminCommon.h"
67
#include "kafka/Error.h"
78
#include "kafka/KafkaClient.h"
89
#include "kafka/RdKafkaHelper.h"
@@ -17,70 +18,7 @@
1718
#include <vector>
1819

1920

20-
namespace KAFKA_API {
21-
22-
namespace Admin
23-
{
24-
/**
25-
* The result of AdminClient::createTopics().
26-
*/
27-
struct CreateTopicsResult
28-
{
29-
explicit CreateTopicsResult(Error err): error(std::move(err)) {}
30-
31-
/**
32-
* The result error.
33-
*/
34-
Error error;
35-
};
36-
37-
/**
38-
* The result of AdminClient::deleteTopics().
39-
*/
40-
struct DeleteTopicsResult
41-
{
42-
explicit DeleteTopicsResult(Error err): error(std::move(err)) {}
43-
44-
/**
45-
* The result error.
46-
*/
47-
Error error;
48-
};
49-
50-
/**
51-
* The result of AdminClient::deleteRecords().
52-
*/
53-
struct DeleteRecordsResult
54-
{
55-
explicit DeleteRecordsResult(Error err): error(std::move(err)) {}
56-
57-
/**
58-
* The result error.
59-
*/
60-
Error error;
61-
};
62-
63-
/**
64-
* The result of AdminClient::listTopics().
65-
*/
66-
struct ListTopicsResult
67-
{
68-
explicit ListTopicsResult(Error err): error(std::move(err)) {}
69-
explicit ListTopicsResult(Topics names): topics(std::move(names)) {}
70-
71-
/**
72-
* The result error.
73-
*/
74-
Error error;
75-
76-
/**
77-
* The topics fetched.
78-
*/
79-
Topics topics;
80-
};
81-
82-
} // end of Admin
83-
21+
namespace KAFKA_API::clients {
8422

8523
/**
8624
* The administrative client for Kafka, which supports managing and inspecting topics, etc.
@@ -97,28 +35,28 @@ class AdminClient: public KafkaClient
9735
/**
9836
* Create a batch of new topics.
9937
*/
100-
Admin::CreateTopicsResult createTopics(const Topics& topics,
38+
admin::CreateTopicsResult createTopics(const Topics& topics,
10139
int numPartitions,
10240
int replicationFactor,
10341
const Properties& topicConfig = Properties(),
10442
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
10543
/**
10644
* Delete a batch of topics.
10745
*/
108-
Admin::DeleteTopicsResult deleteTopics(const Topics& topics,
46+
admin::DeleteTopicsResult deleteTopics(const Topics& topics,
10947
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
11048
/**
11149
* List the topics available in the cluster.
11250
*/
113-
Admin::ListTopicsResult listTopics(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
51+
admin::ListTopicsResult listTopics(std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
11452

11553
/**
11654
* Delete records whose offset is smaller than the given offset of the corresponding partition.
11755
* @param topicPartitionOffsets a batch of offsets for partitions
11856
* @param timeout
11957
* @return
12058
*/
121-
Admin::DeleteRecordsResult deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
59+
admin::DeleteRecordsResult deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
12260
std::chrono::milliseconds timeout = std::chrono::milliseconds(DEFAULT_COMMAND_TIMEOUT_MS));
12361

12462
private:
@@ -186,7 +124,7 @@ AdminClient::combineErrors(const std::list<Error>& errors)
186124
return Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"};
187125
}
188126

189-
inline Admin::CreateTopicsResult
127+
inline admin::CreateTopicsResult
190128
AdminClient::createTopics(const Topics& topics,
191129
int numPartitions,
192130
int replicationFactor,
@@ -202,7 +140,7 @@ AdminClient::createTopics(const Topics& topics,
202140
rkNewTopics.emplace_back(rd_kafka_NewTopic_new(topic.c_str(), numPartitions, replicationFactor, errInfo.str(), errInfo.capacity()));
203141
if (!rkNewTopics.back())
204142
{
205-
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, rd_kafka_err2str(RD_KAFKA_RESP_ERR__INVALID_ARG)});
143+
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, rd_kafka_err2str(RD_KAFKA_RESP_ERR__INVALID_ARG)});
206144
}
207145

208146
for (const auto& conf: topicConfig.map())
@@ -212,7 +150,7 @@ AdminClient::createTopics(const Topics& topics,
212150
{
213151
std::string errMsg = "Invalid config[" + conf.first + "=" + conf.second + "]";
214152
KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
215-
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
153+
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
216154
}
217155
}
218156
}
@@ -243,7 +181,7 @@ AdminClient::createTopics(const Topics& topics,
243181

244182
if (!rk_ev)
245183
{
246-
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
184+
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
247185
}
248186

249187
std::list<Error> errors;
@@ -263,7 +201,7 @@ AdminClient::createTopics(const Topics& topics,
263201
// Return the error if any
264202
if (!errors.empty())
265203
{
266-
return Admin::CreateTopicsResult{combineErrors(errors)};
204+
return admin::CreateTopicsResult{combineErrors(errors)};
267205
}
268206

269207
// Update metedata
@@ -272,14 +210,14 @@ AdminClient::createTopics(const Topics& topics,
272210
auto listResult = listTopics();
273211
if (!listResult.error)
274212
{
275-
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"});
213+
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR_NO_ERROR, "Success"});
276214
}
277215
} while (std::chrono::steady_clock::now() < end);
278216

279-
return Admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "Updating metadata timed out"});
217+
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "Updating metadata timed out"});
280218
}
281219

282-
inline Admin::DeleteTopicsResult
220+
inline admin::DeleteTopicsResult
283221
AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeout)
284222
{
285223
std::vector<rd_kafka_DeleteTopic_unique_ptr> rkDeleteTopics;
@@ -316,7 +254,7 @@ AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeou
316254

317255
if (!rk_ev)
318256
{
319-
return Admin::DeleteTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
257+
return admin::DeleteTopicsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
320258
}
321259

322260
std::list<Error> errors;
@@ -333,10 +271,10 @@ AdminClient::deleteTopics(const Topics& topics, std::chrono::milliseconds timeou
333271

334272
errors.splice(errors.end(), getPerTopicResults(res_topics, res_topic_cnt));
335273

336-
return Admin::DeleteTopicsResult(combineErrors(errors));
274+
return admin::DeleteTopicsResult(combineErrors(errors));
337275
}
338276

339-
inline Admin::ListTopicsResult
277+
inline admin::ListTopicsResult
340278
AdminClient::listTopics(std::chrono::milliseconds timeout)
341279
{
342280
const rd_kafka_metadata_t* rk_metadata = nullptr;
@@ -345,18 +283,18 @@ AdminClient::listTopics(std::chrono::milliseconds timeout)
345283

346284
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
347285
{
348-
return Admin::ListTopicsResult(Error{err, rd_kafka_err2str(err)});
286+
return admin::ListTopicsResult(Error{err, rd_kafka_err2str(err)});
349287
}
350288

351289
Topics names;
352290
for (int i = 0; i < rk_metadata->topic_cnt; ++i)
353291
{
354292
names.insert(rk_metadata->topics[i].topic);
355293
}
356-
return Admin::ListTopicsResult(names);
294+
return admin::ListTopicsResult(names);
357295
}
358296

359-
inline Admin::DeleteRecordsResult
297+
inline admin::DeleteRecordsResult
360298
AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
361299
std::chrono::milliseconds timeout)
362300
{
@@ -385,7 +323,7 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
385323

386324
if (!rk_ev)
387325
{
388-
return Admin::DeleteRecordsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
326+
return admin::DeleteRecordsResult(Error{RD_KAFKA_RESP_ERR__TIMED_OUT, "No response within the time limit"});
389327
}
390328

391329
std::list<Error> errors;
@@ -400,8 +338,8 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
400338

401339
errors.splice(errors.end(), getPerTopicPartitionResults(res_offsets));
402340

403-
return Admin::DeleteRecordsResult(combineErrors(errors));
341+
return admin::DeleteRecordsResult(combineErrors(errors));
404342
}
405343

406-
} // end of KAFKA_API
344+
} // end of KAFKA_API::clients
407345

0 commit comments

Comments
 (0)