Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit ac76362

Browse files
committed
[Join] InitHashTable optimisation
This commit reworks `fill_hash_join_buff_bucketized_cpu` to use tbb and utilize cpu properly. Partially resolves: #574 Signed-off-by: Dmitrii Makarenko <[email protected]>
1 parent 6fd1eec commit ac76362

File tree

7 files changed

+381
-91
lines changed

7 files changed

+381
-91
lines changed

omniscidb/QueryEngine/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ set(query_engine_source_files
6161
JoinHashTable/HashTable.cpp
6262
JoinHashTable/PerfectJoinHashTable.cpp
6363
JoinHashTable/Runtime/HashJoinRuntime.cpp
64+
JoinHashTable/Runtime/HashJoinRuntimeCpu.cpp
6465
L0Kernel.cpp
6566
LogicalIR.cpp
6667
LLVMFunctionAttributesUtil.cpp

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ JoinColumn ColumnFetcher::makeJoinColumn(
146146
data_provider,
147147
column_cache);
148148
if (col_buff != nullptr) {
149+
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count, num_elems};
149150
num_elems += elem_count;
150-
join_chunk_array[num_chunks] = JoinChunk{col_buff, elem_count};
151151
} else {
152152
continue;
153153
}

omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,6 @@ class PerfectJoinHashTableBuilder {
166166
0);
167167

168168
auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
169-
const int thread_count = cpu_threads();
170-
std::vector<std::thread> init_cpu_buff_threads;
171169

172170
{
173171
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
@@ -176,54 +174,36 @@ class PerfectJoinHashTableBuilder {
176174
hash_join_invalid_val);
177175
}
178176
const bool for_semi_join = for_semi_anti_join(join_type);
179-
std::atomic<int> err{0};
180177
{
181178
auto timer_fill =
182-
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
183-
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
184-
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
185-
&join_column,
186-
str_proxy_translation_map,
187-
thread_idx,
188-
thread_count,
189-
type,
190-
&err,
191-
&col_range,
192-
&is_bitwise_eq,
193-
&for_semi_join,
194-
cpu_hash_table_buff,
195-
hash_entry_info] {
196-
int partial_err = fill_hash_join_buff_bucketized(
197-
cpu_hash_table_buff,
198-
hash_join_invalid_val,
199-
for_semi_join,
200-
join_column,
201-
{static_cast<size_t>(type->size()),
202-
col_range.getIntMin(),
203-
col_range.getIntMax(),
204-
inline_fixed_encoding_null_value(type),
205-
is_bitwise_eq,
206-
col_range.getIntMax() + 1,
207-
get_join_column_type_kind(type)},
208-
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
209-
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
210-
: 0, // 0 is dummy value
211-
thread_idx,
212-
thread_count,
213-
hash_entry_info.bucket_normalization);
214-
int zero{0};
215-
err.compare_exchange_strong(zero, partial_err);
216-
});
217-
}
218-
for (auto& t : init_cpu_buff_threads) {
219-
t.join();
179+
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized_cpu");
180+
181+
{
182+
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
183+
col_range.getIntMin(),
184+
col_range.getIntMax(),
185+
inline_fixed_encoding_null_value(type),
186+
is_bitwise_eq,
187+
col_range.getIntMax() + 1,
188+
get_join_column_type_kind(type)};
189+
190+
int error = fill_hash_join_buff_bucketized_cpu(
191+
cpu_hash_table_buff,
192+
hash_join_invalid_val,
193+
for_semi_join,
194+
join_column,
195+
type_info,
196+
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
197+
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
198+
: 0, // 0 is dummy value
199+
hash_entry_info.bucket_normalization);
200+
if (error) {
201+
// Too many hash entries, need to retry with a 1:many table
202+
hash_table_ = nullptr; // clear the hash table buffer
203+
throw NeedsOneToManyHash();
204+
}
220205
}
221206
}
222-
if (err) {
223-
// Too many hash entries, need to retry with a 1:many table
224-
hash_table_ = nullptr; // clear the hash table buffer
225-
throw NeedsOneToManyHash();
226-
}
227207
}
228208

229209
void initOneToManyHashTableOnCpu(

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#endif
3939

4040
#include <tbb/parallel_for.h>
41+
#include <tbb/parallel_reduce.h>
4142

4243
#include <future>
4344
#endif
@@ -73,35 +74,6 @@ namespace {
7374
* ignore any element ID that is not in the dictionary corresponding to t1_s.x or is
7475
* outside the range of column t1_s.
7576
*/
76-
inline int64_t translate_str_id_to_outer_dict(const int64_t elem,
77-
const int64_t min_elem,
78-
const int64_t max_elem,
79-
const void* sd_inner_proxy,
80-
const void* sd_outer_proxy) {
81-
CHECK(sd_outer_proxy);
82-
const auto sd_inner_dict_proxy =
83-
static_cast<const StringDictionaryProxy*>(sd_inner_proxy);
84-
const auto sd_outer_dict_proxy =
85-
static_cast<const StringDictionaryProxy*>(sd_outer_proxy);
86-
const auto elem_str = sd_inner_dict_proxy->getString(elem);
87-
const auto outer_id = sd_outer_dict_proxy->getIdOfString(elem_str);
88-
if (outer_id > max_elem || outer_id < min_elem) {
89-
return StringDictionary::INVALID_STR_ID;
90-
}
91-
return outer_id;
92-
}
93-
94-
inline int64_t map_str_id_to_outer_dict(const int64_t inner_elem,
95-
const int64_t min_inner_elem,
96-
const int64_t min_outer_elem,
97-
const int64_t max_outer_elem,
98-
const int32_t* inner_to_outer_translation_map) {
99-
const auto outer_id = inner_to_outer_translation_map[inner_elem - min_inner_elem];
100-
if (outer_id > max_outer_elem || outer_id < min_outer_elem) {
101-
return StringDictionary::INVALID_STR_ID;
102-
}
103-
return outer_id;
104-
}
10577

10678
#if defined(_MSC_VER)
10779
#define DEFAULT_TARGET_ATTRIBUTE
@@ -275,11 +247,12 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
275247
#ifndef __CUDACC__
276248
if (sd_inner_to_outer_translation_map &&
277249
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
278-
const auto outer_id = map_str_id_to_outer_dict(elem,
279-
min_inner_elem,
280-
type_info.min_val,
281-
type_info.max_val,
282-
sd_inner_to_outer_translation_map);
250+
const auto outer_id =
251+
cpu_utils::map_str_id_to_outer_dict(elem,
252+
min_inner_elem,
253+
type_info.min_val,
254+
type_info.max_val,
255+
sd_inner_to_outer_translation_map);
283256
if (outer_id == StringDictionary::INVALID_STR_ID) {
284257
continue;
285258
}
@@ -678,11 +651,12 @@ DEVICE void count_matches_impl(int32_t* count_buff,
678651
#ifndef __CUDACC__
679652
if (sd_inner_to_outer_translation_map &&
680653
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
681-
const auto outer_id = map_str_id_to_outer_dict(elem,
682-
min_inner_elem,
683-
type_info.min_val,
684-
type_info.max_val,
685-
sd_inner_to_outer_translation_map);
654+
const auto outer_id =
655+
cpu_utils::map_str_id_to_outer_dict(elem,
656+
min_inner_elem,
657+
type_info.min_val,
658+
type_info.max_val,
659+
sd_inner_to_outer_translation_map);
686660
if (outer_id == StringDictionary::INVALID_STR_ID) {
687661
continue;
688662
}
@@ -873,11 +847,12 @@ DEVICE void fill_row_ids_impl(int32_t* buff,
873847
#ifndef __CUDACC__
874848
if (sd_inner_to_outer_translation_map &&
875849
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
876-
const auto outer_id = map_str_id_to_outer_dict(elem,
877-
min_inner_elem,
878-
type_info.min_val,
879-
type_info.max_val,
880-
sd_inner_to_outer_translation_map);
850+
const auto outer_id =
851+
cpu_utils::map_str_id_to_outer_dict(elem,
852+
min_inner_elem,
853+
type_info.min_val,
854+
type_info.max_val,
855+
sd_inner_to_outer_translation_map);
881856
if (outer_id == StringDictionary::INVALID_STR_ID) {
882857
continue;
883858
}

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "../../DecodersImpl.h"
3535
#else
3636
#include "../../RuntimeFunctions.h"
37+
#include "HashJoinRuntimeCpu.h"
3738
#endif
3839
#include "../../../QueryEngine/Compiler/CommonRuntimeDefs.h"
3940
#include "../../../Shared/funcannotations.h"
@@ -101,6 +102,7 @@ struct JoinChunk {
101102
const int8_t*
102103
col_buff; // actually from AbstractBuffer::getMemoryPtr() via Chunk_NS::Chunk
103104
size_t num_elems;
105+
size_t row_id;
104106
};
105107

106108
struct JoinColumn {

0 commit comments

Comments
 (0)