Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 99e0098

Browse files
committed
[Join] Parallelization.
This commit adds parallelization. Resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 80f139d commit 99e0098

File tree

1 file changed

+50
-44
lines changed

1 file changed

+50
-44
lines changed

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
#include "QueryEngine/ColumnFetcher.h"
1818

19-
//#include <tbb/parallel_for.h>
19+
#include <tbb/parallel_for.h>
2020
#include <memory>
2121

2222
#include "DataMgr/ArrayNoneEncoder.h"
@@ -284,7 +284,7 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
284284
const auto& type_width = col_info->type->size();
285285
auto write_ptr =
286286
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
287-
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
287+
tbb::concurrent_vector<std::pair<int8_t*, size_t>> write_ptrs;
288288
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
289289
const auto& fragment = (*fragments)[frag_id];
290290
if (!fragment.getNumTuples()) {
@@ -307,48 +307,54 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
307307
}
308308

309309
CHECK_EQ(frag_count, write_ptrs.size());
310-
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
311-
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
312-
std::list<ChunkIter> chunk_iter_holder;
313-
const auto& fragment = (*fragments)[frag_id];
314-
if (fragment.isEmptyPhysicalFragment()) {
315-
continue;
316-
}
317-
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
318-
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
319-
std::shared_ptr<Chunk_NS::Chunk> chunk;
320-
// Fixed length arrays are also included here.
321-
const bool is_varlen = col_info->type->isString() || col_info->type->isArray();
322-
int8_t* col_buffer;
323-
{
324-
ChunkKey chunk_key{db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
325-
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
326-
if (is_varlen) {
327-
varlen_chunk_lock.reset(
328-
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
329-
}
330-
chunk = data_provider_->getChunk(col_info,
331-
chunk_key,
332-
Data_Namespace::CPU_LEVEL,
333-
0,
334-
chunk_meta_it->second->numBytes(),
335-
chunk_meta_it->second->numElements());
336-
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
337-
chunk_holder.push_back(chunk);
338-
}
339-
if (is_varlen) {
340-
CHECK_GT(table_id, 0);
341-
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
342-
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
343-
auto& chunk_iter = chunk_iter_holder.back();
344-
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
345-
} else {
346-
auto ab = chunk->getBuffer();
347-
CHECK(ab->getMemoryPtr());
348-
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
349-
}
350-
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
351-
}
310+
tbb::parallel_for(
311+
tbb::blocked_range<size_t>(0, frag_count),
312+
[&](const tbb::blocked_range<size_t>& frag_ids) {
313+
for (size_t frag_id = frag_ids.begin(); frag_id < frag_ids.end(); ++frag_id) {
314+
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
315+
std::list<ChunkIter> chunk_iter_holder;
316+
const auto& fragment = (*fragments)[frag_id];
317+
if (fragment.isEmptyPhysicalFragment()) {
318+
continue;
319+
}
320+
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
321+
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
322+
std::shared_ptr<Chunk_NS::Chunk> chunk;
323+
// Fixed length arrays are also included here.
324+
const bool is_varlen =
325+
col_info->type->isString() || col_info->type->isArray();
326+
int8_t* col_buffer;
327+
{
328+
ChunkKey chunk_key{
329+
db_id, fragment.physicalTableId, col_id, fragment.fragmentId};
330+
std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
331+
if (is_varlen) {
332+
varlen_chunk_lock.reset(
333+
new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
334+
}
335+
chunk = data_provider_->getChunk(col_info,
336+
chunk_key,
337+
Data_Namespace::CPU_LEVEL,
338+
0,
339+
chunk_meta_it->second->numBytes(),
340+
chunk_meta_it->second->numElements());
341+
std::lock_guard<std::mutex> chunk_list_lock(chunk_list_mutex_);
342+
chunk_holder.push_back(chunk);
343+
}
344+
if (is_varlen) {
345+
CHECK_GT(table_id, 0);
346+
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
347+
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
348+
auto& chunk_iter = chunk_iter_holder.back();
349+
col_buffer = reinterpret_cast<int8_t*>(&chunk_iter);
350+
} else {
351+
auto ab = chunk->getBuffer();
352+
CHECK(ab->getMemoryPtr());
353+
col_buffer = ab->getMemoryPtr(); // @TODO(alex) change to use ChunkIter
354+
}
355+
memcpy(write_ptrs[frag_id].first, col_buffer, write_ptrs[frag_id].second);
356+
}
357+
});
352358

353359
std::vector<int8_t*> raw_write_ptrs;
354360
raw_write_ptrs.reserve(frag_count);

0 commit comments

Comments
 (0)