Skip to content

Commit 9731b74

Browse files
committed
Clickhouse - add main plugin class and plugin interface implementation
1 parent 0a8692b commit 9731b74

File tree

3 files changed

+446
-0
lines changed

3 files changed

+446
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Plugin interface implementation
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "plugin.h"
12+
13+
#include <ipfixcol2.h>
14+
15+
#include <memory>
16+
17+
/** Plugin description */
18+
IPX_API struct ipx_plugin_info ipx_plugin_info = {
19+
// Plugin identification name
20+
"clickhouse",
21+
// Brief description of plugin
22+
"Output plugin that stores flow records to ClickHouse database.",
23+
// Plugin type
24+
IPX_PT_OUTPUT,
25+
// Configuration flags (reserved for future use)
26+
0,
27+
// Plugin version string (like "1.2.3")
28+
"1.0.0",
29+
// Minimal IPFIXcol version string (like "1.2.3")
30+
"2.8.0"
31+
};
32+
33+
int
34+
ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config)
35+
{
36+
std::unique_ptr<Plugin> plugin;
37+
try {
38+
plugin = std::make_unique<Plugin>(ctx, xml_config);
39+
} catch (const std::exception &ex) {
40+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what());
41+
return IPX_ERR_DENIED;
42+
} catch (...) {
43+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured.");
44+
return IPX_ERR_DENIED;
45+
}
46+
ipx_ctx_private_set(ctx, plugin.release());
47+
return IPX_OK;
48+
}
49+
50+
void
51+
ipx_plugin_destroy(ipx_ctx_t *ctx, void *priv)
52+
{
53+
(void) ctx;
54+
Plugin *plugin = reinterpret_cast<Plugin *>(priv);
55+
try {
56+
plugin->stop();
57+
} catch (const std::exception &ex) {
58+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what());
59+
} catch (...) {
60+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured.");
61+
}
62+
delete plugin;
63+
}
64+
65+
int
66+
ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg)
67+
{
68+
Plugin *plugin = reinterpret_cast<Plugin *>(priv);
69+
try {
70+
plugin->process(msg);
71+
} catch (const std::exception &ex) {
72+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured: %s", ex.what());
73+
return IPX_ERR_DENIED;
74+
} catch (...) {
75+
IPX_CTX_ERROR(ctx, "An unexpected exception has occured.");
76+
return IPX_ERR_DENIED;
77+
}
78+
79+
return IPX_OK;
80+
}
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Main plugin class
5+
* @date 2025
6+
*
7+
* Copyright(c) 2025 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "datatype.h"
12+
#include "plugin.h"
13+
14+
#include <cassert>
15+
#include <ipfixcol2.h>
16+
#include <libfds.h>
17+
18+
static std::vector<Column> prepare_columns(std::vector<Config::Column> &columns_cfg)
19+
{
20+
std::vector<Column> columns;
21+
22+
for (const auto &column_cfg : columns_cfg) {
23+
Column column{};
24+
DataType type;
25+
26+
if (std::holds_alternative<const fds_iemgr_elem *>(column_cfg.source)) {
27+
const fds_iemgr_elem *elem = std::get<const fds_iemgr_elem *>(column_cfg.source);
28+
type = type_from_ipfix(elem->data_type);
29+
column.elem = elem;
30+
31+
} else if (std::holds_alternative<const fds_iemgr_alias *>(column_cfg.source)) {
32+
const fds_iemgr_alias *alias = std::get<const fds_iemgr_alias *>(column_cfg.source);
33+
type = find_common_type(*alias);
34+
column.alias = alias;
35+
36+
} else /* if (std::holds_alternative<SpecialField>(column_cfg.source)) */ {
37+
type = DataType::UInt32;
38+
column.special = std::get<SpecialField>(column_cfg.source);
39+
40+
}
41+
42+
column.name = column_cfg.name;
43+
column.datatype = type;
44+
column.nullable = column_cfg.nullable;
45+
46+
columns.emplace_back(std::move(column));
47+
}
48+
49+
return columns;
50+
}
51+
52+
53+
Plugin::Plugin(ipx_ctx_t *ctx, const char *xml_config)
54+
: m_logger(ctx)
55+
, m_stats(m_logger, *this)
56+
{
57+
// Subscribe to periodic messages aswell to ensure data export even when no data is coming
58+
ipx_msg_mask_t new_mask = IPX_MSG_IPFIX | IPX_MSG_PERIODIC | IPX_MSG_SESSION;
59+
int rc = ipx_ctx_subscribe(ctx, &new_mask, nullptr);
60+
if (rc != IPX_OK) {
61+
throw Error("ipx_ctx_subscribe() failed with error code {}", rc);
62+
}
63+
64+
// Parse config
65+
m_config = parse_config(xml_config, ipx_ctx_iemgr_get(ctx));
66+
67+
std::vector<clickhouse::Endpoint> endpoints;
68+
for (const Config::Endpoint &endpoint_cfg : m_config.connection.endpoints) {
69+
endpoints.push_back(clickhouse::Endpoint{endpoint_cfg.host, endpoint_cfg.port});
70+
}
71+
72+
m_columns = prepare_columns(m_config.columns);
73+
m_rec_parsers = std::make_unique<RecParserManager>(m_columns, m_config.biflow_empty_autoignore);
74+
75+
// Prepare blocks
76+
for (unsigned int i = 0; i < m_config.blocks; i++) {
77+
std::unique_ptr<Block> blk = std::make_unique<Block>();
78+
for (const auto &column : m_columns) {
79+
blk->columns.emplace_back(make_column(column.datatype, column.nullable));
80+
blk->block.AppendColumn(column.name, blk->columns.back());
81+
}
82+
m_blocks.emplace_back(std::move(blk));
83+
m_avail_blocks.put(m_blocks.back().get());
84+
}
85+
86+
// Prepare inserters
87+
for (unsigned int i = 0; i < m_config.inserter_threads; i++) {
88+
clickhouse::ClientOptions client_opts = clickhouse::ClientOptions()
89+
.SetEndpoints(endpoints)
90+
.SetUser(m_config.connection.user)
91+
.SetPassword(m_config.connection.password)
92+
.SetDefaultDatabase(m_config.connection.database);
93+
std::unique_ptr<Inserter> ins = std::make_unique<Inserter>(
94+
i,
95+
m_logger,
96+
client_opts,
97+
m_config.connection.table,
98+
m_columns,
99+
m_filled_blocks,
100+
m_avail_blocks);
101+
102+
m_inserters.emplace_back(std::move(ins));
103+
}
104+
105+
m_logger.info("Starting inserters");
106+
for (auto &ins : m_inserters) {
107+
ins->start();
108+
}
109+
110+
111+
m_logger.info("ClickHouse plugin is ready");
112+
}
113+
114+
void Plugin::extract_values(ipx_msg_ipfix_t *msg, RecParser &parser, Block &block, bool rev)
115+
{
116+
std::size_t n_columns = m_columns.size();
117+
bool has_value;
118+
ValueVariant value;
119+
120+
for (std::size_t i = 0; i < n_columns; i++) {
121+
has_value = false;
122+
123+
if (m_columns[i].special == SpecialField::ODID) {
124+
value = ipx_msg_ipfix_get_ctx(msg)->odid;
125+
has_value = true;
126+
127+
} else {
128+
fds_drec_field &field = parser.get_column(i, rev);
129+
if (field.data != nullptr) {
130+
try {
131+
value = get_value(m_columns[i].datatype, field);
132+
has_value = true;
133+
} catch (const ConversionError& err) {
134+
m_logger.error("Field conversion failed (field #%d, \"%s\"): %s", i, m_columns[i].name, err.what());
135+
}
136+
}
137+
}
138+
139+
write_to_column(m_columns[i].datatype, m_columns[i].nullable, *block.columns[i].get(), has_value ? &value : nullptr);
140+
}
141+
block.rows++;
142+
}
143+
144+
int
145+
Plugin::process_record(ipx_msg_ipfix_t *msg, fds_drec &rec, Block &block)
146+
{
147+
int ret = 0;
148+
RecParser &parser = m_rec_parsers->get_parser(rec.tmplt);
149+
parser.parse_record(rec);
150+
151+
if (!parser.skip_fwd()) {
152+
extract_values(msg, parser, block, false);
153+
ret++;
154+
}
155+
156+
if (!parser.skip_rev()) {
157+
extract_values(msg, parser, block, true);
158+
ret++;
159+
}
160+
161+
return ret;
162+
}
163+
164+
void
165+
Plugin::process_session_msg(ipx_msg_session_t *msg)
166+
{
167+
if (ipx_msg_session_get_event(msg) == IPX_MSG_SESSION_CLOSE) {
168+
const ipx_session *sess = ipx_msg_session_get_session(msg);
169+
m_rec_parsers->delete_session(sess);
170+
}
171+
}
172+
173+
void
174+
Plugin::process_ipfix_msg(ipx_msg_ipfix_t *msg)
175+
{
176+
// get new block if we don't have one
177+
if (m_current_block == nullptr) {
178+
if (m_config.nonblocking) {
179+
std::optional<Block *> maybe_block = m_avail_blocks.try_get();
180+
if (maybe_block.has_value()) {
181+
m_current_block = maybe_block.value();
182+
} else {
183+
// no available blocks and we are in a non-blocking mode, drop the message
184+
uint32_t drec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
185+
m_stats.add_dropped(drec_cnt);
186+
return;
187+
}
188+
} else {
189+
m_current_block = m_avail_blocks.get();
190+
}
191+
}
192+
193+
// setup rec parser
194+
const ipx_msg_ctx *msg_ctx = ipx_msg_ipfix_get_ctx(msg);
195+
if (msg_ctx->session->type == FDS_SESSION_SCTP) {
196+
throw std::runtime_error("SCTP is not supported at this time");
197+
}
198+
m_rec_parsers->select_session(msg_ctx->session);
199+
m_rec_parsers->select_odid(msg_ctx->odid);
200+
201+
// go through all the records
202+
uint32_t drec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
203+
uint32_t rows_count = 0;
204+
for (uint32_t idx = 0; idx < drec_cnt; idx++) {
205+
ipx_ipfix_record *rec = ipx_msg_ipfix_get_drec(msg, idx);
206+
uint32_t rows_inserted = process_record(msg, rec->rec, *m_current_block);
207+
rows_count += rows_inserted;
208+
}
209+
210+
m_stats.add_recs(drec_cnt);
211+
m_stats.add_rows(rows_count);
212+
}
213+
214+
void
215+
Plugin::process(ipx_msg_t *msg)
216+
{
217+
if (ipx_msg_get_type(msg) == IPX_MSG_SESSION) {
218+
process_session_msg(ipx_msg_base2session(msg));
219+
220+
} else if (ipx_msg_get_type(msg) == IPX_MSG_IPFIX) {
221+
ipx_msg_ipfix_t *ipfix_msg = ipx_msg_base2ipfix(msg);
222+
process_ipfix_msg(ipfix_msg);
223+
}
224+
225+
time_t now = std::time(nullptr);
226+
227+
// Send the block for insertion if it is sufficiently full or a block hasn't been sent in a long enough time
228+
if (m_current_block) {
229+
bool nonempty = m_current_block->rows > 0;
230+
bool thresh_reached = m_current_block->rows >= m_config.block_insert_threshold;
231+
bool timeout_reached = uint64_t(now - m_last_insert_time) >= m_config.block_insert_max_delay_secs;
232+
233+
if (nonempty && (thresh_reached || timeout_reached)) {
234+
m_filled_blocks.put(m_current_block);
235+
m_current_block = nullptr;
236+
m_last_insert_time = now;
237+
}
238+
}
239+
240+
// Print stats
241+
m_stats.print_stats_throttled(now);
242+
243+
// Check for any exceptions thrown by workers
244+
for (auto &ins : m_inserters) {
245+
ins->check_error();
246+
}
247+
248+
}
249+
250+
void Plugin::stop()
251+
{
252+
// Export what's left in the last block
253+
if (m_current_block && m_current_block->rows > 0) {
254+
m_filled_blocks.put(m_current_block);
255+
m_current_block = nullptr;
256+
}
257+
258+
// Stop all the threads and wait for them to finish
259+
m_logger.info("Sending stop signal to inserter threads...");
260+
for (auto &ins : m_inserters) {
261+
ins->request_stop();
262+
}
263+
for (const auto &ins : m_inserters) {
264+
(void) ins;
265+
// Wake up the inserter threads in case they are waiting on a .get()
266+
m_filled_blocks.put(nullptr);
267+
}
268+
269+
m_logger.info("Waiting for inserter threads to finish...");
270+
for (auto &ins : m_inserters) {
271+
ins->join();
272+
}
273+
274+
std::size_t drop_count = 0;
275+
for (const auto& block : m_blocks) {
276+
drop_count += block->rows;
277+
}
278+
m_logger.warning("%zu rows could not have been inserted and have been dropped due to termination timeout",
279+
drop_count);
280+
}

0 commit comments

Comments
 (0)