@@ -266,23 +266,98 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
266
266
}
267
267
auto chunk_meta_it = fragment.getChunkMetadataMap ().find (col_id);
268
268
CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
269
- auto col_buffer = getOneTableColumnFragment (col_info,
270
- static_cast <int >(frag_id),
271
- all_tables_fragments,
272
- chunk_holder,
273
- chunk_iter_holder,
274
- Data_Namespace::CPU_LEVEL,
275
- int (0 ),
276
- device_allocator);
269
+ // auto col_buffer = getOneTableColumnFragment(col_info,
270
+ // static_cast<int>(frag_id),
271
+ // all_tables_fragments,
272
+ // chunk_holder,
273
+ // chunk_iter_holder,
274
+ // Data_Namespace::CPU_LEVEL,
275
+ // int(0),
276
+ // device_allocator);
277
+ // getOneTableColumnFragment(
278
+ // const Data_Namespace::MemoryLevel memory_level,
279
+ // const int device_id,
280
+ // DeviceAllocator* allocator)
281
+ std::shared_ptr<Chunk_NS::Chunk> chunk;
282
+ // Fixed length arrays are also included here.
283
+ const bool is_varlen = col_info->type ->isString () || col_info->type ->isArray ();
284
+ {
285
+ ChunkKey chunk_key{
286
+ db_id, fragment.physicalTableId , col_id, fragment.fragmentId };
287
+ std::unique_ptr<std::lock_guard<std::mutex>> varlen_chunk_lock;
288
+ if (is_varlen) {
289
+ varlen_chunk_lock.reset (
290
+ new std::lock_guard<std::mutex>(varlen_chunk_fetch_mutex_));
291
+ }
292
+ chunk = data_provider_->getChunk (
293
+ col_info,
294
+ chunk_key,
295
+ Data_Namespace::CPU_LEVEL,
296
+ 0 ,
297
+ chunk_meta_it->second ->numBytes (),
298
+ chunk_meta_it->second ->numElements ());
299
+ std::lock_guard<std::mutex> chunk_list_lock (chunk_list_mutex_);
300
+ chunk_holder.push_back (chunk);
301
+ }
302
+ if (is_varlen) {
303
+ CHECK_GT (table_id, 0 );
304
+ CHECK (chunk_meta_it != fragment.getChunkMetadataMap ().end ());
305
+ chunk_iter_holder.push_back (chunk->begin_iterator (chunk_meta_it->second ));
306
+ auto & chunk_iter = chunk_iter_holder.back ();
307
+ return reinterpret_cast <int8_t *>(&chunk_iter);
308
+ } else {
309
+ auto ab = chunk->getBuffer ();
310
+ CHECK (ab->getMemoryPtr ());
311
+ return ab->getMemoryPtr (); // @TODO(alex) change to use ChunkIter
312
+ }
277
313
column_frags.push_back (
278
314
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_ ,
279
315
col_buffer,
280
316
fragment.getNumTuples (),
281
317
chunk_meta_it->second ->type (),
282
318
thread_idx));
283
319
}
284
- auto merged_results =
285
- ColumnarResults::mergeResults (executor_->row_set_mem_owner_ , column_frags);
320
+ // auto merged_results =
321
+ // ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
322
+ // std::unique_ptr<ColumnarResults> ColumnarResults::mergeResults(
323
+ // std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
324
+ // const std::vector<std::unique_ptr<ColumnarResults>>& sub_results) {
325
+ if (column_frags.empty ()) {
326
+ return nullptr ;
327
+ }
328
+ const auto total_row_count = std::accumulate (
329
+ column_frags.begin (),
330
+ column_frags.end (),
331
+ size_t (0 ),
332
+ [](const size_t init, const std::unique_ptr<ColumnarResults>& result) {
333
+ return init + result->size ();
334
+ });
335
+ std::unique_ptr<ColumnarResults> merged_results (
336
+ new ColumnarResults (total_row_count, column_frags[0 ]->target_types_ ));
337
+ const auto col_count = column_frags[0 ]->column_buffers_ .size ();
338
+ const auto nonempty_it = std::find_if (
339
+ column_frags.begin (),
340
+ column_frags.end (),
341
+ [](const std::unique_ptr<ColumnarResults>& needle) { return needle->size (); });
342
+ if (nonempty_it == column_frags.end ()) {
343
+ return nullptr ;
344
+ }
345
+ for (size_t col_idx = 0 ; col_idx < col_count; ++col_idx) {
346
+ const auto byte_width = (*nonempty_it)->columnType (col_idx)->size ();
347
+ auto write_ptr = row_set_mem_owner->allocate (byte_width * total_row_count);
348
+ merged_results->column_buffers_ .push_back (write_ptr);
349
+ for (auto & rs : column_frags) {
350
+ CHECK_EQ (col_count, rs->column_buffers_ .size ());
351
+ if (!rs->size ()) {
352
+ continue ;
353
+ }
354
+ CHECK_EQ (byte_width, rs->columnType (col_idx)->size ());
355
+ memcpy (write_ptr, rs->column_buffers_ [col_idx], rs->size () * byte_width);
356
+ write_ptr += rs->size () * byte_width;
357
+ }
358
+ }
359
+ return merged_results;
360
+
286
361
table_column = merged_results.get ();
287
362
columnarized_scan_table_cache_.emplace (std::make_pair (table_id, col_id),
288
363
std::move (merged_results));
0 commit comments