@@ -372,102 +372,125 @@ DEVICE int SUFFIX(fill_hash_join_buff_bucketized_cpu)(
372
372
// << join_column.col_chunks_buff_sz << " num elems: " <<
373
373
// join_column.num_elems
374
374
// << " num_chunks: " << join_column.num_chunks;
375
- for (size_t chunk_i = 0 ; chunk_i < join_column.num_chunks ; chunk_i++) {
376
- // wtf 1 chunk, but 0 elements.
377
- if (join_column.num_elems == 0 ) {
378
- break ;
379
- }
380
- auto curr_chunk = join_chunk_array[chunk_i];
381
- for (size_t elem_i = 0 ; elem_i < curr_chunk.num_elems ; elem_i++) {
382
- chunk_mem_ptr = curr_chunk.col_buff ;
383
-
384
- // char line[1024];
385
- // snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
386
- // LOG(ERROR) << "initOneToOneHashTableOnCpu " << line
387
- // << " type: " << type_info.column_type << " index in chunk: " << elem_i
388
- // << " elem_sz: " << type_info.elem_sz
389
- // << " invalid_slot_val: " << hash_join_invalid_val;
390
-
391
- int64_t elem = 0 ;
392
- switch (type_info.column_type ) {
393
- case SmallDate: {
394
- // LOG(ERROR) << "smallDate";
395
- elem = fixed_width_small_date_decode_noinline (
396
- chunk_mem_ptr,
397
- type_info.elem_sz ,
398
- type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
399
- type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
400
- elem_i);
401
- break ;
402
- }
403
- case Signed: {
404
- // char line[1024];
405
- // snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
406
- // LOG(ERROR) << "Should call fixed_width_int_decode_noinline: " << line
407
- // << " elem_i: " << elem_i << " without func: ";
408
-
409
- // LOG(ERROR) << "int32_cast: "
410
- // << *(reinterpret_cast<const int32_t*>(
411
- // &chunk_mem_ptr[elem_i * type_info.elem_sz]));
412
- elem =
413
- fixed_width_int_decode_noinline (chunk_mem_ptr, type_info.elem_sz , elem_i);
414
- break ;
415
- }
416
- case Unsigned: {
417
- // LOG(ERROR) << "unsigned";
418
- elem = fixed_width_unsigned_decode_noinline (
419
- chunk_mem_ptr, type_info.elem_sz , elem_i);
420
- break ;
421
- }
422
- case Double: {
423
- // LOG(ERROR) << "double";
424
- elem = fixed_width_double_decode_noinline (chunk_mem_ptr, elem_i);
425
- break ;
426
- }
427
- default : {
428
- // LOG(ERROR) << "default";
429
- assert (0 );
430
- }
431
- }
432
375
433
- if (elem == type_info.null_val ) {
434
- // LOG(ERROR) << "null elem";
435
- if (type_info.uses_bw_eq ) {
436
- elem = type_info.translated_null_val ;
437
- } else {
438
- // LOG(ERROR) << "cont: elem_i - " << elem_i << " chunk_i - " << chunk_i;
439
- break ;
440
- }
441
- }
442
- if (sd_inner_to_outer_translation_map &&
443
- (!type_info.uses_bw_eq || elem != type_info.translated_null_val )) {
444
- const auto outer_id = map_str_id_to_outer_dict (elem,
445
- min_inner_elem,
446
- type_info.min_val ,
447
- type_info.max_val ,
448
- sd_inner_to_outer_translation_map);
449
- if (outer_id == StringDictionary::INVALID_STR_ID) {
450
- break ;
376
+ tbb::parallel_for (
377
+ tbb::blocked_range<size_t >(0 , join_column.num_chunks ),
378
+ [&](const tbb::blocked_range<size_t >& join_chunks_range) {
379
+ for (size_t chunk_i = join_chunks_range.begin ();
380
+ chunk_i != join_chunks_range.end ();
381
+ chunk_i++) {
382
+ // wtf 1 chunk, but 0 elements.
383
+ if (join_column.num_elems == 0 ) {
384
+ break ;
385
+ }
386
+ auto curr_chunk = join_chunk_array[chunk_i];
387
+ // granularity be fit in cpu cache
388
+ // should be L2 size, for optimal distribution
389
+ // in general it's from 256Kb to 32 Mb
390
+ size_t cache_size = 512000 ;
391
+ size_t granularity = cache_size / type_info.elem_sz ;
392
+ tbb::parallel_for (
393
+ tbb::blocked_range<size_t >(0 , curr_chunk.num_elems , granularity),
394
+ [&](const tbb::blocked_range<size_t >& curr_chnunk_elems_range) {
395
+ for (size_t elem_i = curr_chnunk_elems_range.begin ();
396
+ elem_i != curr_chnunk_elems_range.end ();
397
+ elem_i++) {
398
+ chunk_mem_ptr = curr_chunk.col_buff ;
399
+
400
+ // char line[1024];
401
+ // snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
402
+ // LOG(ERROR) << "initOneToOneHashTableOnCpu " << line
403
+ // << " type: " << type_info.column_type << " index in chunk:
404
+ // " << elem_i
405
+ // << " elem_sz: " << type_info.elem_sz
406
+ // << " invalid_slot_val: " << hash_join_invalid_val;
407
+
408
+ int64_t elem = 0 ;
409
+ switch (type_info.column_type ) {
410
+ case SmallDate: {
411
+ // LOG(ERROR) << "smallDate";
412
+ elem = fixed_width_small_date_decode_noinline (
413
+ chunk_mem_ptr,
414
+ type_info.elem_sz ,
415
+ type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
416
+ type_info.elem_sz == 4 ? NULL_INT : NULL_SMALLINT,
417
+ elem_i);
418
+ break ;
419
+ }
420
+ case Signed: {
421
+ // char line[1024];
422
+ // snprintf(line, sizeof(line), " ptr: %p", chunk_mem_ptr);
423
+ // LOG(ERROR) << "Should call fixed_width_int_decode_noinline: " <<
424
+ // line
425
+ // << " elem_i: " << elem_i << " without func: ";
426
+
427
+ // LOG(ERROR) << "int32_cast: "
428
+ // << *(reinterpret_cast<const int32_t*>(
429
+ // &chunk_mem_ptr[elem_i * type_info.elem_sz]));
430
+ elem = fixed_width_int_decode_noinline (
431
+ chunk_mem_ptr, type_info.elem_sz , elem_i);
432
+ break ;
433
+ }
434
+ case Unsigned: {
435
+ // LOG(ERROR) << "unsigned";
436
+ elem = fixed_width_unsigned_decode_noinline (
437
+ chunk_mem_ptr, type_info.elem_sz , elem_i);
438
+ break ;
439
+ }
440
+ case Double: {
441
+ // LOG(ERROR) << "double";
442
+ elem = fixed_width_double_decode_noinline (chunk_mem_ptr, elem_i);
443
+ break ;
444
+ }
445
+ default : {
446
+ // LOG(ERROR) << "default";
447
+ assert (0 );
448
+ }
449
+ }
450
+
451
+ if (elem == type_info.null_val ) {
452
+ // LOG(ERROR) << "null elem";
453
+ if (type_info.uses_bw_eq ) {
454
+ elem = type_info.translated_null_val ;
455
+ } else {
456
+ // LOG(ERROR) << "cont: elem_i - " << elem_i << " chunk_i - " <<
457
+ // chunk_i;
458
+ break ;
459
+ }
460
+ }
461
+ if (sd_inner_to_outer_translation_map &&
462
+ (!type_info.uses_bw_eq || elem != type_info.translated_null_val )) {
463
+ const auto outer_id =
464
+ map_str_id_to_outer_dict (elem,
465
+ min_inner_elem,
466
+ type_info.min_val ,
467
+ type_info.max_val ,
468
+ sd_inner_to_outer_translation_map);
469
+ if (outer_id == StringDictionary::INVALID_STR_ID) {
470
+ break ;
471
+ }
472
+ elem = outer_id;
473
+ }
474
+
475
+ // LOG(ERROR) << "initOneToOneHashTableOnCpu elem: " << elem
476
+ // << " index: " << global_elem_index << " chunk idx: " <<
477
+ // chunk_i
478
+ // << " el_i: " << elem_i;
479
+
480
+ if (hashtable_filling_func (elem, global_elem_index)) {
481
+ partial_err = -1 ;
482
+ }
483
+
484
+ global_elem_index++;
485
+ if (partial_err != 0 ) {
486
+ // LOG(ERROR) << "error here! " << partial_err;
487
+ return partial_err;
488
+ }
489
+ partial_err = 0 ;
490
+ }
491
+ });
451
492
}
452
- elem = outer_id;
453
- }
454
-
455
- // LOG(ERROR) << "initOneToOneHashTableOnCpu elem: " << elem
456
- // << " index: " << global_elem_index << " chunk idx: " << chunk_i
457
- // << " el_i: " << elem_i;
458
-
459
- if (hashtable_filling_func (elem, global_elem_index)) {
460
- partial_err = -1 ;
461
- }
462
-
463
- global_elem_index++;
464
- if (partial_err != 0 ) {
465
- // LOG(ERROR) << "error here! " << partial_err;
466
- return partial_err;
467
- }
468
- partial_err = 0 ;
469
- }
470
- }
493
+ });
471
494
return 0 ;
472
495
}
473
496
#endif
0 commit comments