|
13 | 13 | * See the License for the specific language governing permissions and |
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | | -#include "common.h" |
| 16 | +#include "SimpleIni.h" |
17 | 17 | #include "create_topic.h" |
18 | 18 | #include "delete_topic.h" |
19 | 19 | #include "describe_topic.h" |
20 | 20 | #include "list_topics.h" |
21 | 21 | #include <argparse/argparse.hpp> |
22 | 22 | #include <array> |
| 23 | +#include <cstring> |
23 | 24 | #include <librdkafka/rdkafka.h> |
24 | 25 | #include <memory> |
25 | 26 | #include <stdexcept> |
@@ -66,14 +67,62 @@ int main(int argc, char *argv[]) { |
66 | 67 | throw std::runtime_error("Failed to " + action + ": " + errstr.data()); |
67 | 68 | }; |
68 | 69 |
|
69 | | - auto rk_conf_map = load_rdkafka_configs(program); |
| 70 | + const auto config_file = program.get("--config"); |
| 71 | + CSimpleIni ini; |
| 72 | + if (auto rc = ini.LoadFile(config_file.c_str()); rc < 0) { |
| 73 | + throw std::runtime_error("Error loading config file " + config_file); |
| 74 | + } |
| 75 | + auto get_value = [&ini](const auto &key, bool required) { |
| 76 | + auto value = ini.GetValue("kafka", key, ""); |
| 77 | + if (strlen(value) == 0 && required) { |
| 78 | + throw std::runtime_error("Error: " + std::string(key) + |
| 79 | + " not found in kafka section"); |
| 80 | + } |
| 81 | + return std::string(value); |
| 82 | + }; |
| 83 | + |
| 84 | + std::unordered_map<std::string, std::string> rk_conf_map{ |
| 85 | + {"bootstrap.servers", get_value("bootstrap.servers", true)}}; |
| 86 | + if (auto token = get_value("token", false); !token.empty()) { |
| 87 | + rk_conf_map["sasl.mechanism"] = "PLAIN"; |
| 88 | + rk_conf_map["security.protocol"] = "SASL_SSL"; |
| 89 | + rk_conf_map["sasl.username"] = "user"; |
| 90 | + rk_conf_map["sasl.password"] = "token:" + token; |
| 91 | + } |
| 92 | + if (auto client_id = program.present("--client-id")) { |
| 93 | + rk_conf_map["client.id"] = client_id.value(); |
| 94 | + } |
70 | 95 | for (auto &&[key, value] : rk_conf_map) { |
71 | 96 | if (rd_kafka_conf_set(rk_conf, key.c_str(), value.c_str(), errstr.data(), |
72 | 97 | errstr.size()) != RD_KAFKA_CONF_OK) { |
73 | 98 | fail("set " + key + " => " + value); |
74 | 99 | } |
75 | 100 | } |
76 | 101 |
|
| 102 | + std::unique_ptr<FILE, decltype(&fclose)> file{nullptr, &fclose}; |
| 103 | + if (auto log_enabled = ini.GetValue("log", "enabled", "false"); |
| 104 | + std::string(log_enabled) == "false") { |
| 105 | + // Disable logging in rdkafka |
| 106 | + rd_kafka_conf_set_log_cb( |
| 107 | + rk_conf, +[](const rd_kafka_t *rk, int level, const char *fac, |
| 108 | + const char *buf) {}); |
| 109 | + } else { |
| 110 | + if (auto log_file = ini.GetValue("log", "path", ""); |
| 111 | + strlen(log_file) == 0) { |
| 112 | + rd_kafka_conf_set_opaque(rk_conf, stdout); |
| 113 | + } else { |
| 114 | + file.reset(fopen(log_file, "a")); |
| 115 | + std::cout << "Opened log file " << log_file << std::endl; |
| 116 | + rd_kafka_conf_set_opaque(rk_conf, file.get()); |
| 117 | + rd_kafka_conf_set_log_cb( |
| 118 | + rk_conf, +[](const rd_kafka_t *rk, int level, const char *fac, |
| 119 | + const char *buf) { |
| 120 | + auto file = static_cast<FILE *>(rd_kafka_opaque(rk)); |
| 121 | + fprintf(file, "[%d] %s: %s\n", level, fac, buf); |
| 122 | + }); |
| 123 | + } |
| 124 | + } |
| 125 | + |
77 | 126 | auto rk = |
78 | 127 | rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr.data(), errstr.size()); |
79 | 128 | if (!rk) { |
|
0 commit comments