Skip to content

Commit ecbabea

Browse files
committed
Merge branch 'fdsdump-fixes' into devel
2 parents f4c2123 + 4bd43d7 commit ecbabea

File tree

10 files changed

+176
-931
lines changed

10 files changed

+176
-931
lines changed

src/tools/fdsdump/src/CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ add_executable(fdsdump
1010
main.cpp
1111
options.cpp
1212
$<TARGET_OBJECTS:common_obj>
13-
$<TARGET_OBJECTS:lister_obj>
13+
$<TARGET_OBJECTS:lister_obj>
1414
$<TARGET_OBJECTS:aggregator_obj>
1515
$<TARGET_OBJECTS:statistics_obj>
1616
)
@@ -21,7 +21,8 @@ target_include_directories(fdsdump
2121

2222
target_link_libraries(fdsdump
2323
PUBLIC
24-
${FDS_LIBRARIES})
24+
${FDS_LIBRARIES}
25+
Threads::Threads)
2526

2627
# Installation targets
2728
install(

src/tools/fdsdump/src/aggregator/aggregator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,13 @@ Aggregator::aggregate(FlowContext &ctx)
136136
void
137137
Aggregator::sort_items()
138138
{
139-
sort_records(m_view, items());
139+
sort_records(m_view, items());
140140
}
141141

142142
void
143143
Aggregator::merge(Aggregator &other)
144144
{
145-
merge_hash_tables(m_view, m_table, other.m_table);
145+
merge_hash_tables(m_view, m_table, other.m_table);
146146
}
147147

148148
} // aggregator

src/tools/fdsdump/src/aggregator/inOutField.hpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,17 @@ class InOutValueField : public Field {
9090
void
9191
init(Value &value) const override;
9292

93-
/**
94-
* @brief Aggregate the value retrieved from the provided flow record into the provided
95-
* aggregated value
96-
*
97-
* @param ctx The flow record to retrieve the value that will be aggregated from
98-
* @param aggregated_value The currently accumulated value that the retrieved value will be
99-
* aggregated towards
100-
*
101-
* @return true if the field was found in the flow record and the aggregated value was updated,
102-
* false otherwise
103-
*/
93+
/**
94+
* @brief Aggregate the value retrieved from the provided flow record into the provided
95+
* aggregated value
96+
*
97+
* @param ctx The flow record to retrieve the value that will be aggregated from
98+
* @param aggregated_value The currently accumulated value that the retrieved value will be
99+
* aggregated towards
100+
*
101+
* @return true if the field was found in the flow record and the aggregated value was updated,
102+
* false otherwise
103+
*/
104104
bool
105105
aggregate(FlowContext &ctx, Value &aggregated_value) const override;
106106

src/tools/fdsdump/src/aggregator/mode.cpp

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,23 @@ static void print_results(const Options &opts, std::vector<uint8_t *> items)
5656
void
5757
mode_aggregate(const Options &opts)
5858
{
59-
Channel<ThreadedAggregator *> notify_channel;
60-
ThreadedAggregator aggregator(
61-
opts.get_aggregation_keys(),
62-
opts.get_aggregation_values(),
63-
opts.get_input_filter(),
64-
opts.get_input_file_patterns(),
65-
opts.get_order_by(),
66-
opts.get_num_threads(),
67-
opts.get_biflow_autoignore(),
68-
true,
69-
opts.get_output_limit(),
70-
notify_channel);
71-
aggregator.start();
72-
73-
while (true) {
74-
notify_channel.get(std::chrono::milliseconds(1000));
75-
auto state = aggregator.get_aggregator_state();
59+
Channel<ThreadedAggregator *> notify_channel;
60+
ThreadedAggregator aggregator(
61+
opts.get_aggregation_keys(),
62+
opts.get_aggregation_values(),
63+
opts.get_input_filter(),
64+
opts.get_input_file_patterns(),
65+
opts.get_order_by(),
66+
opts.get_num_threads(),
67+
opts.get_biflow_autoignore(),
68+
true,
69+
opts.get_output_limit(),
70+
notify_channel);
71+
aggregator.start();
72+
73+
while (true) {
74+
notify_channel.get(std::chrono::milliseconds(1000));
75+
auto state = aggregator.get_aggregator_state();
7676

7777
auto state_str = aggregator_state_to_str(state);
7878
if (state == AggregatorState::aggregating) {
@@ -82,18 +82,18 @@ mode_aggregate(const Options &opts)
8282
LOG_INFO << "Status: " << state_str;
8383
}
8484

85-
if (state == AggregatorState::errored) {
86-
std::rethrow_exception(aggregator.get_exception());
87-
}
85+
if (state == AggregatorState::errored) {
86+
std::rethrow_exception(aggregator.get_exception());
87+
}
8888

89-
if (state == AggregatorState::finished) {
90-
break;
91-
}
92-
}
89+
if (state == AggregatorState::finished) {
90+
break;
91+
}
92+
}
9393

94-
aggregator.join();
94+
aggregator.join();
9595

96-
print_results(opts, aggregator.get_results());
96+
print_results(opts, aggregator.get_results());
9797
}
9898

9999
} // aggregator

src/tools/fdsdump/src/aggregator/threadedAggregator.hpp

Lines changed: 62 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,31 @@ const char *aggregator_state_to_str(AggregatorState aggregator_state);
4343
class ThreadedAggregator {
4444
DISABLE_COPY_AND_MOVE(ThreadedAggregator)
4545

46-
struct ThreadInfo {
47-
std::atomic<uint64_t> processed_files{0};
48-
std::atomic<uint64_t> processed_flows{0};
49-
std::atomic<AggregatorState> state{AggregatorState::none};
50-
std::exception_ptr exception = nullptr;
51-
std::atomic<bool> cancelled{false};
52-
};
46+
struct ThreadInfo {
47+
std::atomic<uint64_t> processed_files{0};
48+
std::atomic<uint64_t> processed_flows{0};
49+
std::atomic<AggregatorState> state{AggregatorState::none};
50+
std::exception_ptr exception = nullptr;
51+
std::atomic<bool> cancelled{false};
52+
};
5353

5454
public:
55-
/**
56-
* @brief Create a new threaded aggregator instance
57-
*
58-
* @param aggregation_keys The aggregation keys
59-
* @param aggregation_values The aggregation values
60-
* @param input_filter The input record filter
61-
* @param input_file_patterns The input file patterns (paths w/ glob support)
62-
* @param order_by The fields to order by
63-
* @param num_threads The number of threads
64-
* @param biflow_autoignore Whether the biflow autoignore option should be applied
65-
* @param merge_results Merge output results or not
66-
* @param merge_topk If merge results is enabled, how many top K values to obtain (0 = all
67-
* values, i.e. no top K is performed and everything is merged)
68-
* @param notify_channel The channel to send state notifications through
69-
*/
70-
ThreadedAggregator(
55+
/**
56+
* @brief Create a new threaded aggregator instance
57+
*
58+
* @param aggregation_keys The aggregation keys
59+
* @param aggregation_values The aggregation values
60+
* @param input_filter The input record filter
61+
* @param input_file_patterns The input file patterns (paths w/ glob support)
62+
* @param order_by The fields to order by
63+
* @param num_threads The number of threads
64+
* @param biflow_autoignore Whether the biflow autoignore option should be applied
65+
* @param merge_results Merge output results or not
66+
* @param merge_topk If merge results is enabled, how many top K values to obtain (0 = all
67+
* values, i.e. no top K is performed and everything is merged)
68+
* @param notify_channel The channel to send state notifications through
69+
*/
70+
ThreadedAggregator(
7171
const std::string& aggregation_keys,
7272
const std::string& aggregation_values,
7373
const std::string& input_filter,
@@ -76,64 +76,64 @@ class ThreadedAggregator {
7676
unsigned int num_threads,
7777
bool biflow_autoignore,
7878
bool merge_results,
79-
unsigned int merge_topk,
79+
unsigned int merge_topk,
8080
Channel<ThreadedAggregator *> &notify_channel
8181
);
8282

83-
/**
84-
* @brief Start the aggregation. This is a non-blocking operation that starts the threads that
85-
* will perform the aggregation.
86-
*/
87-
void start();
83+
/**
84+
* @brief Start the aggregation. This is a non-blocking operation that starts the threads that
85+
* will perform the aggregation.
86+
*/
87+
void start();
8888

89-
/**
90-
* @brief Get the exception that occured if the aggregation state is _errored_
91-
*/
89+
/**
90+
* @brief Get the exception that occured if the aggregation state is _errored_
91+
*/
9292
std::exception_ptr get_exception() { return m_exception; }
9393

94-
/**
95-
* @brief Get the final results if merging was enabled
96-
*/
94+
/**
95+
* @brief Get the final results if merging was enabled
96+
*/
9797
std::vector<uint8_t *>& get_results();
9898

99-
/**
100-
* @brief Get the number of processed flows (for progress reporting)
101-
*/
99+
/**
100+
* @brief Get the number of processed flows (for progress reporting)
101+
*/
102102
uint64_t get_processed_flows() const;
103103

104-
/**
105-
* @brief Get the number of processed files (for progress reporting)
106-
*/
104+
/**
105+
* @brief Get the number of processed files (for progress reporting)
106+
*/
107107
uint64_t get_processed_files() const;
108108

109-
/**
110-
* @brief Get the total number of flows that will be processed (for progress reporting)
111-
*/
109+
/**
110+
* @brief Get the total number of flows that will be processed (for progress reporting)
111+
*/
112112
uint64_t get_total_flows() const;
113113

114-
/**
115-
* @brief Get the total number of files that will be processed (for progress reporting)
116-
*/
114+
/**
115+
* @brief Get the total number of files that will be processed (for progress reporting)
116+
*/
117117
uint64_t get_total_files() const;
118118

119-
/**
120-
* @brief Get the current aggregator state
121-
*/
119+
/**
120+
* @brief Get the current aggregator state
121+
*/
122122
AggregatorState get_aggregator_state() const;
123123

124-
/**
125-
* @brief Get the per-thread tables if merging was disabled
126-
*/
124+
/**
125+
* @brief Get the per-thread tables if merging was disabled
126+
*/
127127
std::vector<HashTable *> get_tables();
128128

129-
/**
130-
* @brief Cancel the aggregation
131-
*/
129+
/**
130+
* @brief Cancel the aggregation
131+
*/
132132
void cancel();
133133

134-
/**
135-
* @brief Block until the execution of all aggregator threads finishes
136-
*/
134+
/**
135+
* @brief Block until the execution of all aggregator threads finishes
136+
*/
137137
void join();
138138

139139
private:
@@ -144,7 +144,7 @@ class ThreadedAggregator {
144144
std::string m_order_by;
145145
bool m_biflow_autoignore;
146146
bool m_merge_results;
147-
unsigned int m_merge_topk;
147+
unsigned int m_merge_topk;
148148

149149
std::thread m_main_thread;
150150

@@ -167,15 +167,15 @@ class ThreadedAggregator {
167167

168168
std::exception_ptr m_exception = nullptr;
169169

170-
std::unique_ptr<ThresholdAlgorithm> m_threshold_algorithm;
170+
std::unique_ptr<ThresholdAlgorithm> m_threshold_algorithm;
171171

172172
void thread_worker(unsigned int thread_id);
173173

174174
void run();
175175

176176
void perform_all_merge();
177177

178-
void perform_topk_merge();
178+
void perform_topk_merge();
179179
};
180180

181181
} // aggregator

0 commit comments

Comments
 (0)