Skip to content

Commit 4c4cd2c

Browse files
author
Damir Zainullin
committed
Biflow aggregator - update main.cpp
1 parent 00380bf commit 4c4cd2c

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

biflow_aggregator/main.cpp

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ UR_FIELDS (
7979
PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \
8080
PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \
8181
PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \
82-
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number")
82+
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \
83+
PARAM('g', "global-timeout", "Global timeout.", required_argument, "number")
8384

8485
trap_module_info_t *module_info = NULL;
8586
static volatile int stop = 0;
@@ -364,6 +365,19 @@ void update_flow(
364365
if (pt != t_data->value.passive_timeout)
365366
dll.swap(t_data);
366367
}
368+
369+
static void flush_all(agg::Aggregator<agg::FlowKey> aggregator, ur_template_t* out_template, void* out_record)
370+
{
371+
int count = 0;
372+
for (auto flow_data : aggregator.flow_cache) {
373+
proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record);
374+
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
375+
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
376+
count++;
377+
}
378+
trap_send_flush(0);
379+
}
380+
367381
static int
368382
do_mainloop(Configuration& config)
369383
{
@@ -383,8 +397,11 @@ do_mainloop(Configuration& config)
383397

384398
time_t time_first;
385399
time_t time_last = 0;
400+
time_t last_flush_time = 0;
386401
time_t t_passive = config.get_passive_timeout() >> 32;
387402
time_t t_active = config.get_active_timeout() >> 32;
403+
const Configuration::Global_flush_configuration& flush_configuration
404+
= config.get_global_flush_configuration();
388405
std::size_t flow_cnt = 0;
389406
Dll<agg::Timeout_data> dll;
390407

@@ -432,13 +449,7 @@ do_mainloop(Configuration& config)
432449

433450
// clear all memory
434451
// flush all flows
435-
for (auto flow_data : agg.flow_cache) {
436-
proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec);
437-
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
438-
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
439-
}
440-
441-
trap_send_flush(0);
452+
flush_all(agg, out_tmplt, out_rec);
442453

443454
// Free previous record and temlate
444455
ur_free_template(out_tmplt);
@@ -490,6 +501,14 @@ do_mainloop(Configuration& config)
490501
trap_send_flush(0);
491502
timeouted = false;
492503
}
504+
505+
if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) {
506+
last_flush_time = time_last;
507+
if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) {
508+
last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval;
509+
}
510+
flush_all(agg, out_tmplt, out_rec);
511+
}
493512

494513
bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key());
495514

@@ -662,6 +681,9 @@ main(int argc, char **argv)
662681
case 's':
663682
config.set_flow_cache_size(optarg);
664683
break;
684+
case 'g':
685+
config.set_global_flush_configuration(optarg);
686+
break;
665687
default:
666688
std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl;
667689
}

0 commit comments

Comments
 (0)