Skip to content

Commit 6158d93

Browse files
authored
Example fixes (#96)
* Add example for kafka buffered producer * Add notes regarding bool returned in produce failure callback * Fix example names
1 parent 5c72f3f commit 6158d93

File tree

8 files changed

+107
-6
lines changed

8 files changed

+107
-6
lines changed

examples/CMakeLists.txt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})
44

55
add_custom_target(examples)
66
macro(create_example example_name)
7-
add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp")
8-
add_dependencies(examples ${example_name})
7+
string(REPLACE "_" "-" sanitized_name ${example_name})
8+
add_executable(${sanitized_name} EXCLUDE_FROM_ALL "${example_name}_example.cpp")
9+
add_dependencies(examples ${sanitized_name})
910
endmacro()
1011

11-
create_example(kafka_producer)
12-
create_example(kafka_consumer)
13-
create_example(kafka_consumer_dispatcher)
12+
create_example(producer)
13+
create_example(buffered_producer)
14+
create_example(consumer)
15+
create_example(consumer_dispatcher)
1416
create_example(metadata)
1517
create_example(consumers_information)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#include <stdexcept>
2+
#include <iostream>
3+
#include <boost/program_options.hpp>
4+
#include "cppkafka/utils/buffered_producer.h"
5+
#include "cppkafka/configuration.h"
6+
7+
using std::string;
8+
using std::exception;
9+
using std::getline;
10+
using std::cin;
11+
using std::cout;
12+
using std::endl;
13+
14+
using cppkafka::BufferedProducer;
15+
using cppkafka::Configuration;
16+
using cppkafka::Topic;
17+
using cppkafka::MessageBuilder;
18+
using cppkafka::Message;
19+
20+
namespace po = boost::program_options;
21+
22+
int main(int argc, char* argv[]) {
23+
string brokers;
24+
string topic_name;
25+
int partition_value = -1;
26+
27+
po::options_description options("Options");
28+
options.add_options()
29+
("help,h", "produce this help message")
30+
("brokers,b", po::value<string>(&brokers)->required(),
31+
"the kafka broker list")
32+
("topic,t", po::value<string>(&topic_name)->required(),
33+
"the topic in which to write to")
34+
("partition,p", po::value<int>(&partition_value),
35+
"the partition to write into (unassigned if not provided)")
36+
;
37+
38+
po::variables_map vm;
39+
40+
try {
41+
po::store(po::command_line_parser(argc, argv).options(options).run(), vm);
42+
po::notify(vm);
43+
}
44+
catch (exception& ex) {
45+
cout << "Error parsing options: " << ex.what() << endl;
46+
cout << endl;
47+
cout << options << endl;
48+
return 1;
49+
}
50+
51+
// Create a message builder for this topic
52+
MessageBuilder builder(topic_name);
53+
54+
// Get the partition we want to write to. If no partition is provided, this will be
55+
// an unassigned one
56+
if (partition_value != -1) {
57+
builder.partition(partition_value);
58+
}
59+
60+
// Construct the configuration
61+
Configuration config = {
62+
{ "metadata.broker.list", brokers }
63+
};
64+
65+
// Create the producer
66+
BufferedProducer<string> producer(config);
67+
68+
// Set a produce success callback
69+
producer.set_produce_success_callback([](const Message& msg) {
70+
cout << "Successfully produced message with payload " << msg.get_payload() << endl;
71+
});
72+
// Set a produce failure callback
73+
producer.set_produce_failure_callback([](const Message& msg) {
74+
cout << "Failed to produce message with payload " << msg.get_payload() << endl;
75+
// Return false so we stop trying to produce this message
76+
return false;
77+
});
78+
79+
cout << "Producing messages into topic " << topic_name << endl;
80+
81+
// Now read lines and write them into kafka
82+
string line;
83+
while (getline(cin, line)) {
84+
// Set the payload on this builder
85+
builder.payload(line);
86+
87+
// Add the message we've built to the buffered producer
88+
producer.add_message(builder);
89+
90+
// Now flush so we:
91+
// * emit the buffered message
92+
// * poll the producer so we dispatch on delivery report callbacks and
93+
// therefore get the produce failure/success callbacks
94+
producer.flush();
95+
}
96+
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

include/cppkafka/utils/buffered_producer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ class CPPKAFKA_API BufferedProducer {
9999
using ProduceSuccessCallback = std::function<void(const Message&)>;
100100

101101
/**
102-
* Callback to indicate a message failed to be produced by the broker
102+
* Callback to indicate a message failed to be produced by the broker.
103+
*
104+
* The returned bool indicates whether the BufferedProducer should try to produce
105+
* the message again after each failure.
103106
*/
104107
using ProduceFailureCallback = std::function<bool(const Message&)>;
105108

0 commit comments

Comments
 (0)