Skip to content

Commit 9073fb0

Browse files
committed
fdsdump: make aggregate mode use threaded aggregator
1 parent 6ca3afb commit 9073fb0

File tree

2 files changed

+54
-30
lines changed

2 files changed

+54
-30
lines changed

src/tools/fdsdump/src/aggregator/mode.cpp

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,60 +7,41 @@
77
* SPDX-License-Identifier: BSD-3-Clause
88
*/
99

10-
#include <common/flowProvider.hpp>
10+
#include <vector>
11+
1112
#include <aggregator/aggregator.hpp>
1213
#include <aggregator/mode.hpp>
14+
#include <aggregator/print.hpp>
1315
#include <aggregator/printer.hpp>
14-
#include <aggregator/view.hpp>
16+
#include <aggregator/threadedAggregator.hpp>
1517
#include <aggregator/viewFactory.hpp>
18+
#include <common/channel.hpp>
19+
#include <common/flowProvider.hpp>
1620

1721
#include <memory>
1822
#include <iomanip>
1923

2024
namespace fdsdump {
2125
namespace aggregator {
2226

23-
void
24-
mode_aggregate(const Options &opts)
27+
static void print_results(const Options &opts, std::vector<uint8_t *> items)
2528
{
2629
View view = ViewFactory::create_view(
2730
opts.get_aggregation_keys(),
2831
opts.get_aggregation_values(),
29-
opts.get_order_by());
32+
opts.get_order_by(),
33+
opts.get_output_limit());
3034

3135
std::unique_ptr<Printer> printer = printer_factory(
3236
view,
3337
opts.get_output_specifier());
34-
FlowProvider flows;
35-
Aggregator aggr(view);
3638

3739
const size_t rec_limit = opts.get_output_limit();
3840
size_t rec_printed = 0;
3941

40-
flows.set_biflow_autoignore(opts.get_biflow_autoignore());
41-
42-
if (!opts.get_input_filter().empty()) {
43-
flows.set_filter(opts.get_input_filter());
44-
}
45-
46-
for (const auto &it : glob_files(opts.get_input_file_patterns())) {
47-
flows.add_file(it);
48-
}
49-
50-
while (true) {
51-
Flow *flow = flows.next_record();
52-
53-
if (!flow) {
54-
break;
55-
}
56-
57-
aggr.process_record(*flow);
58-
}
59-
60-
aggr.sort_items();
6142
printer->print_prologue();
6243

63-
for (uint8_t *record : aggr.items()) {
44+
for (uint8_t *record : items) {
6445
if (rec_limit != 0 && rec_printed >= rec_limit) {
6546
break;
6647
}
@@ -72,5 +53,48 @@ mode_aggregate(const Options &opts)
7253
printer->print_epilogue();
7354
}
7455

56+
void
57+
mode_aggregate(const Options &opts)
58+
{
59+
Channel<ThreadedAggregator *> notify_channel;
60+
ThreadedAggregator aggregator(
61+
opts.get_aggregation_keys(),
62+
opts.get_aggregation_values(),
63+
opts.get_input_filter(),
64+
opts.get_input_file_patterns(),
65+
opts.get_order_by(),
66+
opts.get_num_threads(),
67+
opts.get_biflow_autoignore(),
68+
true,
69+
opts.get_output_limit(),
70+
notify_channel);
71+
aggregator.start();
72+
73+
while (true) {
74+
notify_channel.get(std::chrono::milliseconds(1000));
75+
auto state = aggregator.get_aggregator_state();
76+
77+
auto state_str = aggregator_state_to_str(state);
78+
if (state == AggregatorState::aggregating) {
79+
auto percent = (double(aggregator.get_processed_flows()) / double(aggregator.get_total_flows())) * 100.0;
80+
LOG_INFO << "Status: " << state_str << " (" << std::fixed << std::setprecision(2) << percent << "%)";
81+
} else {
82+
LOG_INFO << "Status: " << state_str;
83+
}
84+
85+
if (state == AggregatorState::errored) {
86+
std::rethrow_exception(aggregator.get_exception());
87+
}
88+
89+
if (state == AggregatorState::finished) {
90+
break;
91+
}
92+
}
93+
94+
aggregator.join();
95+
96+
print_results(opts, aggregator.get_results());
97+
}
98+
7599
} // aggregator
76100
} // fdsdump

src/tools/fdsdump/src/aggregator/threadedAggregator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ void ThreadedAggregator::thread_worker(unsigned int thread_id)
168168
continue;
169169
}
170170

171-
info.processed_flows.fetch_add(1, std::memory_order_relaxed);
171+
info.processed_flows.store(flows.get_processed_flow_count(), std::memory_order_relaxed);
172172
aggregator.process_record(*flow);
173173
}
174174

0 commit comments

Comments
 (0)