16
16
17
17
#include " QueryEngine/ColumnFetcher.h"
18
18
19
- #include < memory>
20
-
21
19
#include " DataMgr/ArrayNoneEncoder.h"
22
20
#include " QueryEngine/ErrorHandling.h"
23
21
#include " QueryEngine/Execute.h"
24
22
#include " Shared/Intervals.h"
25
23
#include " Shared/likely.h"
26
24
#include " Shared/sqltypes.h"
27
25
26
+ #include < tbb/parallel_for.h>
27
+ #include < memory>
28
+
28
29
namespace {
29
30
30
31
std::string getMemoryLevelString (Data_Namespace::MemoryLevel memoryLevel) {
@@ -239,6 +240,11 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
239
240
int db_id = col_info->db_id ;
240
241
int table_id = col_info->table_id ;
241
242
int col_id = col_info->column_id ;
243
+ if (col_info->type ->isString () || col_info->type ->isArray ()) {
244
+ throw std::runtime_error (
245
+ " Array type passed to getAllTableColumnFragments. Should be handled in "
246
+ " linearization." );
247
+ }
242
248
const auto fragments_it = all_tables_fragments.find ({db_id, table_id});
243
249
CHECK (fragments_it != all_tables_fragments.end ());
244
250
const auto fragments = fragments_it->second ;
@@ -248,7 +254,6 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
248
254
const InputDescriptor table_desc (db_id, table_id, int (0 ));
249
255
{
250
256
std::lock_guard<std::mutex> columnar_conversion_guard (columnar_fetch_mutex_);
251
-
252
257
auto col_token = data_provider_->getZeroCopyColumnData (*col_info);
253
258
if (col_token != nullptr ) {
254
259
size_t num_rows = col_token->getSize () / col_token->getType ()->size ();
@@ -262,44 +267,96 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
262
267
}
263
268
264
269
auto column_it = columnarized_scan_table_cache_.find ({table_id, col_id});
265
- if (column_it == columnarized_scan_table_cache_.end ()) {
266
- for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
267
- if (executor_->getConfig ()
268
- .exec .interrupt .enable_non_kernel_time_query_interrupt &&
269
- executor_->checkNonKernelTimeInterrupted ()) {
270
- throw QueryExecutionError (Executor::ERR_INTERRUPTED);
271
- }
272
- std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
273
- std::list<ChunkIter> chunk_iter_holder;
274
- const auto & fragment = (*fragments)[frag_id];
275
- if (fragment.isEmptyPhysicalFragment ()) {
276
- continue ;
277
- }
278
- auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
279
- CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
280
- auto col_buffer = getOneTableColumnFragment (col_info,
281
- static_cast <int >(frag_id),
282
- all_tables_fragments,
283
- chunk_holder,
284
- chunk_iter_holder,
285
- Data_Namespace::CPU_LEVEL,
286
- int (0 ),
287
- device_allocator);
288
- column_frags.push_back (
289
- std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_ ,
290
- col_buffer,
291
- fragment.getNumTuples (),
292
- chunk_meta_it->second ->type (),
293
- thread_idx));
270
+ if (column_it != columnarized_scan_table_cache_.end ()) {
271
+ table_column = column_it->second .get ();
272
+ return ColumnFetcher::transferColumnIfNeeded (
273
+ table_column, 0 , memory_level, device_id, device_allocator);
274
+ }
275
+
276
+ size_t total_row_count = 0 ;
277
+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
278
+ if (executor_->getConfig ().exec .interrupt .enable_non_kernel_time_query_interrupt &&
279
+ executor_->checkNonKernelTimeInterrupted ()) {
280
+ throw QueryExecutionError (Executor::ERR_INTERRUPTED);
294
281
}
295
- auto merged_results =
296
- ColumnarResults::mergeResults (executor_->row_set_mem_owner_ , column_frags);
282
+ const auto & fragment = (*fragments)[frag_id];
283
+ const auto rows_in_frag = fragment.getNumTuples ();
284
+ total_row_count += rows_in_frag;
285
+ }
286
+
287
+ if (total_row_count == 0 ) {
288
+ std::unique_ptr<ColumnarResults> merged_results (nullptr );
289
+
297
290
table_column = merged_results.get ();
298
291
columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
299
292
std::move (merged_results));
300
- } else {
301
- table_column = column_it->second .get ();
293
+
294
+ return ColumnFetcher::transferColumnIfNeeded (
295
+ table_column, 0 , memory_level, device_id, device_allocator);
296
+ }
297
+
298
+ const auto type_width = col_info->type ->size ();
299
+ auto write_ptr =
300
+ executor_->row_set_mem_owner_ ->allocate (type_width * total_row_count);
301
+ std::vector<std::pair<int8_t *, size_t >> write_ptrs;
302
+ std::vector<size_t > valid_fragments;
303
+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
304
+ const auto & fragment = (*fragments)[frag_id];
305
+ if (fragment.isEmptyPhysicalFragment ()) {
306
+ continue ;
307
+ }
308
+ CHECK_EQ (type_width, fragment.getChunkMetadataMap ().at (col_id)->type ()->size ());
309
+ write_ptrs.push_back ({write_ptr, fragment.getNumTuples () * type_width});
310
+ write_ptr += fragment.getNumTuples () * type_width;
311
+ valid_fragments.push_back (frag_id);
302
312
}
313
+
314
+ CHECK (!write_ptrs.empty ());
315
+ size_t valid_frag_count = valid_fragments.size ();
316
+ tbb::parallel_for (
317
+ tbb::blocked_range<size_t >(0 , valid_frag_count),
318
+ [&](const tbb::blocked_range<size_t >& frag_ids) {
319
+ for (size_t v_frag_id = frag_ids.begin (); v_frag_id < frag_ids.end ();
320
+ ++v_frag_id) {
321
+ std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
322
+ std::list<ChunkIter> chunk_iter_holder;
323
+ size_t frag_id = valid_fragments[v_frag_id];
324
+ const auto & fragment = (*fragments)[frag_id];
325
+ auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
326
+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
327
+ std::shared_ptr<Chunk_NS::Chunk> chunk;
328
+ {
329
+ ChunkKey chunk_key{
330
+ db_id, fragment.physicalTableId , col_id, fragment.fragmentId };
331
+ chunk = data_provider_->getChunk (col_info,
332
+ chunk_key,
333
+ Data_Namespace::CPU_LEVEL,
334
+ 0 ,
335
+ chunk_meta_it->second ->numBytes (),
336
+ chunk_meta_it->second ->numElements ());
337
+ std::lock_guard<std::mutex> chunk_list_lock (chunk_list_mutex_);
338
+ chunk_holder.push_back (chunk);
339
+ }
340
+ auto ab = chunk->getBuffer ();
341
+ CHECK (ab->getMemoryPtr ());
342
+ int8_t * col_buffer =
343
+ ab->getMemoryPtr (); // @TODO(alex) change to use ChunkIter
344
+ memcpy (write_ptrs[frag_id].first , col_buffer, write_ptrs[frag_id].second );
345
+ }
346
+ });
347
+
348
+ std::vector<int8_t *> raw_write_ptrs;
349
+ raw_write_ptrs.reserve (frag_count);
350
+ for (size_t i = 0 ; i < frag_count; i++) {
351
+ raw_write_ptrs.emplace_back (write_ptrs[i].first );
352
+ }
353
+
354
+ std::unique_ptr<ColumnarResults> merged_results (new ColumnarResults (
355
+ std::move (raw_write_ptrs), total_row_count, col_info->type , thread_idx));
356
+
357
+ table_column = merged_results.get ();
358
+ columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
359
+ std::move (merged_results));
303
360
}
304
361
return ColumnFetcher::transferColumnIfNeeded (
305
362
table_column, 0 , memory_level, device_id, device_allocator);
0 commit comments