Skip to content

Commit 14ae576

Browse files
committed
Clickhouse - add datatype
1 parent 5a597ce commit 14ae576

File tree

2 files changed

+612
-0
lines changed

2 files changed

+612
-0
lines changed
Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Functions specific to individual data types
5+
* @date 2024
6+
*
7+
* Copyright(c) 2024 CESNET z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include <type_traits>
12+
13+
#include "datatype.h"
14+
#include "common.h"
15+
16+
template <unsigned Precision> class ColumnDateTime64 : public clickhouse::ColumnDateTime64 {
17+
public:
18+
ColumnDateTime64() : clickhouse::ColumnDateTime64(Precision) {}
19+
};
20+
21+
DataType type_from_ipfix(fds_iemgr_element_type type)
22+
{
23+
switch (type) {
24+
case FDS_ET_STRING: return DataType::String;
25+
case FDS_ET_SIGNED_8: return DataType::Int8;
26+
case FDS_ET_SIGNED_16: return DataType::Int16;
27+
case FDS_ET_SIGNED_32: return DataType::Int32;
28+
case FDS_ET_SIGNED_64: return DataType::Int64;
29+
case FDS_ET_UNSIGNED_8: return DataType::UInt8;
30+
case FDS_ET_UNSIGNED_16: return DataType::UInt16;
31+
case FDS_ET_UNSIGNED_32: return DataType::UInt32;
32+
case FDS_ET_UNSIGNED_64: return DataType::UInt64;
33+
case FDS_ET_IPV4_ADDRESS: return DataType::IPv4;
34+
case FDS_ET_IPV6_ADDRESS: return DataType::IPv6;
35+
case FDS_ET_DATE_TIME_SECONDS: return DataType::DatetimeSecs;
36+
case FDS_ET_DATE_TIME_MILLISECONDS: return DataType::DatetimeMillisecs;
37+
case FDS_ET_DATE_TIME_MICROSECONDS: return DataType::DatetimeMicrosecs;
38+
case FDS_ET_DATE_TIME_NANOSECONDS: return DataType::DatetimeNanosecs;
39+
default: throw Error("unsupported IPFIX data type {}", type);
40+
}
41+
}
42+
43+
static DataType unify_type(DataType a, DataType b)
44+
{
45+
auto is_int = [](DataType t) {
46+
return t == DataType::Int8
47+
|| t == DataType::Int16
48+
|| t == DataType::Int32
49+
|| t == DataType::Int64;
50+
};
51+
auto is_uint = [](DataType t) {
52+
return t == DataType::UInt8
53+
|| t == DataType::UInt16
54+
|| t == DataType::UInt32
55+
|| t == DataType::UInt64;
56+
};
57+
auto is_ip = [](DataType t) {
58+
return t == DataType::IPv4
59+
|| t == DataType::IPv6
60+
|| t == DataType::IP;
61+
};
62+
auto is_datetime = [](DataType t) {
63+
return t == DataType::DatetimeSecs
64+
|| t == DataType::DatetimeMillisecs
65+
|| t == DataType::DatetimeMicrosecs
66+
|| t == DataType::DatetimeNanosecs;
67+
};
68+
69+
if (a == b) {
70+
return a;
71+
}
72+
if (is_int(a) && is_int(b)) {
73+
return std::max(a, b);
74+
}
75+
if (is_uint(a) && is_uint(b)) {
76+
return std::max(a, b);
77+
}
78+
if (is_datetime(a) && is_datetime(b)) {
79+
return std::max(a, b);
80+
}
81+
if (is_ip(a) && is_ip(b)) {
82+
return DataType::IP;
83+
}
84+
85+
throw Error("cannot unify types {} and {}", a, b);
86+
}
87+
88+
DataType find_common_type(const fds_iemgr_alias &alias)
89+
{
90+
if (alias.sources_cnt == 0) {
91+
throw Error("alias \"{}\" has no sources", alias.name);
92+
}
93+
94+
DataType common_type = type_from_ipfix(alias.sources[0]->data_type);
95+
for (size_t i = 1; i < alias.sources_cnt; i++) {
96+
common_type = unify_type(common_type, type_from_ipfix(alias.sources[i]->data_type));
97+
}
98+
return common_type;
99+
}
100+
101+
namespace getters {
102+
103+
template <typename UIntType>
104+
static UIntType get_uint(fds_drec_field field)
105+
{
106+
uint64_t value = 0;
107+
int ret = fds_get_uint_be(field.data, field.size, &value);
108+
if (ret != FDS_OK) {
109+
throw Error("fds_get_uint_be() has failed: {}", ret);
110+
}
111+
return static_cast<UIntType>(value);
112+
}
113+
114+
template <typename IntType>
115+
static IntType get_int(fds_drec_field field)
116+
{
117+
int64_t value = 0;
118+
int ret = fds_get_int_be(field.data, field.size, &value);
119+
if (ret != FDS_OK) {
120+
throw Error("fds_get_int_be() has failed: {}", ret);
121+
}
122+
return static_cast<IntType>(value);
123+
}
124+
125+
static IP4Addr get_ipv4(fds_drec_field field)
126+
{
127+
IP4Addr value;
128+
int ret = fds_get_ip(field.data, field.size, &value);
129+
if (ret != FDS_OK) {
130+
throw Error("fds_get_ip() has failed: {}", ret);
131+
}
132+
return value;
133+
}
134+
135+
static IP6Addr get_ipv6(fds_drec_field field)
136+
{
137+
IP6Addr value;
138+
int ret = fds_get_ip(field.data, field.size, &value);
139+
if (ret != FDS_OK) {
140+
throw Error("fds_get_ip() has failed: {}", ret);
141+
}
142+
return value;
143+
}
144+
145+
static IP6Addr get_ip(fds_drec_field field)
146+
{
147+
IP6Addr value;
148+
int ret;
149+
if (field.size == 4) {
150+
static constexpr uint8_t IPV4_MAPPED_IPV6_PREFIX[]{
151+
0x00, 0x00, 0x00, 0x00,
152+
0x00, 0x00, 0x00, 0x00,
153+
0x00, 0x00, 0xFF, 0xFF};
154+
std::memcpy(
155+
reinterpret_cast<uint8_t *>(&value),
156+
IPV4_MAPPED_IPV6_PREFIX,
157+
sizeof(IPV4_MAPPED_IPV6_PREFIX));
158+
ret = fds_get_ip(field.data, field.size, &reinterpret_cast<uint8_t *>(&value)[12]);
159+
} else {
160+
ret = fds_get_ip(field.data, field.size, &value);
161+
}
162+
if (ret != FDS_OK) {
163+
throw Error("fds_get_ip() has failed: {}", ret);
164+
}
165+
return value;
166+
}
167+
168+
static std::string get_string(fds_drec_field field)
169+
{
170+
std::string value;
171+
value.resize(field.size);
172+
int ret = fds_get_string(field.data, field.size, &value[0]);
173+
if (ret != FDS_OK) {
174+
throw Error("fds_get_string() has failed: {}", ret);
175+
}
176+
return value;
177+
}
178+
179+
static uint64_t get_datetime(fds_drec_field field)
180+
{
181+
uint64_t value = 0;
182+
int ret = fds_get_datetime_lp_be(field.data, field.size, field.info->def->data_type, &value);
183+
if (ret != FDS_OK) {
184+
throw Error("fds_get_datetime_lp_be() has failed: {}", ret);
185+
}
186+
value /= 1000;
187+
return value;
188+
}
189+
190+
template <int64_t Divisor = 1>
191+
static int64_t get_datetime64(fds_drec_field field)
192+
{
193+
int64_t value = 0;
194+
timespec ts;
195+
int ret = fds_get_datetime_hp_be(field.data, field.size, field.info->def->data_type, &ts);
196+
if (ret != FDS_OK) {
197+
throw Error("fds_get_datetime_hp_be() has failed: {}", ret);
198+
}
199+
value = (static_cast<int64_t>(ts.tv_sec) * 1'000'000'000 + static_cast<int64_t>(ts.tv_nsec)) / Divisor;
200+
return value;
201+
}
202+
}
203+
204+
template<DataType> struct DataTypeTraits {};
205+
206+
template<> struct DataTypeTraits<DataType::UInt8> {
207+
using ColumnType = clickhouse::ColumnUInt8;
208+
static constexpr std::string_view ClickhouseTypeName = "UInt8";
209+
static constexpr auto Getter = &getters::get_uint<uint8_t>;
210+
};
211+
212+
template<> struct DataTypeTraits<DataType::UInt16> {
213+
using ColumnType = clickhouse::ColumnUInt16;
214+
static constexpr std::string_view ClickhouseTypeName = "UInt16";
215+
static constexpr auto Getter = &getters::get_uint<uint16_t>;
216+
};
217+
218+
template<> struct DataTypeTraits<DataType::UInt32> {
219+
using ColumnType = clickhouse::ColumnUInt32;
220+
static constexpr std::string_view ClickhouseTypeName = "UInt32";
221+
static constexpr auto Getter = &getters::get_uint<uint32_t>;
222+
};
223+
224+
template<> struct DataTypeTraits<DataType::UInt64> {
225+
using ColumnType = clickhouse::ColumnUInt64;
226+
static constexpr std::string_view ClickhouseTypeName = "UInt64";
227+
static constexpr auto Getter = &getters::get_uint<uint64_t>;
228+
};
229+
230+
template<> struct DataTypeTraits<DataType::Int8> {
231+
using ColumnType = clickhouse::ColumnInt8;
232+
static constexpr std::string_view ClickhouseTypeName = "Int8";
233+
static constexpr auto Getter = &getters::get_int<int8_t>;
234+
};
235+
236+
template<> struct DataTypeTraits<DataType::Int16> {
237+
using ColumnType = clickhouse::ColumnInt16;
238+
static constexpr std::string_view ClickhouseTypeName = "Int16";
239+
static constexpr auto Getter = &getters::get_int<int16_t>;
240+
};
241+
242+
template<> struct DataTypeTraits<DataType::Int32> {
243+
using ColumnType = clickhouse::ColumnInt32;
244+
static constexpr std::string_view ClickhouseTypeName = "Int32";
245+
static constexpr auto Getter = &getters::get_int<int32_t>;
246+
};
247+
248+
template<> struct DataTypeTraits<DataType::Int64> {
249+
using ColumnType = clickhouse::ColumnInt64;
250+
static constexpr std::string_view ClickhouseTypeName = "Int64";
251+
static constexpr auto Getter = &getters::get_int<int64_t>;
252+
};
253+
254+
template<> struct DataTypeTraits<DataType::IP> {
255+
using ColumnType = clickhouse::ColumnIPv6;
256+
static constexpr std::string_view ClickhouseTypeName = "IPv6";
257+
static constexpr auto Getter = &getters::get_ip;
258+
};
259+
260+
template<> struct DataTypeTraits<DataType::IPv4> {
261+
using ColumnType = clickhouse::ColumnIPv4;
262+
static constexpr std::string_view ClickhouseTypeName = "IPv4";
263+
static constexpr auto Getter = &getters::get_ipv4;
264+
};
265+
266+
template<> struct DataTypeTraits<DataType::IPv6> {
267+
using ColumnType = clickhouse::ColumnIPv6;
268+
static constexpr std::string_view ClickhouseTypeName = "IPv6";
269+
static constexpr auto Getter = &getters::get_ipv6;
270+
};
271+
272+
template<> struct DataTypeTraits<DataType::String> {
273+
using ColumnType = clickhouse::ColumnString;
274+
static constexpr std::string_view ClickhouseTypeName = "String";
275+
static constexpr auto Getter = &getters::get_string;
276+
};
277+
278+
template<> struct DataTypeTraits<DataType::DatetimeSecs> {
279+
using ColumnType = clickhouse::ColumnDateTime;
280+
static constexpr std::string_view ClickhouseTypeName = "DateTime";
281+
static constexpr auto Getter = &getters::get_datetime;
282+
};
283+
284+
template<> struct DataTypeTraits<DataType::DatetimeMillisecs> {
285+
using ColumnType = ColumnDateTime64<3>;
286+
static constexpr std::string_view ClickhouseTypeName = "DateTime64(3)";
287+
static constexpr auto Getter = &getters::get_datetime64<1'000'000>;
288+
};
289+
290+
template<> struct DataTypeTraits<DataType::DatetimeMicrosecs> {
291+
using ColumnType = ColumnDateTime64<6>;
292+
static constexpr std::string_view ClickhouseTypeName = "DateTime64(6)";
293+
static constexpr auto Getter = &getters::get_datetime64<1'000>;
294+
};
295+
296+
template<> struct DataTypeTraits<DataType::DatetimeNanosecs> {
297+
using ColumnType = ColumnDateTime64<9>;
298+
static constexpr std::string_view ClickhouseTypeName = "DateTime64(9)";
299+
static constexpr auto Getter = &getters::get_datetime64<1>;
300+
};
301+
302+
template <typename Func>
303+
static void visit(DataType type, Func func)
304+
{
305+
switch (type) {
306+
case DataType::UInt8: func(DataTypeTraits<DataType::UInt8>{}); break;
307+
case DataType::UInt16: func(DataTypeTraits<DataType::UInt16>{}); break;
308+
case DataType::UInt32: func(DataTypeTraits<DataType::UInt32>{}); break;
309+
case DataType::UInt64: func(DataTypeTraits<DataType::UInt64>{}); break;
310+
case DataType::Int8: func(DataTypeTraits<DataType::Int8>{}); break;
311+
case DataType::Int16: func(DataTypeTraits<DataType::Int16>{}); break;
312+
case DataType::Int32: func(DataTypeTraits<DataType::Int32>{}); break;
313+
case DataType::Int64: func(DataTypeTraits<DataType::Int64>{}); break;
314+
case DataType::String: func(DataTypeTraits<DataType::String>{}); break;
315+
case DataType::DatetimeMillisecs: func(DataTypeTraits<DataType::DatetimeMillisecs>{}); break;
316+
case DataType::DatetimeMicrosecs: func(DataTypeTraits<DataType::DatetimeMicrosecs>{}); break;
317+
case DataType::DatetimeNanosecs: func(DataTypeTraits<DataType::DatetimeNanosecs>{}); break;
318+
case DataType::DatetimeSecs: func(DataTypeTraits<DataType::DatetimeSecs>{}); break;
319+
case DataType::IPv4: func(DataTypeTraits<DataType::IPv4>{}); break;
320+
case DataType::IPv6: func(DataTypeTraits<DataType::IPv6>{}); break;
321+
case DataType::IP: func(DataTypeTraits<DataType::IP>{}); break;
322+
case DataType::Invalid: throw std::runtime_error("invalid data type");
323+
}
324+
}
325+
326+
std::shared_ptr<clickhouse::Column> make_column(DataType type, DataTypeNullable nullable) {
327+
std::shared_ptr<clickhouse::Column> column;
328+
visit(type, [&](auto traits) {
329+
if (nullable == DataTypeNullable::Nullable) {
330+
using ColType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
331+
column = std::make_shared<ColType>();
332+
} else /* (nullable == DataTypeNullable::Nonnullable) */ {
333+
using ColType = typename decltype(traits)::ColumnType;
334+
column = std::make_shared<ColType>();
335+
}
336+
});
337+
return column;
338+
}
339+
340+
GetterFn make_getter(DataType type) {
341+
GetterFn getter;
342+
visit(type, [&](auto traits) {
343+
getter = [](fds_drec_field field, ValueVariant &value) { value = decltype(traits)::Getter(field); };
344+
});
345+
return getter;
346+
}
347+
348+
ColumnWriterFn make_columnwriter(DataType type, DataTypeNullable nullable) {
349+
ColumnWriterFn columnwriter;
350+
351+
if (nullable == DataTypeNullable::Nullable) {
352+
visit(type, [&](auto traits) {
353+
columnwriter = [](ValueVariant *value, clickhouse::Column &column) {
354+
using ColumnType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
355+
using ValueType = std::invoke_result_t<decltype(decltype(traits)::Getter), fds_drec_field>;
356+
auto *col = dynamic_cast<ColumnType*>(&column);
357+
if (!value) {
358+
col->Append(std::nullopt);
359+
} else {
360+
col->Append(std::get<ValueType>(*value));
361+
}
362+
};
363+
});
364+
365+
} else {
366+
visit(type, [&](auto traits) {
367+
columnwriter = [](ValueVariant *value, clickhouse::Column &column) {
368+
using ColumnType = typename decltype(traits)::ColumnType;
369+
using ValueType = std::invoke_result_t<decltype(decltype(traits)::Getter), fds_drec_field>;
370+
static const auto ZeroValue = ValueType{};
371+
auto *col = dynamic_cast<ColumnType*>(&column);
372+
if (!value) {
373+
col->Append(ZeroValue);
374+
} else {
375+
col->Append(std::get<ValueType>(*value));
376+
}
377+
};
378+
});
379+
}
380+
381+
return columnwriter;
382+
}
383+
384+
std::string type_to_clickhouse(DataType type, DataTypeNullable nullable) {
385+
std::string result;
386+
visit(type, [&](auto traits) {
387+
result = traits.ClickhouseTypeName;
388+
if (nullable == DataTypeNullable::Nullable) {
389+
result = "Nullable(" + result + ")";
390+
}
391+
});
392+
return result;
393+
}

0 commit comments

Comments
 (0)