Skip to content

Commit 6ca3afb

Browse files
committed
fdsdump: add threaded aggregator component
1 parent 3dd2954 commit 6ca3afb

File tree

4 files changed

+503
-0
lines changed

4 files changed

+503
-0
lines changed

src/tools/fdsdump/src/aggregator/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ set(AGGREGATOR_SRC
1717
stdAllocator.cpp
1818
stdHashTable.cpp
1919
tablePrinter.cpp
20+
threadedAggregator.cpp
2021
thresholdAlgorithm.cpp
2122
value.cpp
2223
view.cpp

src/tools/fdsdump/src/aggregator/mode.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <aggregator/viewFactory.hpp>
1616

1717
#include <memory>
18+
#include <iomanip>
1819

1920
namespace fdsdump {
2021
namespace aggregator {
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/**
2+
* @file
3+
* @author Michal Sedlak <[email protected]>
4+
* @brief Multi-threaded runner
5+
*
6+
* Copyright: (C) 2024 CESNET, z.s.p.o.
7+
* SPDX-License-Identifier: BSD-3-Clause
8+
*/
9+
10+
#include <aggregator/threadedAggregator.hpp>
11+
12+
#include <aggregator/print.hpp>
13+
#include <aggregator/viewFactory.hpp>
14+
#include <common/common.hpp>
15+
#include <common/ieMgr.hpp>
16+
17+
#include <algorithm>
18+
19+
namespace fdsdump {
20+
namespace aggregator {
21+
22+
ThreadedAggregator::ThreadedAggregator(
23+
const std::string &aggregation_keys,
24+
const std::string &aggregation_values,
25+
const std::string &input_filter,
26+
const std::vector<std::string> &input_file_patterns,
27+
const std::string &order_by,
28+
unsigned int num_threads,
29+
bool biflow_autoignore,
30+
bool merge_results,
31+
unsigned int merge_topk,
32+
Channel<ThreadedAggregator *> &notify_channel
33+
) :
34+
m_num_threads(num_threads),
35+
m_input_filter(input_filter),
36+
m_order_by(order_by),
37+
m_biflow_autoignore(biflow_autoignore),
38+
m_merge_results(merge_results),
39+
m_merge_topk(merge_topk),
40+
m_aggregators(num_threads),
41+
m_threadinfo(num_threads),
42+
m_notify_channel(notify_channel)
43+
{
44+
m_view = ViewFactory::create_unique_view(aggregation_keys, aggregation_values, order_by, 0);
45+
46+
for (const auto& pattern : input_file_patterns) {
47+
for (const auto& file : glob_files(pattern)) {
48+
FlowProvider provider;
49+
provider.add_file(file);
50+
m_total_flows += provider.get_total_flow_count();
51+
m_total_files++;
52+
m_files.push(file);
53+
}
54+
}
55+
}
56+
57+
void ThreadedAggregator::start()
58+
{
59+
m_main_thread = std::thread([this]() {
60+
run();
61+
});
62+
}
63+
64+
void ThreadedAggregator::run()
65+
{
66+
m_aggregator_state = AggregatorState::started;
67+
68+
m_threads.clear();
69+
for (unsigned int i = 0; i < m_num_threads; i++) {
70+
std::thread thread([this, i]() {
71+
thread_worker(i);
72+
});
73+
m_threads.push_back(std::move(thread));
74+
}
75+
m_aggregator_state = AggregatorState::aggregating;
76+
77+
while (true) {
78+
ThreadInfo *info = nullptr;
79+
m_worker_notify_channel >> info;
80+
81+
if (info->state == AggregatorState::errored) {
82+
m_aggregator_state = AggregatorState::errored;
83+
m_exception = info->exception;
84+
m_notify_channel << this;
85+
for (auto &info : m_threadinfo) {
86+
// Cancel all worker threads
87+
info.cancelled = true;
88+
}
89+
return;
90+
91+
} else if (info->state == AggregatorState::sorting) {
92+
bool all_atleast_sorting = std::all_of(m_threadinfo.begin(), m_threadinfo.end(), [](const ThreadInfo &info)
93+
{ return info.state == AggregatorState::sorting || info.state == AggregatorState::finished; });
94+
if (all_atleast_sorting) {
95+
m_aggregator_state = AggregatorState::sorting;
96+
}
97+
} else if (info->state == AggregatorState::finished) {
98+
bool all_finished = std::all_of(m_threadinfo.begin(), m_threadinfo.end(), [](const ThreadInfo &info)
99+
{ return info.state == AggregatorState::finished; });
100+
if (all_finished) {
101+
break;
102+
}
103+
}
104+
}
105+
106+
if (m_merge_results) {
107+
if (m_merge_topk == 0) {
108+
perform_all_merge();
109+
} else {
110+
perform_topk_merge();
111+
}
112+
}
113+
114+
m_aggregator_state = AggregatorState::finished;
115+
m_notify_channel << this;
116+
117+
// Wait for the worker threads to gracefully finish
118+
int i = 0;
119+
for (auto &thread : m_threads) {
120+
thread.join();
121+
i++;
122+
LOG_DEBUG << "Waiting for worker thread to finish (" << i << "/" << m_threads.size() << ")";
123+
}
124+
}
125+
126+
std::vector<uint8_t *>& ThreadedAggregator::get_results()
127+
{
128+
assert(m_merge_results);
129+
return *m_items;
130+
}
131+
132+
void ThreadedAggregator::thread_worker(unsigned int thread_id)
133+
{
134+
ThreadInfo &info = m_threadinfo[thread_id];
135+
136+
try {
137+
const View& view = *m_view.get();
138+
139+
FlowProvider flows;
140+
flows.set_biflow_autoignore(m_biflow_autoignore);
141+
if (!m_input_filter.empty()) {
142+
flows.set_filter(m_input_filter);
143+
}
144+
145+
m_aggregators[thread_id] = std::unique_ptr<Aggregator>(new Aggregator(view));
146+
Aggregator &aggregator = *m_aggregators[thread_id].get();
147+
148+
info.state = AggregatorState::aggregating;
149+
m_worker_notify_channel << &info;
150+
151+
while (true) {
152+
Flow *flow = flows.next_record();
153+
154+
if (info.cancelled.load(std::memory_order_relaxed)) { // Doesn't matter if we process few extra values
155+
info.state = AggregatorState::finished;
156+
m_worker_notify_channel << &info;
157+
return;
158+
}
159+
160+
if (!flow) {
161+
std::lock_guard<std::mutex> guard(m_files_mutex);
162+
if (m_files.empty()) {
163+
break;
164+
}
165+
flows.add_file(m_files.front());
166+
m_files.pop();
167+
info.processed_files.fetch_add(1, std::memory_order_relaxed);
168+
continue;
169+
}
170+
171+
info.processed_flows.fetch_add(1, std::memory_order_relaxed);
172+
aggregator.process_record(*flow);
173+
}
174+
175+
176+
// Figure out whether the per-thread tables should be sorted, which is if
177+
// there is something to sort by, and we are not merging or we are merging by top K
178+
if (!m_order_by.empty() && (!m_merge_results || m_merge_topk > 0)) {
179+
// Check for cancellation first
180+
if (info.cancelled) {
181+
info.state = AggregatorState::finished;
182+
m_worker_notify_channel << &info;
183+
return;
184+
}
185+
186+
info.state = AggregatorState::sorting;
187+
m_worker_notify_channel << &info;
188+
aggregator.sort_items();
189+
}
190+
191+
info.state = AggregatorState::finished;
192+
m_worker_notify_channel << &info;
193+
194+
} catch (...) {
195+
info.state = AggregatorState::errored;
196+
info.exception = std::current_exception();
197+
m_worker_notify_channel << &info;
198+
}
199+
}
200+
201+
void
202+
ThreadedAggregator::perform_all_merge()
203+
{
204+
assert(m_merge_results);
205+
assert(m_merge_topk == 0);
206+
207+
m_aggregator_state = AggregatorState::merging;
208+
Aggregator& main = *m_aggregators[0].get();
209+
for (size_t i = 1; i < m_aggregators.size(); i++) {
210+
Aggregator& other = *m_aggregators[i].get();
211+
main.merge(other);
212+
}
213+
214+
m_aggregator_state = AggregatorState::sorting;
215+
main.sort_items();
216+
m_items = &main.items();
217+
}
218+
219+
void
220+
ThreadedAggregator::perform_topk_merge()
221+
{
222+
assert(m_merge_results);
223+
assert(m_merge_topk > 0);
224+
225+
m_aggregator_state = AggregatorState::merging;
226+
auto tables = get_tables();
227+
m_threshold_algorithm.reset(new ThresholdAlgorithm(
228+
tables,
229+
*m_view.get(),
230+
m_merge_topk));
231+
while (!m_threshold_algorithm->check_finish_condition() && !m_threshold_algorithm->out_of_items()) {
232+
m_threshold_algorithm->process_row();
233+
}
234+
235+
m_aggregator_state = AggregatorState::sorting;
236+
auto &items = m_threshold_algorithm->m_result_table->items();
237+
sort_records(*m_view.get(), items);
238+
m_items = &items;
239+
}
240+
241+
uint64_t ThreadedAggregator::get_processed_flows() const
242+
{
243+
uint64_t processed_flows = 0;
244+
for (const auto& info : m_threadinfo) {
245+
processed_flows += info.processed_flows;
246+
}
247+
return processed_flows;
248+
}
249+
250+
uint64_t ThreadedAggregator::get_processed_files() const
251+
{
252+
uint64_t processed_files = 0;
253+
for (const auto& info : m_threadinfo) {
254+
processed_files += info.processed_files;
255+
}
256+
return processed_files;
257+
}
258+
259+
uint64_t ThreadedAggregator::get_total_flows() const
260+
{
261+
return m_total_flows;
262+
}
263+
264+
uint64_t ThreadedAggregator::get_total_files() const
265+
{
266+
return m_total_files;
267+
}
268+
269+
AggregatorState ThreadedAggregator::get_aggregator_state() const
270+
{
271+
return m_aggregator_state;
272+
}
273+
274+
std::vector<HashTable *> ThreadedAggregator::get_tables()
275+
{
276+
std::vector<HashTable *> tables;
277+
for (auto &aggregator : m_aggregators) {
278+
tables.emplace_back(&aggregator->m_table);
279+
}
280+
return tables;
281+
}
282+
283+
void ThreadedAggregator::cancel()
284+
{
285+
for (auto &info : m_threadinfo) {
286+
// Cancel all worker threads
287+
info.cancelled = true;
288+
}
289+
}
290+
291+
void ThreadedAggregator::join()
292+
{
293+
m_main_thread.join();
294+
}
295+
296+
const char *aggregator_state_to_str(AggregatorState aggregator_state)
297+
{
298+
switch (aggregator_state) {
299+
case AggregatorState::none:
300+
return "none";
301+
case AggregatorState::errored:
302+
return "errored";
303+
case AggregatorState::started:
304+
return "started";
305+
case AggregatorState::aggregating:
306+
return "aggregating";
307+
case AggregatorState::sorting:
308+
return "sorting";
309+
case AggregatorState::merging:
310+
return "merging";
311+
case AggregatorState::finished:
312+
return "finished";
313+
}
314+
315+
return "<invalid>";
316+
}
317+
318+
} // aggregator
319+
} // fdsdump

0 commit comments

Comments
 (0)