Skip to content

Commit 92639c1

Browse files
authored
Merge pull request #1827 from lonvia/avoid-parallel-execution-for-few-items
Avoid parallel execution for pending items when there are only a few of them
2 parents 0569cdf + 33a5c86 commit 92639c1

File tree

1 file changed

+37
-25
lines changed

1 file changed

+37
-25
lines changed

src/osmdata.cpp

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -298,38 +298,50 @@ class multithreaded_processor
298298
{
299299
auto const ids_queued = list.size();
300300

301-
log_info("Going over {} pending {}s (using {} threads)"_format(
302-
ids_queued, type, m_clones.size()));
303-
304301
util::timer_t timer;
305-
std::vector<std::future<void>> workers;
306302

307-
for (auto const &clone : m_clones) {
308-
workers.push_back(std::async(std::launch::async, run,
309-
std::cref(clone), &list, &m_mutex,
310-
function));
311-
}
312-
workers.push_back(
313-
std::async(std::launch::async, print_stats, &list, &m_mutex));
314-
315-
for (auto &worker : workers) {
316-
try {
317-
worker.get();
318-
} catch (...) {
319-
// Drain the queue, so that the other workers finish early.
320-
m_mutex.lock();
321-
list.clear();
322-
m_mutex.unlock();
323-
throw;
303+
if (ids_queued < 100) {
304+
// Worker startup is quite expensive. Run the processing directly
305+
// when only few items need to be processed.
306+
log_info("Going over {} pending {}s"_format(ids_queued, type));
307+
308+
for (auto const oid : list) {
309+
(m_clones[0].get()->*function)(oid);
324310
}
325-
}
311+
m_clones[0]->sync();
312+
} else {
313+
log_info("Going over {} pending {}s (using {} threads)"_format(
314+
ids_queued, type, m_clones.size()));
326315

327-
timer.stop();
316+
std::vector<std::future<void>> workers;
328317

329-
if (get_logger().show_progress()) {
330-
fmt::print(stderr, "\rLeft to process: 0.\n");
318+
for (auto const &clone : m_clones) {
319+
workers.push_back(std::async(std::launch::async, run,
320+
std::cref(clone), &list, &m_mutex,
321+
function));
322+
}
323+
workers.push_back(
324+
std::async(std::launch::async, print_stats, &list, &m_mutex));
325+
326+
for (auto &worker : workers) {
327+
try {
328+
worker.get();
329+
} catch (...) {
330+
// Drain the queue, so that the other workers finish early.
331+
m_mutex.lock();
332+
list.clear();
333+
m_mutex.unlock();
334+
throw;
335+
}
336+
}
337+
338+
if (get_logger().show_progress()) {
339+
fmt::print(stderr, "\rLeft to process: 0.\n");
340+
}
331341
}
332342

343+
timer.stop();
344+
333345
log_info("Processing {} pending {}s took {} at a rate of {:.2f}/s",
334346
ids_queued, type,
335347
util::human_readable_duration(timer.elapsed()),

0 commit comments

Comments
 (0)