Skip to content

Commit 13b3f91

Browse files
committed
Update doc
1 parent 12bae97 commit 13b3f91

6 files changed

+93
-78
lines changed

doc/KafkaConsumerQuickStart.md

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,27 @@ We'd recommend users to cross-reference them, --especially the examples.
66

77
Unlike Java's KafkaConsumer, here we introduced two derived classes, --KafkaAutoCommitConsumer and KafkaManualCommitConsumer, --depending on whether users should call `commit` manually.
88

9-
## KafkaAutoCommitConsumer
9+
## KafkaConsumer (`enable.auto.commit=true`)
1010

11-
* Friendly for users, --would not care about when to commit the offsets for these received messages.
11+
* Automatically commits previously polled offsets on each `poll` (and the final `close`) operations.
1212

13-
* Internally, it would commit the offsets (for received records) within the next `poll` and the final `close`.
14-
Note, each internal `commit` would "try its best", but "not guaranteed to succeed", -- it's supposed to be called periodically, thus occasional failure doesn't matter.
13+
* Note, the internal `offset commit` is asynchronous, and is not guaranteed to succeed. It's supposed to be triggered (within each `poll` operation) periodically, thus the occasional failure doesn't quite matter.
1514

1615
### Example
1716
```cpp
1817
// Create configuration object
1918
kafka::Properties props ({
20-
{"bootstrap.servers", brokers},
19+
{"bootstrap.servers", brokers},
20+
{"enable.auto.commit", "true"}
2121
});
2222

23-
// Create a consumer instance.
24-
kafka::KafkaAutoCommitConsumer consumer(props);
23+
// Create a consumer instance
24+
kafka::clients::KafkaConsumer consumer(props);
2525

2626
// Subscribe to topics
2727
consumer.subscribe({topic});
2828

29-
// Read messages from the topic.
29+
// Read messages from the topic
3030
std::cout << "% Reading messages from topic: " << topic << std::endl;
3131
while (true) {
3232
auto records = consumer.poll(std::chrono::milliseconds(100));
@@ -48,17 +48,19 @@ Note, each internal `commit` would "try its best", but "not guaranteed to succee
4848
}
4949
}
5050
}
51+
52+
// consumer.close(); // No explicit close is needed, RAII will take care of it
5153
```
5254
53-
* `ConsumerConfig::BOOTSTRAP_SERVERS` is mandatory for `ConsumerConfig`.
55+
* `bootstrap.servers` property is mandatory for a Kafka client.
5456
55-
* `subscribe` could take a topic list. And it's a blocking operation, -- would return after the rebalance event triggered callback was executed.
57+
* `subscribe` could take a topic list. It's a block operation, would wait the consumer to get partitions assigned.
5658
57-
* `poll` must be periodically called, and it would trigger kinds of callback handling internally. As in this example, just put it in a "while loop" would be OK.
59+
* `poll` must be called periodically, thus to trigger kinds of callback handling internally. In practice, it could be put in a "while loop".
5860
59-
* At the end, the user could `close` the consumer manually, or just leave it to the destructor (which would `close` anyway).
61+
* At the end, we could `close` the consumer explicitly, or just leave it to the destructor.
6062
61-
## KafkaManualCommitConsumer
63+
## KafkaConsumer (`enable.auto.commit=false`)
6264
6365
* Users must commit the offsets for received records manually.
6466
@@ -69,15 +71,15 @@ Note, each internal `commit` would "try its best", but "not guaranteed to succee
6971
{"bootstrap.servers", brokers},
7072
});
7173
72-
// Create a consumer instance.
73-
kafka::KafkaManualCommitConsumer consumer(props);
74+
// Create a consumer instance
75+
kafka::clients::KafkaConsumer consumer(props);
7476
7577
// Subscribe to topics
7678
consumer.subscribe({topic});
7779
7880
auto lastTimeCommitted = std::chrono::steady_clock::now();
7981
80-
// Read messages from the topic.
82+
// Read messages from the topic
8183
std::cout << "% Reading messages from topic: " << topic << std::endl;
8284
bool allCommitted = true;
8385
bool running = true;
@@ -110,31 +112,33 @@ Note, each internal `commit` would "try its best", but "not guaranteed to succee
110112
auto now = std::chrono::steady_clock::now();
111113
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
112114
// Commit offsets for messages polled
113-
std::cout << "% syncCommit offsets: " << kafka::Utility::getCurrentTime() << std::endl;
115+
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
114116
consumer.commitSync(); // or commitAsync()
115117
116118
lastTimeCommitted = now;
117119
allCommitted = true;
118120
}
119121
}
120122
}
123+
124+
// consumer.close(); // No explicit close is needed, RAII will take care of it
121125
```
122126

123127
* The example is quite similar with the KafkaAutoCommitConsumer, with only 1 more line added for manual-commit.
124128

125129
* `commitSync` and `commitAsync` are both available for a KafkaManualConsumer. Normally, use `commitSync` to guarantee the commitment, or use `commitAsync`(with `OffsetCommitCallback`) to get a better performance.
126130

127-
## KafkaManualCommitConsumer with `KafkaClient::EventsPollingOption::Manual`
131+
## `KafkaConsumer` with `kafka::clients::KafkaClient::EventsPollingOption`
128132

129-
While we construct a `KafkaManualCommitConsumer` with option `KafkaClient::EventsPollingOption::AUTO` (default), an internal thread would be created for `OffsetCommit` callbacks handling.
133+
While we construct a `KafkaConsumer` with `kafka::clients::KafkaClient::EventsPollingOption::Auto` (i.e. the default option), an internal thread would be created for `OffsetCommit` callbacks handling.
130134

131135
This might not be what you want, since then you have to use 2 different threads to process the messages and handle the `OffsetCommit` responses.
132136

133-
Here we have another choice, -- using `KafkaClient::EventsPollingOption::Manual`, thus the `OffsetCommit` callbacks would be called within member function `pollEvents()`.
137+
Here we have another choice, -- using `kafka::clients::KafkaClient::EventsPollingOption::Manual`, thus the `OffsetCommit` callbacks would be called within member function `pollEvents()`.
134138

135139
### Example
136140
```cpp
137-
KafkaManualCommitConsumer consumer(props, KafkaClient::EventsPollingOption::Manual);
141+
KafkaConsumer consumer(props, kafka::clients::KafkaClient::EventsPollingOption::Manual);
138142

139143
consumer.subscribe({"topic1", "topic2"});
140144

@@ -156,17 +160,17 @@ Here we have another choice, -- using `KafkaClient::EventsPollingOption::Manual`
156160
157161
## Error handling
158162
159-
No exception would be thrown by `KafkaProducer::poll()`.
163+
No exception would be thrown from a consumer's `poll` operation.
160164
161-
Once an error occurs, the `ErrorCode` would be embedded in the `Consumer::ConsumerRecord`.
165+
Instead, once an error occurs, the `Error` would be embedded in the `Consumer::ConsumerRecord`.
162166
163-
There're 2 cases,
167+
About `Error`'s `value()`s, there are 2 cases
164168
165169
1. Success
166170
167-
- RD_KAFKA_RESP_ERR__NO_ERROR (0), -- got a message successfully
171+
- `RD_KAFKA_RESP_ERR__NO_ERROR` (`0`), -- got a message successfully
168172
169-
- RD_KAFKA_RESP_ERR__PARTITION_EOF, -- reached the end of a partition (no message got)
173+
- `RD_KAFKA_RESP_ERR__PARTITION_EOF`, -- reached the end of a partition (no message got)
170174
171175
2. Failure
172176
@@ -187,21 +191,20 @@ There're 2 cases,
187191
188192
* How many threads would be created by a KafkaConsumer?
189193
190-
Excluding the user's main thread, `KafkaAutoCommitConsumer` would start another (N + 2) threads in the background, while `KafkaManualConsumer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
194+
Excluding the user's main thread, if `enable.auto.commit` is `false`, the `KafkaConsumer` would start another (N + 2) threads in the background; otherwise, the `KafkaConsumer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
191195
192196
1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server.
193197
194198
2. Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc.
195199
196-
3. KafkaManualConsumer has one more thread, which keeps polling the offset-commit callback event.
200+
3. To enable the auto commit, one more thread would be create, which keeps polling/processing the offset-commit callback event.
197201
198-
E.g, if a KafkaAutoCommitConsumer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
202+
E.g, if a KafkaConsumer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
199203
200204
* Which one of these threads will handle the callbacks?
201205
202206
There are 2 kinds of callbacks for a KafkaConsumer,
203207
204208
1. `RebalanceCallback` will be triggered internally by the user's thread, -- within the `poll` function.
205209
206-
2. `OffsetCommitCallback` (only available for `KafkaManualCommitConsumer`) will be triggered by a background thread, not by the user's thread.
207-
210+
2. If `enable.auto.commit=true`, the `OffsetCommitCallback` will be triggered by the user's `poll` thread; otherwise, it would be triggered by a background thread.

doc/KafkaProducerQuickStart.md

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,42 +6,50 @@ We'd recommend users to cross-reference them, --especially the examples.
66

77
## KafkaProducer
88

9-
* The `send` is an unblocking operation, and the result (including errors) could only be got from the delivery callback.
9+
* The `send` is an unblock operation, and the result (including errors) could only be got from the delivery callback.
1010

1111
### Example
1212
```cpp
13+
using namespace kafka::clients;
14+
1315
// Create configuration object
1416
kafka::Properties props ({
1517
{"bootstrap.servers", brokers},
1618
{"enable.idempotence", "true"},
1719
});
1820

19-
// Create a producer instance.
20-
kafka::KafkaProducer producer(props);
21+
// Create a producer instance
22+
KafkaProducer producer(props);
2123

22-
// Read messages from stdin and produce to the broker.
24+
// Read messages from stdin and produce to the broker
2325
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
2426

25-
for (std::string line; std::getline(std::cin, line);) {
27+
for (auto line = std::make_shared<std::string>();
28+
std::getline(std::cin, *line);
29+
line = std::make_shared<std::string>()) {
2630
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
27-
auto record = kafka::ProducerRecord(topic,
28-
kafka::NullKey,
29-
kafka::Value(line.c_str(), line.size()));
30-
// Send the message.
31+
auto record = producer::ProducerRecord(topic,
32+
kafka::NullKey,
33+
kafka::Value(line->c_str(), line->size()));
34+
35+
// Send the message
3136
producer.send(record,
3237
// The delivery report handler
33-
[](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
34-
if (!ec) {
38+
// Note: Here we capture the shared_pointer of `line`,
39+
// which holds the content for `record.value()`.
40+
// It makes sure the memory block is valid until the lambda finishes.
41+
[line](const producer::RecordMetadata& metadata, const kafka::Error& error) {
42+
if (!error) {
3543
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
3644
} else {
37-
std::cerr << "% Message delivery failed: " << ec.message() << std::endl;
45+
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
3846
}
39-
},
40-
// The memory block given by record.value() would be copied
41-
kafka::KafkaProducer::SendOption::ToCopyRecordValue);
47+
});
4248

43-
if (line.empty()) break;
49+
if (line->empty()) break;
4450
}
51+
52+
// producer.close(); // No explicit close is needed, RAII will take care of it
4553
```
4654

4755
* User must guarantee the memory block for `ProducerRecord`'s `key` is valid until being `send`.
@@ -50,40 +58,42 @@ We'd recommend users to cross-reference them, --especially the examples.
5058

5159
* It's guaranteed that the delivery callback would be triggered anyway after `send`, -- a producer would even be waiting for it before `close`. So, it's a good way to release these memory resources in the `Producer::Callback` function.
5260

53-
## `KafkaProducer` with `KafkaClient::EventsPollingOption::Manual`
61+
## `KafkaProducer` with `kafka::clients::KafkaClient::EventsPollingOption`
5462

55-
While we construct a `KafkaProducer` with option `KafkaClient::EventsPollingOption::Auto` (default), an internal thread would be created for `MessageDelivery` callbacks handling.
63+
While we construct a `KafkaProducer` with `kafka::clients::KafkaClient::EventsPollingOption::Auto` (the default option), an internal thread would be created for `MessageDelivery` callbacks handling.
5664

5765
This might not be what you want, since then you have to use 2 different threads to send the messages and handle the `MessageDelivery` responses.
5866

59-
Here we have another choice, -- using `KafkaClient::EventsPollingOption::Manual`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
67+
Here we have another choice, -- using `kafka::clients::KafkaClient::EventsPollingOption::Manual`, thus the `MessageDelivery` callbacks would be called within member function `pollEvents()`.
6068

6169
* Note, if you constructed the `KafkaProducer` with `EventsPollingOption::Manual`, the `send()` would be an `unblocked` operation.
6270
I.e, once the `message buffering queue` becomes full, the `send()` operation would throw an exception (or return an `error code` with the input reference parameter), -- instead of blocking there.
6371
This makes sense, since you might want to call `pollEvents()` later, thus delivery-callback could be called for some messages (which could then be removed from the `message buffering queue`).
6472

6573
### Example
6674
```cpp
67-
kafak::KafkaProducer producer(props, KafkaClient::EventsPollingOption::Manual);
75+
using namespace kafka::clients;
76+
77+
KafkaProducer producer(props, KafkaClient::EventsPollingOption::Manual);
6878

6979
// Prepare "msgsToBeSent"
7080
auto std::map<int, std::pair<Key, Value>> msgsToBeSent = ...;
7181

7282
for (const auto& msg : msgsToBeSent) {
73-
auto record = kafak::ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
74-
std::error_code ec;
75-
producer.send(ec,
83+
auto record = producer::ProducerRecord(topic, partition, msg.second.first, msg.second.second, msg.first);
84+
kafka::Error sendError;
85+
producer.send(sendError,
7686
record,
7787
// Ack callback
78-
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
88+
[&msg](const producer::RecordMetadata& metadata, const kafka::Error& deliveryError) {
7989
// the message could be identified by `metadata.recordId()`
80-
if (ec) {
81-
LOG_ERROR("Cannot send out message with recordId={0}", metadata.recordId());
90+
if (deliveryError) {
91+
std::cerr << "% Message delivery failed: " << deliveryError.message() << std::endl;
8292
} else {
8393
msgsToBeSend.erase(metadata.recordId()); // Quite safe here
8494
}
8595
});
86-
if (ec) break;
96+
if (sendError) break;
8797
}
8898

8999
// Here we call the `MessageDelivery` callbacks
@@ -99,9 +109,11 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
99109

100110
### Example
101111
```cpp
112+
using namespace kafka::clients;
113+
102114
kafak::KafkaProducer producer(props);
103115

104-
auto record = kafka::ProducerRecord(topic, partition, Key(), Value());
116+
auto record = producer::ProducerRecord(topic, partition, Key(), Value());
105117

106118
for (const auto& msg : msgsToBeSent) {
107119
// Prepare record headers
@@ -117,9 +129,9 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
117129

118130
producer.send(record,
119131
// Ack callback
120-
[&msg](const kafka::Producer::RecordMetadata& metadata, std::error_code ec) {
121-
if (ec) {
122-
LOG_ERROR("Cannot send out message: {0}, err: {1}", metadata.toString(), ec);
132+
[&msg](const kafka::Producer::RecordMetadata& metadata, , const kafka::Error& error) {
133+
if (error) {
134+
std::cerr << "% Message delivery failed: " << error.message() << std::endl;
123135
}
124136
});
125137
}
@@ -133,17 +145,17 @@ This makes sense, since you might want to call `pollEvents()` later, thus delive
133145
134146
2. Delivery `Error` would be passed through the delivery-callback.
135147
136-
There are 2 kinds of possible errors,
148+
About `Error`'s `value()`s, there are 2 cases
137149
138150
1. Local errors,
139151
140-
- RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC -- The topic doesn't exist
152+
- `RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC` -- The topic doesn't exist
141153
142-
- RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION -- The partition doesn't exist
154+
- `RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION` -- The partition doesn't exist
143155
144-
- RD_KAFKA_RESP_ERR__INVALID_ARG -- Invalid topic (topic is null or the length is too long (>512))
156+
- `RD_KAFKA_RESP_ERR__INVALID_ARG` -- Invalid topic (topic is null or the length is too long (>512))
145157
146-
- RD_KAFKA_RESP_ERR__MSG_TIMED_OUT -- No ack received within the time limit
158+
- `RD_KAFKA_RESP_ERR__MSG_TIMED_OUT` -- No ack received within the time limit
147159
148160
2. Broker errors,
149161
@@ -205,5 +217,5 @@ E.g, if a `KafkaProducer` was created with property of `BOOTSTRAP_SERVERS=127.0.
205217
206218
It will be handled by a background thread, not by the user's thread.
207219
208-
Note, should be careful if both the `KafkaProducer::send()` and the `Producer::Callback` might access the same container at the same time.
220+
Note, should be careful if both the `KafkaProducer::send()` and the `producer::Callback` might access the same container at the same time.
209221

examples/kafka_async_producer_copy_payload.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ int main(int argc, char **argv)
2424
{"enable.idempotence", "true"},
2525
});
2626

27-
// Create a producer instance.
27+
// Create a producer instance
2828
KafkaProducer producer(props);
2929

30-
// Read messages from stdin and produce to the broker.
30+
// Read messages from stdin and produce to the broker
3131
std::cout << "% Type message value and hit enter to produce message. (empty line to quit)" << std::endl;
3232

3333
for (std::string line; std::getline(std::cin, line);) {
3434
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
3535
auto record = producer::ProducerRecord(topic,
3636
kafka::NullKey,
3737
kafka::Value(line.c_str(), line.size()));
38-
// Send the message.
38+
// Send the message
3939
producer.send(record,
4040
// The delivery report handler
4141
[](const producer::RecordMetadata& metadata, const kafka::Error& error) {

0 commit comments

Comments
 (0)