Skip to content

Commit f314ce1

Browse files
author
Damir Zainullin
committed
Biflow aggregator - Add UNIQUE_COUNT aggregation function
1 parent 6b89b96 commit f314ce1

File tree

5 files changed

+144
-8
lines changed

5 files changed

+144
-8
lines changed

biflow_aggregator/aggregator.cpp

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,18 @@ int Field_template::assign_append() noexcept
184184
return 0;
185185
}
186186

187+
template<typename T>
188+
int Field_template::assign_unique_count() noexcept
189+
{
190+
typename_size = sizeof(T);
191+
ag_fnc = unique_count<T>;
192+
post_proc_fnc = Unique_count_data<T>::postprocessing;
193+
init_fnc = Unique_count_data<T>::init;
194+
deinit_fnc = Unique_count_data<T>::deinit;
195+
ag_data_size = sizeof(Unique_count_data<T>);
196+
return 0;
197+
}
198+
187199
template<Field_type ag_type, typename T>
188200
int Field_template::assign_min_max() noexcept
189201
{
@@ -431,6 +443,23 @@ int Field_template::set_templates(const Field_type ag_type, const ur_field_type_
431443
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to APPEND function." << std::endl;
432444
return 1;
433445
}
446+
case UNIQUE_COUNT:
447+
switch (ur_f_type) {
448+
case UR_TYPE_UINT8: return assign_unique_count<uint8_t>();
449+
case UR_TYPE_INT8: return assign_unique_count<int8_t>();
450+
case UR_TYPE_UINT16: return assign_unique_count<uint16_t>();
451+
case UR_TYPE_INT16: return assign_unique_count<int16_t>();
452+
case UR_TYPE_UINT32: return assign_unique_count<uint32_t>();
453+
case UR_TYPE_INT32: return assign_unique_count<int32_t>();
454+
case UR_TYPE_UINT64: return assign_unique_count<uint64_t>();
455+
case UR_TYPE_INT64: return assign_unique_count<int64_t>();
456+
case UR_TYPE_MAC: return assign_unique_count<Mac_addr>();
457+
case UR_TYPE_IP: return assign_unique_count<uint128_t>();
458+
case UR_TYPE_STRING: return assign_unique_count<char>();
459+
default:
460+
std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to UNIQUE_COUNT function." << std::endl;
461+
return 1;
462+
}
434463
default:
435464
assert("Invalid case option.\n");
436465
return 1;
@@ -732,8 +761,10 @@ int Field_template::set_templates_dir(const ur_field_type_t ur_f_type, const ur_
732761
}
733762
}
734763

735-
Field::Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid) :
736-
ur_fid(ur_fid), ur_r_fid(ur_r_fid), ur_sort_key_id(0), ur_sort_key_type()
764+
Field::Field(const Field_config cfg, const ur_field_id_t ur_fid_in, const ur_field_id_t ur_r_fid_in,
765+
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out)
766+
: ur_fid(ur_fid_in), ur_r_fid(ur_r_fid_in), ur_fid_out(ur_fid_out),
767+
ur_r_fid_out(ur_r_fid_out), ur_sort_key_id(0), ur_sort_key_type()
737768
{
738769
ur_field_type_t ur_field_type = ur_get_type(ur_fid);
739770

@@ -813,6 +844,11 @@ void Fields::init(uint8_t *memory)
813844
data.first.init(memory, &cfg);
814845
break;
815846
}
847+
case UNIQUE_COUNT: {
848+
struct Config_unique_count cfg = {data.first.limit};
849+
data.first.init(memory, &cfg);
850+
break;
851+
}
816852
case SORTED_MERGE:
817853
case SORTED_MERGE_DIR: {
818854
struct Config_sorted_merge cfg = {data.first.limit, data.first.delimiter, data.first.sort_type};

biflow_aggregator/aggregator.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ enum Field_type {
4040
LAST,
4141
LAST_NON_EMPTY,
4242
APPEND,
43+
UNIQUE_COUNT,
4344
SORTED_MERGE,
4445
SORTED_MERGE_DIR,
4546
INVALID_TYPE,
@@ -95,6 +96,9 @@ class Field_template {
9596
template<typename T>
9697
int assign_append() noexcept;
9798

99+
template<typename T>
100+
int assign_unique_count() noexcept;
101+
98102
template<Field_type ag_type, typename T>
99103
int assign_min_max() noexcept;
100104

@@ -275,7 +279,7 @@ struct Field_config {
275279
char delimiter;
276280

277281
/**
278-
* @brief Max size of append and sortd merge data
282+
* @brief Max size of data
279283
*/
280284
std::size_t limit;
281285

@@ -301,6 +305,16 @@ class Field : public Field_config, public Field_template {
301305
*/
302306
ur_field_id_t ur_r_fid;
303307

308+
/**
309+
* @brief ID of output unirec field
310+
*/
311+
ur_field_id_t ur_fid_out;
312+
313+
/**
314+
* @brief Reverse ID of output unirec field
315+
*/
316+
ur_field_id_t ur_r_fid_out;
317+
304318
/**
305319
* @brief ID of sort key unirec field
306320
*
@@ -321,8 +335,11 @@ class Field : public Field_config, public Field_template {
321335
* @param cfg Field configuration
322336
* @param ur_fid Field ID
323337
* @param ur_r_fid Reverse field ID
338+
* @param ur_fid_out Output field ID
339+
* @param ur_r_fid_out Reverse output field ID
324340
*/
325-
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid);
341+
Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid,
342+
const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out);
326343

327344
/**
328345
* @brief Call field init function.

biflow_aggregator/aggregator_functions.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <iostream>
1717
#include <limits>
18+
#include <nemea-common/BloomFilter.hpp>
1819

1920
#include <unirec/unirec.h>
2021

@@ -174,6 +175,50 @@ struct Append_data : Config_append {
174175
}
175176
};
176177

178+
/**
179+
* @brief Configuration for unique count function
180+
*/
181+
struct Config_unique_count {
182+
std::size_t filter_size;
183+
};
184+
185+
/**
186+
* @brief Structure used to store data for unique count function.
187+
*/
188+
template<typename T>
189+
struct Unique_count_data : Config_unique_count {
190+
bloom_filter filter;
191+
std::size_t unique_count;
192+
193+
Unique_count_data(const bloom_parameters& parameters)
194+
: filter(parameters)
195+
{}
196+
197+
static inline void init(void* memory, const void* config)
198+
{
199+
const auto* config_unique_count = static_cast<const Config_unique_count*>(config);
200+
bloom_parameters parameters;
201+
parameters.projected_element_count = config_unique_count->filter_size;
202+
parameters.false_positive_probability = 0.01;
203+
parameters.compute_optimal_parameters();
204+
new(memory) Unique_count_data<T>(parameters);
205+
}
206+
207+
static inline void deinit(void* memory)
208+
{
209+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
210+
unique_count_data->unique_count = 0;
211+
unique_count_data->~Unique_count_data<T>();
212+
}
213+
214+
static inline const void* postprocessing(void* memory, std::size_t& elem_cnt) noexcept
215+
{
216+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(memory);
217+
elem_cnt = 1;
218+
return static_cast<void*>(&unique_count_data->unique_count);
219+
}
220+
};
221+
177222
/**
178223
* @brief Configuration to sorted merge function
179224
*/
@@ -519,6 +564,32 @@ inline void append(const void *src, void *dst) noexcept
519564
static_cast<const T*>(src_data->ptr_first) + src_data->cnt_elements);
520565
}
521566

567+
/**
568+
* @brief Inserts element from src pointer into bloom filter.
569+
* @tparam T template type variable.
570+
* @param [in] src pointer to source of new data.
571+
* @param [in,out] dst pointer to already stored data which will be updated (modified).
572+
*/
573+
template<typename T>
574+
inline void unique_count(const void* src, void* dst) noexcept
575+
{
576+
Unique_count_data<T>* unique_count_data = static_cast<Unique_count_data<T>*>(dst);
577+
578+
if (std::is_same<T, char>::value) {
579+
const ur_array_data* src_data = (static_cast<const ur_array_data*>(src));
580+
if (src_data->cnt_elements != 0 && !unique_count_data->filter.contains(
581+
static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements)) {
582+
unique_count_data->filter.insert(static_cast<const unsigned char*>(src_data->ptr_first), src_data->cnt_elements);
583+
unique_count_data->unique_count++;
584+
}
585+
return;
586+
}
587+
if (!unique_count_data->filter.contains(static_cast<const unsigned char*>(src), sizeof(T))) {
588+
unique_count_data->filter.insert(static_cast<const unsigned char*>(src), sizeof(T));
589+
unique_count_data->unique_count++;
590+
}
591+
}
592+
522593
template <typename T, typename K>
523594
inline void sorted_merge(const void *src, void *dst) noexcept
524595
{

biflow_aggregator/configuration.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ agg::Field_type Configuration::get_field_type(const char *input)
128128
if (!std::strcmp(input, "BITAND")) return agg::BIT_AND;
129129
if (!std::strcmp(input, "BITOR")) return agg::BIT_OR;
130130
if (!std::strcmp(input, "APPEND")) return agg::APPEND;
131+
if (!std::strcmp(input, "UNIQUE_COUNT")) return agg::UNIQUE_COUNT;
131132
if (!std::strcmp(input, "SORTED_MERGE")) return agg::SORTED_MERGE;
132133
if (!std::strcmp(input, "SORTED_MERGE_DIR")) return agg::SORTED_MERGE_DIR;
133134
std::cerr << "Invalid type field. Given: " << input << ", Expected: KEY|SUM|MIN|MAX|AVG|FIRST|FIRST_NON_EMPTY|LAST|LAST_NON_EMPTY|BITAND|BITOR|APPEND|SORTED_MERGE|SORTED_MERGE_DIR." << std::endl;

biflow_aggregator/main.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ proccess_and_send(agg::Aggregator<agg::FlowKey>& agg, const agg::FlowKey& key, c
198198
agg_data = field->post_processing(&flow_data.ctx->data[agg_field.second], size, elem_cnt);
199199
}
200200
if (ur_is_array(field->ur_fid)) {
201-
ur_array_allocate(out_tmplt, out_rec, field->ur_fid, elem_cnt);
202-
std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field->ur_fid), agg_data, size * elem_cnt);
201+
ur_array_allocate(out_tmplt, out_rec, field->ur_fid_out, elem_cnt);
202+
std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field->ur_fid_out), agg_data, size * elem_cnt);
203203
} else {
204-
field_id = flow_data.reverse ? field->ur_r_fid : field->ur_fid;
204+
field_id = flow_data.reverse ? field->ur_r_fid_out : field->ur_fid_out;
205205
std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field_id), agg_data, size);
206206
}
207207
}
@@ -249,8 +249,19 @@ static int process_format_change(
249249
if (ur_get_type(ur_fid) == UR_TYPE_STRING)
250250
is_string_key = true;
251251
agg::Key_template::add(ur_fid, ur_r_fid);
252+
} else if (field_cfg.type == agg::UNIQUE_COUNT) {
253+
const int ur_fid_out = ur_define_field((field_cfg.name + "_UNIQUE_COUNT").c_str(), UR_TYPE_UINT32);
254+
out_template.append("," + field_cfg.name + "_UNIQUE_COUNT");
255+
int ur_r_fid_out = ur_fid_out;
256+
if (ur_fid != ur_r_fid) {
257+
ur_define_field((field_cfg.reverse_name + "_UNIQUE_COUNT").c_str(), UR_TYPE_UINT32);
258+
out_template.append("," + field_cfg.reverse_name + "_UNIQUE_COUNT");
259+
}
260+
field_cfg.to_output = false;
261+
agg::Field field(field_cfg, ur_fid, ur_r_fid, ur_fid_out, ur_r_fid_out);
262+
agg.fields.add_field(field);
252263
} else {
253-
agg::Field field(field_cfg, ur_fid, ur_r_fid);
264+
agg::Field field(field_cfg, ur_fid, ur_r_fid, ur_fid, ur_r_fid);
254265
agg.fields.add_field(field);
255266
}
256267
if (field_cfg.to_output)

0 commit comments

Comments
 (0)