diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.cpp b/omniscidb/DataMgr/BufferMgr/Buffer.cpp index dc804b69ba..c7e7520d65 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.cpp +++ b/omniscidb/DataMgr/BufferMgr/Buffer.cpp @@ -166,4 +166,37 @@ int8_t* Buffer::getMemoryPtr() { void Buffer::setMemoryPtr(int8_t* new_ptr) { mem_ = new_ptr; } + +void Buffer::deleteSelf() { + // ZeroCopy buffers don't have correct iterators by design. + // To delete it we are detecting them and delete explicitly without removing segment as + // it done in deleteBuffer(...) + if (seg_it_ == BufferList::iterator()) { + delete this; + return; + } + + bm_->deleteBuffer(seg_it_->chunk_key); +} + +int Buffer::unPin() { + std::unique_lock pin_lock(pin_mutex_); + int res = (--pin_count_); + if (!res && delete_on_unpin_) { + pin_lock.unlock(); + // deleteSelf + deleteSelf(); + } + return res; +} + +void Buffer::deleteWhenUnpinned() { + std::unique_lock pin_lock(pin_mutex_); + if (pin_count_) { + delete_on_unpin_ = true; + } else { + pin_lock.unlock(); + deleteSelf(); + } +} } // namespace Buffer_Namespace diff --git a/omniscidb/DataMgr/BufferMgr/Buffer.h b/omniscidb/DataMgr/BufferMgr/Buffer.h index 92a880e2d4..ea62d5ec9f 100644 --- a/omniscidb/DataMgr/BufferMgr/Buffer.h +++ b/omniscidb/DataMgr/BufferMgr/Buffer.h @@ -134,28 +134,14 @@ class Buffer : public AbstractBuffer { return (++pin_count_); } - inline int unPin() override { - std::lock_guard pin_lock(pin_mutex_); - int res = (--pin_count_); - if (!res && delete_on_unpin_) { - delete this; - } - return res; - } + int unPin() override; + inline int getPinCount() override { std::lock_guard pin_lock(pin_mutex_); return (pin_count_); } - inline void deleteWhenUnpinned() override { - std::unique_lock pin_lock(pin_mutex_); - if (pin_count_) { - delete_on_unpin_ = true; - } else { - pin_lock.unlock(); - delete this; - } - } + void deleteWhenUnpinned() override; // Added for testing. int32_t getSlabNum() const { return seg_it_->slab_num; } @@ -177,6 +163,8 @@ class Buffer : public AbstractBuffer { const MemoryLevel src_buffer_type = CPU_LEVEL, const int src_device_id = -1) = 0; + void deleteSelf(); + BufferMgr* bm_; BufferList::iterator seg_it_; size_t page_size_; /// the size of each page in the buffer diff --git a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp index fc710d462e..530c0c7730 100644 --- a/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp +++ b/omniscidb/DataMgr/BufferMgr/BufferMgr.cpp @@ -103,6 +103,12 @@ void BufferMgr::clear() { // for removal to have them deleted when unpinned. for (auto& buf : chunk_index_) { if (buf.second->buffer) { + // WARN !!!!!!!!!!!!!!!!!! + // deleteWhenUnpinned(...) will call free(...) that will call deleteBuffer(...) in + // case when segment iterator is valid. That method will try to acquire + // chunk_index_mutex_ that already acquired here. To avoid deadlock and remove all + // the stuff, we are cleaning segments later we are removing segment iterator. + buf.second->buffer->seg_it_ = BufferList::iterator(); buf.second->buffer->deleteWhenUnpinned(); buf.second->buffer = nullptr; } @@ -623,6 +629,7 @@ void BufferMgr::deleteBuffer(const ChunkKey& key, const bool) { chunk_index_lock.unlock(); std::lock_guard sized_segs_lock(sized_segs_mutex_); if (seg_it->buffer) { + CHECK_EQ(seg_it->buffer->getPinCount(), 0); delete seg_it->buffer; // Delete Buffer for segment seg_it->buffer = 0; } @@ -826,12 +833,15 @@ AbstractBuffer* BufferMgr::alloc(const size_t num_bytes) { return createBuffer(chunk_key, page_size_, num_bytes); } +// all buffer deletions should be done via free(...) void BufferMgr::free(AbstractBuffer* buffer) { Buffer* casted_buffer = dynamic_cast(buffer); if (casted_buffer == 0) { LOG(FATAL) << "Wrong buffer type - expects base class pointer to Buffer type."; } - deleteBuffer(casted_buffer->seg_it_->chunk_key); + CHECK_EQ(casted_buffer->getPinCount(), 1); + casted_buffer->deleteWhenUnpinned(); + casted_buffer->unPin(); } size_t BufferMgr::getNumChunks() { diff --git a/omniscidb/DataMgr/Chunk/Chunk.h b/omniscidb/DataMgr/Chunk/Chunk.h index 30de33d30b..cfcf77959a 100644 --- a/omniscidb/DataMgr/Chunk/Chunk.h +++ b/omniscidb/DataMgr/Chunk/Chunk.h @@ -51,7 +51,14 @@ class Chunk { : buffer_(nullptr), index_buf_(nullptr), column_info_(col_info) {} Chunk(AbstractBuffer* b, AbstractBuffer* ib, ColumnInfoPtr col_info) - : buffer_(b), index_buf_(ib), column_info_(col_info) {} + : buffer_(b), index_buf_(ib), column_info_(col_info) { + if (buffer_) { + buffer_->pin(); + } + if (index_buf_) { + index_buf_->pin(); + } + } ~Chunk() { unpinBuffer(); } diff --git a/omniscidb/QueryEngine/ColumnFetcher.cpp b/omniscidb/QueryEngine/ColumnFetcher.cpp index 62fd71f33a..0b2bc13ede 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.cpp +++ b/omniscidb/QueryEngine/ColumnFetcher.cpp @@ -521,6 +521,10 @@ const int8_t* ColumnFetcher::linearizeColumnFragments( } } CHECK(res.first); // check merged data buffer + // This buffers are associated with Chunk, that created by hands, not with + // Chunk::getChunk(...) method So it should be removed to do it we mark both buffers to + // delete on unpin in ColumnFetcher dtor. Pin means that none of chunks are uses this + // buffer. if (!type->isFixedLenArray()) { CHECK(res.second); // check merged index buffer } @@ -1061,34 +1065,3 @@ ChunkIter ColumnFetcher::prepareChunkIter(AbstractBuffer* merged_data_buf, merged_chunk_iter.elem_type_size = chunk_iter.elem_type_size; return merged_chunk_iter; } - -void ColumnFetcher::freeLinearizedBuf() { - std::lock_guard linearized_col_cache_guard(linearized_col_cache_mutex_); - auto buffer_provider = executor_->getBufferProvider(); - - if (!linearized_data_buf_cache_.empty()) { - for (auto& kv : linearized_data_buf_cache_) { - for (auto& kv2 : kv.second) { - buffer_provider->free(kv2.second); - } - } - } - - if (!linearized_idx_buf_cache_.empty()) { - for (auto& kv : linearized_idx_buf_cache_) { - for (auto& kv2 : kv.second) { - buffer_provider->free(kv2.second); - } - } - } -} - -void ColumnFetcher::freeTemporaryCpuLinearizedIdxBuf() { - std::lock_guard linearized_col_cache_guard(linearized_col_cache_mutex_); - auto buffer_provider = executor_->getBufferProvider(); - if (!linearlized_temporary_cpu_index_buf_cache_.empty()) { - for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) { - buffer_provider->free(kv.second); - } - } -} diff --git a/omniscidb/QueryEngine/ColumnFetcher.h b/omniscidb/QueryEngine/ColumnFetcher.h index 6a43f56e26..6694197067 100644 --- a/omniscidb/QueryEngine/ColumnFetcher.h +++ b/omniscidb/QueryEngine/ColumnFetcher.h @@ -37,6 +37,32 @@ class ColumnFetcher { ColumnFetcher(Executor* executor, DataProvider* data_provider, const ColumnCacheMap& column_cache); + ~ColumnFetcher() { + if (!linearized_data_buf_cache_.empty()) { + for (auto& kv : linearized_data_buf_cache_) { + for (auto& kv2 : kv.second) { + kv2.second->deleteWhenUnpinned(); + kv2.second->unPin(); + } + } + } + + if (!linearized_idx_buf_cache_.empty()) { + for (auto& kv : linearized_idx_buf_cache_) { + for (auto& kv2 : kv.second) { + kv2.second->deleteWhenUnpinned(); + kv2.second->unPin(); + } + } + } + + if (!linearlized_temporary_cpu_index_buf_cache_.empty()) { + for (auto& kv : linearlized_temporary_cpu_index_buf_cache_) { + kv.second->deleteWhenUnpinned(); + kv.second->unPin(); + } + } + }; //! Gets one chunk's pointer and element count on either CPU or GPU. static std::pair getOneColumnFragment( @@ -93,9 +119,6 @@ class ColumnFetcher { DeviceAllocator* device_allocator, const size_t thread_idx) const; - void freeTemporaryCpuLinearizedIdxBuf(); - void freeLinearizedBuf(); - DataProvider* getDataProvider() const { return data_provider_; } private: diff --git a/omniscidb/QueryEngine/Execute.cpp b/omniscidb/QueryEngine/Execute.cpp index 3bdbe9abbe..2d3ab83e42 100644 --- a/omniscidb/QueryEngine/Execute.cpp +++ b/omniscidb/QueryEngine/Execute.cpp @@ -2186,10 +2186,6 @@ hdk::ResultSetTable Executor::executeWorkUnitImpl( do { SharedKernelContext shared_context(query_infos); ColumnFetcher column_fetcher(this, data_provider, column_cache); - ScopeGuard scope_guard = [&column_fetcher] { - column_fetcher.freeLinearizedBuf(); - column_fetcher.freeTemporaryCpuLinearizedIdxBuf(); - }; if (ra_exe_unit.isShuffle()) { allocateShuffleBuffers(query_infos, ra_exe_unit, row_set_mem_owner, shared_context); diff --git a/omniscidb/Tests/JoinHashTableTest.cpp b/omniscidb/Tests/JoinHashTableTest.cpp index a1decec97d..7b46788141 100644 --- a/omniscidb/Tests/JoinHashTableTest.cpp +++ b/omniscidb/Tests/JoinHashTableTest.cpp @@ -670,6 +670,85 @@ TEST(Other, Regression) { dropTable("table_b"); } +TEST(Other, FixedLenArr) { + createTable("table_a", + {{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int64())}}, + {2}); + + std::ostringstream oss; + oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl; + oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl; + oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_a", oss.str()); + + createTable("table_b", + {{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}}, + {2}); + + std::ostringstream oss_2; + oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_b", oss_2.str()); + + EXPECT_NO_THROW(run_multiple_agg(R"( + SELECT * FROM table_a, table_b WHERE table_a.si = table_b.si; + )", + ExecutorDeviceType::CPU)); + + dropTable("table_a"); + dropTable("table_b"); +} + +TEST(Other, FixedLenArr2) { + config().rs.enable_lazy_fetch = false; + + createTable("table_a", + {{"si", ctx().int16()}, {"FixedLen", ctx().arrayFixed(2, ctx().int16())}}, + {1}); + + std::ostringstream oss; + oss << "{\"si\": 1, \"FixedLen\": [" << 7 << ", " << 233 << "]}" << std::endl; + oss << "{\"si\": 1, \"FixedLen\": [" << 5 << ", " << 47 << "]}" << std::endl; + oss << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_a", oss.str()); + + createTable("table_b", + {{"si", ctx().int8()}, {"FixedLen", ctx().arrayFixed(2, ctx().int32())}}, + {2}); + + std::ostringstream oss_2; + oss_2 << "{\"si\": 1, \"FixedLen\": [" << 2 << ", " << 33 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 1 << ", " << 17 << "]}" << std::endl; + oss_2 << "{\"si\": 2, \"FixedLen\": [" << 6 << ", " << 48 << "]}" << std::endl; + oss_2 << "{\"si\": 3, \"FixedLen\": [" << 7 << ", " << 49 << "]}" << std::endl; + oss_2 << "{\"si\": 4, \"FixedLen\": [" << 8 << ", " << 67 << "]}" << std::endl; + oss_2 << "{\"si\": 5, \"FixedLen\": [" << 9 << ", " << 68 << "]}" << std::endl; + oss_2 << "{\"si\": 6, \"FixedLen\": [" << 10 << ", " << 69 << "]}" << std::endl; + insertJsonValues("table_b", oss_2.str()); + + EXPECT_NO_THROW(run_multiple_agg(R"( + SELECT * FROM table_a INNER JOIN table_b ON table_a.FixedLen[1] = + table_b.FixedLen[1]; + )", + ExecutorDeviceType::CPU)); + + dropTable("table_a"); + dropTable("table_b"); +} + int main(int argc, char** argv) { TestHelpers::init_logger_stderr_only(argc, argv); testing::InitGoogleTest(&argc, argv);