Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 80f139d

Browse files
committed
[Join] Remove redundant copies.
This commit removes useless copying(memcpy) in `getAllTableColumnFragments`. Also some parallelization added. Resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 0b46841 commit 80f139d

File tree

3 files changed

+96
-62
lines changed

3 files changed

+96
-62
lines changed

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 96 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "QueryEngine/ColumnFetcher.h"
1818

19+
//#include <tbb/parallel_for.h>
1920
#include <memory>
2021

2122
#include "DataMgr/ArrayNoneEncoder.h"
@@ -239,16 +240,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
239240
int db_id = col_info->db_id;
240241
int table_id = col_info->table_id;
241242
int col_id = col_info->column_id;
243+
242244
const auto fragments_it = all_tables_fragments.find({db_id, table_id});
243245
CHECK(fragments_it != all_tables_fragments.end());
246+
244247
const auto fragments = fragments_it->second;
245248
const auto frag_count = fragments->size();
246249
std::vector<std::unique_ptr<ColumnarResults>> column_frags;
247250
const ColumnarResults* table_column = nullptr;
248251
const InputDescriptor table_desc(db_id, table_id, int(0));
249252
{
250253
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_fetch_mutex_);
251-
252254
auto col_token = data_provider_->getZeroCopyColumnData(*col_info);
253255
if (col_token != nullptr) {
254256
size_t num_rows = col_token->getSize() / col_token->getType()->size();
@@ -262,44 +264,104 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
262264
}
263265

264266
auto column_it = columnarized_scan_table_cache_.find({table_id, col_id});
265-
if (column_it == columnarized_scan_table_cache_.end()) {
266-
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
267-
if (executor_->getConfig()
268-
.exec.interrupt.enable_non_kernel_time_query_interrupt &&
269-
executor_->checkNonKernelTimeInterrupted()) {
270-
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
271-
}
272-
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
273-
std::list<ChunkIter> chunk_iter_holder;
274-
const auto& fragment = (*fragments)[frag_id];
275-
if (fragment.isEmptyPhysicalFragment()) {
276-
continue;
277-
}
278-
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
279-
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
280-
auto col_buffer = getOneTableColumnFragment(col_info,
281-
static_cast<int>(frag_id),
282-
all_tables_fragments,
283-
chunk_holder,
284-
chunk_iter_holder,
285-
Data_Namespace::CPU_LEVEL,
286-
int(0),
287-
device_allocator);
288-
column_frags.push_back(
289-
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
290-
col_buffer,
291-
fragment.getNumTuples(),
292-
chunk_meta_it->second->type(),
293-
thread_idx));
267+
if (column_it != columnarized_scan_table_cache_.end()) {
268+
table_column = column_it->second.get();
269+
return ColumnFetcher::transferColumnIfNeeded(
270+
table_column, 0, memory_level, device_id, device_allocator);
271+
}
272+
273+
size_t total_row_count = 0;
274+
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
275+
if (executor_->getConfig().exec.interrupt.enable_non_kernel_time_query_interrupt &&
276+
executor_->checkNonKernelTimeInterrupted()) {
277+
throw QueryExecutionError(Executor::ERR_INTERRUPTED);
294278
}
295-
auto merged_results =
296-
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
279+
const auto& fragment = (*fragments)[frag_id];
280+
const auto& rows_in_frag = fragment.getNumTuples();
281+
total_row_count += rows_in_frag;
282+
}
283+
284+
const auto& type_width = col_info->type->size();
285+
auto write_ptr =
286+
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
287+
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
288+
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
289+
const auto& fragment = (*fragments)[frag_id];
290+
if (!fragment.getNumTuples()) {
291+
continue;
292+
}
293+
CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size());
294+
write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width});
295+
write_ptr += fragment.getNumTuples() * type_width;
296+
}
297+
298+
if (write_ptrs.empty()) {
299+
std::unique_ptr<ColumnarResults> merged_results(nullptr);
300+
297301
table_column = merged_results.get();
298302
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
299303
std::move(merged_results));
300-
} else {
301-
table_column = column_it->second.get();
304+
305+
return ColumnFetcher::transferColumnIfNeeded(
306+
table_column, 0, memory_level, device_id, device_allocator);
302307
}
308+
309+
CHECK_EQ(frag_count, write_ptrs.size());
310+
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
311+
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
312+
std::list<ChunkIter> chunk_iter_holder;
313+
const auto& fragment = (*fragments)[frag_id];
314+
if (fragment.isEmptyPhysicalFragment()) {
315+
continue;
316+
}
317+
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
318+
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
319+
std::shared_ptr<Chunk_NS::Chunk> chunk;
320+
// Fixed length arrays are also included here.
321+
const bool is_varlen = col_info->type->isString() || col_info->type->isArray();
322+
int8_t* col_buffer;
323+
{
324+
ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
325+
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
326+
if (is_varlen) {
327+
varlen_chunk_lock.reset(
328+
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
329+
}
330+
chunk = data_provider_->getChunk(col_info,
331+
chunk_key,
332+
Data_Namespace::CPU_LEVEL,
333+
0,
334+
chunk_meta_it->second->numBytes(),
335+
chunk_meta_it->second->numElements());
336+
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
337+
chunk_holder.push_back(chunk);
338+
}
339+
if (is_varlen) {
340+
CHECK_GT(table_id, 0);
341+
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
342+
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
343+
auto& chunk_iter = chunk_iter_holder.back();
344+
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
345+
} else {
346+
auto ab = chunk->getBuffer();
347+
CHECK(ab->getMemoryPtr());
348+
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
349+
}
350+
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
351+
}
352+
353+
std::vector<int8_t*> raw_write_ptrs;
354+
raw_write_ptrs.reserve(frag_count);
355+
for (uint i = 0; i < frag_count; i++) {
356+
raw_write_ptrs.emplace_back(write_ptrs[i].first);
357+
}
358+
359+
std::unique_ptr<ColumnarResults> merged_results(
360+
new ColumnarResults(raw_write_ptrs, total_row_count, col_info->type, thread_idx));
361+
362+
table_column = merged_results.get();
363+
columnarized_scan_table_cache_.emplace(std::make_pair(table_id, col_id),
364+
std::move(merged_results));
303365
}
304366
return ColumnFetcher::transferColumnIfNeeded(
305367
table_column, 0, memory_level, device_id, device_allocator);

omniscidb/ResultSetRegistry/ColumnarResults.cpp

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,28 +122,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
122122
}
123123
}
124124

125-
ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
126-
const int8_t* one_col_buffer,
127-
const size_t num_rows,
128-
const hdk::ir::Type* target_type,
129-
const size_t thread_idx)
130-
: column_buffers_(1)
131-
, num_rows_(num_rows)
132-
, target_types_{target_type}
133-
, parallel_conversion_(false)
134-
, direct_columnar_conversion_(false)
135-
, thread_idx_(thread_idx) {
136-
auto timer = DEBUG_TIMER(__func__);
137-
138-
if (target_type->isVarLen()) {
139-
throw ColumnarConversionNotSupported();
140-
}
141-
const auto buf_size = num_rows * target_type->size();
142-
column_buffers_[0] =
143-
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
144-
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
145-
}
146-
147125
ColumnarResults::ColumnarResults(const std::vector<int8_t*> one_col_buffer,
148126
const size_t num_rows,
149127
const hdk::ir::Type* target_type,

omniscidb/ResultSetRegistry/ColumnarResults.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,6 @@ class ColumnarResults {
6767
const Config& config,
6868
const bool is_parallel_execution_enforced = false);
6969

70-
ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
71-
const int8_t* one_col_buffer,
72-
const size_t num_rows,
73-
const hdk::ir::Type* target_type,
74-
const size_t thread_idx);
75-
7670
ColumnarResults(const std::vector<int8_t*> one_col_buffer,
7771
const size_t num_rows,
7872
const hdk::ir::Type* target_type,

0 commit comments

Comments
 (0)