Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 37 additions & 43 deletions omniscidb/ResultSetRegistry/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -623,14 +623,15 @@ void ColumnarResults::materializeAllLazyColumns(
const ResultSet& rows,
const size_t num_columns) {
CHECK(isDirectColumnarConversionPossible());
const auto do_work_just_lazy_columns = [num_columns, &rows, this](
const size_t row_idx,
const std::vector<bool>& targets_to_skip) {
const auto do_write_only_lazy_columns = [num_columns, &rows, this](
const size_t row_idx,
const std::vector<bool>& targets_to_skip) {
const auto crt_row = rows.getRowAtNoTranslations(row_idx, targets_to_skip);
for (size_t i = 0; i < num_columns; ++i) {
if (!targets_to_skip.empty() && !targets_to_skip[i]) {
writeBackCell(crt_row[i], row_idx - rows.getOffset(), i);
if (targets_to_skip.empty() || targets_to_skip[i]) {
continue;
}
writeBackCell(crt_row[i], row_idx - rows.getOffset(), i);
}
};

Expand All @@ -639,47 +640,40 @@ void ColumnarResults::materializeAllLazyColumns(
bool has_array = std::any_of(target_types_.begin(),
target_types_.end(),
[](const hdk::ir::Type* type) { return type->isArray(); });
if (rows.areAnyColumnsLazyFetched() || !offset_buffers_.empty() || has_array) {
const size_t worker_count =
result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1;
std::vector<std::future<void>> conversion_threads;
std::vector<bool> targets_to_skip;
if (skip_non_lazy_columns) {
CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
targets_to_skip.reserve(num_columns);
for (size_t i = 0; i < num_columns; i++) {
// we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns)
targets_to_skip.push_back(
(lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) &&
!target_types_[i]->isVarLen() && !target_types_[i]->isArray());
}
}
size_t first = rows.getOffset();
size_t last = rows.entryCount();
if (rows.isTruncated()) {
last = std::min(last, first + rows.getLimit());
}
for (auto interval : makeIntervals(first, last, worker_count)) {
conversion_threads.push_back(std::async(
std::launch::async,
[&do_work_just_lazy_columns, &targets_to_skip, first, this](const size_t start,
const size_t end) {
for (size_t i = start; i < end; ++i) {
do_work_just_lazy_columns(i, targets_to_skip);
}
},
interval.begin,
interval.end));
}
if (!rows.areAnyColumnsLazyFetched() && offset_buffers_.empty() && !has_array) {
return;
}

try {
for (auto& child : conversion_threads) {
child.wait();
}
} catch (...) {
throw;
std::vector<bool> targets_to_skip{};
if (skip_non_lazy_columns) {
CHECK_EQ(lazy_fetch_info.size(), size_t(num_columns));
targets_to_skip.reserve(num_columns);
for (size_t i = 0; i < num_columns; i++) {
// we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns)
bool skip_column =
(lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) &&
!target_types_[i]->isVarLen() && !target_types_[i]->isArray();
targets_to_skip.push_back(skip_column);
}
}
size_t first_row = rows.getOffset();
size_t last_row = rows.entryCount();
if (rows.isTruncated()) {
last_row = std::min(last_row, first_row + rows.getLimit());
}
const size_t worker_count =
result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1;
// Heuristics, should be tuned somehow
size_t granularity = (last_row - first_row) / (worker_count * 3);
granularity = std::max(granularity, static_cast<size_t>(10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we work with TBB algorithms, we should ignore a number of actual cores. Granularity should provide us with tasks of proper size. Proper size means that the task's execution time is not too big to provide proper load balance (in this case it shouldn't exceed several ms) and is not too small to keep scheduler overhead small (execution time shouldn't be less than 5us). E.g. we can use a small batch size (like 64-256KB) and divide it by the number of bytes fetched per row or use another similar heuristic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just don't know how to estimate data size, it can memcopy some data or can just pass thorough. I think it's better to have some granularity, that not. But I am not sure which estimation should be used here - some pessimistic, that everyone will memcpy, or some other? Also it's new heuristics, so if approach is fine, I will need some statistics of data access.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to develop the perfect heuristics from the beginning. Any would be most probably better than the current code. What I suggest now is to compute a number of bytes we copy for a single row and then set granularity as (128 << 10) / bytes_per_row. This would roughly make 128KB tasks.


tbb::parallel_for(tbb::blocked_range<size_t>(first_row, last_row, granularity),
[&do_write_only_lazy_columns, targets_to_skip, first_row, this](
const tbb::blocked_range<size_t>& interval) {
for (size_t i = interval.begin(); i < interval.end(); ++i) {
do_write_only_lazy_columns(i, targets_to_skip);
}
});
}

/**
Expand Down