Skip to content

Commit 46fb933

Browse files
committed
Clickhouse - introduce config
1 parent 52d09ea commit 46fb933

File tree

2 files changed

+338
-0
lines changed

2 files changed

+338
-0
lines changed
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Configuration parsing and representation
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "config.h"
12+
13+
#include <libfds.h>
14+
15+
#include <limits>
16+
#include <optional>
17+
#include <stdexcept>
18+
#include <memory>
19+
20+
namespace args {
21+
22+
enum {
23+
CONNECTION,
24+
ENDPOINTS,
25+
ENDPOINT,
26+
HOST,
27+
PORT,
28+
USER,
29+
PASSWORD,
30+
DATABASE,
31+
TABLE,
32+
COLUMNS,
33+
COLUMN,
34+
NAME,
35+
SOURCE,
36+
NULLABLE,
37+
INSERTER_THREADS,
38+
BLOCKS,
39+
BLOCK_INSERT_THRESHOLD,
40+
BLOCK_INSERT_MAX_DELAY_SECS,
41+
SPLIT_BIFLOW,
42+
BIFLOW_EMPTY_AUTOIGNORE,
43+
NONBLOCKING,
44+
};
45+
46+
47+
static const struct fds_xml_args column[] = {
48+
FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, 0),
49+
FDS_OPTS_ELEM(SOURCE, "source", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
50+
FDS_OPTS_ELEM(NULLABLE, "nullable", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
51+
FDS_OPTS_END,
52+
};
53+
54+
static const struct fds_xml_args columns[] = {
55+
FDS_OPTS_NESTED(COLUMN, "column", column, FDS_OPTS_P_MULTI),
56+
FDS_OPTS_END,
57+
};
58+
59+
static const struct fds_xml_args endpoint[] = {
60+
FDS_OPTS_ELEM (HOST, "host", FDS_OPTS_T_STRING, 0),
61+
FDS_OPTS_ELEM (PORT, "port", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
62+
FDS_OPTS_END,
63+
};
64+
65+
static const struct fds_xml_args endpoints[] = {
66+
FDS_OPTS_NESTED(ENDPOINT, "endpoint", endpoint, FDS_OPTS_P_MULTI),
67+
FDS_OPTS_END,
68+
};
69+
70+
static const struct fds_xml_args connection[] = {
71+
FDS_OPTS_NESTED(ENDPOINTS, "endpoints", endpoints, 0),
72+
FDS_OPTS_ELEM (USER, "user", FDS_OPTS_T_STRING, 0),
73+
FDS_OPTS_ELEM (PASSWORD, "password", FDS_OPTS_T_STRING, 0),
74+
FDS_OPTS_ELEM (DATABASE, "database", FDS_OPTS_T_STRING, 0),
75+
FDS_OPTS_ELEM (TABLE, "table", FDS_OPTS_T_STRING, 0),
76+
FDS_OPTS_END,
77+
};
78+
79+
static const struct fds_xml_args root[] = {
80+
FDS_OPTS_ROOT ("params"),
81+
FDS_OPTS_NESTED(CONNECTION, "connection", connection, 0),
82+
FDS_OPTS_ELEM (INSERTER_THREADS, "inserterThreads", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
83+
FDS_OPTS_ELEM (BLOCKS, "blocks", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
84+
FDS_OPTS_ELEM (BLOCK_INSERT_THRESHOLD, "blockInsertThreshold", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
85+
FDS_OPTS_ELEM (BLOCK_INSERT_MAX_DELAY_SECS, "blockInsertMaxDelaySecs", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
86+
FDS_OPTS_ELEM (SPLIT_BIFLOW, "splitBiflow", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
87+
FDS_OPTS_ELEM (BIFLOW_EMPTY_AUTOIGNORE, "biflowEmptyAutoignore", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
88+
FDS_OPTS_ELEM (NONBLOCKING, "nonblocking", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
89+
FDS_OPTS_NESTED(COLUMNS, "columns", columns, 0),
90+
FDS_OPTS_END,
91+
};
92+
93+
}
94+
95+
static std::optional<SpecialField> parse_special_field(const std::string &name)
96+
{
97+
if (name == "odid") {
98+
return {SpecialField::ODID};
99+
}
100+
return {};
101+
}
102+
103+
static Config::Column parse_column(fds_xml_ctx_t *column_ctx, const fds_iemgr_t *iemgr)
104+
{
105+
const fds_xml_cont *content;
106+
Config::Column column;
107+
std::string source;
108+
109+
while (fds_xml_next(column_ctx, &content) == FDS_OK) {
110+
if (content->id == args::NAME) {
111+
column.name = content->ptr_string;
112+
} else if (content->id == args::NULLABLE) {
113+
column.nullable = content->val_bool;
114+
} else if (content->id == args::SOURCE) {
115+
source = content->ptr_string;
116+
}
117+
}
118+
119+
if (source.empty()) {
120+
source = column.name;
121+
}
122+
123+
const fds_iemgr_elem *elem = fds_iemgr_elem_find_name(iemgr, source.c_str());
124+
const fds_iemgr_alias *alias = fds_iemgr_alias_find(iemgr, source.c_str());
125+
std::optional<SpecialField> special = parse_special_field(source);
126+
if (!special && !elem && !alias) {
127+
throw std::runtime_error("IPFIX element with name \"" + source + "\" not found");
128+
} else if (special) {
129+
column.source = *special;
130+
} else if (alias) {
131+
column.source = alias;
132+
} else if (elem) {
133+
column.source = elem;
134+
}
135+
136+
return column;
137+
}
138+
139+
static std::vector<Config::Column> parse_columns(fds_xml_ctx_t *columns_ctx, const fds_iemgr_t *iemgr)
140+
{
141+
const fds_xml_cont *content;
142+
std::vector<Config::Column> columns;
143+
144+
while (fds_xml_next(columns_ctx, &content) == FDS_OK) {
145+
columns.push_back(parse_column(content->ptr_ctx, iemgr));
146+
}
147+
148+
return columns;
149+
}
150+
151+
static Config::Endpoint parse_endpoint(fds_xml_ctx_t *endpoint_ctx)
152+
{
153+
Config::Endpoint endpoint;
154+
const fds_xml_cont *content;
155+
156+
while (fds_xml_next(endpoint_ctx, &content) == FDS_OK) {
157+
if (content->id == args::HOST) {
158+
endpoint.host = content->ptr_string;
159+
160+
} else if (content->id == args::PORT) {
161+
if (content->val_uint > std::numeric_limits<uint16_t>::max()) {
162+
throw std::runtime_error(std::to_string(content->val_uint) + " is not a valid port number");
163+
}
164+
endpoint.port = content->val_uint;
165+
166+
}
167+
}
168+
169+
return endpoint;
170+
}
171+
172+
static std::vector<Config::Endpoint> parse_endpoints(fds_xml_ctx_t *endpoints_ctx)
173+
{
174+
std::vector<Config::Endpoint> endpoints;
175+
const fds_xml_cont *content;
176+
177+
while (fds_xml_next(endpoints_ctx, &content) == FDS_OK) {
178+
if (content->id == args::ENDPOINT) {
179+
endpoints.push_back(parse_endpoint(content->ptr_ctx));
180+
}
181+
}
182+
183+
return endpoints;
184+
}
185+
186+
static Config::Connection parse_connection(fds_xml_ctx_t *connection_ctx)
187+
{
188+
Config::Connection connection;
189+
const fds_xml_cont *content;
190+
191+
while (fds_xml_next(connection_ctx, &content) == FDS_OK) {
192+
if (content->id == args::USER) {
193+
connection.user = content->ptr_string;
194+
195+
} else if (content->id == args::PASSWORD) {
196+
connection.password = content->ptr_string;
197+
198+
} else if (content->id == args::DATABASE) {
199+
connection.database = content->ptr_string;
200+
201+
} else if (content->id == args::TABLE) {
202+
connection.table = content->ptr_string;
203+
204+
} else if (content->id == args::ENDPOINTS) {
205+
connection.endpoints = parse_endpoints(content->ptr_ctx);
206+
}
207+
}
208+
209+
return connection;
210+
}
211+
212+
static void parse_root(fds_xml_ctx_t *root_ctx, const fds_iemgr_t *iemgr, Config &config)
213+
{
214+
const fds_xml_cont *content;
215+
216+
while (fds_xml_next(root_ctx, &content) == FDS_OK) {
217+
if (content->id == args::CONNECTION) {
218+
config.connection = parse_connection(content->ptr_ctx);
219+
220+
} else if (content->id == args::COLUMNS) {
221+
config.columns = parse_columns(content->ptr_ctx, iemgr);
222+
223+
} else if (content->id == args::BLOCKS) {
224+
config.blocks = content->val_uint;
225+
226+
} else if (content->id == args::INSERTER_THREADS) {
227+
config.inserter_threads = content->val_uint;
228+
229+
} else if (content->id == args::BLOCK_INSERT_THRESHOLD) {
230+
config.block_insert_threshold = content->val_uint;
231+
232+
} else if (content->id == args::BLOCK_INSERT_MAX_DELAY_SECS) {
233+
config.block_insert_max_delay_secs = content->val_uint;
234+
235+
} else if (content->id == args::SPLIT_BIFLOW) {
236+
config.split_biflow = content->val_bool;
237+
238+
} else if (content->id == args::BIFLOW_EMPTY_AUTOIGNORE) {
239+
config.biflow_empty_autoignore = content->val_bool;
240+
241+
} else if (content->id == args::NONBLOCKING) {
242+
config.nonblocking = content->val_bool;
243+
244+
}
245+
}
246+
}
247+
248+
Config parse_config(const char *xml_string, const fds_iemgr_t *iemgr)
249+
{
250+
Config config{};
251+
252+
std::unique_ptr<fds_xml_t, decltype(&fds_xml_destroy)> parser(fds_xml_create(), &fds_xml_destroy);
253+
if (!parser) {
254+
throw std::runtime_error("Failed to create an XML parser!");
255+
}
256+
257+
if (fds_xml_set_args(parser.get(), args::root) != FDS_OK) {
258+
std::string err = fds_xml_last_err(parser.get());
259+
throw std::runtime_error("Failed to parse the description of an XML document: " + err);
260+
}
261+
262+
fds_xml_ctx_t *root_ctx = fds_xml_parse_mem(parser.get(), xml_string, true);
263+
if (!root_ctx) {
264+
std::string err = fds_xml_last_err(parser.get());
265+
throw std::runtime_error("Failed to parse the configuration: " + err);
266+
}
267+
268+
parse_root(root_ctx, iemgr, config);
269+
270+
return config;
271+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Configuration parsing and representation
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <libfds.h>
14+
15+
#include <cstdint>
16+
#include <string>
17+
#include <variant>
18+
#include <vector>
19+
20+
enum class SpecialField {
21+
NONE = 0,
22+
ODID,
23+
};
24+
25+
/**
26+
* @class Config
27+
* @brief A struct containing all the configurable plugin parameters
28+
*/
29+
struct Config {
30+
struct Column {
31+
std::string name;
32+
bool nullable = false;
33+
std::variant<const fds_iemgr_elem *, const fds_iemgr_alias *, SpecialField> source;
34+
};
35+
36+
struct Endpoint {
37+
std::string host;
38+
uint16_t port = 9000;
39+
};
40+
41+
struct Connection {
42+
std::vector<Endpoint> endpoints;
43+
std::string user;
44+
std::string password;
45+
std::string database;
46+
std::string table;
47+
};
48+
49+
Connection connection;
50+
std::vector<Config::Column> columns;
51+
uint64_t inserter_threads = 8;
52+
uint64_t blocks = 64;
53+
uint64_t block_insert_threshold = 100000;
54+
uint64_t block_insert_max_delay_secs = 10;
55+
bool split_biflow = true;
56+
bool biflow_empty_autoignore = true;
57+
bool nonblocking = true;
58+
};
59+
60+
/**
61+
* @brief Parse a XML config into a structured form
62+
*
63+
* @param xml The config as a XML string
64+
* @param iemgr The iemgr instance
65+
* @return The parsed config
66+
*/
67+
Config parse_config(const char *xml, const fds_iemgr_t *iemgr);

0 commit comments

Comments
 (0)