Skip to content

Commit 9e8f903

Browse files
committed
Merge branch 'fdsdump-threading' into devel
2 parents 4b44dac + 9073fb0 commit 9e8f903

17 files changed

+1342
-73
lines changed

src/tools/fdsdump/src/aggregator/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ set(AGGREGATOR_SRC
1717
stdAllocator.cpp
1818
stdHashTable.cpp
1919
tablePrinter.cpp
20+
threadedAggregator.cpp
21+
thresholdAlgorithm.cpp
2022
value.cpp
2123
view.cpp
2224
viewFactory.cpp

src/tools/fdsdump/src/aggregator/aggregator.cpp

Lines changed: 38 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,38 @@
2828
namespace fdsdump {
2929
namespace aggregator {
3030

31+
// Merge two hash tables containing records defined by view
32+
void merge_hash_tables(const View &view, HashTable &dst_table, HashTable &src_table)
33+
{
34+
uint8_t *dst_record = nullptr;
35+
for (uint8_t *src_record : src_table.items()) {
36+
bool found = dst_table.find_or_create(src_record, dst_record);
37+
if (found) {
38+
// If found - merge
39+
for (auto x : view.iter_values(dst_record, src_record)) {
40+
x.field.merge(x.value1, x.value2);
41+
}
42+
} else {
43+
// If not found - copy over
44+
// Key is already copied by find_or_create, so only value needs to be copied
45+
unsigned int key_size = view.key_size(src_record);
46+
unsigned int value_size = view.value_size();
47+
//TODO: Possible optimalization: Can we just move pointers instead of memcpy?
48+
std::memcpy(dst_record + key_size, src_record + key_size, value_size);
49+
}
50+
}
51+
}
52+
53+
// Sort records
54+
void sort_records(const View &view, std::vector<uint8_t *>& records)
55+
{
56+
if (view.order_fields().empty()) {
57+
return;
58+
}
59+
std::sort(records.begin(), records.end(),
60+
[&view](uint8_t *a, uint8_t *b) { return view.ordered_before(a, b); });
61+
}
62+
3163
Aggregator::Aggregator(const View &view) :
3264
m_table(view),
3365
m_key_buffer(65535),
@@ -104,45 +136,14 @@ Aggregator::aggregate(FlowContext &ctx)
104136
void
105137
Aggregator::sort_items()
106138
{
107-
if (!m_view.order_fields().empty()) {
108-
std::sort(items().begin(), items().end(),
109-
[this](uint8_t *a, uint8_t *b) { return m_view.ordered_before(a, b); });
110-
}
139+
sort_records(m_view, items());
111140
}
112141

113-
114-
// uint8_t *record;
115-
116-
// if (!m_table.find_or_create(&m_key_buffer[0], record)) {
117-
// init_values(*m_view_def.get(), record + m_view_def->keys_size);
118-
// }
119-
120-
// for (const auto &iter : m_view_def->iter_values(record + m_view_def->keys_size)) {
121-
// aggregate_value(iter.field, drec, &iter.value, direction, drec_find_flags);
122-
// }
123-
// }
124-
125-
// void
126-
// Aggregator::merge(Aggregator &other, unsigned int max_num_items)
127-
// {
128-
// unsigned int n = 0;
129-
// for (uint8_t *other_record : other.items()) {
130-
// if (max_num_items != 0 && n == max_num_items) {
131-
// break;
132-
// }
133-
134-
// uint8_t *record;
135-
136-
// if (!m_table.find_or_create(other_record, record)) {
137-
// //TODO: this copy is unnecessary, we could just take the already allocated record from the other table instead
138-
// memcpy(record, other_record, m_view_def->keys_size + m_view_def->values_size);
139-
// } else {
140-
// merge_records(*m_view_def.get(), record, other_record);
141-
// }
142-
143-
// n++;
144-
// }
145-
// }
142+
void
143+
Aggregator::merge(Aggregator &other)
144+
{
145+
merge_hash_tables(m_view, m_table, other.m_table);
146+
}
146147

147148
} // aggregator
148149
} // fdsdump

src/tools/fdsdump/src/aggregator/aggregator.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
namespace fdsdump {
2222
namespace aggregator {
2323

24+
void merge_hash_tables(const View &view, HashTable &dst_table, HashTable &src_table);
25+
26+
void sort_records(const View &view, std::vector<uint8_t *>& records);
27+
2428
/**
2529
* @brief A class performing aggregation of fds data records based on a view definition.
2630
*/
@@ -43,8 +47,8 @@ class Aggregator {
4347
* @brief Merge other aggregator into this one.
4448
* @param other The other aggregator
4549
*/
46-
// void
47-
// merge(Aggregator &other, unsigned int max_num_items = 0);
50+
void
51+
merge(Aggregator &other);
4852

4953
/**
5054
* @brief The underlying hash table.

src/tools/fdsdump/src/aggregator/mode.cpp

Lines changed: 54 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,59 +7,41 @@
77
* SPDX-License-Identifier: BSD-3-Clause
88
*/
99

10-
#include <common/flowProvider.hpp>
10+
#include <vector>
11+
1112
#include <aggregator/aggregator.hpp>
1213
#include <aggregator/mode.hpp>
14+
#include <aggregator/print.hpp>
1315
#include <aggregator/printer.hpp>
14-
#include <aggregator/view.hpp>
16+
#include <aggregator/threadedAggregator.hpp>
1517
#include <aggregator/viewFactory.hpp>
18+
#include <common/channel.hpp>
19+
#include <common/flowProvider.hpp>
1620

1721
#include <memory>
22+
#include <iomanip>
1823

1924
namespace fdsdump {
2025
namespace aggregator {
2126

22-
void
23-
mode_aggregate(const Options &opts)
27+
static void print_results(const Options &opts, std::vector<uint8_t *> items)
2428
{
2529
View view = ViewFactory::create_view(
2630
opts.get_aggregation_keys(),
2731
opts.get_aggregation_values(),
28-
opts.get_order_by());
32+
opts.get_order_by(),
33+
opts.get_output_limit());
2934

3035
std::unique_ptr<Printer> printer = printer_factory(
3136
view,
3237
opts.get_output_specifier());
33-
FlowProvider flows;
34-
Aggregator aggr(view);
3538

3639
const size_t rec_limit = opts.get_output_limit();
3740
size_t rec_printed = 0;
3841

39-
flows.set_biflow_autoignore(opts.get_biflow_autoignore());
40-
41-
if (!opts.get_input_filter().empty()) {
42-
flows.set_filter(opts.get_input_filter());
43-
}
44-
45-
for (const auto &it : glob_files(opts.get_input_file_patterns())) {
46-
flows.add_file(it);
47-
}
48-
49-
while (true) {
50-
Flow *flow = flows.next_record();
51-
52-
if (!flow) {
53-
break;
54-
}
55-
56-
aggr.process_record(*flow);
57-
}
58-
59-
aggr.sort_items();
6042
printer->print_prologue();
6143

62-
for (uint8_t *record : aggr.items()) {
44+
for (uint8_t *record : items) {
6345
if (rec_limit != 0 && rec_printed >= rec_limit) {
6446
break;
6547
}
@@ -71,5 +53,48 @@ mode_aggregate(const Options &opts)
7153
printer->print_epilogue();
7254
}
7355

56+
void
57+
mode_aggregate(const Options &opts)
58+
{
59+
Channel<ThreadedAggregator *> notify_channel;
60+
ThreadedAggregator aggregator(
61+
opts.get_aggregation_keys(),
62+
opts.get_aggregation_values(),
63+
opts.get_input_filter(),
64+
opts.get_input_file_patterns(),
65+
opts.get_order_by(),
66+
opts.get_num_threads(),
67+
opts.get_biflow_autoignore(),
68+
true,
69+
opts.get_output_limit(),
70+
notify_channel);
71+
aggregator.start();
72+
73+
while (true) {
74+
notify_channel.get(std::chrono::milliseconds(1000));
75+
auto state = aggregator.get_aggregator_state();
76+
77+
auto state_str = aggregator_state_to_str(state);
78+
if (state == AggregatorState::aggregating) {
79+
auto percent = (double(aggregator.get_processed_flows()) / double(aggregator.get_total_flows())) * 100.0;
80+
LOG_INFO << "Status: " << state_str << " (" << std::fixed << std::setprecision(2) << percent << "%)";
81+
} else {
82+
LOG_INFO << "Status: " << state_str;
83+
}
84+
85+
if (state == AggregatorState::errored) {
86+
std::rethrow_exception(aggregator.get_exception());
87+
}
88+
89+
if (state == AggregatorState::finished) {
90+
break;
91+
}
92+
}
93+
94+
aggregator.join();
95+
96+
print_results(opts, aggregator.get_results());
97+
}
98+
7499
} // aggregator
75100
} // fdsdump

0 commit comments

Comments
 (0)