Skip to content

Commit 4c749f6

Browse files
committed
Clickhouse - add config
1 parent cd0e118 commit 4c749f6

File tree

2 files changed

+332
-0
lines changed

2 files changed

+332
-0
lines changed
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Configuration parsing and representation
5+
* @date 2024
6+
*
7+
* Copyright(c) 2024 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "config.h"
12+
13+
#include <libfds.h>
14+
15+
#include <limits>
16+
#include <optional>
17+
#include <stdexcept>
18+
#include <memory>
19+
20+
namespace args {
21+
22+
enum {
23+
CONNECTION,
24+
ENDPOINTS,
25+
ENDPOINT,
26+
HOST,
27+
PORT,
28+
USER,
29+
PASSWORD,
30+
DATABASE,
31+
TABLE,
32+
COLUMNS,
33+
COLUMN,
34+
NAME,
35+
SOURCE,
36+
NULLABLE,
37+
INSERTER_THREADS,
38+
BLOCKS,
39+
BLOCK_INSERT_THRESHOLD,
40+
BLOCK_INSERT_MAX_DELAY_SECS,
41+
SPLIT_BIFLOW,
42+
BIFLOW_EMPTY_AUTOIGNORE,
43+
};
44+
45+
46+
static const struct fds_xml_args column[] = {
47+
FDS_OPTS_ELEM(NAME, "name", FDS_OPTS_T_STRING, 0),
48+
FDS_OPTS_ELEM(SOURCE, "source", FDS_OPTS_T_STRING, FDS_OPTS_P_OPT),
49+
FDS_OPTS_ELEM(NULLABLE, "nullable", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
50+
FDS_OPTS_END,
51+
};
52+
53+
static const struct fds_xml_args columns[] = {
54+
FDS_OPTS_NESTED(COLUMN, "column", column, FDS_OPTS_P_MULTI),
55+
FDS_OPTS_END,
56+
};
57+
58+
static const struct fds_xml_args endpoint[] = {
59+
FDS_OPTS_ELEM (HOST, "host", FDS_OPTS_T_STRING, 0),
60+
FDS_OPTS_ELEM (PORT, "port", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
61+
FDS_OPTS_END,
62+
};
63+
64+
static const struct fds_xml_args endpoints[] = {
65+
FDS_OPTS_NESTED(ENDPOINT, "endpoint", endpoint, FDS_OPTS_P_MULTI),
66+
FDS_OPTS_END,
67+
};
68+
69+
static const struct fds_xml_args connection[] = {
70+
FDS_OPTS_NESTED(ENDPOINTS, "endpoints", endpoints, 0),
71+
FDS_OPTS_ELEM (USER, "user", FDS_OPTS_T_STRING, 0),
72+
FDS_OPTS_ELEM (PASSWORD, "password", FDS_OPTS_T_STRING, 0),
73+
FDS_OPTS_ELEM (DATABASE, "database", FDS_OPTS_T_STRING, 0),
74+
FDS_OPTS_ELEM (TABLE, "table", FDS_OPTS_T_STRING, 0),
75+
FDS_OPTS_END,
76+
};
77+
78+
static const struct fds_xml_args root[] = {
79+
FDS_OPTS_ROOT ("params"),
80+
FDS_OPTS_NESTED(CONNECTION, "connection", connection, 0),
81+
FDS_OPTS_ELEM (INSERTER_THREADS, "inserterThreads", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
82+
FDS_OPTS_ELEM (BLOCKS, "blocks", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
83+
FDS_OPTS_ELEM (BLOCK_INSERT_THRESHOLD, "blockInsertThreshold", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
84+
FDS_OPTS_ELEM (BLOCK_INSERT_MAX_DELAY_SECS, "blockInsertMaxDelaySecs", FDS_OPTS_T_UINT, FDS_OPTS_P_OPT),
85+
FDS_OPTS_ELEM (SPLIT_BIFLOW, "splitBiflow", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
86+
FDS_OPTS_ELEM (BIFLOW_EMPTY_AUTOIGNORE, "biflowEmptyAutoignore", FDS_OPTS_T_BOOL, FDS_OPTS_P_OPT),
87+
FDS_OPTS_NESTED(COLUMNS, "columns", columns, 0),
88+
FDS_OPTS_END,
89+
};
90+
91+
}
92+
93+
static std::optional<SpecialField> parse_special_field(const std::string &name)
94+
{
95+
if (name == "odid") {
96+
return {SpecialField::ODID};
97+
}
98+
return {};
99+
}
100+
101+
static Config::Column parse_column(fds_xml_ctx_t *column_ctx, const fds_iemgr_t *iemgr)
102+
{
103+
const fds_xml_cont *content;
104+
Config::Column column;
105+
std::string source;
106+
107+
while (fds_xml_next(column_ctx, &content) == FDS_OK) {
108+
if (content->id == args::NAME) {
109+
column.name = content->ptr_string;
110+
} else if (content->id == args::NULLABLE) {
111+
column.nullable = content->val_bool;
112+
} else if (content->id == args::SOURCE) {
113+
source = content->ptr_string;
114+
}
115+
}
116+
117+
if (source.empty()) {
118+
source = column.name;
119+
}
120+
121+
const fds_iemgr_elem *elem = fds_iemgr_elem_find_name(iemgr, source.c_str());
122+
const fds_iemgr_alias *alias = fds_iemgr_alias_find(iemgr, source.c_str());
123+
std::optional<SpecialField> special = parse_special_field(source);
124+
if (!special && !elem && !alias) {
125+
throw std::runtime_error("IPFIX element with name \"" + source + "\" not found");
126+
} else if (special) {
127+
column.source = *special;
128+
} else if (elem) {
129+
column.source = elem;
130+
} else if (alias) {
131+
column.source = alias;
132+
}
133+
134+
return column;
135+
}
136+
137+
static std::vector<Config::Column> parse_columns(fds_xml_ctx_t *columns_ctx, const fds_iemgr_t *iemgr)
138+
{
139+
const fds_xml_cont *content;
140+
std::vector<Config::Column> columns;
141+
142+
while (fds_xml_next(columns_ctx, &content) == FDS_OK) {
143+
columns.push_back(parse_column(content->ptr_ctx, iemgr));
144+
}
145+
146+
return columns;
147+
}
148+
149+
static Config::Endpoint parse_endpoint(fds_xml_ctx_t *endpoint_ctx)
150+
{
151+
Config::Endpoint endpoint;
152+
const fds_xml_cont *content;
153+
154+
while (fds_xml_next(endpoint_ctx, &content) == FDS_OK) {
155+
if (content->id == args::HOST) {
156+
endpoint.host = content->ptr_string;
157+
158+
} else if (content->id == args::PORT) {
159+
if (content->val_uint > std::numeric_limits<uint16_t>::max()) {
160+
throw std::runtime_error(std::to_string(content->val_uint) + " is not a valid port number");
161+
}
162+
endpoint.port = content->val_uint;
163+
164+
}
165+
}
166+
167+
return endpoint;
168+
}
169+
170+
static std::vector<Config::Endpoint> parse_endpoints(fds_xml_ctx_t *endpoints_ctx)
171+
{
172+
std::vector<Config::Endpoint> endpoints;
173+
const fds_xml_cont *content;
174+
175+
while (fds_xml_next(endpoints_ctx, &content) == FDS_OK) {
176+
if (content->id == args::ENDPOINT) {
177+
endpoints.push_back(parse_endpoint(content->ptr_ctx));
178+
}
179+
}
180+
181+
return endpoints;
182+
}
183+
184+
static Config::Connection parse_connection(fds_xml_ctx_t *connection_ctx)
185+
{
186+
Config::Connection connection;
187+
const fds_xml_cont *content;
188+
189+
while (fds_xml_next(connection_ctx, &content) == FDS_OK) {
190+
if (content->id == args::USER) {
191+
connection.user = content->ptr_string;
192+
193+
} else if (content->id == args::PASSWORD) {
194+
connection.password = content->ptr_string;
195+
196+
} else if (content->id == args::DATABASE) {
197+
connection.database = content->ptr_string;
198+
199+
} else if (content->id == args::TABLE) {
200+
connection.table = content->ptr_string;
201+
202+
} else if (content->id == args::ENDPOINTS) {
203+
connection.endpoints = parse_endpoints(content->ptr_ctx);
204+
}
205+
}
206+
207+
return connection;
208+
}
209+
210+
static void parse_root(fds_xml_ctx_t *root_ctx, const fds_iemgr_t *iemgr, Config &config)
211+
{
212+
const fds_xml_cont *content;
213+
214+
while (fds_xml_next(root_ctx, &content) == FDS_OK) {
215+
if (content->id == args::CONNECTION) {
216+
config.connection = parse_connection(content->ptr_ctx);
217+
218+
} else if (content->id == args::COLUMNS) {
219+
config.columns = parse_columns(content->ptr_ctx, iemgr);
220+
221+
} else if (content->id == args::BLOCKS) {
222+
config.blocks = content->val_uint;
223+
224+
} else if (content->id == args::INSERTER_THREADS) {
225+
config.inserter_threads = content->val_uint;
226+
227+
} else if (content->id == args::BLOCK_INSERT_THRESHOLD) {
228+
config.block_insert_threshold = content->val_uint;
229+
230+
} else if (content->id == args::BLOCK_INSERT_MAX_DELAY_SECS) {
231+
config.block_insert_max_delay_secs = content->val_uint;
232+
233+
} else if (content->id == args::SPLIT_BIFLOW) {
234+
config.split_biflow = content->val_bool;
235+
236+
} else if (content->id == args::BIFLOW_EMPTY_AUTOIGNORE) {
237+
config.biflow_empty_autoignore = content->val_bool;
238+
239+
}
240+
}
241+
}
242+
243+
Config parse_config(const char *xml_string, const fds_iemgr_t *iemgr)
244+
{
245+
Config config{};
246+
247+
std::unique_ptr<fds_xml_t, decltype(&fds_xml_destroy)> parser(fds_xml_create(), &fds_xml_destroy);
248+
if (!parser) {
249+
throw std::runtime_error("Failed to create an XML parser!");
250+
}
251+
252+
if (fds_xml_set_args(parser.get(), args::root) != FDS_OK) {
253+
std::string err = fds_xml_last_err(parser.get());
254+
throw std::runtime_error("Failed to parse the description of an XML document: " + err);
255+
}
256+
257+
fds_xml_ctx_t *root_ctx = fds_xml_parse_mem(parser.get(), xml_string, true);
258+
if (!root_ctx) {
259+
std::string err = fds_xml_last_err(parser.get());
260+
throw std::runtime_error("Failed to parse the configuration: " + err);
261+
}
262+
263+
parse_root(root_ctx, iemgr, config);
264+
265+
return config;
266+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Configuration parsing and representation
5+
* @date 2024
6+
*
7+
* Copyright(c) 2024 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <libfds.h>
14+
15+
#include <cstdint>
16+
#include <string>
17+
#include <variant>
18+
#include <vector>
19+
20+
enum class SpecialField {
21+
NONE = 0,
22+
ODID,
23+
};
24+
25+
/**
26+
* @class Config
27+
* @brief A struct containing all the configurable plugin parameters
28+
*/
29+
struct Config {
30+
struct Column {
31+
std::string name;
32+
bool nullable = false;
33+
std::variant<const fds_iemgr_elem *, const fds_iemgr_alias *, SpecialField> source;
34+
};
35+
36+
struct Endpoint {
37+
std::string host;
38+
uint16_t port = 9000;
39+
};
40+
41+
struct Connection {
42+
std::vector<Endpoint> endpoints;
43+
std::string user;
44+
std::string password;
45+
std::string database;
46+
std::string table;
47+
};
48+
49+
Connection connection;
50+
std::vector<Config::Column> columns;
51+
uint64_t inserter_threads = 32;
52+
uint64_t blocks = 256;
53+
uint64_t block_insert_threshold = 100000;
54+
uint64_t block_insert_max_delay_secs = 10;
55+
bool split_biflow = true;
56+
bool biflow_empty_autoignore = true;
57+
};
58+
59+
/**
60+
* @brief Parse a XML config into a structured form
61+
*
62+
* @param xml The config as a XML string
63+
* @param iemgr The iemgr instance
64+
* @return The parsed config
65+
*/
66+
Config parse_config(const char *xml, const fds_iemgr_t *iemgr);

0 commit comments

Comments
 (0)