@@ -2990,82 +2990,169 @@ FetchResult Executor::fetchChunks(
2990
2990
std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
2991
2991
std::vector<std::vector<int64_t >> all_num_rows;
2992
2992
std::vector<std::vector<uint64_t >> all_frag_offsets;
2993
- for (const auto & selected_frag_ids : frag_ids_crossjoin) {
2994
- std::vector<const int8_t *> frag_col_buffers (
2995
- plan_state_->global_to_local_col_ids_ .size ());
2996
- for (const auto & col_id : col_global_ids) {
2997
- if (interrupted_.load ()) {
2998
- throw QueryExecutionError (ERR_INTERRUPTED);
2999
- }
3000
- CHECK (col_id);
3001
- if (col_id->isVirtual ()) {
3002
- continue ;
3003
- }
3004
- const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3005
- CHECK (fragments_it != all_tables_fragments.end ());
3006
- const auto fragments = fragments_it->second ;
3007
- auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3008
- CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3009
- CHECK_LT (static_cast <size_t >(it->second ),
3010
- plan_state_->global_to_local_col_ids_ .size ());
3011
- const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3012
- if (!fragments->size ()) {
3013
- return {};
3014
- }
3015
- auto memory_level_for_column = memory_level;
3016
- if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3017
- plan_state_->columns_to_fetch_ .end ()) {
3018
- memory_level_for_column = Data_Namespace::CPU_LEVEL;
3019
- }
3020
- if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3021
- // determine if we need special treatment to linearlize multi-frag table
3022
- // i.e., a column that is classified as varlen type, i.e., array
3023
- // for now, we can support more types in this way
3024
- if (needLinearizeAllFragments (
3025
- *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3026
- bool for_lazy_fetch = false ;
3027
- if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3028
- plan_state_->columns_to_not_fetch_ .end ()) {
3029
- for_lazy_fetch = true ;
3030
- VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3031
- << col_id->getColId () << " )" ;
2993
+ if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2994
+ std::mutex all_frag;
2995
+ tbb::task_arena limitedArena (16 );
2996
+ limitedArena.execute ([&]() {
2997
+ tbb::parallel_for_each (
2998
+ frag_ids_crossjoin.begin (),
2999
+ frag_ids_crossjoin.end (),
3000
+ [&](const std::vector<size_t >& selected_frag_ids) {
3001
+ // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
3002
+ std::vector<const int8_t *> frag_col_buffers (
3003
+ plan_state_->global_to_local_col_ids_ .size ());
3004
+ for (const auto & col_id : col_global_ids) {
3005
+ if (interrupted_.load ()) {
3006
+ throw QueryExecutionError (ERR_INTERRUPTED);
3007
+ }
3008
+ CHECK (col_id);
3009
+ if (col_id->isVirtual ()) {
3010
+ continue ;
3011
+ }
3012
+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3013
+ CHECK (fragments_it != all_tables_fragments.end ());
3014
+ const auto fragments = fragments_it->second ;
3015
+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3016
+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3017
+ CHECK_LT (static_cast <size_t >(it->second ),
3018
+ plan_state_->global_to_local_col_ids_ .size ());
3019
+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3020
+ if (!fragments->size ()) {
3021
+ continue ;
3022
+ }
3023
+ auto memory_level_for_column = memory_level;
3024
+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3025
+ plan_state_->columns_to_fetch_ .end ()) {
3026
+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3027
+ }
3028
+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3029
+ // determine if we need special treatment to linearlize multi-frag table
3030
+ // i.e., a column that is classified as varlen type, i.e., array
3031
+ // for now, we can support more types in this way
3032
+ if (needLinearizeAllFragments (
3033
+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3034
+ bool for_lazy_fetch = false ;
3035
+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3036
+ plan_state_->columns_to_not_fetch_ .end ()) {
3037
+ for_lazy_fetch = true ;
3038
+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3039
+ << col_id->getColId () << " )" ;
3040
+ }
3041
+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3042
+ col_id->getColInfo (),
3043
+ all_tables_fragments,
3044
+ chunks,
3045
+ chunk_iterators,
3046
+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3047
+ for_lazy_fetch ? 0 : device_id,
3048
+ device_allocator,
3049
+ thread_idx);
3050
+ } else {
3051
+ frag_col_buffers[it->second ] =
3052
+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3053
+ all_tables_fragments,
3054
+ memory_level_for_column,
3055
+ device_id,
3056
+ device_allocator,
3057
+ thread_idx);
3058
+ }
3059
+ } else {
3060
+ frag_col_buffers[it->second ] =
3061
+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3062
+ frag_id,
3063
+ all_tables_fragments,
3064
+ chunks,
3065
+ chunk_iterators,
3066
+ memory_level_for_column,
3067
+ device_id,
3068
+ device_allocator);
3069
+ }
3070
+ }
3071
+ all_frag.lock ();
3072
+ all_frag_col_buffers.push_back (frag_col_buffers);
3073
+ all_frag.unlock ();
3074
+ });
3075
+ });
3076
+ } else {
3077
+ for (const auto & selected_frag_ids : frag_ids_crossjoin) {
3078
+ std::vector<const int8_t *> frag_col_buffers (
3079
+ plan_state_->global_to_local_col_ids_ .size ());
3080
+ for (const auto & col_id : col_global_ids) {
3081
+ if (interrupted_.load ()) {
3082
+ throw QueryExecutionError (ERR_INTERRUPTED);
3083
+ }
3084
+ CHECK (col_id);
3085
+ if (col_id->isVirtual ()) {
3086
+ continue ;
3087
+ }
3088
+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3089
+ CHECK (fragments_it != all_tables_fragments.end ());
3090
+ const auto fragments = fragments_it->second ;
3091
+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3092
+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3093
+ CHECK_LT (static_cast <size_t >(it->second ),
3094
+ plan_state_->global_to_local_col_ids_ .size ());
3095
+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3096
+ if (!fragments->size ()) {
3097
+ return {};
3098
+ }
3099
+ auto memory_level_for_column = memory_level;
3100
+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3101
+ plan_state_->columns_to_fetch_ .end ()) {
3102
+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3103
+ }
3104
+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3105
+ // determine if we need special treatment to linearlize multi-frag table
3106
+ // i.e., a column that is classified as varlen type, i.e., array
3107
+ // for now, we can support more types in this way
3108
+ if (needLinearizeAllFragments (
3109
+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3110
+ bool for_lazy_fetch = false ;
3111
+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3112
+ plan_state_->columns_to_not_fetch_ .end ()) {
3113
+ for_lazy_fetch = true ;
3114
+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3115
+ << col_id->getColId () << " )" ;
3116
+ }
3117
+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3118
+ col_id->getColInfo (),
3119
+ all_tables_fragments,
3120
+ chunks,
3121
+ chunk_iterators,
3122
+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3123
+ for_lazy_fetch ? 0 : device_id,
3124
+ device_allocator,
3125
+ thread_idx);
3126
+ } else {
3127
+ frag_col_buffers[it->second ] =
3128
+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3129
+ all_tables_fragments,
3130
+ memory_level_for_column,
3131
+ device_id,
3132
+ device_allocator,
3133
+ thread_idx);
3134
+ }
3135
+ } else {
3136
+ auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3137
+ frag_col_buffers[it->second ] =
3138
+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3139
+ frag_id,
3140
+ all_tables_fragments,
3141
+ chunks,
3142
+ chunk_iterators,
3143
+ memory_level_for_column,
3144
+ device_id,
3145
+ device_allocator);
3146
+ timer1.stop ();
3147
+ }
3148
+ }
3149
+ all_frag_col_buffers.push_back (frag_col_buffers);
3032
3150
}
3033
- frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3034
- col_id->getColInfo (),
3035
- all_tables_fragments,
3036
- chunks,
3037
- chunk_iterators,
3038
- for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3039
- for_lazy_fetch ? 0 : device_id,
3040
- device_allocator,
3041
- thread_idx);
3042
- } else {
3043
- frag_col_buffers[it->second ] =
3044
- column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3045
- all_tables_fragments,
3046
- memory_level_for_column,
3047
- device_id,
3048
- device_allocator,
3049
- thread_idx);
3050
- }
3051
- } else {
3052
- auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3053
- frag_col_buffers[it->second ] =
3054
- column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3055
- frag_id,
3056
- all_tables_fragments,
3057
- chunks,
3058
- chunk_iterators,
3059
- memory_level_for_column,
3060
- device_id,
3061
- device_allocator);
3062
- timer1.stop ();
3063
- }
3064
- }
3065
- all_frag_col_buffers.push_back (frag_col_buffers);
3066
3151
}
3067
3152
std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
3068
3153
ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3154
+ CHECK_EQ (all_num_rows.size (), all_frag_col_buffers.size ());
3155
+ CHECK_EQ (all_frag_offsets.size (), all_frag_col_buffers.size ());
3069
3156
return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3070
3157
}
3071
3158
0 commit comments