@@ -162,12 +162,64 @@ void osmdata_t::flush() const
162162
163163namespace {
164164
165- struct pending_threaded_processor
165+ class pending_threaded_processor
166166{
167+ public:
167168 using output_vec_t = std::vector<std::shared_ptr<output_t >>;
168- using pending_queue_t = idlist_t ;
169169
170- static osmid_t pop_id (pending_queue_t &queue, std::mutex &mutex)
170+ pending_threaded_processor (std::shared_ptr<middle_t > mid,
171+ output_vec_t const &outs, size_t thread_count)
172+ : m_outputs(outs)
173+ {
174+ assert (!outs.empty ());
175+
176+ // The database connection info should be the same for all outputs,
177+ // we take it arbitrarily from the first.
178+ std::string const &conninfo =
179+ m_outputs[0 ]->get_options ()->database_options .conninfo ();
180+
181+ m_clones.resize (thread_count);
182+ for (size_t i = 0 ; i < thread_count; ++i) {
183+ auto const midq = mid->get_query_instance ();
184+ auto copy_thread = std::make_shared<db_copy_thread_t >(conninfo);
185+
186+ for (auto const &out : m_outputs) {
187+ if (out->need_forward_dependencies ()) {
188+ m_clones[i].push_back (out->clone (midq, copy_thread));
189+ } else {
190+ m_clones[i].emplace_back (nullptr );
191+ }
192+ }
193+ }
194+ }
195+
196+ void process_ways (idlist_t &&list)
197+ {
198+ m_queue = std::move (list);
199+ process_queue (" way" , do_ways);
200+ }
201+
202+ void process_relations (idlist_t &&list)
203+ {
204+ m_queue = std::move (list);
205+ process_queue (" relation" , do_rels);
206+
207+ // Collect expiry tree information from all clones and merge it back
208+ // into the original outputs.
209+ for (auto const &clone : m_clones) {
210+ auto it = clone.begin ();
211+ for (auto const &output : m_outputs) {
212+ assert (it != clone.end ());
213+ if (*it) {
214+ output->merge_expire_trees (it->get ());
215+ }
216+ ++it;
217+ }
218+ }
219+ }
220+
221+ private:
222+ static osmid_t pop_id (idlist_t &queue, std::mutex &mutex)
171223 {
172224 osmid_t id = 0 ;
173225
@@ -180,7 +232,7 @@ struct pending_threaded_processor
180232 return id;
181233 }
182234
183- static void do_ways (output_vec_t const &outputs, pending_queue_t &queue,
235+ static void do_ways (output_vec_t const &outputs, idlist_t &queue,
184236 std::mutex &mutex)
185237 {
186238 while (osmid_t const id = pop_id (queue, mutex)) {
@@ -192,7 +244,7 @@ struct pending_threaded_processor
192244 }
193245 }
194246
195- static void do_rels (output_vec_t const &outputs, pending_queue_t &queue,
247+ static void do_rels (output_vec_t const &outputs, idlist_t &queue,
196248 std::mutex &mutex)
197249 {
198250 while (osmid_t const id = pop_id (queue, mutex)) {
@@ -204,7 +256,7 @@ struct pending_threaded_processor
204256 }
205257 }
206258
207- static void print_stats (pending_queue_t &queue, std::mutex &mutex)
259+ static void print_stats (idlist_t &queue, std::mutex &mutex)
208260 {
209261 size_t queue_size;
210262 do {
@@ -218,32 +270,6 @@ struct pending_threaded_processor
218270 } while (queue_size > 0 );
219271 }
220272
221- pending_threaded_processor (std::shared_ptr<middle_t > mid,
222- output_vec_t const &outs, size_t thread_count)
223- : m_outputs(outs)
224- {
225- assert (!outs.empty ());
226-
227- // The database connection info should be the same for all outputs,
228- // we take it arbitrarily from the first.
229- std::string const &conninfo =
230- m_outputs[0 ]->get_options ()->database_options .conninfo ();
231-
232- m_clones.resize (thread_count);
233- for (size_t i = 0 ; i < thread_count; ++i) {
234- auto const midq = mid->get_query_instance ();
235- auto copy_thread = std::make_shared<db_copy_thread_t >(conninfo);
236-
237- for (auto const &out : m_outputs) {
238- if (out->need_forward_dependencies ()) {
239- m_clones[i].push_back (out->clone (midq, copy_thread));
240- } else {
241- m_clones[i].emplace_back (nullptr );
242- }
243- }
244- }
245- }
246-
247273 template <typename FUNCTION>
248274 void process_queue (char const *type, FUNCTION &&function)
249275 {
@@ -297,40 +323,14 @@ struct pending_threaded_processor
297323 }
298324 }
299325
300- void process_ways (idlist_t &&list)
301- {
302- m_queue = std::move (list);
303- process_queue (" way" , do_ways);
304- }
305-
306- void process_relations (idlist_t &&list)
307- {
308- m_queue = std::move (list);
309- process_queue (" relation" , do_rels);
310-
311- // Collect expiry tree information from all clones and merge it back
312- // into the original outputs.
313- for (auto const &clone : m_clones) {
314- auto it = clone.begin ();
315- for (auto const &output : m_outputs) {
316- assert (it != clone.end ());
317- if (*it) {
318- output->merge_expire_trees (it->get ());
319- }
320- ++it;
321- }
322- }
323- }
324-
325- private:
326326 // / Clones of all outputs, one vector of clones per thread.
327327 std::vector<output_vec_t > m_clones;
328328
329329 // / All outputs.
330330 output_vec_t m_outputs;
331331
332332 // / The queue with ids that the worker threads work on.
333- pending_queue_t m_queue;
333+ idlist_t m_queue;
334334
335335 // / Mutex to make sure worker threads coordinate access to queue.
336336 std::mutex m_mutex;
0 commit comments