Skip to content

Commit dcdb681

Browse files
committed
Handle erros in threads correctly
Generalization processing can run in multiple threads. If a thread throws an error that error was not caught correctly. The new code catches the error and reports it and sets a flag that the main thread knows an error happened, so it can terminate the program correctly.
1 parent c282643 commit dcdb681

File tree

1 file changed

+42
-26
lines changed

1 file changed

+42
-26
lines changed

src/gen/osm2pgsql-gen.cpp

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
#include <algorithm>
4848
#include <array>
49+
#include <atomic>
4950
#include <cassert>
5051
#include <chrono>
5152
#include <cstdint>
@@ -195,39 +196,49 @@ class tile_processor_t
195196
std::size_t m_num_tiles;
196197
};
197198

198-
void run_tile_gen(connection_params_t const &connection_params,
199+
void run_tile_gen(std::atomic_flag *error_flag,
200+
connection_params_t const &connection_params,
199201
gen_base_t *master_generalizer, params_t params,
200202
uint32_t zoom,
201203
std::vector<std::pair<uint32_t, uint32_t>> *queue,
202204
std::mutex *mut, unsigned int n)
203205
{
204-
logger::init_thread(n);
206+
try {
207+
logger::init_thread(n);
205208

206-
log_debug("Started generalizer thread for '{}'.",
207-
master_generalizer->strategy());
208-
pg_conn_t db_connection{connection_params, "gen.tile"};
209-
std::string const strategy{master_generalizer->strategy()};
210-
auto generalizer = create_generalizer(
211-
strategy, &db_connection, master_generalizer->append_mode(), &params);
209+
log_debug("Started generalizer thread for '{}'.",
210+
master_generalizer->strategy());
211+
pg_conn_t db_connection{connection_params, "gen.tile"};
212+
std::string const strategy{master_generalizer->strategy()};
213+
auto generalizer =
214+
create_generalizer(strategy, &db_connection,
215+
master_generalizer->append_mode(), &params);
212216

213-
while (true) {
214-
std::pair<uint32_t, uint32_t> p;
215-
{
216-
std::lock_guard<std::mutex> const guard{*mut};
217-
if (queue->empty()) {
218-
master_generalizer->merge_timers(*generalizer);
219-
break;
217+
while (true) {
218+
std::pair<uint32_t, uint32_t> p;
219+
{
220+
std::lock_guard<std::mutex> const guard{*mut};
221+
if (queue->empty()) {
222+
master_generalizer->merge_timers(*generalizer);
223+
break;
224+
}
225+
p = queue->back();
226+
queue->pop_back();
220227
}
221-
p = queue->back();
222-
queue->pop_back();
223-
}
224228

225-
tile_t const tile{zoom, p.first, p.second};
226-
log_debug("Processing tile {}/{}/{}...", tile.zoom(), tile.x(),
227-
tile.y());
228-
generalizer->process(tile);
229+
tile_t const tile{zoom, p.first, p.second};
230+
log_debug("Processing tile {}/{}/{}...", tile.zoom(), tile.x(),
231+
tile.y());
232+
generalizer->process(tile);
233+
}
234+
log_debug("Shutting down generalizer thread.");
235+
} catch (std::exception const &e) {
236+
log_error("{}", e.what());
237+
error_flag->test_and_set();
238+
} catch (...) {
239+
log_error("Unknown exception in generalizer thread.");
240+
error_flag->test_and_set();
229241
}
230-
log_debug("Shutting down generalizer thread.");
231242
}
232243

233244
class genproc_t
@@ -490,16 +501,21 @@ class genproc_t
490501
log_debug("Running in multi-threaded mode.");
491502
std::mutex mut;
492503
std::vector<std::thread> threads;
504+
std::atomic_flag error_flag = ATOMIC_FLAG_INIT;
493505
for (unsigned int n = 1;
494506
n <= std::min(m_jobs, static_cast<uint32_t>(tile_list.size()));
495507
++n) {
496-
threads.emplace_back(run_tile_gen, m_connection_params,
497-
generalizer, params, zoom, &tile_list,
498-
&mut, n);
508+
threads.emplace_back(run_tile_gen, &error_flag,
509+
m_connection_params, generalizer, params,
510+
zoom, &tile_list, &mut, n);
499511
}
500512
for (auto &t : threads) {
501513
t.join();
502514
}
515+
if (error_flag.test_and_set()) {
516+
throw std::runtime_error{
517+
"Error in generalizer thread. Stopping."};
518+
}
503519
}
504520
}
505521

0 commit comments

Comments
 (0)