@@ -2985,82 +2985,169 @@ FetchResult Executor::fetchChunks(
2985
2985
std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
2986
2986
std::vector<std::vector<int64_t >> all_num_rows;
2987
2987
std::vector<std::vector<uint64_t >> all_frag_offsets;
2988
- for (const auto & selected_frag_ids : frag_ids_crossjoin) {
2989
- std::vector<const int8_t *> frag_col_buffers (
2990
- plan_state_->global_to_local_col_ids_ .size ());
2991
- for (const auto & col_id : col_global_ids) {
2992
- if (interrupted_.load ()) {
2993
- throw QueryExecutionError (ERR_INTERRUPTED);
2994
- }
2995
- CHECK (col_id);
2996
- if (col_id->isVirtual ()) {
2997
- continue ;
2998
- }
2999
- const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3000
- CHECK (fragments_it != all_tables_fragments.end ());
3001
- const auto fragments = fragments_it->second ;
3002
- auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3003
- CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3004
- CHECK_LT (static_cast <size_t >(it->second ),
3005
- plan_state_->global_to_local_col_ids_ .size ());
3006
- const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3007
- if (!fragments->size ()) {
3008
- return {};
3009
- }
3010
- auto memory_level_for_column = memory_level;
3011
- if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3012
- plan_state_->columns_to_fetch_ .end ()) {
3013
- memory_level_for_column = Data_Namespace::CPU_LEVEL;
3014
- }
3015
- if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3016
- // determine if we need special treatment to linearlize multi-frag table
3017
- // i.e., a column that is classified as varlen type, i.e., array
3018
- // for now, we can support more types in this way
3019
- if (needLinearizeAllFragments (
3020
- *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3021
- bool for_lazy_fetch = false ;
3022
- if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3023
- plan_state_->columns_to_not_fetch_ .end ()) {
3024
- for_lazy_fetch = true ;
3025
- VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3026
- << col_id->getColId () << " )" ;
2988
+ if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2989
+ std::mutex all_frag;
2990
+ tbb::task_arena limitedArena (16 );
2991
+ limitedArena.execute ([&]() {
2992
+ tbb::parallel_for_each (
2993
+ frag_ids_crossjoin.begin (),
2994
+ frag_ids_crossjoin.end (),
2995
+ [&](const std::vector<size_t >& selected_frag_ids) {
2996
+ // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2997
+ std::vector<const int8_t *> frag_col_buffers (
2998
+ plan_state_->global_to_local_col_ids_ .size ());
2999
+ for (const auto & col_id : col_global_ids) {
3000
+ if (interrupted_.load ()) {
3001
+ throw QueryExecutionError (ERR_INTERRUPTED);
3002
+ }
3003
+ CHECK (col_id);
3004
+ if (col_id->isVirtual ()) {
3005
+ continue ;
3006
+ }
3007
+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3008
+ CHECK (fragments_it != all_tables_fragments.end ());
3009
+ const auto fragments = fragments_it->second ;
3010
+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3011
+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3012
+ CHECK_LT (static_cast <size_t >(it->second ),
3013
+ plan_state_->global_to_local_col_ids_ .size ());
3014
+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3015
+ if (!fragments->size ()) {
3016
+ continue ;
3017
+ }
3018
+ auto memory_level_for_column = memory_level;
3019
+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3020
+ plan_state_->columns_to_fetch_ .end ()) {
3021
+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3022
+ }
3023
+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3024
+ // determine if we need special treatment to linearlize multi-frag table
3025
+ // i.e., a column that is classified as varlen type, i.e., array
3026
+ // for now, we can support more types in this way
3027
+ if (needLinearizeAllFragments (
3028
+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3029
+ bool for_lazy_fetch = false ;
3030
+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3031
+ plan_state_->columns_to_not_fetch_ .end ()) {
3032
+ for_lazy_fetch = true ;
3033
+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3034
+ << col_id->getColId () << " )" ;
3035
+ }
3036
+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3037
+ col_id->getColInfo (),
3038
+ all_tables_fragments,
3039
+ chunks,
3040
+ chunk_iterators,
3041
+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3042
+ for_lazy_fetch ? 0 : device_id,
3043
+ device_allocator,
3044
+ thread_idx);
3045
+ } else {
3046
+ frag_col_buffers[it->second ] =
3047
+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3048
+ all_tables_fragments,
3049
+ memory_level_for_column,
3050
+ device_id,
3051
+ device_allocator,
3052
+ thread_idx);
3053
+ }
3054
+ } else {
3055
+ frag_col_buffers[it->second ] =
3056
+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3057
+ frag_id,
3058
+ all_tables_fragments,
3059
+ chunks,
3060
+ chunk_iterators,
3061
+ memory_level_for_column,
3062
+ device_id,
3063
+ device_allocator);
3064
+ }
3065
+ }
3066
+ all_frag.lock ();
3067
+ all_frag_col_buffers.push_back (frag_col_buffers);
3068
+ all_frag.unlock ();
3069
+ });
3070
+ });
3071
+ } else {
3072
+ for (const auto & selected_frag_ids : frag_ids_crossjoin) {
3073
+ std::vector<const int8_t *> frag_col_buffers (
3074
+ plan_state_->global_to_local_col_ids_ .size ());
3075
+ for (const auto & col_id : col_global_ids) {
3076
+ if (interrupted_.load ()) {
3077
+ throw QueryExecutionError (ERR_INTERRUPTED);
3078
+ }
3079
+ CHECK (col_id);
3080
+ if (col_id->isVirtual ()) {
3081
+ continue ;
3082
+ }
3083
+ const auto fragments_it = all_tables_fragments.find (col_id->getTableRef ());
3084
+ CHECK (fragments_it != all_tables_fragments.end ());
3085
+ const auto fragments = fragments_it->second ;
3086
+ auto it = plan_state_->global_to_local_col_ids_ .find (*col_id);
3087
+ CHECK (it != plan_state_->global_to_local_col_ids_ .end ());
3088
+ CHECK_LT (static_cast <size_t >(it->second ),
3089
+ plan_state_->global_to_local_col_ids_ .size ());
3090
+ const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second ]];
3091
+ if (!fragments->size ()) {
3092
+ return {};
3093
+ }
3094
+ auto memory_level_for_column = memory_level;
3095
+ if (plan_state_->columns_to_fetch_ .find (*col_id) ==
3096
+ plan_state_->columns_to_fetch_ .end ()) {
3097
+ memory_level_for_column = Data_Namespace::CPU_LEVEL;
3098
+ }
3099
+ if (needFetchAllFragments (*col_id, ra_exe_unit, selected_fragments)) {
3100
+ // determine if we need special treatment to linearlize multi-frag table
3101
+ // i.e., a column that is classified as varlen type, i.e., array
3102
+ // for now, we can support more types in this way
3103
+ if (needLinearizeAllFragments (
3104
+ *col_id, ra_exe_unit, selected_fragments, memory_level)) {
3105
+ bool for_lazy_fetch = false ;
3106
+ if (plan_state_->columns_to_not_fetch_ .find (*col_id) !=
3107
+ plan_state_->columns_to_not_fetch_ .end ()) {
3108
+ for_lazy_fetch = true ;
3109
+ VLOG (2 ) << " Try to linearize lazy fetch column (col_id: "
3110
+ << col_id->getColId () << " )" ;
3111
+ }
3112
+ frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3113
+ col_id->getColInfo (),
3114
+ all_tables_fragments,
3115
+ chunks,
3116
+ chunk_iterators,
3117
+ for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3118
+ for_lazy_fetch ? 0 : device_id,
3119
+ device_allocator,
3120
+ thread_idx);
3121
+ } else {
3122
+ frag_col_buffers[it->second ] =
3123
+ column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3124
+ all_tables_fragments,
3125
+ memory_level_for_column,
3126
+ device_id,
3127
+ device_allocator,
3128
+ thread_idx);
3129
+ }
3130
+ } else {
3131
+ auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3132
+ frag_col_buffers[it->second ] =
3133
+ column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3134
+ frag_id,
3135
+ all_tables_fragments,
3136
+ chunks,
3137
+ chunk_iterators,
3138
+ memory_level_for_column,
3139
+ device_id,
3140
+ device_allocator);
3141
+ timer1.stop ();
3142
+ }
3143
+ }
3144
+ all_frag_col_buffers.push_back (frag_col_buffers);
3027
3145
}
3028
- frag_col_buffers[it->second ] = column_fetcher.linearizeColumnFragments (
3029
- col_id->getColInfo (),
3030
- all_tables_fragments,
3031
- chunks,
3032
- chunk_iterators,
3033
- for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3034
- for_lazy_fetch ? 0 : device_id,
3035
- device_allocator,
3036
- thread_idx);
3037
- } else {
3038
- frag_col_buffers[it->second ] =
3039
- column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3040
- all_tables_fragments,
3041
- memory_level_for_column,
3042
- device_id,
3043
- device_allocator,
3044
- thread_idx);
3045
- }
3046
- } else {
3047
- auto timer1 = DEBUG_TIMER (" getOneTableColumnFragment" );
3048
- frag_col_buffers[it->second ] =
3049
- column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3050
- frag_id,
3051
- all_tables_fragments,
3052
- chunks,
3053
- chunk_iterators,
3054
- memory_level_for_column,
3055
- device_id,
3056
- device_allocator);
3057
- timer1.stop ();
3058
- }
3059
- }
3060
- all_frag_col_buffers.push_back (frag_col_buffers);
3061
3146
}
3062
3147
std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
3063
3148
ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3149
+ CHECK_EQ (all_num_rows.size (), all_frag_col_buffers.size ());
3150
+ CHECK_EQ (all_frag_offsets.size (), all_frag_col_buffers.size ());
3064
3151
return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3065
3152
}
3066
3153
0 commit comments