Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 037e885

Browse files
committed
single threaded simple version
1 parent 99e0098 commit 037e885

File tree

6 files changed

+202
-55
lines changed

6 files changed

+202
-55
lines changed

omniscidb/QueryEngine/ColumnFetcher.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -284,15 +284,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
284284
const auto& type_width = col_info->type->size();
285285
auto write_ptr =
286286
executor_->row_set_mem_owner_->allocate(type_width * total_row_count);
287-
tbb::concurrent_vector<std::pair<int8_t*, size_t>> write_ptrs;
287+
std::vector<std::pair<int8_t*, size_t>> write_ptrs;
288+
std::vector<size_t> valid_fragments;
288289
for (size_t frag_id = 0; frag_id < frag_count; ++frag_id) {
289290
const auto& fragment = (*fragments)[frag_id];
290-
if (!fragment.getNumTuples()) {
291+
if (fragment.isEmptyPhysicalFragment()) {
291292
continue;
292293
}
293294
CHECK_EQ(type_width, fragment.getChunkMetadataMap().at(col_id)->type()->size());
294295
write_ptrs.push_back({write_ptr, fragment.getNumTuples() * type_width});
295296
write_ptr += fragment.getNumTuples() * type_width;
297+
valid_fragments.push_back(frag_id);
296298
}
297299

298300
if (write_ptrs.empty()) {
@@ -306,17 +308,16 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
306308
table_column, 0, memory_level, device_id, device_allocator);
307309
}
308310

309-
CHECK_EQ(frag_count, write_ptrs.size());
311+
size_t valid_frag_count = valid_fragments.size();
310312
tbb::parallel_for(
311-
tbb::blocked_range<size_t>(0, frag_count),
313+
tbb::blocked_range<size_t>(0, valid_frag_count),
312314
[&](const tbb::blocked_range<size_t>& frag_ids) {
313-
for (size_t frag_id = frag_ids.begin(); frag_id < frag_ids.end(); ++frag_id) {
315+
for (size_t v_frag_id = frag_ids.begin(); v_frag_id < frag_ids.end();
316+
++v_frag_id) {
314317
std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
315318
std::list<ChunkIter> chunk_iter_holder;
319+
size_t frag_id = valid_fragments[v_frag_id];
316320
const auto& fragment = (*fragments)[frag_id];
317-
if (fragment.isEmptyPhysicalFragment()) {
318-
continue;
319-
}
320321
auto chunk_meta_it = fragment.getChunkMetadataMap().find(col_id);
321322
CHECK(chunk_meta_it != fragment.getChunkMetadataMap().end());
322323
std::shared_ptr<Chunk_NS::Chunk> chunk;

omniscidb/QueryEngine/JoinHashTable/Builders/PerfectHashTableBuilder.h

Lines changed: 30 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ class PerfectJoinHashTableBuilder {
166166
0);
167167

168168
auto cpu_hash_table_buff = reinterpret_cast<int32_t*>(hash_table_->getCpuBuffer());
169-
const int thread_count = cpu_threads();
170-
std::vector<std::thread> init_cpu_buff_threads;
169+
// const int thread_count = cpu_threads();
170+
// std::vector<std::thread> init_cpu_buff_threads;
171171

172172
{
173173
auto timer_init = DEBUG_TIMER("CPU One-To-One Perfect-Hash: init_hash_join_buff");
@@ -176,54 +176,39 @@ class PerfectJoinHashTableBuilder {
176176
hash_join_invalid_val);
177177
}
178178
const bool for_semi_join = for_semi_anti_join(join_type);
179-
std::atomic<int> err{0};
179+
// std::atomic<int> err{0};
180180
{
181181
auto timer_fill =
182182
DEBUG_TIMER("CPU One-To-One Perfect-Hash: fill_hash_join_buff_bucketized");
183-
for (int thread_idx = 0; thread_idx < thread_count; ++thread_idx) {
184-
init_cpu_buff_threads.emplace_back([hash_join_invalid_val,
185-
&join_column,
186-
str_proxy_translation_map,
187-
thread_idx,
188-
thread_count,
189-
type,
190-
&err,
191-
&col_range,
192-
&is_bitwise_eq,
193-
&for_semi_join,
194-
cpu_hash_table_buff,
195-
hash_entry_info] {
196-
int partial_err = fill_hash_join_buff_bucketized(
197-
cpu_hash_table_buff,
198-
hash_join_invalid_val,
199-
for_semi_join,
200-
join_column,
201-
{static_cast<size_t>(type->size()),
202-
col_range.getIntMin(),
203-
col_range.getIntMax(),
204-
inline_fixed_encoding_null_value(type),
205-
is_bitwise_eq,
206-
col_range.getIntMax() + 1,
207-
get_join_column_type_kind(type)},
208-
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
209-
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
210-
: 0, // 0 is dummy value
211-
thread_idx,
212-
thread_count,
213-
hash_entry_info.bucket_normalization);
214-
int zero{0};
215-
err.compare_exchange_strong(zero, partial_err);
216-
});
217-
}
218-
for (auto& t : init_cpu_buff_threads) {
219-
t.join();
183+
184+
{
185+
// int partial_err = 0;
186+
JoinColumnTypeInfo type_info{static_cast<size_t>(type->size()),
187+
col_range.getIntMin(),
188+
col_range.getIntMax(),
189+
inline_fixed_encoding_null_value(type),
190+
is_bitwise_eq,
191+
col_range.getIntMax() + 1,
192+
get_join_column_type_kind(type)};
193+
194+
int error = fill_hash_join_buff_bucketized_cpu(
195+
cpu_hash_table_buff,
196+
hash_join_invalid_val,
197+
for_semi_join,
198+
join_column,
199+
type_info,
200+
str_proxy_translation_map ? str_proxy_translation_map->data() : nullptr,
201+
str_proxy_translation_map ? str_proxy_translation_map->domainStart()
202+
: 0, // 0 is dummy value
203+
hash_entry_info.bucket_normalization);
204+
if (error) {
205+
// Too many hash entries, need to retry with a 1:many table
206+
hash_table_ = nullptr; // clear the hash table buffer
207+
// LOG(ERROR) << "cpu hash_tabel error: " << error;
208+
throw NeedsOneToManyHash();
209+
}
220210
}
221211
}
222-
if (err) {
223-
// Too many hash entries, need to retry with a 1:many table
224-
hash_table_ = nullptr; // clear the hash table buffer
225-
throw NeedsOneToManyHash();
226-
}
227212
}
228213

229214
void initOneToManyHashTableOnCpu(

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.cpp

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,14 +261,25 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
261261
int32_t start = cpu_thread_idx;
262262
int32_t step = cpu_thread_count;
263263
#endif
264+
// LOG(ERROR) << "fill_hash_join_buff_impl func cur th idx: " << cpu_thread_idx
265+
// << " count: " << cpu_thread_count
266+
// << " get th index: " << std::this_thread::get_id()
267+
// << " chunk buff size: " << join_column.col_chunks_buff_sz
268+
// << " num elems: " << join_column.num_elems
269+
// << " num_chunks: " << join_column.num_chunks;
270+
// INJECT_TIMER(fill_hash_join_buff_impl);
264271
JoinColumnTyped col{&join_column, &type_info};
265272
for (auto item : col.slice(start, step)) {
273+
// LOG(ERROR) << "items: " << item.index;
266274
const size_t index = item.index;
267275
int64_t elem = item.element;
268276
if (elem == type_info.null_val) {
277+
// LOG(ERROR) << "null val";
269278
if (type_info.uses_bw_eq) {
270279
elem = type_info.translated_null_val;
271280
} else {
281+
// LOG(ERROR) << "initOneToOneHashTableOnCpu(Threaded) cont elem: " << elem
282+
// << " index: " << index;
272283
continue;
273284
}
274285
}
@@ -286,7 +297,13 @@ DEVICE auto fill_hash_join_buff_impl(int32_t* buff,
286297
elem = outer_id;
287298
}
288299
#endif
300+
// char line[1024];
301+
// snprintf(line, sizeof(line), " entry ptr: %p", buff);
302+
// LOG(ERROR) << "initOneToOneHashTableOnCpu(Threaded) elem: " << elem
303+
// << " index: " << index << " invalid_slot_val: " << invalid_slot_val
304+
// << line;
289305
if (filling_func(elem, index)) {
306+
// LOG(ERROR) << "errr";
290307
return -1;
291308
}
292309
}
@@ -323,6 +340,138 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized)(
323340
hashtable_filling_func);
324341
}
325342

343+
#ifndef __CUDACC__
344+
DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
345+
int32_t* cpu_hash_table_buff,
346+
const int32_t hash_join_invalid_val,
347+
const bool for_semi_join,
348+
const JoinColumn& join_column,
349+
const JoinColumnTypeInfo& type_info,
350+
const int32_t* sd_inner_to_outer_translation_map,
351+
const int32_t min_inner_elem,
352+
const int64_t bucket_normalization) {
353+
int partial_err = 0;
354+
auto filling_func = for_semi_join ? SUFFIX(fill_hashtable_for_semi_join)
355+
: SUFFIX(fill_one_to_one_hashtable);
356+
auto hashtable_filling_func = [&](int64_t elem, size_t index) {
357+
auto entry_ptr = SUFFIX(get_bucketized_hash_slot)(
358+
cpu_hash_table_buff, elem, type_info.min_val, bucket_normalization);
359+
// LOG(ERROR) << "filling index: " << index << " elem: " << elem
360+
// << " entry_ptr: " << entry_ptr;
361+
return filling_func(index, entry_ptr, hash_join_invalid_val);
362+
};
363+
364+
// for some stupid reason int8* ptr is actually JoinChunk* Why?
365+
auto join_chunk_array =
366+
reinterpret_cast<const struct JoinChunk*>(join_column.col_chunks_buff);
367+
// BTW it's vector with sz:
368+
// join_column.num_chunks
369+
const int8_t* chunk_mem_ptr = join_chunk_array->col_buff;
370+
size_t global_elem_index = 0;
371+
// LOG(ERROR) << "fill_hash_join_buff_cpu chunk buff size: "
372+
// << join_column.col_chunks_buff_sz << " num elems: " <<
373+
// join_column.num_elems
374+
// << " num_chunks: " << join_column.num_chunks;
375+
for (size_t chunk_i = 0; chunk_i < join_column.num_chunks; chunk_i++) {
376+
// wtf 1 chunk, but 0 elements.
377+
if (join_column.num_elems == 0) {
378+
break;
379+
}
380+
auto curr_chunk = join_chunk_array[chunk_i];
381+
for (size_t elem_i = 0; elem_i < curr_chunk.num_elems; elem_i++) {
382+
chunk_mem_ptr = curr_chunk.col_buff;
383+
384+
// char line[1024];
385+
// snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
386+
// LOG(ERROR) << "initOneToOneHashTableOnCpu " << line
387+
// << " type: " << type_info.column_type << " index in chunk: " << elem_i
388+
// << " elem_sz: " << type_info.elem_sz
389+
// << " invalid_slot_val: " << hash_join_invalid_val;
390+
391+
int64_t elem = 0;
392+
switch (type_info.column_type) {
393+
case SmallDate: {
394+
// LOG(ERROR) << "smallDate";
395+
elem = fixed_width_small_date_decode_noinline(
396+
chunk_mem_ptr,
397+
type_info.elem_sz,
398+
type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
399+
type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
400+
elem_i);
401+
break;
402+
}
403+
case Signed: {
404+
// char line[1024];
405+
// snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
406+
// LOG(ERROR) << "Should call fixed_width_int_decode_noinline: " << line
407+
// << " elem_i: " << elem_i << " without func: ";
408+
409+
// LOG(ERROR) << "int32_cast: "
410+
// << *(reinterpret_cast<const int32_t*>(
411+
// &chunk_mem_ptr[elem_i * type_info.elem_sz]));
412+
elem =
413+
fixed_width_int_decode_noinline(chunk_mem_ptr, type_info.elem_sz, elem_i);
414+
break;
415+
}
416+
case Unsigned: {
417+
// LOG(ERROR) << "unsigned";
418+
elem = fixed_width_unsigned_decode_noinline(
419+
chunk_mem_ptr, type_info.elem_sz, elem_i);
420+
break;
421+
}
422+
case Double: {
423+
// LOG(ERROR) << "double";
424+
elem = fixed_width_double_decode_noinline(chunk_mem_ptr, elem_i);
425+
break;
426+
}
427+
default: {
428+
// LOG(ERROR) << "default";
429+
assert(0);
430+
}
431+
}
432+
433+
if (elem == type_info.null_val) {
434+
// LOG(ERROR) << "null elem";
435+
if (type_info.uses_bw_eq) {
436+
elem = type_info.translated_null_val;
437+
} else {
438+
// LOG(ERROR) << "cont: elem_i - " << elem_i << " chunk_i - " << chunk_i;
439+
break;
440+
}
441+
}
442+
if (sd_inner_to_outer_translation_map &&
443+
(!type_info.uses_bw_eq || elem != type_info.translated_null_val)) {
444+
const auto outer_id = map_str_id_to_outer_dict(elem,
445+
min_inner_elem,
446+
type_info.min_val,
447+
type_info.max_val,
448+
sd_inner_to_outer_translation_map);
449+
if (outer_id == StringDictionary::INVALID_STR_ID) {
450+
break;
451+
}
452+
elem = outer_id;
453+
}
454+
455+
// LOG(ERROR) << "initOneToOneHashTableOnCpu elem: " << elem
456+
// << " index: " << global_elem_index << " chunk idx: " << chunk_i
457+
// << " el_i: " << elem_i;
458+
459+
if (hashtable_filling_func(elem, global_elem_index)) {
460+
partial_err = -1;
461+
}
462+
463+
global_elem_index++;
464+
if (partial_err != 0) {
465+
// LOG(ERROR) << "error here! " << partial_err;
466+
return partial_err;
467+
}
468+
partial_err = 0;
469+
}
470+
}
471+
return 0;
472+
}
473+
#endif
474+
326475
DEVICE int SUFFIX(fill_hash_join_buff)(int32_t* buff,
327476
const int32_t invalid_slot_val,
328477
const bool for_semi_join,

omniscidb/QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,17 @@ int fill_hash_join_buff_bucketized(int32_t* buff,
146146
const int32_t cpu_thread_count,
147147
const int64_t bucket_normalization);
148148

149+
#ifndef __CUDACC_
150+
int fill_hash_join_buff_bucketized_cpu(int32_t* cpu_hash_table_buff,
151+
const int32_t hash_join_invalid_val,
152+
const bool for_semi_join,
153+
const JoinColumn& join_column,
154+
const JoinColumnTypeInfo& type_info,
155+
const int32_t* sd_inner_to_outer_translation_map,
156+
const int32_t min_inner_elem,
157+
const int64_t bucket_normalization);
158+
#endif
159+
149160
int fill_hash_join_buff(int32_t* buff,
150161
const int32_t invalid_slot_val,
151162
const bool for_semi_join,

omniscidb/QueryEngine/JoinHashTable/Runtime/JoinColumnIterator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct JoinColumnIterator {
7373
DEVICE FORCE_INLINE JoinColumnIterator& operator++() {
7474
index += step;
7575
index_inside_chunk += step;
76+
// this stuff is made to find index_of_chunk by total index of element
7677
while (chunk_data &&
7778
index_inside_chunk >= join_chunk_array[index_of_chunk].num_elems) {
7879
index_inside_chunk -= join_chunk_array[index_of_chunk].num_elems;

omniscidb/Shared/thread_count.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616

1717
#pragma once
1818

19-
#include "Shared/funcannotations.h"
20-
2119
#include <algorithm>
2220
#include <thread>
21+
#include "Logger/Logger.h"
22+
#include "Shared/funcannotations.h"
2323

2424
#ifndef SHARED_EXPORT
2525
#define SHARED_EXPORT EXTERN

0 commit comments

Comments
 (0)