@@ -243,29 +243,22 @@ struct pending_threaded_processor : public pending_processor
243243
244244 void enqueue_relation (osmid_t id) override { queue.emplace (id); }
245245
246- // waits for the completion of all outstanding jobs
247- void process_ways () override
246+ template < typename FUNCTION>
247+ void process_queue (FUNCTION &&function)
248248 {
249- auto const ids_queued = queue.size ();
250-
251- fmt::print (stderr, " \n Going over pending ways...\n " );
252- fmt::print (stderr, " \t {} ways are pending\n " , ids_queued);
253- fmt::print (stderr, " \n Using {} helper-processes\n " , m_clones.size ());
254- util::timer_t timer;
255-
256- // make the threads and start them
257249 std::vector<std::future<void >> workers;
250+
258251 for (auto const &clone : m_clones) {
259- workers.push_back (std::async (std::launch::async, do_ways,
260- std::cref (clone) , std::ref (queue ),
261- std::ref (mutex)));
252+ workers.push_back (
253+ std::async (std::launch::async , std::forward<FUNCTION>(function ),
254+ std::cref (clone), std::ref (queue), std::ref (mutex)));
262255 }
263256 workers.push_back (std::async (std::launch::async, print_stats,
264257 std::ref (queue), std::ref (mutex)));
265258
266- for (auto &w : workers) {
259+ for (auto &worker : workers) {
267260 try {
268- w .get ();
261+ worker .get ();
269262 } catch (...) {
270263 // drain the queue, so that the other workers finish
271264 mutex.lock ();
@@ -276,8 +269,20 @@ struct pending_threaded_processor : public pending_processor
276269 throw ;
277270 }
278271 }
272+ }
279273
274+ void process_ways () override
275+ {
276+ auto const ids_queued = queue.size ();
277+
278+ fmt::print (stderr, " \n Going over pending ways...\n " );
279+ fmt::print (stderr, " \t {} ways are pending\n " , ids_queued);
280+ fmt::print (stderr, " \n Using {} helper-processes\n " , m_clones.size ());
281+
282+ util::timer_t timer;
283+ process_queue (do_ways);
280284 timer.stop ();
285+
281286 fmt::print (stderr, " \r Finished processing {} ways in {} s\n\n " ,
282287 ids_queued, timer.elapsed ());
283288 if (timer.elapsed () > 0 ) {
@@ -300,33 +305,11 @@ struct pending_threaded_processor : public pending_processor
300305 fmt::print (stderr, " \n Going over pending relations...\n " );
301306 fmt::print (stderr, " \t {} relations are pending\n " , ids_queued);
302307 fmt::print (stderr, " \n Using {} helper-processes\n " , m_clones.size ());
303- util::timer_t timer;
304-
305- // make the threads and start them
306- std::vector<std::future<void >> workers;
307- for (auto const &clone : m_clones) {
308- workers.push_back (std::async (std::launch::async, do_rels,
309- std::cref (clone), std::ref (queue),
310- std::ref (mutex)));
311- }
312- workers.push_back (std::async (std::launch::async, print_stats,
313- std::ref (queue), std::ref (mutex)));
314-
315- for (auto &w : workers) {
316- try {
317- w.get ();
318- } catch (...) {
319- // drain the queue, so the other worker finish immediately
320- mutex.lock ();
321- while (!queue.empty ()) {
322- queue.pop ();
323- }
324- mutex.unlock ();
325- throw ;
326- }
327- }
328308
309+ util::timer_t timer;
310+ process_queue (do_rels);
329311 timer.stop ();
312+
330313 fmt::print (stderr, " \r Finished processing {} relations in {} s\n\n " ,
331314 ids_queued, timer.elapsed ());
332315 if (timer.elapsed () > 0 ) {
0 commit comments