Skip to content

Commit 753e79e

Browse files
author
Damir Zainullin
committed
Biflow aggregator - Update main.cpp
1 parent 00380bf commit 753e79e

File tree

1 file changed

+31
-8
lines changed

1 file changed

+31
-8
lines changed

biflow_aggregator/main.cpp

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ UR_FIELDS (
7979
PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \
8080
PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \
8181
PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \
82-
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number")
82+
PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \
83+
PARAM('g', "global-timeout", "Global timeout.", required_argument, "number")
8384

8485
trap_module_info_t *module_info = NULL;
8586
static volatile int stop = 0;
@@ -364,6 +365,20 @@ void update_flow(
364365
if (pt != t_data->value.passive_timeout)
365366
dll.swap(t_data);
366367
}
368+
369+
static void flush_all(agg::Aggregator<agg::FlowKey>& aggregator,
370+
ur_template_t* out_template, void* out_record, Dll<agg::Timeout_data>& dll)
371+
{
372+
for (auto flow_data : aggregator.flow_cache) {
373+
proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record);
374+
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
375+
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
376+
}
377+
dll.clear();
378+
aggregator.flow_cache.clear();
379+
trap_send_flush(0);
380+
}
381+
367382
static int
368383
do_mainloop(Configuration& config)
369384
{
@@ -383,8 +398,11 @@ do_mainloop(Configuration& config)
383398

384399
time_t time_first;
385400
time_t time_last = 0;
401+
time_t last_flush_time = 0;
386402
time_t t_passive = config.get_passive_timeout() >> 32;
387403
time_t t_active = config.get_active_timeout() >> 32;
404+
const Configuration::Global_flush_configuration& flush_configuration
405+
= config.get_global_flush_configuration();
388406
std::size_t flow_cnt = 0;
389407
Dll<agg::Timeout_data> dll;
390408

@@ -432,13 +450,7 @@ do_mainloop(Configuration& config)
432450

433451
// clear all memory
434452
// flush all flows
435-
for (auto flow_data : agg.flow_cache) {
436-
proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec);
437-
agg::Flow_key_allocator::release_ptr(static_cast<uint8_t *>(flow_data.first.get_key().first));
438-
agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx);
439-
}
440-
441-
trap_send_flush(0);
453+
flush_all(agg, out_tmplt, out_rec, dll);
442454

443455
// Free previous record and temlate
444456
ur_free_template(out_tmplt);
@@ -490,6 +502,14 @@ do_mainloop(Configuration& config)
490502
trap_send_flush(0);
491503
timeouted = false;
492504
}
505+
506+
if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) {
507+
last_flush_time = time_last;
508+
if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) {
509+
last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval;
510+
}
511+
flush_all(agg, out_tmplt, out_rec, dll);
512+
}
493513

494514
bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key());
495515

@@ -662,6 +682,9 @@ main(int argc, char **argv)
662682
case 's':
663683
config.set_flow_cache_size(optarg);
664684
break;
685+
case 'g':
686+
config.set_global_flush_configuration(optarg);
687+
break;
665688
default:
666689
std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl;
667690
}

0 commit comments

Comments
 (0)