diff --git a/omniscidb/BufferProvider/BufferProvider.h b/omniscidb/BufferProvider/BufferProvider.h index 15027b48e..374e40496 100644 --- a/omniscidb/BufferProvider/BufferProvider.h +++ b/omniscidb/BufferProvider/BufferProvider.h @@ -44,7 +44,7 @@ class BufferProvider { const int8_t* host_ptr, const size_t num_bytes, const int device_id) const = 0; - virtual void synchronizeStream(const int device_id) const = 0; + virtual void synchronizeDeviceDataStream(const int device_id) const = 0; virtual void copyFromDevice(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/CudaMgr/CudaMgr.cpp b/omniscidb/CudaMgr/CudaMgr.cpp index b3ec362bb..6ea0a9a9f 100644 --- a/omniscidb/CudaMgr/CudaMgr.cpp +++ b/omniscidb/CudaMgr/CudaMgr.cpp @@ -112,6 +112,17 @@ void CudaMgr::copyHostToDevice(int8_t* device_ptr, cuMemcpyHtoD(reinterpret_cast(device_ptr), host_ptr, num_bytes)); } +void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if constexpr (async_data_load_available) { + copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num); + } else { + copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num); + } +} + void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, @@ -120,7 +131,7 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, checkError(cuMemcpyHtoDAsync( reinterpret_cast(device_ptr), host_ptr, num_bytes, stream_)); } -void CudaMgr::synchronizeStream(const int device_num) { +void CudaMgr::synchronizeDeviceDataStream(const int device_num) { setContext(device_num); checkError(cuStreamSynchronize(stream_)); } diff --git a/omniscidb/CudaMgr/CudaMgr.h b/omniscidb/CudaMgr/CudaMgr.h index 17ba40c28..794c01820 100644 --- a/omniscidb/CudaMgr/CudaMgr.h +++ b/omniscidb/CudaMgr/CudaMgr.h @@ -96,12 +96,17 @@ class CudaMgr : public GpuMgr { const size_t num_bytes, const int device_num) override; + void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) override; + void copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, const int device_num) override; - void synchronizeStream(const int device_num) override; + void synchronizeDeviceDataStream(const int device_num) override; void copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, @@ -289,7 +294,7 @@ class CudaMgr : public GpuMgr { omnisci::DeviceGroup device_group_; std::vector device_contexts_; mutable std::mutex device_cleanup_mutex_; - static constexpr bool async_data_load_available{true}; + static constexpr bool async_data_load_available{false}; }; } // Namespace CudaMgr_Namespace diff --git a/omniscidb/CudaMgr/CudaMgrNoCuda.cpp b/omniscidb/CudaMgr/CudaMgrNoCuda.cpp index 4a9969e36..342ead554 100644 --- a/omniscidb/CudaMgr/CudaMgrNoCuda.cpp +++ b/omniscidb/CudaMgr/CudaMgrNoCuda.cpp @@ -43,7 +43,14 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr, CHECK(false); } -void CudaMgr::synchronizeStream(const int device_num) { +void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void CudaMgr::synchronizeDeviceDataStream(const int device_num) { CHECK(false); } diff --git a/omniscidb/DataMgr/Allocators/DeviceAllocator.h b/omniscidb/DataMgr/Allocators/DeviceAllocator.h index 162deb644..4d53ee161 100644 --- a/omniscidb/DataMgr/Allocators/DeviceAllocator.h +++ b/omniscidb/DataMgr/Allocators/DeviceAllocator.h @@ -53,4 +53,5 @@ class DeviceAllocator : public Allocator { virtual void setDeviceMem(int8_t* device_ptr, unsigned char uc, const size_t num_bytes) const = 0; + virtual void sync() = 0; }; diff --git a/omniscidb/DataMgr/Allocators/GpuAllocator.cpp b/omniscidb/DataMgr/Allocators/GpuAllocator.cpp index fb3208f36..1f9f49b4d 100644 --- a/omniscidb/DataMgr/Allocators/GpuAllocator.cpp +++ b/omniscidb/DataMgr/Allocators/GpuAllocator.cpp @@ -84,3 +84,7 @@ void GpuAllocator::setDeviceMem(int8_t* device_ptr, const size_t num_bytes) const { buffer_provider_->setDeviceMem(device_ptr, uc, num_bytes, device_id_); } + +void GpuAllocator::sync() { + buffer_provider_->synchronizeDeviceDataStream(device_id_); +} \ No newline at end of file diff --git a/omniscidb/DataMgr/Allocators/GpuAllocator.h b/omniscidb/DataMgr/Allocators/GpuAllocator.h index c9757f80f..7e6040157 100644 --- a/omniscidb/DataMgr/Allocators/GpuAllocator.h +++ b/omniscidb/DataMgr/Allocators/GpuAllocator.h @@ -58,6 +58,7 @@ class GpuAllocator : public DeviceAllocator { void setDeviceMem(int8_t* device_ptr, unsigned char uc, const size_t num_bytes) const override; + void sync() override; private: std::vector owned_buffers_; diff --git a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp index 396daddb0..44128b3bc 100644 --- a/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp @@ -48,7 +48,8 @@ void CpuBuffer::readData(int8_t* const dst, memcpy(dst, mem_ + offset, num_bytes); } else if (dst_memory_level == GPU_LEVEL) { CHECK_GE(dst_device_id, 0); - gpu_mgr_->copyHostToDevice(dst, mem_ + offset, num_bytes, dst_device_id); + gpu_mgr_->copyHostToDeviceAsyncIfPossible( + dst, mem_ + offset, num_bytes, dst_device_id); } else { LOG(FATAL) << "Unsupported buffer type"; } diff --git a/omniscidb/DataMgr/DataMgrBufferProvider.cpp b/omniscidb/DataMgr/DataMgrBufferProvider.cpp index 2080a4476..319741007 100644 --- a/omniscidb/DataMgr/DataMgrBufferProvider.cpp +++ b/omniscidb/DataMgr/DataMgrBufferProvider.cpp @@ -57,18 +57,14 @@ void DataMgrBufferProvider::copyToDeviceAsyncIfPossible(int8_t* device_ptr, CHECK(data_mgr_); const auto gpu_mgr = data_mgr_->getGpuMgr(); CHECK(gpu_mgr); - if (gpu_mgr->canLoadAsync()) { - gpu_mgr->copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_id); - } else { - gpu_mgr->copyHostToDevice(device_ptr, host_ptr, num_bytes, device_id); - } + gpu_mgr->copyHostToDeviceAsyncIfPossible(device_ptr, host_ptr, num_bytes, device_id); } -void DataMgrBufferProvider::synchronizeStream(const int device_num) const { +void DataMgrBufferProvider::synchronizeDeviceDataStream(const int device_num) const { CHECK(data_mgr_); const auto gpu_mgr = data_mgr_->getGpuMgr(); CHECK(gpu_mgr); - gpu_mgr->synchronizeStream(device_num); + gpu_mgr->synchronizeDeviceDataStream(device_num); } void DataMgrBufferProvider::copyFromDevice(int8_t* host_ptr, diff --git a/omniscidb/DataMgr/DataMgrBufferProvider.h b/omniscidb/DataMgr/DataMgrBufferProvider.h index dc1dc9a2f..459e18627 100644 --- a/omniscidb/DataMgr/DataMgrBufferProvider.h +++ b/omniscidb/DataMgr/DataMgrBufferProvider.h @@ -45,7 +45,7 @@ class DataMgrBufferProvider : public BufferProvider { const int8_t* host_ptr, const size_t num_bytes, const int device_id) const override; - void synchronizeStream(const int device_id) const override; + void synchronizeDeviceDataStream(const int device_id) const override; void copyFromDevice(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/DataMgr/GpuMgr.h b/omniscidb/DataMgr/GpuMgr.h index e31c65ecf..ddf473ead 100644 --- a/omniscidb/DataMgr/GpuMgr.h +++ b/omniscidb/DataMgr/GpuMgr.h @@ -38,7 +38,12 @@ struct GpuMgr { const size_t num_bytes, const int device_num) = 0; - virtual void synchronizeStream(const int device_num) = 0; + virtual void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) = 0; + + virtual void synchronizeDeviceDataStream(const int device_num) = 0; virtual void copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, diff --git a/omniscidb/L0Mgr/L0Mgr.cpp b/omniscidb/L0Mgr/L0Mgr.cpp index 091d9d679..6210fffe7 100644 --- a/omniscidb/L0Mgr/L0Mgr.cpp +++ b/omniscidb/L0Mgr/L0Mgr.cpp @@ -139,8 +139,96 @@ void* allocate_device_mem(const size_t num_bytes, L0Device& device) { return mem; } +L0Device::L0DataFetcher::L0DataFetcher(L0Device& device) : my_device_(device) { + ze_command_queue_desc_t command_queue_fetch_desc = { + ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, + nullptr, + 0, + 0, + 0, + ZE_COMMAND_QUEUE_MODE_ASYNCHRONOUS, + ZE_COMMAND_QUEUE_PRIORITY_NORMAL}; + L0_SAFE_CALL(zeCommandQueueCreate(my_device_.driver_.ctx(), + my_device_.device_, + &command_queue_fetch_desc, + &queue_handle_)); + cur_cl_bytes_ = {{}, 0}; + L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(), + my_device_.device_, + &cl_desc_, + &cur_cl_bytes_.cl_handle_)); +} + +L0Device::L0DataFetcher::~L0DataFetcher() { + zeCommandQueueDestroy(queue_handle_); + zeCommandListDestroy(cur_cl_bytes_.cl_handle_); + for (auto& dead_handle : graveyard_) { + zeCommandListDestroy(dead_handle); + } + for (auto& cl_handle : recycled_) { + zeCommandListDestroy(cl_handle); + } +} + +void L0Device::L0DataFetcher::recycleGraveyard() { + while (recycled_.size() < GRAVEYARD_LIMIT && graveyard_.size()) { + recycled_.push_back(graveyard_.front()); + graveyard_.pop_front(); + L0_SAFE_CALL(zeCommandListReset(recycled_.back())); + } + for (auto& dead_handle : graveyard_) { + L0_SAFE_CALL(zeCommandListDestroy(dead_handle)); + } + graveyard_.clear(); +} + +void L0Device::L0DataFetcher::setCLRecycledOrNew() { + cur_cl_bytes_ = {{}, 0}; + if (recycled_.size()) { + cur_cl_bytes_.cl_handle_ = recycled_.front(); + recycled_.pop_front(); + } else { + L0_SAFE_CALL(zeCommandListCreate(my_device_.driver_.ctx(), + my_device_.device_, + &cl_desc_, + &cur_cl_bytes_.cl_handle_)); + } +} + +void L0Device::L0DataFetcher::appendCopyCommand(void* dst, + const void* src, + const size_t num_bytes) { + std::unique_lock cl_lock(cur_cl_lock_); + L0_SAFE_CALL(zeCommandListAppendMemoryCopy( + cur_cl_bytes_.cl_handle_, dst, src, num_bytes, nullptr, 0, nullptr)); + cur_cl_bytes_.bytes_ += num_bytes; + if (cur_cl_bytes_.bytes_ >= CL_BYTES_LIMIT) { + ze_command_list_handle_t cl_h_copy = cur_cl_bytes_.cl_handle_; + graveyard_.push_back(cur_cl_bytes_.cl_handle_); + setCLRecycledOrNew(); + cl_lock.unlock(); + L0_SAFE_CALL(zeCommandListClose(cl_h_copy)); + L0_SAFE_CALL( + zeCommandQueueExecuteCommandLists(queue_handle_, 1, &cl_h_copy, nullptr)); + } +} + +void L0Device::L0DataFetcher::sync() { + if (cur_cl_bytes_.bytes_) { + L0_SAFE_CALL(zeCommandListClose(cur_cl_bytes_.cl_handle_)); + L0_SAFE_CALL(zeCommandQueueExecuteCommandLists( + queue_handle_, 1, &cur_cl_bytes_.cl_handle_, nullptr)); + } + L0_SAFE_CALL( + zeCommandQueueSynchronize(queue_handle_, std::numeric_limits::max())); + L0_SAFE_CALL(zeCommandListReset(cur_cl_bytes_.cl_handle_)); + if (graveyard_.size() > GRAVEYARD_LIMIT) { + recycleGraveyard(); + } +} + L0Device::L0Device(const L0Driver& driver, ze_device_handle_t device) - : device_(device), driver_(driver) { + : device_(device), driver_(driver), data_fetcher_(*this) { ze_command_queue_handle_t queue_handle; ze_command_queue_desc_t command_queue_desc = {ZE_STRUCTURE_TYPE_COMMAND_QUEUE_DESC, nullptr, @@ -192,6 +280,14 @@ unsigned L0Device::maxSharedLocalMemory() const { return compute_props_.maxSharedLocalMemory; } +void L0Device::transferToDevice(void* dst, const void* src, const size_t num_bytes) { + data_fetcher_.appendCopyCommand(dst, src, num_bytes); +} + +void L0Device::syncDataTransfers() { + data_fetcher_.sync(); +} + L0CommandQueue::L0CommandQueue(ze_command_queue_handle_t handle) : handle_(handle) {} ze_command_queue_handle_t L0CommandQueue::handle() const { @@ -219,7 +315,6 @@ std::shared_ptr L0Device::create_module(uint8_t* code, }; ze_module_handle_t handle; ze_module_build_log_handle_t buildlog = nullptr; - auto status = zeModuleCreate(ctx(), device_, &desc, &handle, &buildlog); if (log) { size_t logSize = 0; @@ -329,6 +424,40 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr, cl->submit(*queue); } +void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if (!num_bytes) + return; + CHECK(host_ptr); + CHECK(device_ptr); + CHECK_GT(num_bytes, 0); + CHECK_GE(device_num, 0); + CHECK_LT(device_num, drivers_[0]->devices().size()); + + auto& device = drivers()[0]->devices()[device_num]; + device->transferToDevice(device_ptr, host_ptr, num_bytes); +} + +void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + if constexpr (async_data_load_available) { + copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num); + } else { + copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num); + } +} + +void L0Manager::synchronizeDeviceDataStream(const int device_num) { + CHECK_GE(device_num, 0); + CHECK_LT(device_num, drivers_[0]->devices().size()); + auto& device = drivers()[0]->devices()[device_num]; + device->syncDataTransfers(); +} + void L0Manager::copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/L0Mgr/L0Mgr.h b/omniscidb/L0Mgr/L0Mgr.h index 6e0e687c4..4cd01cbb1 100644 --- a/omniscidb/L0Mgr/L0Mgr.h +++ b/omniscidb/L0Mgr/L0Mgr.h @@ -16,7 +16,9 @@ #pragma once #include +#include #include +#include #include #include "DataMgr/GpuMgr.h" @@ -57,20 +59,64 @@ class L0CommandList; class L0CommandQueue; class L0Device { - private: + protected: + const L0Driver& driver_; + std::shared_ptr command_queue_; #ifdef HAVE_L0 ze_device_handle_t device_; ze_device_properties_t props_; ze_device_compute_properties_t compute_props_; #endif + private: + /* + This component for data fetching to L0 devices is used for + more efficient asynchronous data transfers. It allows to amortize + the costs of data transfers which is especially useful in case of + many relatively small data transfers. + */ + class L0DataFetcher { +#ifdef HAVE_L0 + static constexpr size_t GRAVEYARD_LIMIT{500}; + static constexpr size_t CL_BYTES_LIMIT{128 * 1024 * 1024}; + static constexpr ze_command_list_desc_t cl_desc_ = { + ZE_STRUCTURE_TYPE_COMMAND_LIST_DESC, + nullptr, + 0, + ZE_COMMAND_LIST_FLAG_MAXIMIZE_THROUGHPUT}; + struct CLBytesTracker { + ze_command_list_handle_t cl_handle_; + uint64_t bytes_; + }; + CLBytesTracker cur_cl_bytes_; + ze_command_queue_handle_t queue_handle_; + std::list graveyard_; + std::list recycled_; + std::mutex cur_cl_lock_; +#endif + L0Device& my_device_; + void recycleGraveyard(); + void setCLRecycledOrNew(); - const L0Driver& driver_; - std::shared_ptr command_queue_; + public: + void appendCopyCommand(void* dst, const void* src, const size_t num_bytes); + void sync(); + +#ifdef HAVE_L0 + L0DataFetcher(L0Device& device); + ~L0DataFetcher(); +#else + L0DataFetcher() = default; +#endif + }; + L0DataFetcher data_fetcher_; public: std::shared_ptr command_queue() const; std::unique_ptr create_command_list() const; + void transferToDevice(void* dst, const void* src, const size_t num_bytes); + void syncDataTransfers(); + std::shared_ptr create_module(uint8_t* code, size_t len, bool log = false) const; @@ -169,7 +215,6 @@ class L0CommandList { public: void copy(void* dst, const void* src, const size_t num_bytes); - template void launch(L0Kernel& kernel, const GroupCount& gc, Args&&... args) { #ifdef HAVE_L0 @@ -202,13 +247,15 @@ class L0Manager : public GpuMgr { void copyHostToDeviceAsync(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, - const int device_num) override { - CHECK(false); - } - void synchronizeStream(const int device_num) override { - LOG(WARNING) - << "L0 has no async data transfer enabled, synchronizeStream() has no effect"; - } + const int device_num) override; + + void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) override; + + void synchronizeDeviceDataStream(const int device_num) override; + void copyHostToDevice(int8_t* device_ptr, const int8_t* host_ptr, const size_t num_bytes, @@ -261,7 +308,7 @@ class L0Manager : public GpuMgr { private: std::vector> drivers_; - static constexpr bool async_data_load_available{false}; + static constexpr bool async_data_load_available{true}; }; } // namespace l0 diff --git a/omniscidb/L0Mgr/L0MgrNoL0.cpp b/omniscidb/L0Mgr/L0MgrNoL0.cpp index f544d4062..8b14c3c09 100644 --- a/omniscidb/L0Mgr/L0MgrNoL0.cpp +++ b/omniscidb/L0Mgr/L0MgrNoL0.cpp @@ -69,6 +69,25 @@ void L0Manager::copyHostToDevice(int8_t* device_ptr, const int device_num) { CHECK(false); } + +void L0Manager::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void L0Manager::copyHostToDeviceAsync(int8_t* device_ptr, + const int8_t* host_ptr, + const size_t num_bytes, + const int device_num) { + CHECK(false); +} + +void L0Manager::synchronizeDeviceDataStream(const int device_num) { + CHECK(false); +} + void L0Manager::copyDeviceToHost(int8_t* host_ptr, const int8_t* device_ptr, const size_t num_bytes, diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 62fd71f33..ba897d90f 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -59,7 +59,6 @@ std::pair ColumnFetcher::getOneColumnFragment( const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, DataProvider* data_provider, ColumnCacheMap& column_cache) { @@ -115,7 +114,6 @@ JoinColumn ColumnFetcher::makeJoinColumn( const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, std::vector>& malloc_owner, DataProvider* data_provider, @@ -142,7 +140,6 @@ JoinColumn ColumnFetcher::makeJoinColumn( effective_mem_lvl, effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id, device_allocator, - thread_idx, chunks_owner, data_provider, column_cache); @@ -364,8 +361,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( std::list& chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { auto timer = DEBUG_TIMER(__func__); int db_id = col_info->db_id; int table_id = col_info->table_id; @@ -477,8 +473,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } else { CHECK(type->isVarLenArray()); VLOG(2) << "Linearize variable-length multi-frag array column (col_id: " << col_id @@ -496,8 +491,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } } if (type->isString()) { @@ -516,8 +510,7 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( total_data_buf_size, total_idx_buf_size, total_num_tuples, - device_allocator, - thread_idx); + device_allocator); } } CHECK(res.first); // check merged data buffer @@ -573,8 +566,7 @@ MergedChunk ColumnFetcher::linearizeVarLenArrayColFrags( const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { // for linearization of varlen col we have to deal with not only data buffer // but also its underlying index buffer which is responsible for offset of varlen value // basically we maintain per-device linearized (data/index) buffer @@ -902,8 +894,7 @@ MergedChunk ColumnFetcher::linearizeFixedLenArrayColFrags( const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const { + DeviceAllocator* device_allocator) const { int64_t linearization_time_ms = 0; auto clock_begin = timer_start(); // linearize collected fragments diff --git a/omniscidb/QueryEngine/ColumnFetcher.h b/omniscidb/QueryEngine/ColumnFetcher.h index 6a43f56e2..f0a27473a 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.h +++ b/omniscidb/QueryEngine/ColumnFetcher.h @@ -46,7 +46,6 @@ class ColumnFetcher { const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, DataProvider* data_provider, ColumnCacheMap& column_cache); @@ -59,7 +58,6 @@ class ColumnFetcher { const Data_Namespace::MemoryLevel effective_mem_lvl, const int device_id, DeviceAllocator* device_allocator, - const size_t thread_idx, std::vector>& chunks_owner, std::vector>& malloc_owner, DataProvider* data_provider, @@ -90,8 +88,7 @@ class ColumnFetcher { std::list& chunk_iter_holder, const Data_Namespace::MemoryLevel memory_level, const int device_id, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; void freeTemporaryCpuLinearizedIdxBuf(); void freeLinearizedBuf(); @@ -118,8 +115,7 @@ class ColumnFetcher { const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; MergedChunk linearizeFixedLenArrayColFrags( std::list>& chunk_holder, @@ -133,8 +129,7 @@ class ColumnFetcher { const size_t total_data_buf_size, const size_t total_idx_buf_size, const size_t total_num_tuples, - DeviceAllocator* device_allocator, - const size_t thread_idx) const; + DeviceAllocator* device_allocator) const; void addMergedChunkIter(const int table_id, const int col_id, diff --git a/omniscidb/QueryEngine/Compiler/Backend.cpp b/omniscidb/QueryEngine/Compiler/Backend.cpp index d51143d84..00b5ccf2a 100644 --- a/omniscidb/QueryEngine/Compiler/Backend.cpp +++ b/omniscidb/QueryEngine/Compiler/Backend.cpp @@ -865,7 +865,7 @@ std::shared_ptr L0Backend::generateNativeGPUCode( CHECK(module); CHECK(wrapper_func); - DUMP_MODULE(module, "before.linking.spirv.ll") + // DUMP_MODULE(module, "before.linking.spirv.ll") CHECK(exts.find(ExtModuleKinds::spirv_helper_funcs_module) != exts.end()); @@ -884,7 +884,7 @@ std::shared_ptr L0Backend::generateNativeGPUCode( } } - DUMP_MODULE(module, "after.linking.spirv.ll") + // DUMP_MODULE(module, "after.linking.spirv.ll") // set proper calling conv & mangle spirv built-ins for (auto& Fn : *module) { diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index 156ca0750..5500df868 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -21,6 +21,7 @@ #include "QueryEngine/Execute.h" #include +#include #include #include @@ -2877,7 +2878,7 @@ std::map> get_table_id_to_frag_offsets( std::pair>, std::vector>> Executor::getRowCountAndOffsetForAllFrags( const RelAlgExecutionUnit& ra_exe_unit, - const CartesianProduct>>& frag_ids_crossjoin, + const std::vector>& frag_ids_crossjoin, const std::vector& input_descs, const std::map& all_tables_fragments) { std::vector> all_num_rows; @@ -2990,80 +2991,161 @@ FetchResult Executor::fetchChunks( std::vector> all_frag_col_buffers; std::vector> all_num_rows; std::vector> all_frag_offsets; - for (const auto& selected_frag_ids : frag_ids_crossjoin) { - std::vector frag_col_buffers( - plan_state_->global_to_local_col_ids_.size()); - for (const auto& col_id : col_global_ids) { - if (interrupted_.load()) { - throw QueryExecutionError(ERR_INTERRUPTED); - } - CHECK(col_id); - if (col_id->isVirtual()) { - continue; + + auto fetch_column_callback = [&](std::shared_ptr col_id, + const std::vector& selected_frag_ids, + std::vector& frag_col_buffers, + const bool parallelized = + false) -> bool /*empty_frag*/ { + if (interrupted_.load()) { + throw QueryExecutionError(ERR_INTERRUPTED); + } + const auto fragments_it = all_tables_fragments.find(col_id->getTableRef()); + CHECK(fragments_it != all_tables_fragments.end()); + const auto fragments = fragments_it->second; + auto it = plan_state_->global_to_local_col_ids_.find(*col_id); + CHECK(it != plan_state_->global_to_local_col_ids_.end()); + CHECK_LT(static_cast(it->second), + plan_state_->global_to_local_col_ids_.size()); + const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]]; + if (!fragments->size()) { + return true; + } + auto memory_level_for_column = memory_level; + if (plan_state_->columns_to_fetch_.find(*col_id) == + plan_state_->columns_to_fetch_.end()) { + memory_level_for_column = Data_Namespace::CPU_LEVEL; + } + if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { + // determine if we need special treatment to linearlize multi-frag table + // i.e., a column that is classified as varlen type, i.e., array + // for now, we can support more types in this way + + // If fetch_column_callback is called from parallel code region, we get deadlocks + // due to tbb nested parallel_for, the calls below are internally parallelized. We + // should land here when called from a sequential code region + CHECK(!parallelized); + if (needLinearizeAllFragments( + *col_id, ra_exe_unit, selected_fragments, memory_level)) { + bool for_lazy_fetch = false; + if (plan_state_->columns_to_not_fetch_.find(*col_id) != + plan_state_->columns_to_not_fetch_.end()) { + for_lazy_fetch = true; + VLOG(2) << "Try to linearize lazy fetch column (col_id: " << col_id->getColId() + << ")"; + } + frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments( + col_id->getColInfo(), + all_tables_fragments, + chunks, + chunk_iterators, + for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level, + for_lazy_fetch ? 0 : device_id, + device_allocator); + } else { + frag_col_buffers[it->second] = + column_fetcher.getAllTableColumnFragments(col_id->getColInfo(), + all_tables_fragments, + memory_level_for_column, + device_id, + device_allocator, + thread_idx); } - const auto fragments_it = all_tables_fragments.find(col_id->getTableRef()); - CHECK(fragments_it != all_tables_fragments.end()); - const auto fragments = fragments_it->second; - auto it = plan_state_->global_to_local_col_ids_.find(*col_id); - CHECK(it != plan_state_->global_to_local_col_ids_.end()); - CHECK_LT(static_cast(it->second), - plan_state_->global_to_local_col_ids_.size()); - const size_t frag_id = selected_frag_ids[local_col_to_frag_pos[it->second]]; - if (!fragments->size()) { - return {}; + } else { + frag_col_buffers[it->second] = + column_fetcher.getOneTableColumnFragment(col_id->getColInfo(), + frag_id, + all_tables_fragments, + chunks, + chunk_iterators, + memory_level_for_column, + device_id, + device_allocator); + } + return false; + }; + + // in MT fetching for GPU, we want to preserve "the order of insertion" into + // all_frag_col_buffers + std::vector> selected_frag_ids_vec; + if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL) { + std::atomic empty_frags{false}; + tbb::task_arena limitedArena(8); + std::vector idx_frags_to_inearize; + for (const auto& selected_frag_ids : frag_ids_crossjoin) { + selected_frag_ids_vec.push_back(selected_frag_ids); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { + idx_frags_to_inearize.push_back(selected_frag_ids_vec.size() - 1); + } } - auto memory_level_for_column = memory_level; - if (plan_state_->columns_to_fetch_.find(*col_id) == - plan_state_->columns_to_fetch_.end()) { - memory_level_for_column = Data_Namespace::CPU_LEVEL; + } + all_frag_col_buffers.resize(selected_frag_ids_vec.size()); + + // Try MT fetching for frags that do not need linearization + limitedArena.execute([&]() { + tbb::parallel_for( + tbb::blocked_range(0, selected_frag_ids_vec.size()), [&](auto r) { + for (size_t idx = r.begin(); idx != r.end(); ++idx) { + if (std::find(idx_frags_to_inearize.begin(), + idx_frags_to_inearize.end(), + idx) == idx_frags_to_inearize.end()) { + const auto& selected_frag_ids = selected_frag_ids_vec[idx]; + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback( + col_id, selected_frag_ids, frag_col_buffers, true)) { + empty_frags = true; // not virtual, but empty frags + tbb::task::current_context()->cancel_group_execution(); + } + } + all_frag_col_buffers[idx] = frag_col_buffers; + } + } + }); + }); + if (empty_frags) { + return {}; + } + for (const size_t idx : + idx_frags_to_inearize) { // linear frags materialization is already + // parallelized, avoid nested tbb + const auto& selected_frag_ids = selected_frag_ids_vec[idx]; + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) { + return {}; // not virtual, but empty frags + } } - if (needFetchAllFragments(*col_id, ra_exe_unit, selected_fragments)) { - // determine if we need special treatment to linearlize multi-frag table - // i.e., a column that is classified as varlen type, i.e., array - // for now, we can support more types in this way - if (needLinearizeAllFragments( - *col_id, ra_exe_unit, selected_fragments, memory_level)) { - bool for_lazy_fetch = false; - if (plan_state_->columns_to_not_fetch_.find(*col_id) != - plan_state_->columns_to_not_fetch_.end()) { - for_lazy_fetch = true; - VLOG(2) << "Try to linearize lazy fetch column (col_id: " - << col_id->getColId() << ")"; - } - frag_col_buffers[it->second] = column_fetcher.linearizeColumnFragments( - col_id->getColInfo(), - all_tables_fragments, - chunks, - chunk_iterators, - for_lazy_fetch ? Data_Namespace::CPU_LEVEL : memory_level, - for_lazy_fetch ? 0 : device_id, - device_allocator, - thread_idx); - } else { - frag_col_buffers[it->second] = - column_fetcher.getAllTableColumnFragments(col_id->getColInfo(), - all_tables_fragments, - memory_level_for_column, - device_id, - device_allocator, - thread_idx); + all_frag_col_buffers[idx] = frag_col_buffers; + } + } else { + for (const auto& selected_frag_ids : frag_ids_crossjoin) { + std::vector frag_col_buffers( + plan_state_->global_to_local_col_ids_.size()); + for (const auto& col_id : col_global_ids) { + CHECK(col_id); + if (!col_id->isVirtual() && + fetch_column_callback(col_id, selected_frag_ids, frag_col_buffers)) { + return {}; // not virtual, but empty frags } - } else { - frag_col_buffers[it->second] = - column_fetcher.getOneTableColumnFragment(col_id->getColInfo(), - frag_id, - all_tables_fragments, - chunks, - chunk_iterators, - memory_level_for_column, - device_id, - device_allocator); } + selected_frag_ids_vec.push_back(selected_frag_ids); + all_frag_col_buffers.push_back(frag_col_buffers); } - all_frag_col_buffers.push_back(frag_col_buffers); } + // selected_frag_ids_vec here is just a vector representation of frag_ids_crossjoin + // we need it in this form to have a proper fragment order when doing MT fetching to GPU std::tie(all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags( - ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments); + ra_exe_unit, selected_frag_ids_vec, ra_exe_unit.input_descs, all_tables_fragments); return {all_frag_col_buffers, all_num_rows, all_frag_offsets}; } @@ -3087,6 +3169,7 @@ FetchResult Executor::fetchUnionChunks( std::vector> all_frag_col_buffers; std::vector> all_num_rows; std::vector> all_frag_offsets; + std::vector> selected_frag_ids_vec; CHECK(!selected_fragments.empty()); CHECK_LE(2u, ra_exe_unit.input_descs.size()); @@ -3185,12 +3268,16 @@ FetchResult Executor::fetchUnionChunks( device_allocator); } } + selected_frag_ids_vec.push_back(selected_frag_ids); all_frag_col_buffers.push_back(frag_col_buffers); } std::vector> num_rows; std::vector> frag_offsets; - std::tie(num_rows, frag_offsets) = getRowCountAndOffsetForAllFrags( - ra_exe_unit, frag_ids_crossjoin, ra_exe_unit.input_descs, all_tables_fragments); + std::tie(num_rows, frag_offsets) = + getRowCountAndOffsetForAllFrags(ra_exe_unit, + selected_frag_ids_vec, + ra_exe_unit.input_descs, + all_tables_fragments); all_num_rows.insert(all_num_rows.end(), num_rows.begin(), num_rows.end()); all_frag_offsets.insert( all_frag_offsets.end(), frag_offsets.begin(), frag_offsets.end()); diff --git a/omniscidb/QueryEngine/Execute.h b/omniscidb/QueryEngine/Execute.h index 1ba1ef637..e99c54bc6 100644 --- a/omniscidb/QueryEngine/Execute.h +++ b/omniscidb/QueryEngine/Execute.h @@ -572,7 +572,7 @@ class Executor : public StringDictionaryProvider { std::pair>, std::vector>> getRowCountAndOffsetForAllFrags( const RelAlgExecutionUnit& ra_exe_unit, - const CartesianProduct>>& frag_ids_crossjoin, + const std::vector>& frag_ids_crossjoin, const std::vector& input_descs, const std::map& all_tables_fragments); diff --git a/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp b/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp index 74f4f8198..aeb9adb79 100644 --- a/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp +++ b/omniscidb/QueryEngine/JoinHashTable/HashJoin.cpp @@ -52,7 +52,6 @@ JoinColumn HashJoin::fetchJoinColumn( effective_memory_level, device_id, dev_buff_owner, - /*thread_idx=*/0, chunks_owner, malloc_owner, data_provider_, diff --git a/omniscidb/QueryEngine/QueryExecutionContext.cpp b/omniscidb/QueryEngine/QueryExecutionContext.cpp index aa9cbe444..8a1f45d13 100644 --- a/omniscidb/QueryEngine/QueryExecutionContext.cpp +++ b/omniscidb/QueryEngine/QueryExecutionContext.cpp @@ -352,6 +352,7 @@ std::vector QueryExecutionContext::launchGpuCode( << " ms"; launchClock->start(); } + auto timer_kernel = DEBUG_TIMER("Actual kernel"); kernel->launch(ko, kernel_params); if (executor_->getConfig().exec.watchdog.enable_dynamic || allow_runtime_interrupt) { @@ -365,6 +366,7 @@ std::vector QueryExecutionContext::launchGpuCode( gpu_allocator_->copyFromDevice(reinterpret_cast(error_codes.data()), reinterpret_cast(err_desc), error_codes.size() * sizeof(error_codes[0])); + timer_kernel.stop(); *error_code = aggregate_error_codes(error_codes); if (*error_code > 0) { return {}; @@ -858,9 +860,9 @@ QueryExecutionContext::prepareKernelParams( const uint64_t num_fragments = static_cast(col_buffers.size()); const size_t col_count{num_fragments > 0 ? col_buffers.front().size() : 0}; + std::vector multifrag_col_dev_buffers; + std::vector flatened_col_buffers; if (col_count) { - std::vector multifrag_col_dev_buffers; - std::vector flatened_col_buffers; std::vector col_buffs_offsets; for (auto& buffers : col_buffers) { flatened_col_buffers.insert( @@ -959,6 +961,7 @@ QueryExecutionContext::prepareKernelParams( reinterpret_cast(kernel_metadata_gpu_buf->getMemoryPtr() + alloc_size)); CHECK_EQ(nullptr, params[GROUPBY_BUF]); - buffer_provider->synchronizeStream(device_id); + buffer_provider->synchronizeDeviceDataStream(device_id); + return {params, kernel_metadata_gpu_buf}; } diff --git a/omniscidb/QueryEngine/RelAlgExecutor.cpp b/omniscidb/QueryEngine/RelAlgExecutor.cpp index 2421fbb85..64288098c 100644 --- a/omniscidb/QueryEngine/RelAlgExecutor.cpp +++ b/omniscidb/QueryEngine/RelAlgExecutor.cpp @@ -1154,7 +1154,6 @@ std::unique_ptr RelAlgExecutor::createWindowFunctionConte memory_level, 0, nullptr, - /*thread_idx=*/0, chunks_owner, data_provider_, column_cache_map);