diff --git a/omniscidb/ResultSetRegistry/ColumnarResults.cpp b/omniscidb/ResultSetRegistry/ColumnarResults.cpp index a22924e85..6df9a91f2 100644 --- a/omniscidb/ResultSetRegistry/ColumnarResults.cpp +++ b/omniscidb/ResultSetRegistry/ColumnarResults.cpp @@ -623,14 +623,15 @@ void ColumnarResults::materializeAllLazyColumns( const ResultSet& rows, const size_t num_columns) { CHECK(isDirectColumnarConversionPossible()); - const auto do_work_just_lazy_columns = [num_columns, &rows, this]( - const size_t row_idx, - const std::vector& targets_to_skip) { + const auto do_write_only_lazy_columns = [num_columns, &rows, this]( + const size_t row_idx, + const std::vector& targets_to_skip) { const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip); for (size_t i = 0; i < num_columns; ++i) { - if (!targets_to_skip.empty() && !targets_to_skip[i]) { - writeBackCell(crt_row[i], row_idx - rows.getOffset(), i); + if (targets_to_skip.empty() || targets_to_skip[i]) { + continue; } + writeBackCell(crt_row[i], row_idx - rows.getOffset(), i); } }; @@ -639,47 +640,40 @@ void ColumnarResults::materializeAllLazyColumns( bool has_array = std::any_of(target_types_.begin(), target_types_.end(), [](const hdk::ir::Type* type) { return type->isArray(); }); - if (rows.areAnyColumnsLazyFetched() || !offset_buffers_.empty() || has_array) { - const size_t worker_count = - result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1; - std::vector> conversion_threads; - std::vector targets_to_skip; - if (skip_non_lazy_columns) { - CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns)); - targets_to_skip.reserve(num_columns); - for (size_t i = 0; i < num_columns; i++) { - // we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns) - targets_to_skip.push_back( - (lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) && - !target_types_[i]->isVarLen() && !target_types_[i]->isArray()); - } - } - size_t first = rows.getOffset(); - size_t last = rows.entryCount(); - if (rows.isTruncated()) { - last = std::min(last, first + rows.getLimit()); - } - for (auto interval : makeIntervals(first, last, worker_count)) { - conversion_threads.push_back(std::async( - std::launch::async, - [&do_work_just_lazy_columns, &targets_to_skip, first, this](const size_t start, - const size_t end) { - for (size_t i = start; i < end; ++i) { - do_work_just_lazy_columns(i, targets_to_skip); - } - }, - interval.begin, - interval.end)); - } + if (!rows.areAnyColumnsLazyFetched() && offset_buffers_.empty() && !has_array) { + return; + } - try { - for (auto& child : conversion_threads) { - child.wait(); - } - } catch (...) { - throw; + std::vector targets_to_skip{}; + if (skip_non_lazy_columns) { + CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns)); + targets_to_skip.reserve(num_columns); + for (size_t i = 0; i < num_columns; i++) { + // we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns) + bool skip_column = + (lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) && + !target_types_[i]->isVarLen() && !target_types_[i]->isArray(); + targets_to_skip.push_back(skip_column); } } + size_t first_row = rows.getOffset(); + size_t last_row = rows.entryCount(); + if (rows.isTruncated()) { + last_row = std::min(last_row, first_row + rows.getLimit()); + } + const size_t worker_count = + result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1; + // Heuristics, should be tuned somehow + size_t granularity = (last_row - first_row) / (worker_count * 3); + granularity = std::max(granularity, static_cast(10)); + + tbb::parallel_for(tbb::blocked_range(first_row, last_row, granularity), + [&do_write_only_lazy_columns, targets_to_skip, first_row, this]( + const tbb::blocked_range& interval) { + for (size_t i = interval.begin(); i < interval.end(); ++i) { + do_write_only_lazy_columns(i, targets_to_skip); + } + }); } /**