Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions biflow_aggregator/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura
rapidxml.hpp
biflow_aggregator_LDADD=-lunirec -ltrap
include ../aminclude.am

TESTS = tests/test.sh
21 changes: 21 additions & 0 deletions biflow_aggregator/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ bool Configuration::get_eof_termination() noexcept
return _eof_terminate;
}

void Configuration::set_global_flush_configuration(const char *input)
{
std::size_t mode_start_index;
_global_flush_configuration.interval = std::stoul(input, &mode_start_index);
if (std::strcmp(input + mode_start_index, "a") == 0 ||
std::strcmp(input + mode_start_index, "absolute") == 0) {
_global_flush_configuration.type = Global_flush_configuration::Type::ABSOLUTE;
} else if (std::strcmp(input + mode_start_index, "r") == 0 ||
std::strcmp(input + mode_start_index, "relative") == 0 ||
std::strcmp(input + mode_start_index, "") == 0) {
_global_flush_configuration.type = Global_flush_configuration::Type::RELATIVE;
} else {
throw std::invalid_argument("Invalid flush timeout format. Expected: <interval> [a|absolute|r|relative|<empty for relative>].");
}
}

Configuration::Global_flush_configuration Configuration::get_global_flush_configuration() noexcept
{
return _global_flush_configuration;
}

void Configuration::print() noexcept
{
std::cout << "***** Configuration *****" << std::endl;
Expand Down
51 changes: 51 additions & 0 deletions biflow_aggregator/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@
* @brief Class thas holds module configuration
*/
class Configuration {
public:

/**
* @brief Global flush configuration
*
* Flush interval is used to flush all records in flow cache to output interface once per given amount of seconds.
* If not set, no flush is performed.
*/
struct Global_flush_configuration {
enum class Type {
ABSOLUTE, ///< Flows must be flushed every interval seconds starting from epoch
RELATIVE, ///< Flows must be flushed every interval seconds starting from module start
} type;
time_t interval = 0; ///< Interval in seconds

/**
* @brief Check if flush interval is set
*
* @return true Flush interval is set
* @return false Flush interval is not set
*/
[[nodiscard]] inline
bool is_set() const noexcept
{
return interval > 0;
}
};

private:

/**
* @brief Configuration of fields from config file.
Expand All @@ -45,6 +74,14 @@ class Configuration {
*/
time_t _t_passive;

/**
* @brief Periodic flush configuration
*
* If set, module flush all records in flow cache to output interface once per given amount of seconds.
* If flush interval is set to 0, no flush is performed.
*/
Global_flush_configuration _global_flush_configuration;

/**
* @brief active timeout
*
Expand Down Expand Up @@ -155,6 +192,20 @@ class Configuration {
*/
time_t get_active_timeout() noexcept;

/**
* @brief Set the flush timeout
*
* See _periodic_flush_configuration for more info.
*
* @param input Timeout in text format.
*/
void set_global_flush_configuration(const char *input);

/**
* @brief Get the flush timeout object
*/
Global_flush_configuration get_global_flush_configuration() noexcept;

/**
* @brief Set the passive timeout
*
Expand Down
39 changes: 31 additions & 8 deletions biflow_aggregator/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ UR_FIELDS (
PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \
PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \
PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number")
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \
PARAM('g', "global-timeout", "Global timeout.", required_argument, "number")

trap_module_info_t *module_info = NULL;
static volatile int stop = 0;
Expand Down Expand Up @@ -364,6 +365,20 @@ void update_flow(
if (pt != t_data->value.passive_timeout)
dll.swap(t_data);
}

static void flush_all(agg::Aggregator<agg::FlowKey>& aggregator,
ur_template_t* out_template, void* out_record, Dll<agg::Timeout_data>& dll)
{
for (auto flow_data : aggregator.flow_cache) {
proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record);
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
}
dll.clear();
aggregator.flow_cache.clear();
trap_send_flush(0);
}

static int
do_mainloop(Configuration& config)
{
Expand All @@ -383,8 +398,11 @@ do_mainloop(Configuration& config)

time_t time_first;
time_t time_last = 0;
time_t last_flush_time = 0;
time_t t_passive = config.get_passive_timeout() >> 32;
time_t t_active = config.get_active_timeout() >> 32;
const Configuration::Global_flush_configuration& flush_configuration
= config.get_global_flush_configuration();
std::size_t flow_cnt = 0;
Dll<agg::Timeout_data> dll;

Expand Down Expand Up @@ -432,13 +450,7 @@ do_mainloop(Configuration& config)

// clear all memory
// flush all flows
for (auto flow_data : agg.flow_cache) {
proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec);
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
}

trap_send_flush(0);
flush_all(agg, out_tmplt, out_rec, dll);

// Free previous record and temlate
ur_free_template(out_tmplt);
Expand Down Expand Up @@ -490,6 +502,14 @@ do_mainloop(Configuration& config)
trap_send_flush(0);
timeouted = false;
}

if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) {
last_flush_time = time_last;
if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) {
last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval;
}
flush_all(agg, out_tmplt, out_rec, dll);
}

bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key());

Expand Down Expand Up @@ -662,6 +682,9 @@ main(int argc, char **argv)
case 's':
config.set_flow_cache_size(optarg);
break;
case 'g':
config.set_global_flush_configuration(optarg);
break;
default:
std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl;
}
Expand Down
85 changes: 85 additions & 0 deletions biflow_aggregator/tests/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?xml version="1.0"?>
<aggregator>
<id name="packet_aggregation">
<field>
<type>SUM</type>
<name>PACKETS</name>
</field>
</id>
<id name="generic_flow_key_min_ports">
<field>
<name>FLOW_ID</name>
<type>KEY</type>
</field>

<field>
<name>SRC_PORT</name>
<type>MIN</type>
</field>
<field>
<name>DST_PORT</name>
<type>MIN</type>
</field>
</id>
<id name="src_dst_ip_all_aggregations">
<field>
<name>SRC_IP</name>
<type>KEY</type>
</field>
<field>
<name>DST_IP</name>
<type>KEY</type>
</field>

<field>
<name>SUM</name>
<type>SUM</type>
</field>
<field>
<name>MIN</name>
<type>MIN</type>
</field>
<field>
<name>MAX</name>
<type>MAX</type>
</field>
<field>
<name>FIRST_NON_EMPTY</name>
<type>FIRST_NON_EMPTY</type>
</field>
<field>
<name>LAST_NON_EMPTY</name>
<type>LAST_NON_EMPTY</type>
</field>
<field>
<name>FIRST</name>
<type>FIRST</type>
</field>
<field>
<name>LAST</name>
<type>LAST</type>
</field>
<field>
<name>AVG</name>
<type>AVG</type>
</field>
<field>
<name>BITOR</name>
<type>BITOR</type>
</field>
<field>
<name>STR_APPEND</name>
<type>APPEND</type>
<delimiter>:</delimiter>
<size>10</size>
</field>
<field>
<name>SORTED_MERGE_VALUE</name>
<type>SORTED_MERGE</type>
<delimiter>:</delimiter>
<sort_key>SORTED_MERGE_KEY</sort_key>
<sort_type>ASCENDING</sort_type>
<size>10</size>
</field>
</id>
</aggregator>
3 changes: 3 additions & 0 deletions biflow_aggregator/tests/inputs/input1_packet_aggregation
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0
6 changes: 6 additions & 0 deletions biflow_aggregator/tests/inputs/input2_packet_aggregation
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0
192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0
192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0
192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST,uint32 FLOW_ID,uint16 SRC_PORT,uint16 DST_PORT
192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,1,0,6666
192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0,1,6666,0
192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0,2,6666,0
192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0,2,3333,3333
192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0,2,0,6666
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ipaddr DST_IP,ipaddr SRC_IP,time TIME_FIRST,time TIME_LAST,uint32 SUM,uint32 MIN,uint32 MAX,string FIRST_NON_EMPTY,string LAST_NON_EMPTY,uint32 FIRST,uint32 LAST,double AVG,string STR_APPEND,uint32 BITOR,uint32* SORTED_MERGE_KEY,uint32* SORTED_MERGE_VALUE
192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,333,5,5,,222,16,32,7,test1,1,[3|2|1],[1|2|3]
192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:11.0,333,1,4,test,,555,33,9,test2,3,[8|7|6],[6|5|4]
1 change: 1 addition & 0 deletions biflow_aggregator/tests/references/reference1_gt0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667
2 changes: 2 additions & 0 deletions biflow_aggregator/tests/references/reference1_gt5a
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1
2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666
1 change: 1 addition & 0 deletions biflow_aggregator/tests/references/reference1_gt5r
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667
2 changes: 2 additions & 0 deletions biflow_aggregator/tests/references/reference2_gt0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:23.000000,4,694
2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,53
5 changes: 5 additions & 0 deletions biflow_aggregator/tests/references/reference2_gt5a
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1
2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666
2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10
2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17
2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,53
4 changes: 4 additions & 0 deletions biflow_aggregator/tests/references/reference2_gt5r
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667
2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10
2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17
2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,53
2 changes: 2 additions & 0 deletions biflow_aggregator/tests/references/reference3_gt0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0
2016-10-28T17:00:13.000000,2016-10-28T17:00:41.000000,3,2,0,0
5 changes: 5 additions & 0 deletions biflow_aggregator/tests/references/reference3_gt5a
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,6666,0
2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,1,0,6666
2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666
2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333
2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0
4 changes: 4 additions & 0 deletions biflow_aggregator/tests/references/reference3_gt5r
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0
2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666
2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333
2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0
1 change: 1 addition & 0 deletions biflow_aggregator/tests/references/reference4_gt0
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6]
2 changes: 2 additions & 0 deletions biflow_aggregator/tests/references/reference4_gt5a
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
192.168.1.1,192.168.1.2,7.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,16,32,5,5,333,"","222","test1:",[3|2|1]
192.168.1.1,192.168.1.2,9.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,1,555,33,4,1,333,"test","","test2:",[4|5|6]
1 change: 1 addition & 0 deletions biflow_aggregator/tests/references/reference4_gt5r
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6]
Loading
Loading