|
33 | 33 |
|
34 | 34 | #include <boost/filesystem.hpp> |
35 | 35 | #include <boost/filesystem/fstream.hpp> |
| 36 | +#include <boost/iterator/function_input_iterator.hpp> |
36 | 37 | #include <boost/optional/optional.hpp> |
37 | 38 | #include <boost/scope_exit.hpp> |
38 | 39 |
|
39 | 40 | #include <osmium/io/any_input.hpp> |
40 | 41 |
|
41 | | -#include <tbb/concurrent_vector.h> |
| 42 | +#include <tbb/pipeline.h> |
42 | 43 | #include <tbb/task_scheduler_init.h> |
43 | 44 |
|
44 | 45 | #include <cstdlib> |
@@ -252,58 +253,77 @@ std::vector<TurnRestriction> Extractor::ParseOSMData(ScriptingEnvironment &scrip |
252 | 253 |
|
253 | 254 | timestamp_file.WriteFrom(timestamp.c_str(), timestamp.length()); |
254 | 255 |
|
255 | | - // initialize vectors holding parsed objects |
256 | | - tbb::concurrent_vector<std::pair<std::size_t, ExtractionNode>> resulting_nodes; |
257 | | - tbb::concurrent_vector<std::pair<std::size_t, ExtractionWay>> resulting_ways; |
258 | | - tbb::concurrent_vector<boost::optional<InputRestrictionContainer>> resulting_restrictions; |
259 | | - |
260 | 256 | std::vector<std::string> restrictions = scripting_environment.GetRestrictions(); |
261 | 257 | // setup restriction parser |
262 | 258 | const RestrictionParser restriction_parser( |
263 | 259 | scripting_environment.GetProfileProperties().use_turn_restrictions, |
264 | 260 | config.parse_conditionals, |
265 | 261 | restrictions); |
266 | 262 |
|
267 | | - // create a vector of iterators into the buffer |
268 | | - for (std::vector<osmium::memory::Buffer::const_iterator> osm_elements; |
269 | | - const osmium::memory::Buffer buffer = reader.read(); |
270 | | - osm_elements.clear()) |
271 | | - { |
272 | | - for (auto iter = std::begin(buffer), end = std::end(buffer); iter != end; ++iter) |
273 | | - { |
274 | | - osm_elements.push_back(iter); |
275 | | - } |
| 263 | + std::mutex process_mutex; |
276 | 264 |
|
277 | | - // clear resulting vectors |
278 | | - resulting_nodes.clear(); |
279 | | - resulting_ways.clear(); |
280 | | - resulting_restrictions.clear(); |
| 265 | + using SharedBuffer = std::shared_ptr<const osmium::memory::Buffer>; |
| 266 | + struct ParsedBuffer |
| 267 | + { |
| 268 | + SharedBuffer buffer; |
| 269 | + std::vector<std::pair<const osmium::Node &, ExtractionNode>> resulting_nodes; |
| 270 | + std::vector<std::pair<const osmium::Way &, ExtractionWay>> resulting_ways; |
| 271 | + std::vector<boost::optional<InputRestrictionContainer>> resulting_restrictions; |
| 272 | + }; |
281 | 273 |
|
282 | | - scripting_environment.ProcessElements(osm_elements, |
283 | | - restriction_parser, |
284 | | - resulting_nodes, |
285 | | - resulting_ways, |
286 | | - resulting_restrictions); |
| 274 | + tbb::filter_t<void, SharedBuffer> buffer_reader( |
| 275 | + tbb::filter::serial_in_order, [&](tbb::flow_control &fc) { |
| 276 | + if (auto buffer = reader.read()) |
| 277 | + { |
| 278 | + return std::make_shared<const osmium::memory::Buffer>(std::move(buffer)); |
| 279 | + } |
| 280 | + else |
| 281 | + { |
| 282 | + fc.stop(); |
| 283 | + return SharedBuffer{}; |
| 284 | + } |
| 285 | + }); |
| 286 | + tbb::filter_t<SharedBuffer, std::shared_ptr<ParsedBuffer>> buffer_transform( |
| 287 | + tbb::filter::parallel, [&](const SharedBuffer buffer) { |
| 288 | + if (!buffer) |
| 289 | + return std::shared_ptr<ParsedBuffer>{}; |
| 290 | + |
| 291 | + auto parsed_buffer = std::make_shared<ParsedBuffer>(); |
| 292 | + parsed_buffer->buffer = buffer; |
| 293 | + scripting_environment.ProcessElements(*buffer, |
| 294 | + restriction_parser, |
| 295 | + parsed_buffer->resulting_nodes, |
| 296 | + parsed_buffer->resulting_ways, |
| 297 | + parsed_buffer->resulting_restrictions); |
| 298 | + return parsed_buffer; |
| 299 | + }); |
| 300 | + tbb::filter_t<std::shared_ptr<ParsedBuffer>, void> buffer_storage( |
| 301 | + tbb::filter::serial_in_order, [&](const std::shared_ptr<ParsedBuffer> parsed_buffer) { |
| 302 | + if (!parsed_buffer) |
| 303 | + return; |
| 304 | + |
| 305 | + number_of_nodes += parsed_buffer->resulting_nodes.size(); |
| 306 | + // put parsed objects thru extractor callbacks |
| 307 | + for (const auto &result : parsed_buffer->resulting_nodes) |
| 308 | + { |
| 309 | + extractor_callbacks->ProcessNode(result.first, result.second); |
| 310 | + } |
| 311 | + number_of_ways += parsed_buffer->resulting_ways.size(); |
| 312 | + for (const auto &result : parsed_buffer->resulting_ways) |
| 313 | + { |
| 314 | + extractor_callbacks->ProcessWay(result.first, result.second); |
| 315 | + } |
| 316 | + number_of_relations += parsed_buffer->resulting_restrictions.size(); |
| 317 | + for (const auto &result : parsed_buffer->resulting_restrictions) |
| 318 | + { |
| 319 | + extractor_callbacks->ProcessRestriction(result); |
| 320 | + } |
| 321 | + }); |
| 322 | + |
| 323 | + // Number of pipeline tokens that yielded the best speedup was about 1.5 * num_cores |
| 324 | + tbb::parallel_pipeline(tbb::task_scheduler_init::default_num_threads() * 1.5, |
| 325 | + buffer_reader & buffer_transform & buffer_storage); |
287 | 326 |
|
288 | | - number_of_nodes += resulting_nodes.size(); |
289 | | - // put parsed objects thru extractor callbacks |
290 | | - for (const auto &result : resulting_nodes) |
291 | | - { |
292 | | - extractor_callbacks->ProcessNode( |
293 | | - static_cast<const osmium::Node &>(*(osm_elements[result.first])), result.second); |
294 | | - } |
295 | | - number_of_ways += resulting_ways.size(); |
296 | | - for (const auto &result : resulting_ways) |
297 | | - { |
298 | | - extractor_callbacks->ProcessWay( |
299 | | - static_cast<const osmium::Way &>(*(osm_elements[result.first])), result.second); |
300 | | - } |
301 | | - number_of_relations += resulting_restrictions.size(); |
302 | | - for (const auto &result : resulting_restrictions) |
303 | | - { |
304 | | - extractor_callbacks->ProcessRestriction(result); |
305 | | - } |
306 | | - } |
307 | 327 | TIMER_STOP(parsing); |
308 | 328 | util::Log() << "Parsing finished after " << TIMER_SEC(parsing) << " seconds"; |
309 | 329 |
|
|
0 commit comments