16
16
17
17
#include " QueryEngine/ColumnFetcher.h"
18
18
19
+ // #include <tbb/parallel_for.h>
19
20
#include < memory>
20
21
21
22
#include " DataMgr/ArrayNoneEncoder.h"
@@ -241,16 +242,17 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
241
242
int db_id = col_info->db_id ;
242
243
int table_id = col_info->table_id ;
243
244
int col_id = col_info->column_id ;
245
+
244
246
const auto fragments_it = all_tables_fragments.find ({db_id, table_id});
245
247
CHECK (fragments_it != all_tables_fragments.end ());
248
+
246
249
const auto fragments = fragments_it->second ;
247
250
const auto frag_count = fragments->size ();
248
251
std::vector<std::unique_ptr<ColumnarResults>> column_frags;
249
252
const ColumnarResults* table_column = nullptr ;
250
253
const InputDescriptor table_desc (db_id, table_id, int (0 ));
251
254
{
252
255
std::lock_guard<std::mutex> columnar_conversion_guard (columnar_fetch_mutex_);
253
-
254
256
auto col_token = data_provider_->getZeroCopyColumnData (*col_info);
255
257
if (col_token != nullptr ) {
256
258
size_t num_rows = col_token->getSize () / col_token->getType ()->size ();
@@ -264,44 +266,104 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
264
266
}
265
267
266
268
auto column_it = columnarized_scan_table_cache_.find ({table_id, col_id});
267
- if (column_it == columnarized_scan_table_cache_.end ()) {
268
- for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
269
- if (executor_->getConfig ()
270
- .exec .interrupt .enable_non_kernel_time_query_interrupt &&
271
- executor_->checkNonKernelTimeInterrupted ()) {
272
- throw QueryExecutionError (Executor::ERR_INTERRUPTED);
273
- }
274
- std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
275
- std::list<ChunkIter> chunk_iter_holder;
276
- const auto & fragment = (*fragments)[frag_id];
277
- if (fragment.isEmptyPhysicalFragment ()) {
278
- continue ;
279
- }
280
- auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
281
- CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
282
- auto col_buffer = getOneTableColumnFragment (col_info,
283
- static_cast <int >(frag_id),
284
- all_tables_fragments,
285
- chunk_holder,
286
- chunk_iter_holder,
287
- Data_Namespace::CPU_LEVEL,
288
- int (0 ),
289
- device_allocator);
290
- column_frags.push_back (
291
- std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_ ,
292
- col_buffer,
293
- fragment.getNumTuples (),
294
- chunk_meta_it->second ->type (),
295
- thread_idx));
269
+ if (column_it != columnarized_scan_table_cache_.end ()) {
270
+ table_column = column_it->second .get ();
271
+ return ColumnFetcher::transferColumnIfNeeded (
272
+ table_column, 0 , memory_level, device_id, device_allocator);
273
+ }
274
+
275
+ size_t total_row_count = 0 ;
276
+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
277
+ if (executor_->getConfig ().exec .interrupt .enable_non_kernel_time_query_interrupt &&
278
+ executor_->checkNonKernelTimeInterrupted ()) {
279
+ throw QueryExecutionError (Executor::ERR_INTERRUPTED);
296
280
}
297
- auto merged_results =
298
- ColumnarResults::mergeResults (executor_->row_set_mem_owner_ , column_frags);
281
+ const auto & fragment = (*fragments)[frag_id];
282
+ const auto & rows_in_frag = fragment.getNumTuples ();
283
+ total_row_count += rows_in_frag;
284
+ }
285
+
286
+ const auto & type_width = col_info->type ->size ();
287
+ auto write_ptr =
288
+ executor_->row_set_mem_owner_ ->allocate (type_width * total_row_count);
289
+ std::vector<std::pair<int8_t *, size_t >> write_ptrs;
290
+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
291
+ const auto & fragment = (*fragments)[frag_id];
292
+ if (!fragment.getNumTuples ()) {
293
+ continue ;
294
+ }
295
+ CHECK_EQ (type_width, fragment.getChunkMetadataMap ().at (col_id)->type ()->size ());
296
+ write_ptrs.push_back ({write_ptr, fragment.getNumTuples () * type_width});
297
+ write_ptr += fragment.getNumTuples () * type_width;
298
+ }
299
+
300
+ if (write_ptrs.empty ()) {
301
+ std::unique_ptr<ColumnarResults> merged_results (nullptr );
302
+
299
303
table_column = merged_results.get ();
300
304
columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
301
305
std::move (merged_results));
302
- } else {
303
- table_column = column_it->second .get ();
306
+
307
+ return ColumnFetcher::transferColumnIfNeeded (
308
+ table_column, 0 , memory_level, device_id, device_allocator);
304
309
}
310
+
311
+ CHECK_EQ (frag_count, write_ptrs.size ());
312
+ for (size_t frag_id = 0 ; frag_id < frag_count; ++frag_id) {
313
+ std::list<std::shared_ptr<Chunk_NS::Chunk>> chunk_holder;
314
+ std::list<ChunkIter> chunk_iter_holder;
315
+ const auto & fragment = (*fragments)[frag_id];
316
+ if (fragment.isEmptyPhysicalFragment ()) {
317
+ continue ;
318
+ }
319
+ auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
320
+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
321
+ std::shared_ptr<Chunk_NS::Chunk> chunk;
322
+ // Fixed length arrays are also included here.
323
+ const bool is_varlen = col_info->type ->isString () || col_info->type ->isArray ();
324
+ int8_t * col_buffer;
325
+ {
326
+ ChunkKey chunk_key{db_id, fragment.physicalTableId , col_id, fragment.fragmentId };
327
+ std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
328
+ if (is_varlen) {
329
+ varlen_chunk_lock.reset (
330
+ new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
331
+ }
332
+ chunk = data_provider_->getChunk (col_info,
333
+ chunk_key,
334
+ Data_Namespace::CPU_LEVEL,
335
+ 0 ,
336
+ chunk_meta_it->second ->numBytes (),
337
+ chunk_meta_it->second ->numElements ());
338
+ std::lock_guard<std::mutex> chunk_list_lock (chunk_list_mutex_);
339
+ chunk_holder.push_back (chunk);
340
+ }
341
+ if (is_varlen) {
342
+ CHECK_GT (table_id, 0 );
343
+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
344
+ chunk_iter_holder.push_back (chunk->begin_iterator (chunk_meta_it->second ));
345
+ auto & chunk_iter = chunk_iter_holder.back ();
346
+ col_buffer = reinterpret_cast <int8_t *>(&chunk_iter);
347
+ } else {
348
+ auto ab = chunk->getBuffer ();
349
+ CHECK (ab->getMemoryPtr ());
350
+ col_buffer = ab->getMemoryPtr (); // @TODO(alex) change to use ChunkIter
351
+ }
352
+ memcpy (write_ptrs[frag_id].first , col_buffer, write_ptrs[frag_id].second );
353
+ }
354
+
355
+ std::vector<int8_t *> raw_write_ptrs;
356
+ raw_write_ptrs.reserve (frag_count);
357
+ for (uint i = 0 ; i < frag_count; i++) {
358
+ raw_write_ptrs.emplace_back (write_ptrs[i].first );
359
+ }
360
+
361
+ std::unique_ptr<ColumnarResults> merged_results (
362
+ new ColumnarResults (raw_write_ptrs, total_row_count, col_info->type , thread_idx));
363
+
364
+ table_column = merged_results.get ();
365
+ columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
366
+ std::move (merged_results));
305
367
}
306
368
return ColumnFetcher::transferColumnIfNeeded (
307
369
table_column, 0 , memory_level, device_id, device_allocator);
0 commit comments