Skip to content

Commit 5ad3e90

Browse files
committed
Improve doc
1 parent 02cb4b6 commit 5ad3e90

File tree

6 files changed

+73
-64
lines changed

6 files changed

+73
-64
lines changed

README.md

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -163,44 +163,50 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t
163163

164164
* [Kafka Client API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html)
165165

166+
* `Properties` for Kafka clients
166167

167-
* Kafka Client Properties
168+
* `Properties` is a map which contains all configuration info needed to initialize a Kafka client. These configuration items are key-value pairs, -- the "key" is a `std::string`, while the "value" could be a `std::string`, a `std::function<...>`, or an `Interceptors`.
168169

169-
* In most cases, the `Properties` settings for ***modern-cpp-kafka*** are identical with [librdkafka configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
170+
* K-V Types: `std::string` -> `std::string`
170171

171-
* With following exceptions
172+
* Most are identical with [librdkafka configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
172173

173-
* KafkaConsumer
174+
* But with Exceptions
174175

175-
* Properties with random string as default
176+
* Default Value Changes
176177

177-
* `client.id`
178+
* `log_level`(default: `5`): default was `6` from **librdkafka**
178179

179-
* `group.id`
180+
* `client.id` (default: random string): no default string from **librdkafka**
180181

181-
* More properties than ***librdkafka***
182+
* `group.id` (default: random string, for `KafkaConsumer` only): no default string from **librdkafka**
182183

183-
* `max.poll.records` (default: `500`): The maxmum number of records that a single call to `poll()` would return
184+
* Additional Options
184185

185-
* Property which overrides the one from ***librdkafka***
186+
* `enable.manual.events.poll` (default: `false`): To poll the (offset-commit/message-delivery callback) events manually
186187

187-
* `enable.auto.commit` (default: `false`): To automatically commit the previously polled offsets on each `poll` operation
188+
* `max.poll.records` (default: `500`, for `KafkaConsumer` only): The maxmum number of records that a single call to `poll()` would return
188189

189-
* Properties not supposed to be used (internally shadowed by ***modern-cpp-kafka***)
190+
* Ignored Options
190191

191-
* `enable.auto.offset.store`
192+
* `enable.auto.offset.store`: ***modern-cpp-kafka*** will save the offsets in its own way
192193

193-
* `auto.commit.interval.ms`
194+
* `auto.commit.interval.ms`: ***modern-cpp-kafka*** will not commit the offsets periodically, instead, it would do it in the next `poll()`.
194195

195-
* KafkaProducer
196196

197-
* Properties with random string as default
197+
* K-V Types: `std::string` -> `std::function<...>`
198198

199-
* `client.id`
199+
* `log_cb` -> `LogCallback` (`std::function<void(int, const char*, int, const char* msg)>`)
200200

201-
* Log level
201+
* `error_cb` -> `ErrorCallback` (`std::function<void(const Error&)>`)
202202

203-
* The default `log_level` is `NOTICE` (`5`) for all these clients
203+
* `stats_cb` -> `StatsCallback` (`std::function<void(const std::string&)>`)
204+
205+
* `oauthbearer_token_refresh_cb` -> `OauthbearerTokenRefreshCallback` (`std::function<SaslOauthbearerToken(const std::string&)>`)
206+
207+
* K-V Types: `std::string` -> `Interceptors`
208+
209+
* `interceptors`: takes `Interceptors` as the value type
204210

205211
* Test Environment (ZooKeeper/Kafka cluster) Setup
206212

doc/HowToMakeKafkaProducerReliable.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,16 @@ The `msgid` is used, (along with a base `msgid` value stored at the time the `PI
153153
### `KafkaProducer` demo
154154

155155
```cpp
156+
using namespace kafka::clients;
157+
using namespace kafka::clients::producer;
158+
156159
std::atomic<bool> running = true;
157160

158161
KafkaProducer producer(
159162
Properties({
160-
{ ProducerConfig::BOOTSTRAP_SERVERS, "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092" },
161-
{ ProducerConfig::ENABLE_IDEMPOTENCE, "true" },
162-
{ ProducerConfig::MESSAGE_TIMEOUT_MS, "86400000"} // as long as 1 day
163+
{ Config::BOOTSTRAP_SERVERS, {"192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092"} },
164+
{ ProducerConfig::ENABLE_IDEMPOTENCE, {"true"} },
165+
{ ProducerConfig::MESSAGE_TIMEOUT_MS, {"86400000"} } // as long as 1 day
163166
})
164167
);
165168

@@ -168,7 +171,7 @@ The `msgid` is used, (along with a base `msgid` value stored at the time the `PI
168171
auto record = ProducerRecord(topic, msg.key, msg.value, msg.id);
169172
producer.send(record,
170173
// Ack callback
171-
[&msg](const Producer::RecordMetadata& metadata, std::error_code ec) {
174+
[&msg](const RecordMetadata& metadata, std::error_code ec) {
172175
// the message could be identified by `metadata.recordId()`
173176
auto recordId = metadata.recordId();
174177
if (ec) {

doc/KafkaConsumerQuickStart.md

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ Generally speaking, The `Modern C++ Kafka API` is quite similar with [Kafka Java
44

55
We'd recommend users to cross-reference them, --especially the examples.
66

7-
Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAutoCommitConsumer and KafkaManualCommitConsumer, --depending on whether users should call `commit` manually.
8-
97
## KafkaConsumer (`enable.auto.commit=true`)
108

119
* Automatically commits previously polled offsets on each `poll` (and the final `close`) operations.
@@ -15,13 +13,12 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
1513
### Example
1614
```cpp
1715
// Create configuration object
18-
kafka::Properties props ({
19-
{"bootstrap.servers", brokers},
20-
{"enable.auto.commit", "true"}
16+
const Properties props ({
17+
{"bootstrap.servers", {brokers}},
2118
});
2219

2320
// Create a consumer instance
24-
kafka::clients::KafkaConsumer consumer(props);
21+
KafkaConsumer consumer(props);
2522

2623
// Subscribe to topics
2724
consumer.subscribe({topic});
@@ -40,15 +37,15 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
4037
std::cout << " Partition: " << record.partition() << std::endl;
4138
std::cout << " Offset : " << record.offset() << std::endl;
4239
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
43-
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
40+
std::cout << " Headers : " << toString(record.headers()) << std::endl;
4441
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
4542
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
4643
} else {
4744
std::cerr << record.toString() << std::endl;
4845
}
4946
}
5047
}
51-
48+
5249
// consumer.close(); // No explicit close is needed, RAII will take care of it
5350
```
5451
@@ -67,12 +64,13 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
6764
### Example
6865
```cpp
6966
// Create configuration object
70-
kafka::Properties props ({
71-
{"bootstrap.servers", brokers},
67+
const Properties props ({
68+
{"bootstrap.servers", {brokers}},
69+
{"enable.auto.commit", {"false" }}
7270
});
7371
7472
// Create a consumer instance
75-
kafka::clients::KafkaConsumer consumer(props);
73+
KafkaConsumer consumer(props);
7674
7775
// Subscribe to topics
7876
consumer.subscribe({topic});
@@ -98,7 +96,7 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
9896
std::cout << " Partition: " << record.partition() << std::endl;
9997
std::cout << " Offset : " << record.offset() << std::endl;
10098
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
101-
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
99+
std::cout << " Headers : " << toString(record.headers()) << std::endl;
102100
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
103101
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
104102
@@ -112,7 +110,7 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
112110
auto now = std::chrono::steady_clock::now();
113111
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
114112
// Commit offsets for messages polled
115-
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
113+
std::cout << "% syncCommit offsets: " << utility::getCurrentTime() << std::endl;
116114
consumer.commitSync(); // or commitAsync()
117115
118116
lastTimeCommitted = now;
@@ -124,21 +122,21 @@ Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAuto
124122
// consumer.close(); // No explicit close is needed, RAII will take care of it
125123
```
126124

127-
* The example is quite similar with the KafkaAutoCommitConsumer, with only 1 more line added for manual-commit.
125+
* The example is quite similar with the previous `enable.auto.commit=true` case, but has to call `commitSync`(or `commitAsync`) manually.
128126

129-
* `commitSync` and `commitAsync` are both available for a KafkaManualConsumer. Normally, use `commitSync` to guarantee the commitment, or use `commitAsync`(with `OffsetCommitCallback`) to get a better performance.
127+
* `commitSync` and `commitAsync` are both available here. Normally, use `commitSync` to guarantee the commitment, or use `commitAsync`(with `OffsetCommitCallback`) to get a better performance.
130128

131-
## `KafkaConsumer` with `kafka::clients::KafkaClient::EventsPollingOption`
129+
## Option `enable.manual.events.poll`
132130

133-
While we construct a `KafkaConsumer` with `kafka::clients::KafkaClient::EventsPollingOption::Auto` (i.e. the default option), an internal thread would be created for `OffsetCommit` callbacks handling.
131+
While we construct a `KafkaConsumer` with `enable.manual.events.poll=false` (i.e. the default option), an internal thread would be created for `OffsetCommit` callbacks handling.
134132

135133
This might not be what you want, since then you have to use 2 different threads to process the messages and handle the `OffsetCommit` responses.
136134

137-
Here we have another choice, -- using `kafka::clients::KafkaClient::EventsPollingOption::Manual`, thus the `OffsetCommit` callbacks would be called within member function `pollEvents()`.
135+
Here we have another choice, -- using `enable.manual.events.poll=true`, thus the `OffsetCommit` callbacks would be called within member function `pollEvents()`.
138136

139137
### Example
140138
```cpp
141-
KafkaConsumer consumer(props, kafka::clients::KafkaClient::EventsPollingOption::Manual);
139+
KafkaConsumer consumer(props.put("enable.manual.events.poll", "true"));
142140

143141
consumer.subscribe({"topic1", "topic2"});
144142

@@ -153,7 +151,7 @@ Here we have another choice, -- using `kafka::clients::KafkaClient::EventsPollin
153151
}
154152

155153
// Here we call the `OffsetCommit` callbacks
156-
// Note, we can only do this while the consumer was constructed with `EventsPollingOption::Manual`.
154+
// Note, we can only do this while the consumer was constructed with `enable.manual.events.poll=true`.
157155
consumer.pollEvents();
158156
}
159157
```

doc/KafkaProducerQuickStart.md

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ We'd recommend users to cross-reference them, --especially the examples.
1010

1111
### Example
1212
```cpp
13+
using namespace kafka;
1314
using namespace kafka::clients;
15+
using namespace kafka::clients::producer;
1416

1517
// Create configuration object
16-
kafka::Properties props ({
17-
{"bootstrap.servers", brokers},
18-
{"enable.idempotence", "true"},
18+
const Properties props ({
19+
{"bootstrap.servers", {brokers}},
20+
{"enable.idempotence", {"true" }},
1921
});
2022

2123
// Create a producer instance
@@ -28,17 +30,17 @@ We'd recommend users to cross-reference them, --especially the examples.
2830
std::getline(std::cin, *line);
2931
line = std::make_shared<std::string>()) {
3032
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
31-
auto record = producer::ProducerRecord(topic,
32-
kafka::NullKey,
33-
kafka::Value(line->c_str(), line->size()));
33+
auto record = ProducerRecord(topic,
34+
NullKey,
35+
Value(line->c_str(), line->size()));
3436

3537
// Send the message
3638
producer.send(record,
3739
// The delivery report handler
3840
// Note: Here we capture the shared_pointer of `line`,
3941
// which holds the content for `record.value()`.
4042
// It makes sure the memory block is valid until the lambda finishes.
41-
[line](const producer::RecordMetadata& metadata, const kafka::Error& error) {
43+
[line](const RecordMetadata& metadata, const Error& error) {
4244
if (!error) {
4345
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
4446
} else {
@@ -60,32 +62,32 @@ We'd recommend users to cross-reference them, --especially the examples.
6062

6163
## `KafkaProducer` with `kafka::clients::KafkaClient::EventsPollingOption`
6264

63-
While we construct a `KafkaProducer` with `kafka::clients::KafkaClient::EventsPollingOption::Auto` (the default option), an internal thread would be created for `MessageDelivery` callbacks handling.
65+
While we construct a `KafkaProducer` with `enable.manual.events.poll=false` (the default option), an internal thread would be created for `MessageDelivery` callbacks handling.
6466

6567
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the `MessageDelivery` responses.
6668

67-
Here we have another choice, -- using `kafka::clients::KafkaClient::EventsPollingOption::Manual`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
69+
Here we have another choice, -- using `enable.manual.events.poll=true`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
6870

69-
* Note, if you constructed the `KafkaProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
70-
I.e, once the `message buffering queue` becomes full, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there.
71-
This makes sense, since you might want to call `pollEvents()` later, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`).
71+
* Note, if you constructed the `KafkaProducer` with `enable.manual.events.poll=true`, the `send()` will be an `unblocked` operation even if the `message buffering queue` is full. In that case, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there. And you might want to call `pollEvents()`, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`).
7272

7373
### Example
7474
```cpp
75+
using namespace kafka;
7576
using namespace kafka::clients;
77+
using namespace kafka::clients::producer;
7678

77-
KafkaProducer producer(props, KafkaClient::EventsPollingOption::Manual);
79+
KafkaProducer producer(props.put("enable.manual.events.poll", "true"));
7880

7981
// Prepare "msgsToBeSent"
8082
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
8183

8284
for (const auto& msg : msgsToBeSent) {
83-
auto record = producer::ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
85+
auto record = ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
8486
kafka::Error sendError;
8587
producer.send(sendError,
8688
record,
8789
// Ack callback
88-
[&msg](const producer::RecordMetadata& metadata, const kafka::Error& deliveryError) {
90+
[&msg](const RecordMetadata& metadata, const Error& deliveryError) {
8991
// the message could be identified by `metadata.recordId()`
9092
if (deliveryError) {
9193
std::cerr << "% Message delivery failed: " << deliveryError.message() << std::endl;
@@ -97,7 +99,7 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
9799
}
98100

99101
// Here we call the `MessageDelivery` callbacks
100-
// Note, we can only do this while the producer was constructed with `EventsPollingOption::MANUAL`.
102+
// Note, we can only do this while the producer was constructed with `enable.manual.events.poll=true`.
101103
producer.pollEvents();
102104
```
103105

@@ -111,7 +113,7 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
111113
```cpp
112114
using namespace kafka::clients;
113115

114-
kafak::KafkaProducer producer(props);
116+
kafak::producer::KafkaProducer producer(props);
115117

116118
auto record = producer::ProducerRecord(topic, partition, Key(), Value());
117119

@@ -129,7 +131,7 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
129131

130132
producer.send(record,
131133
// Ack callback
132-
[&msg](const kafka::Producer::RecordMetadata& metadata, , const kafka::Error& error) {
134+
[&msg](const kafka::producer::RecordMetadata& metadata, , const kafka::Error& error) {
133135
if (error) {
134136
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
135137
}

examples/kafka_auto_commit_consumer.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ int main(int argc, char **argv)
2121

2222
// Create configuration object
2323
const Properties props ({
24-
{"bootstrap.servers", {brokers}},
25-
{"enable.auto.commit", {"true" }}
24+
{"bootstrap.servers", {brokers}}
2625
});
2726

2827
// Create a consumer instance

examples/kafka_manual_commit_consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ int main(int argc, char **argv)
2121

2222
// Create configuration object
2323
const Properties props ({
24-
{"bootstrap.servers", {brokers}},
24+
{"bootstrap.servers", {brokers}},
25+
{"enable.auto.commit", {"false"}}
2526
});
2627

2728
// Create a consumer instance

0 commit comments

Comments
 (0)