@@ -162,13 +162,19 @@ void osmdata_t::flush() const
162162
163163namespace {
164164
165- class pending_threaded_processor
165+ /* *
166+ * After all objects in a change file have been processed, all objects
167+ * depending on the changed objects must also be processed. This class
168+ * handles this extra processing by starting a number of threads and doing
169+ * the processing in them.
170+ */
171+ class multithreaded_processor
166172{
167173public:
168174 using output_vec_t = std::vector<std::shared_ptr<output_t >>;
169175
170- pending_threaded_processor (std::shared_ptr<middle_t > mid,
171- output_vec_t const &outs, size_t thread_count)
176+ multithreaded_processor (std::shared_ptr<middle_t > mid,
177+ output_vec_t const &outs, size_t thread_count)
172178 : m_outputs(outs)
173179 {
174180 assert (!outs.empty ());
@@ -178,6 +184,7 @@ class pending_threaded_processor
178184 std::string const &conninfo =
179185 m_outputs[0 ]->get_options ()->database_options .conninfo ();
180186
187+ // For each thread we create clones of all the outputs.
181188 m_clones.resize (thread_count);
182189 for (size_t i = 0 ; i < thread_count; ++i) {
183190 auto const midq = mid->get_query_instance ();
@@ -193,12 +200,24 @@ class pending_threaded_processor
193200 }
194201 }
195202
203+ /* *
204+ * Process all ways in the list.
205+ *
206+ * \param list List of way ids to work on. The list is moved into the
207+ * function.
208+ */
196209 void process_ways (idlist_t &&list)
197210 {
198211 m_queue = std::move (list);
199212 process_queue (" way" , do_ways);
200213 }
201214
215+ /* *
216+ * Process all relations in the list.
217+ *
218+ * \param list List of relation ids to work on. The list is moved into the
219+ * function.
220+ */
202221 void process_relations (idlist_t &&list)
203222 {
204223 m_queue = std::move (list);
@@ -219,6 +238,7 @@ class pending_threaded_processor
219238 }
220239
221240private:
241+ // / Get the next id from the queue.
222242 static osmid_t pop_id (idlist_t &queue, std::mutex &mutex)
223243 {
224244 osmid_t id = 0 ;
@@ -232,6 +252,10 @@ class pending_threaded_processor
232252 return id;
233253 }
234254
255+ /* *
256+ * Runs in the worker threads: As long as there are any, get ids from
257+ * the queue and let the outputs process the ways.
258+ */
235259 static void do_ways (output_vec_t const &outputs, idlist_t &queue,
236260 std::mutex &mutex)
237261 {
@@ -244,6 +268,10 @@ class pending_threaded_processor
244268 }
245269 }
246270
271+ /* *
272+ * Runs in the worker threads: As long as there are any, get ids from
273+ * the queue and let the outputs process the relations.
274+ */
247275 static void do_rels (output_vec_t const &outputs, idlist_t &queue,
248276 std::mutex &mutex)
249277 {
@@ -256,6 +284,7 @@ class pending_threaded_processor
256284 }
257285 }
258286
287+ // / Runs in a worker thread: Update progress display once per second.
259288 static void print_stats (idlist_t &queue, std::mutex &mutex)
260289 {
261290 size_t queue_size;
@@ -294,7 +323,7 @@ class pending_threaded_processor
294323 try {
295324 worker.get ();
296325 } catch (...) {
297- // drain the queue, so that the other workers finish
326+ // Drain the queue, so that the other workers finish early.
298327 m_mutex.lock ();
299328 m_queue.clear ();
300329 m_mutex.unlock ();
@@ -356,10 +385,11 @@ void osmdata_t::stop() const
356385 // In append mode there might be dependent objects pending that we
357386 // need to process.
358387 if (opts->append && m_dependency_manager->has_pending ()) {
359- pending_threaded_processor ptp (m_mid, m_outs, opts->num_procs );
388+ multithreaded_processor proc{m_mid, m_outs,
389+ (std::size_t )opts->num_procs };
360390
361- ptp .process_ways (m_dependency_manager->get_pending_way_ids ());
362- ptp .process_relations (m_dependency_manager->get_pending_relation_ids ());
391+ proc .process_ways (m_dependency_manager->get_pending_way_ids ());
392+ proc .process_relations (m_dependency_manager->get_pending_relation_ids ());
363393 }
364394
365395 for (auto &out : m_outs) {
0 commit comments