Skip to content

Commit 4121597

Browse files
authored
Merge pull request #243 from Zadamsa/biflow-aggregator-unique-count
Biflow aggregator - unique_count aggregation function
2 parents 74ba22f + ca28697 commit 4121597

36 files changed

+516
-19
lines changed

Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
ACLOCAL_AMFLAGS = -I m4
22

3+
# Biflow aggregator depends on logger and logreplay
34
SUBDIRS= \
45
aggregator \
56
anonymizer \
67
backscatter \
7-
biflow_aggregator \
88
debug_sender \
99
device_classifier \
1010
email_reporter \
@@ -31,6 +31,7 @@ topn \
3131
traffic_repeater \
3232
unirec2json \
3333
endiverter \
34+
biflow_aggregator \
3435
googletest_example
3536

3637
EXTRA_DIST = AUTHORS COPYING ChangeLog INSTALL NEWS README.md nfreader \

biflow_aggregator/Makefile.am

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,10 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura
55
rapidxml.hpp
66
biflow_aggregator_LDADD=-lunirec -ltrap
77
include ../aminclude.am
8+
9+
TESTS = tests/test.sh
10+
11+
EXTRA_DIST = tests/test.sh \
12+
tests/references \
13+
tests/inputs \
14+
tests/config.xml

biflow_aggregator/aggregator.cpp

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ int Field_template::assign_append() noexcept
184184
return 0;
185185
}
186186

187+
template<typename T>
188+
int Field_template::assign_unique_count() noexcept
189+
{
190+
typename_size = sizeof(T);
191+
ag_fnc = unique_count<T>;
192+
post_proc_fnc = Unique_count_data<T>::postprocessing;
193+
init_fnc = Unique_count_data<T>::init;
194+
deinit_fnc = Unique_count_data<T>::deinit;
195+
ag_data_size = sizeof(Unique_count_data<T>);
196+
return 0;
197+
}
198+
187199
template<Field_type ag_type, typename T>
188200
int Field_template::assign_min_max() noexcept
189201
{
@@ -431,6 +443,23 @@ int Field_template::set_templates(const Field_type ag_type, const ur_field_type_
431443
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to APPEND function." << std::endl;
432444
return 1;
433445
}
446+
case UNIQUE_COUNT:
447+
switch (ur_f_type) {
448+
case UR_TYPE_UINT8: return assign_unique_count<uint8_t>();
449+
case UR_TYPE_INT8: return assign_unique_count<int8_t>();
450+
case UR_TYPE_UINT16: return assign_unique_count<uint16_t>();
451+
case UR_TYPE_INT16: return assign_unique_count<int16_t>();
452+
case UR_TYPE_UINT32: return assign_unique_count<uint32_t>();
453+
case UR_TYPE_INT32: return assign_unique_count<int32_t>();
454+
case UR_TYPE_UINT64: return assign_unique_count<uint64_t>();
455+
case UR_TYPE_INT64: return assign_unique_count<int64_t>();
456+
case UR_TYPE_MAC: return assign_unique_count<Mac_addr>();
457+
case UR_TYPE_IP: return assign_unique_count<uint128_t>();
458+
case UR_TYPE_STRING: return assign_unique_count<char>();
459+
default:
460+
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to UNIQUE_COUNT function." << std::endl;
461+
return 1;
462+
}
434463
default:
435464
assert("Invalid case option.\n");
436465
return 1;
@@ -732,8 +761,10 @@ int Field_template::set_templates_dir(const ur_field_type_t ur_f_type, const ur_
732761
}
733762
}
734763

735-
Field::Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid) :
736-
ur_fid(ur_fid), ur_r_fid(ur_r_fid), ur_sort_key_id(0), ur_sort_key_type()
764+
Field::Field(const Field_config cfg, const ur_field_id_t ur_fid_in, const ur_field_id_t ur_r_fid_in,
765+
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out)
766+
: ur_fid(ur_fid_in), ur_r_fid(ur_r_fid_in), ur_fid_out(ur_fid_out),
767+
ur_r_fid_out(ur_r_fid_out), ur_sort_key_id(0), ur_sort_key_type()
737768
{
738769
ur_field_type_t ur_field_type = ur_get_type(ur_fid);
739770

@@ -813,6 +844,11 @@ void Fields::init(uint8_t *memory)
813844
data.first.init(memory, &cfg);
814845
break;
815846
}
847+
case UNIQUE_COUNT: {
848+
struct Config_unique_count cfg = {data.first.limit};
849+
data.first.init(memory, &cfg);
850+
break;
851+
}
816852
case SORTED_MERGE:
817853
case SORTED_MERGE_DIR: {
818854
struct Config_sorted_merge cfg = {data.first.limit, data.first.delimiter, data.first.sort_type};

biflow_aggregator/aggregator.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ enum Field_type {
4040
LAST,
4141
LAST_NON_EMPTY,
4242
APPEND,
43+
UNIQUE_COUNT,
4344
SORTED_MERGE,
4445
SORTED_MERGE_DIR,
4546
INVALID_TYPE,
@@ -95,6 +96,9 @@ class Field_template {
9596
template<typename T>
9697
int assign_append() noexcept;
9798

99+
template<typename T>
100+
int assign_unique_count() noexcept;
101+
98102
template<Field_type ag_type, typename T>
99103
int assign_min_max() noexcept;
100104

@@ -275,7 +279,7 @@ struct Field_config {
275279
char delimiter;
276280

277281
/**
278-
* @brief Max size of append and sortd merge data
282+
* @brief Max size of data
279283
*/
280284
std::size_t limit;
281285

@@ -301,6 +305,16 @@ class Field : public Field_config, public Field_template {
301305
*/
302306
ur_field_id_t ur_r_fid;
303307

308+
/**
309+
* @brief ID of output unirec field
310+
*/
311+
ur_field_id_t ur_fid_out;
312+
313+
/**
314+
* @brief Reverse ID of output unirec field
315+
*/
316+
ur_field_id_t ur_r_fid_out;
317+
304318
/**
305319
* @brief ID of sort key unirec field
306320
*
@@ -321,8 +335,11 @@ class Field : public Field_config, public Field_template {
321335
* @param cfg Field configuration
322336
* @param ur_fid Field ID
323337
* @param ur_r_fid Reverse field ID
338+
* @param ur_fid_out Output field ID
339+
* @param ur_r_fid_out Reverse output field ID
324340
*/
325-
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid);
341+
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid,
342+
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out);
326343

327344
/**
328345
* @brief Call field init function.

biflow_aggregator/aggregator_functions.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <iostream>
1717
#include <limits>
18+
#include <nemea-common/BloomFilter.hpp>
1819

1920
#include <unirec/unirec.h>
2021

@@ -174,6 +175,50 @@ struct Append_data : Config_append {
174175
}
175176
};
176177

178+
/**
179+
* @brief Configuration for unique count function
180+
*/
181+
struct Config_unique_count {
182+
std::size_t filter_size;
183+
};
184+
185+
/**
186+
* @brief Structure used to store data for unique count function.
187+
*/
188+
template<typename T>
189+
struct Unique_count_data : Config_unique_count {
190+
bloom_filter filter;
191+
std::size_t unique_count;
192+
193+
Unique_count_data(const bloom_parameters& parameters)
194+
: filter(parameters)
195+
{}
196+
197+
static inline void init(void* memory, const void* config)
198+
{
199+
const auto* config_unique_count = static_cast<const Config_unique_count*>(config);
200+
bloom_parameters parameters;
201+
parameters.projected_element_count = config_unique_count->filter_size;
202+
parameters.false_positive_probability = 0.01;
203+
parameters.compute_optimal_parameters();
204+
new(memory) Unique_count_data<T>(parameters);
205+
}
206+
207+
static inline void deinit(void* memory)
208+
{
209+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
210+
unique_count_data->unique_count = 0;
211+
unique_count_data->~Unique_count_data<T>();
212+
}
213+
214+
static inline const void* postprocessing(void* memory, std::size_t& elem_cnt) noexcept
215+
{
216+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
217+
elem_cnt = 1;
218+
return static_cast<void*>(&unique_count_data->unique_count);
219+
}
220+
};
221+
177222
/**
178223
* @brief Configuration to sorted merge function
179224
*/
@@ -519,6 +564,32 @@ inline void append(const void *src, void *dst) noexcept
519564
static_cast<const T*>(src_data->ptr_first) + src_data->cnt_elements);
520565
}
521566

567+
/**
568+
* @brief Inserts element from src pointer into bloom filter.
569+
* @tparam T template type variable.
570+
* @param [in] src pointer to source of new data.
571+
* @param [in,out] dst pointer to already stored data which will be updated (modified).
572+
*/
573+
template<typename T>
574+
inline void unique_count(const void* src, void* dst) noexcept
575+
{
576+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(dst);
577+
578+
if (std::is_same<T, char>::value) {
579+
const ur_array_data* src_data = (static_cast<const ur_array_data*>(src));
580+
if (src_data->cnt_elements != 0 && !unique_count_data->filter.contains(
581+
static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements)) {
582+
unique_count_data->filter.insert(static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements);
583+
unique_count_data->unique_count++;
584+
}
585+
return;
586+
}
587+
if (!unique_count_data->filter.contains(static_cast<const unsigned char*>(src), sizeof(T))) {
588+
unique_count_data->filter.insert(static_cast<const unsigned char*>(src), sizeof(T));
589+
unique_count_data->unique_count++;
590+
}
591+
}
592+
522593
template <typename T, typename K>
523594
inline void sorted_merge(const void *src, void *dst) noexcept
524595
{

biflow_aggregator/configuration.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ bool Configuration::get_eof_termination() noexcept
7171
return _eof_terminate;
7272
}
7373

74+
void Configuration::set_global_flush_configuration(const char *input)
75+
{
76+
std::size_t mode_start_index;
77+
_global_flush_configuration.interval = std::stoul(input, &mode_start_index);
78+
if (std::strcmp(input + mode_start_index, "a") == 0 ||
79+
std::strcmp(input + mode_start_index, "absolute") == 0) {
80+
_global_flush_configuration.type = Global_flush_configuration::Type::ABSOLUTE;
81+
} else if (std::strcmp(input + mode_start_index, "r") == 0 ||
82+
std::strcmp(input + mode_start_index, "relative") == 0 ||
83+
std::strcmp(input + mode_start_index, "") == 0) {
84+
_global_flush_configuration.type = Global_flush_configuration::Type::RELATIVE;
85+
} else {
86+
throw std::invalid_argument("Invalid flush timeout format. Expected: <interval> [a|absolute|r|relative|<empty for relative>].");
87+
}
88+
}
89+
90+
Configuration::Global_flush_configuration Configuration::get_global_flush_configuration() noexcept
91+
{
92+
return _global_flush_configuration;
93+
}
94+
7495
void Configuration::print() noexcept
7596
{
7697
std::cout << "***** Configuration *****" << std::endl;
@@ -107,6 +128,7 @@ agg::Field_type Configuration::get_field_type(const char *input)
107128
if (!std::strcmp(input, "BITAND")) return agg::BIT_AND;
108129
if (!std::strcmp(input, "BITOR")) return agg::BIT_OR;
109130
if (!std::strcmp(input, "APPEND")) return agg::APPEND;
131+
if (!std::strcmp(input, "UNIQUE_COUNT")) return agg::UNIQUE_COUNT;
110132
if (!std::strcmp(input, "SORTED_MERGE")) return agg::SORTED_MERGE;
111133
if (!std::strcmp(input, "SORTED_MERGE_DIR")) return agg::SORTED_MERGE_DIR;
112134
std::cerr << "Invalid type field. Given: " << input << ", Expected: KEY|SUM|MIN|MAX|AVG|FIRST|FIRST_NON_EMPTY|LAST|LAST_NON_EMPTY|BITAND|BITOR|APPEND|SORTED_MERGE|SORTED_MERGE_DIR." << std::endl;

biflow_aggregator/configuration.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,35 @@
2323
* @brief Class thas holds module configuration
2424
*/
2525
class Configuration {
26+
public:
27+
28+
/**
29+
* @brief Global flush configuration
30+
*
31+
* Flush interval is used to flush all records in flow cache to output interface once per given amount of seconds.
32+
* If not set, no flush is performed.
33+
*/
34+
struct Global_flush_configuration {
35+
enum class Type {
36+
ABSOLUTE, ///< Flows must be flushed every interval seconds starting from epoch
37+
RELATIVE, ///< Flows must be flushed every interval seconds starting from module start
38+
} type;
39+
time_t interval = 0; ///< Interval in seconds
40+
41+
/**
42+
* @brief Check if flush interval is set
43+
*
44+
* @return true Flush interval is set
45+
* @return false Flush interval is not set
46+
*/
47+
[[nodiscard]] inline
48+
bool is_set() const noexcept
49+
{
50+
return interval > 0;
51+
}
52+
};
53+
54+
private:
2655

2756
/**
2857
* @brief Configuration of fields from config file.
@@ -45,6 +74,14 @@ class Configuration {
4574
*/
4675
time_t _t_passive;
4776

77+
/**
78+
* @brief Periodic flush configuration
79+
*
80+
* If set, module flush all records in flow cache to output interface once per given amount of seconds.
81+
* If flush interval is set to 0, no flush is performed.
82+
*/
83+
Global_flush_configuration _global_flush_configuration;
84+
4885
/**
4986
* @brief active timeout
5087
*
@@ -155,6 +192,20 @@ class Configuration {
155192
*/
156193
time_t get_active_timeout() noexcept;
157194

195+
/**
196+
* @brief Set the flush timeout
197+
*
198+
* See _periodic_flush_configuration for more info.
199+
*
200+
* @param input Timeout in text format.
201+
*/
202+
void set_global_flush_configuration(const char *input);
203+
204+
/**
205+
* @brief Get the flush timeout object
206+
*/
207+
Global_flush_configuration get_global_flush_configuration() noexcept;
208+
158209
/**
159210
* @brief Set the passive timeout
160211
*

0 commit comments

Comments
 (0)