77 * SPDX-License-Identifier: BSD-3-Clause
88 */
99
10- #include < common/flowProvider.hpp>
10+ #include < vector>
11+
1112#include < aggregator/aggregator.hpp>
1213#include < aggregator/mode.hpp>
14+ #include < aggregator/print.hpp>
1315#include < aggregator/printer.hpp>
14- #include < aggregator/view .hpp>
16+ #include < aggregator/threadedAggregator .hpp>
1517#include < aggregator/viewFactory.hpp>
18+ #include < common/channel.hpp>
19+ #include < common/flowProvider.hpp>
1620
1721#include < memory>
1822#include < iomanip>
1923
2024namespace fdsdump {
2125namespace aggregator {
2226
23- void
24- mode_aggregate (const Options &opts)
27+ static void print_results (const Options &opts, std::vector<uint8_t *> items)
2528{
2629 View view = ViewFactory::create_view (
2730 opts.get_aggregation_keys (),
2831 opts.get_aggregation_values (),
29- opts.get_order_by ());
32+ opts.get_order_by (),
33+ opts.get_output_limit ());
3034
3135 std::unique_ptr<Printer> printer = printer_factory (
3236 view,
3337 opts.get_output_specifier ());
34- FlowProvider flows;
35- Aggregator aggr (view);
3638
3739 const size_t rec_limit = opts.get_output_limit ();
3840 size_t rec_printed = 0 ;
3941
40- flows.set_biflow_autoignore (opts.get_biflow_autoignore ());
41-
42- if (!opts.get_input_filter ().empty ()) {
43- flows.set_filter (opts.get_input_filter ());
44- }
45-
46- for (const auto &it : glob_files (opts.get_input_file_patterns ())) {
47- flows.add_file (it);
48- }
49-
50- while (true ) {
51- Flow *flow = flows.next_record ();
52-
53- if (!flow) {
54- break ;
55- }
56-
57- aggr.process_record (*flow);
58- }
59-
60- aggr.sort_items ();
6142 printer->print_prologue ();
6243
63- for (uint8_t *record : aggr. items () ) {
44+ for (uint8_t *record : items) {
6445 if (rec_limit != 0 && rec_printed >= rec_limit) {
6546 break ;
6647 }
@@ -72,5 +53,48 @@ mode_aggregate(const Options &opts)
7253 printer->print_epilogue ();
7354}
7455
56+ void
57+ mode_aggregate (const Options &opts)
58+ {
59+ Channel<ThreadedAggregator *> notify_channel;
60+ ThreadedAggregator aggregator (
61+ opts.get_aggregation_keys (),
62+ opts.get_aggregation_values (),
63+ opts.get_input_filter (),
64+ opts.get_input_file_patterns (),
65+ opts.get_order_by (),
66+ opts.get_num_threads (),
67+ opts.get_biflow_autoignore (),
68+ true ,
69+ opts.get_output_limit (),
70+ notify_channel);
71+ aggregator.start ();
72+
73+ while (true ) {
74+ notify_channel.get (std::chrono::milliseconds (1000 ));
75+ auto state = aggregator.get_aggregator_state ();
76+
77+ auto state_str = aggregator_state_to_str (state);
78+ if (state == AggregatorState::aggregating) {
79+ auto percent = (double (aggregator.get_processed_flows ()) / double (aggregator.get_total_flows ())) * 100.0 ;
80+ LOG_INFO << " Status: " << state_str << " (" << std::fixed << std::setprecision (2 ) << percent << " %)" ;
81+ } else {
82+ LOG_INFO << " Status: " << state_str;
83+ }
84+
85+ if (state == AggregatorState::errored) {
86+ std::rethrow_exception (aggregator.get_exception ());
87+ }
88+
89+ if (state == AggregatorState::finished) {
90+ break ;
91+ }
92+ }
93+
94+ aggregator.join ();
95+
96+ print_results (opts, aggregator.get_results ());
97+ }
98+
7599} // aggregator
76100} // fdsdump
0 commit comments