@@ -2871,7 +2871,7 @@ std::map<size_t, std::vector<uint64_t>> get_table_id_to_frag_offsets(
2871
2871
std::pair<std::vector<std::vector<int64_t >>, std::vector<std::vector<uint64_t >>>
2872
2872
Executor::getRowCountAndOffsetForAllFrags (
2873
2873
const RelAlgExecutionUnit& ra_exe_unit,
2874
- const CartesianProduct< std::vector<std::vector<size_t > >>& frag_ids_crossjoin,
2874
+ const std::vector<std::vector<size_t >>& frag_ids_crossjoin,
2875
2875
const std::vector<InputDescriptor>& input_descs,
2876
2876
const std::map<TableRef, const TableFragments*>& all_tables_fragments) {
2877
2877
std::vector<std::vector<int64_t >> all_num_rows;
@@ -2947,6 +2947,8 @@ bool Executor::needLinearizeAllFragments(
2947
2947
const auto & fragments = selected_fragments[nest_level].fragment_ids ;
2948
2948
auto need_linearize =
2949
2949
inner_col_desc.type ()->isArray () || inner_col_desc.type ()->isString ();
2950
+ LOG (INFO) << inner_col_desc.type ()->isArray () << " || "
2951
+ << inner_col_desc.type ()->isString () << " ) && " << fragments.size () << " > 1" ;
2950
2952
return need_linearize && fragments.size () > 1 ;
2951
2953
}
2952
2954
@@ -2984,6 +2986,9 @@ FetchResult Executor::fetchChunks(
2984
2986
std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
2985
2987
std::vector<std::vector<int64_t >> all_num_rows;
2986
2988
std::vector<std::vector<uint64_t >> all_frag_offsets;
2989
+
2990
+ // in MT case we want to preserve "the order of insertion" into all_frag_col_buffers
2991
+ std::vector<std::vector<size_t >> selected_frag_ids_vec;
2987
2992
if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2988
2993
std::mutex all_frag;
2989
2994
std::atomic<bool > empty_frags{false };
@@ -2993,7 +2998,6 @@ FetchResult Executor::fetchChunks(
2993
2998
frag_ids_crossjoin.begin (),
2994
2999
frag_ids_crossjoin.end (),
2995
3000
[&](const std::vector<size_t >& selected_frag_ids) {
2996
- // for (const auto& selected_frag_ids : frag_ids_crossjoin) {
2997
3001
std::vector<const int8_t *> frag_col_buffers (
2998
3002
plan_state_->global_to_local_col_ids_ .size ());
2999
3003
for (const auto & col_id : col_global_ids) {
@@ -3041,16 +3045,15 @@ FetchResult Executor::fetchChunks(
3041
3045
chunk_iterators,
3042
3046
for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3043
3047
for_lazy_fetch ? 0 : device_id,
3044
- device_allocator,
3045
- thread_idx);
3048
+ device_allocator);
3046
3049
} else {
3047
3050
frag_col_buffers[it->second ] =
3048
3051
column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
3049
3052
all_tables_fragments,
3050
3053
memory_level_for_column,
3051
3054
device_id,
3052
3055
device_allocator,
3053
- thread_idx);
3056
+ /* thread_idx= */ 0 );
3054
3057
}
3055
3058
} else {
3056
3059
frag_col_buffers[it->second ] =
@@ -3065,10 +3068,11 @@ FetchResult Executor::fetchChunks(
3065
3068
}
3066
3069
}
3067
3070
all_frag.lock ();
3071
+ selected_frag_ids_vec.push_back (selected_frag_ids);
3068
3072
all_frag_col_buffers.push_back (frag_col_buffers);
3069
3073
all_frag.unlock ();
3070
- });
3071
- });
3074
+ });
3075
+ });
3072
3076
if (empty_frags) {
3073
3077
return {};
3074
3078
}
@@ -3120,8 +3124,7 @@ FetchResult Executor::fetchChunks(
3120
3124
chunk_iterators,
3121
3125
for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level,
3122
3126
for_lazy_fetch ? 0 : device_id,
3123
- device_allocator,
3124
- thread_idx);
3127
+ device_allocator);
3125
3128
} else {
3126
3129
frag_col_buffers[it->second ] =
3127
3130
column_fetcher.getAllTableColumnFragments (col_id->getColInfo (),
@@ -3143,11 +3146,12 @@ FetchResult Executor::fetchChunks(
3143
3146
device_allocator);
3144
3147
}
3145
3148
}
3149
+ selected_frag_ids_vec.push_back (selected_frag_ids);
3146
3150
all_frag_col_buffers.push_back (frag_col_buffers);
3147
3151
}
3148
3152
}
3149
3153
std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
3150
- ra_exe_unit, frag_ids_crossjoin , ra_exe_unit.input_descs , all_tables_fragments);
3154
+ ra_exe_unit, selected_frag_ids_vec , ra_exe_unit.input_descs , all_tables_fragments);
3151
3155
return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3152
3156
}
3153
3157
@@ -3171,6 +3175,7 @@ FetchResult Executor::fetchUnionChunks(
3171
3175
std::vector<std::vector<const int8_t *>> all_frag_col_buffers;
3172
3176
std::vector<std::vector<int64_t >> all_num_rows;
3173
3177
std::vector<std::vector<uint64_t >> all_frag_offsets;
3178
+ std::vector<std::vector<size_t >> selected_frag_ids_vec;
3174
3179
3175
3180
CHECK (!selected_fragments.empty ());
3176
3181
CHECK_LE (2u , ra_exe_unit.input_descs .size ());
@@ -3269,12 +3274,16 @@ FetchResult Executor::fetchUnionChunks(
3269
3274
device_allocator);
3270
3275
}
3271
3276
}
3277
+ selected_frag_ids_vec.push_back (selected_frag_ids);
3272
3278
all_frag_col_buffers.push_back (frag_col_buffers);
3273
3279
}
3274
3280
std::vector<std::vector<int64_t >> num_rows;
3275
3281
std::vector<std::vector<uint64_t >> frag_offsets;
3276
- std::tie (num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags (
3277
- ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs , all_tables_fragments);
3282
+ std::tie (num_rows, frag_offsets) =
3283
+ getRowCountAndOffsetForAllFrags (ra_exe_unit,
3284
+ selected_frag_ids_vec,
3285
+ ra_exe_unit.input_descs ,
3286
+ all_tables_fragments);
3278
3287
all_num_rows.insert (all_num_rows.end (), num_rows.begin (), num_rows.end ());
3279
3288
all_frag_offsets.insert (
3280
3289
all_frag_offsets.end (), frag_offsets.begin (), frag_offsets.end ());
0 commit comments