@@ -350,7 +350,6 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
350
350
const int32_t * sd_inner_to_outer_translation_map,
351
351
const int32_t min_inner_elem,
352
352
const int64_t bucket_normalization) {
353
- int partial_err = 0 ;
354
353
auto filling_func = for_semi_join ? SUFFIX (fill_hashtable_for_semi_join)
355
354
: SUFFIX (fill_one_to_one_hashtable);
356
355
auto hashtable_filling_func = [&](int64_t elem, size_t index) {
@@ -366,13 +365,14 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
366
365
reinterpret_cast <const struct JoinChunk *>(join_column.col_chunks_buff );
367
366
// BTW it's vector with sz:
368
367
// join_column.num_chunks
369
- const int8_t * chunk_mem_ptr = join_chunk_array->col_buff ;
370
- size_t global_elem_index = 0 ;
368
+ // const int8_t* chunk_mem_ptr = join_chunk_array->col_buff;
369
+
371
370
// LOG(ERROR) << "fill_hash_join_buff_cpu chunk buff size: "
372
371
// << join_column.col_chunks_buff_sz << " num elems: " <<
373
372
// join_column.num_elems
374
373
// << " num_chunks: " << join_column.num_chunks;
375
-
374
+ std::atomic<int > err{0 };
375
+ std::atomic<size_t > global_elem_index{0 };
376
376
tbb::parallel_for (
377
377
tbb::blocked_range<size_t >(0 , join_column.num_chunks ),
378
378
[&](const tbb::blocked_range<size_t >& join_chunks_range) {
@@ -395,7 +395,8 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
395
395
for (size_t elem_i = curr_chnunk_elems_range.begin ();
396
396
elem_i != curr_chnunk_elems_range.end ();
397
397
elem_i++) {
398
- chunk_mem_ptr = curr_chunk.col_buff ;
398
+ int partial_err = 0 ;
399
+ const int8_t * chunk_mem_ptr = curr_chunk.col_buff ;
399
400
400
401
// char line[1024];
401
402
// snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
@@ -477,20 +478,28 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
477
478
// chunk_i
478
479
// << " el_i: " << elem_i;
479
480
480
- if (hashtable_filling_func (elem, global_elem_index)) {
481
+ if (hashtable_filling_func (elem, global_elem_index. load () )) {
481
482
partial_err = -1 ;
482
483
}
483
484
484
485
global_elem_index++;
485
486
if (partial_err != 0 ) {
486
487
// LOG(ERROR) << "error here! " << partial_err;
487
- return partial_err;
488
+ int zero{0 };
489
+ err.compare_exchange_strong (zero, partial_err);
490
+ break ;
488
491
}
489
492
partial_err = 0 ;
490
493
}
491
494
});
495
+ if (err) {
496
+ break ;
497
+ }
492
498
}
493
499
});
500
+ if (err) {
501
+ return -1 ;
502
+ }
494
503
return 0 ;
495
504
}
496
505
#endif
0 commit comments