@@ -215,65 +215,68 @@ struct pending_threaded_processor : public pending_processor
215215 } while (queue_size > 0 );
216216 }
217217
218- // starts up count threads and works on the queue
219218 pending_threaded_processor (std::shared_ptr<middle_t > mid,
220219 output_vec_t const &outs, size_t thread_count)
221220 // note that we cant hint to the stack how large it should be ahead of time
222221 // we could use a different datastructure like a deque or vector but then
223222 // the outputs the enqueue jobs would need the version check for the push(_back) method
224- : outs (outs), queue( )
223+ : m_outputs (outs)
225224 {
225+ assert (!outs.empty ());
226+
227+ // The database connection info should be the same for all outputs,
228+ // we take it arbitrarily from the first.
229+ std::string const &conninfo =
230+ m_outputs[0 ]->get_options ()->database_options .conninfo ();
226231
227- // clone all the things we need
228232 m_clones.resize (thread_count);
229233 for (size_t i = 0 ; i < thread_count; ++i) {
230234 auto const midq = mid->get_query_instance ();
231- auto copy_thread = std::make_shared<db_copy_thread_t >(
232- outs[0 ]->get_options ()->database_options .conninfo ());
235+ auto copy_thread = std::make_shared<db_copy_thread_t >(conninfo);
233236
234- for (auto const &out : outs ) {
237+ for (auto const &out : m_outputs ) {
235238 if (out->need_forward_dependencies ()) {
236239 m_clones[i].push_back (out->clone (midq, copy_thread));
237240 }
238241 }
239242 }
240243 }
241244
242- void enqueue_way (osmid_t id) override { queue .emplace (id); }
245+ void enqueue_way (osmid_t id) override { m_queue .emplace (id); }
243246
244- void enqueue_relation (osmid_t id) override { queue .emplace (id); }
247+ void enqueue_relation (osmid_t id) override { m_queue .emplace (id); }
245248
246249 template <typename FUNCTION>
247250 void process_queue (FUNCTION &&function)
248251 {
249252 std::vector<std::future<void >> workers;
250253
251254 for (auto const &clone : m_clones) {
252- workers.push_back (
253- std::async (std:: launch::async, std::forward<FUNCTION>(function),
254- std::cref (clone), std::ref (queue ), std::ref (mutex )));
255+ workers.push_back (std::async (
256+ std::launch::async, std::forward<FUNCTION>(function),
257+ std::cref (clone), std::ref (m_queue ), std::ref (m_mutex )));
255258 }
256259 workers.push_back (std::async (std::launch::async, print_stats,
257- std::ref (queue ), std::ref (mutex )));
260+ std::ref (m_queue ), std::ref (m_mutex )));
258261
259262 for (auto &worker : workers) {
260263 try {
261264 worker.get ();
262265 } catch (...) {
263266 // drain the queue, so that the other workers finish
264- mutex .lock ();
265- while (!queue .empty ()) {
266- queue .pop ();
267+ m_mutex .lock ();
268+ while (!m_queue .empty ()) {
269+ m_queue .pop ();
267270 }
268- mutex .unlock ();
271+ m_mutex .unlock ();
269272 throw ;
270273 }
271274 }
272275 }
273276
274277 void process_ways () override
275278 {
276- auto const ids_queued = queue .size ();
279+ auto const ids_queued = m_queue .size ();
277280
278281 fmt::print (stderr, " \n Going over pending ways...\n " );
279282 fmt::print (stderr, " \t {} ways are pending\n " , ids_queued);
@@ -300,7 +303,7 @@ struct pending_threaded_processor : public pending_processor
300303
301304 void process_relations () override
302305 {
303- auto const ids_queued = queue .size ();
306+ auto const ids_queued = m_queue .size ();
304307
305308 fmt::print (stderr, " \n Going over pending relations...\n " );
306309 fmt::print (stderr, " \t {} relations are pending\n " , ids_queued);
@@ -320,10 +323,11 @@ struct pending_threaded_processor : public pending_processor
320323
321324 // collect all expiry tree informations together into one
322325 for (auto const &clone : m_clones) {
323- // for each clone/original output
324- for (output_vec_t ::const_iterator original_output = outs.begin (),
325- clone_output = clone.begin ();
326- original_output != outs.end () && clone_output != clone.end ();
326+ for (output_vec_t ::const_iterator
327+ original_output = m_outputs.begin (),
328+ clone_output = clone.begin ();
329+ original_output != m_outputs.end () &&
330+ clone_output != clone.end ();
327331 ++original_output, ++clone_output) {
328332 // done copying rels for now
329333 clone_output->get ()->commit ();
@@ -334,15 +338,17 @@ struct pending_threaded_processor : public pending_processor
334338 }
335339
336340private:
337- // output copies , one vector per thread
341+ // / Clones of all outputs , one vector of clones per thread.
338342 std::vector<output_vec_t > m_clones;
339- output_vec_t
340- outs; // would like to move ownership of outs to osmdata_t and middle passed to output_t instead of owned by it
341- // job queue
342- pending_queue_t queue;
343343
344- // so the threads can manage some of the shared state
345- std::mutex mutex;
344+ // / All outputs.
345+ output_vec_t m_outputs;
346+
347+ // / The queue with ids that the worker threads work on.
348+ pending_queue_t m_queue;
349+
350+ // / Mutex to make sure worker threads coordinate access to queue.
351+ std::mutex m_mutex;
346352};
347353
348354} // anonymous namespace
0 commit comments