Skip to content

Commit af6610e

Browse files
authored
Merge pull request #624 from astrorama/fix/helgrind
Fix multithreading problems detected by helgrind
2 parents 97528f8 + e9c7541 commit af6610e

File tree

4 files changed

+33
-30
lines changed

4 files changed

+33
-30
lines changed

SEFramework/SEFramework/Output/OutputRegistry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class OutputRegistry {
169169
return converter(source.getProperty<PropertyType>(i));
170170
};
171171
}
172-
Euclid::Table::Row::cell_type operator()(const SourceInterface& source) {
172+
Euclid::Table::Row::cell_type operator()(const SourceInterface& source) const {
173173
return m_convert_func(source, index);
174174
}
175175
std::size_t index = 0;

SEFramework/src/lib/Output/OutputRegistry.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,26 @@ auto OutputRegistry::getSourceToRowConverter(const std::vector<std::string>& ena
4949
return [this, out_prop_list](const SourceInterface& source) {
5050
std::vector<ColumnInfo::info_type> info_list {};
5151
std::vector<Row::cell_type> cell_values {};
52+
53+
const auto& property_to_names_map = m_property_to_names_map;
54+
const auto& name_to_col_info_map = m_name_to_col_info_map;
55+
const auto& name_to_converter_map = m_name_to_converter_map;
56+
5257
for (const auto& property : out_prop_list) {
53-
if (m_property_to_names_map.count(property) == 0) {
58+
if (property_to_names_map.count(property) == 0) {
5459
throw Elements::Exception() << "Missing column generator for " << property.name();
5560
}
56-
for (const auto& name : m_property_to_names_map.at(property)) {
57-
auto& col_info = m_name_to_col_info_map.at(name);
58-
info_list.emplace_back(name, m_name_to_converter_map.at(name).first,
59-
col_info.unit, col_info.description);
60-
cell_values.emplace_back(m_name_to_converter_map.at(name).second(source));
61+
for (const auto& name : property_to_names_map.at(property)) {
62+
auto& col_info = name_to_col_info_map.at(name);
63+
info_list.push_back(ColumnInfo::info_type {name, name_to_converter_map.at(name).first,
64+
col_info.unit, col_info.description});
65+
cell_values.push_back(name_to_converter_map.at(name).second(source));
6166
}
6267
}
6368
if (info_list.empty()) {
6469
throw Elements::Exception() << "The given configuration would not generate any output";
6570
}
66-
return Row {std::move(cell_values), std::make_shared<ColumnInfo>(move(info_list))};
71+
return Row {std::move(cell_values), std::make_shared<ColumnInfo>(std::move(info_list))};
6772
};
6873
}
6974

SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ class MultithreadedMeasurement : public Measurement {
4040

4141
using SourceToRowConverter = std::function<Euclid::Table::Row(const SourceInterface&)>;
4242
MultithreadedMeasurement(SourceToRowConverter source_to_row, const std::shared_ptr<Euclid::ThreadPool>& thread_pool,
43-
unsigned max_queue_size)
43+
unsigned /*max_queue_size*/)
4444
: m_source_to_row(source_to_row),
4545
m_thread_pool(thread_pool),
4646
m_group_counter(0),
47-
m_input_done(false), m_abort_raised(false), m_semaphore(max_queue_size) {}
47+
m_input_done(false), m_abort_raised(false) {}
4848

4949
~MultithreadedMeasurement() override;
5050

@@ -66,10 +66,8 @@ class MultithreadedMeasurement : public Measurement {
6666
int m_group_counter;
6767
std::atomic_bool m_input_done, m_abort_raised;
6868

69-
std::condition_variable m_new_output;
7069
std::list<std::pair<int, std::unique_ptr<SourceGroupInterface>>> m_output_queue;
71-
std::mutex m_output_queue_mutex;
72-
Euclid::Semaphore m_semaphore;
70+
mutable std::mutex m_output_queue_mutex;
7371
};
7472

7573
}

SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static Elements::Logging logger = Elements::Logging::getLogger("Multithreading")
3434

3535

3636
MultithreadedMeasurement::~MultithreadedMeasurement() {
37-
if (m_output_thread->joinable()) {
37+
if (m_output_thread && m_output_thread->joinable()) {
3838
m_output_thread->join();
3939
}
4040
}
@@ -46,7 +46,9 @@ void MultithreadedMeasurement::startThreads() {
4646
void MultithreadedMeasurement::stopThreads() {
4747
m_input_done = true;
4848
m_thread_pool->block();
49-
m_output_thread->join();
49+
if (m_output_thread && m_output_thread->joinable()) {
50+
m_output_thread->join();
51+
}
5052
logger.debug() << "All worker threads done!";
5153
}
5254

@@ -81,22 +83,23 @@ void MultithreadedMeasurement::receiveSource(std::unique_ptr<SourceGroupInterfac
8183

8284
// Put the new SourceGroup into the input queue
8385
auto order_number = m_group_counter;
86+
8487
auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
8588
// Trigger measurements
8689
for (auto& source : *source_group) {
8790
m_source_to_row(source);
8891
}
8992
// Pass to the output thread
9093
{
91-
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
94+
std::lock_guard<std::mutex> output_lock(m_output_queue_mutex);
9295
m_output_queue.emplace_back(order_number, std::move(source_group));
9396
}
94-
m_new_output.notify_one();
9597
};
9698
auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
9799
(*lambda)();
98100
};
99101
m_thread_pool->submit(lambda_copyable);
102+
100103
++m_group_counter;
101104
}
102105

@@ -118,23 +121,20 @@ void MultithreadedMeasurement::outputThreadStatic(MultithreadedMeasurement *meas
118121

119122
void MultithreadedMeasurement::outputThreadLoop() {
120123
while (m_thread_pool->activeThreads() > 0) {
121-
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
122-
123-
// Wait for something in the output queue
124-
if (m_output_queue.empty()) {
125-
m_new_output.wait_for(output_lock, std::chrono::milliseconds(100));
126-
}
124+
{
125+
std::lock_guard<std::mutex> output_lock(m_output_queue_mutex);
127126

128-
// Process the output queue
129-
while (!m_output_queue.empty()) {
130-
sendSource(std::move(m_output_queue.front().second));
131-
m_output_queue.pop_front();
132-
}
127+
while (!m_output_queue.empty()) {
128+
sendSource(std::move(m_output_queue.front().second));
129+
m_output_queue.pop_front();
130+
}
133131

134-
if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
132+
if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
135133
m_output_queue.empty()) {
136-
break;
134+
break;
135+
}
137136
}
137+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
138138
}
139139
}
140140

0 commit comments

Comments
 (0)