diff --git a/ydb/library/actors/interconnect/rdma/mem_pool.cpp b/ydb/library/actors/interconnect/rdma/mem_pool.cpp index 0bc3ab2bd1a0..01952a6cf22a 100644 --- a/ydb/library/actors/interconnect/rdma/mem_pool.cpp +++ b/ydb/library/actors/interconnect/rdma/mem_pool.cpp @@ -33,6 +33,8 @@ struct ibv_mr { #include #include +#include + #include #include #include @@ -49,13 +51,27 @@ static constexpr size_t HPageSz = (1 << 21); using ::NMonitoring::TDynamicCounters; +struct TMemRegCompare { + using is_transparent = void; + bool operator ()(const TIntrusivePtr& a, const TIntrusivePtr& b) const noexcept { + return a->Chunk.Get() < b->Chunk.Get(); + } + bool operator ()(const NInterconnect::NRdma::TChunkPtr& a, const TIntrusivePtr& b) const noexcept { + return a.Get() < b->Chunk.Get(); + } + bool operator ()(const TIntrusivePtr& a, const NInterconnect::NRdma::TChunkPtr& b) const noexcept { + return a->Chunk.Get() < b.Get(); + } +}; + namespace NInterconnect::NRdma { // Cross-platform memory management static void* allocateMemory(size_t size, size_t alignment, bool hp); static void freeMemory(void* ptr) noexcept; - class TChunk: public NNonCopyable::TMoveOnly, public TAtomicRefCount { + class TChunk: public NNonCopyable::TMoveOnly, public TAtomicRefCount, public TIntrusiveListItem { + friend class TMemPoolBase; public: TChunk(std::vector&& mrs, IMemPool* pool) noexcept @@ -65,7 +81,7 @@ namespace NInterconnect::NRdma { } ~TChunk() { - MemPool->DealocateMr(MRs); + MemPool->DealocateMr(this); } ibv_mr* GetMr(size_t deviceIndex) noexcept { @@ -87,9 +103,105 @@ namespace NInterconnect::NRdma { return MemPool->Alloc(size, flags); } + bool TryReclaim() noexcept { + return ReclaimSemaphore.TryReclaim(); + } + + void DoRelease() noexcept { + ReclaimSemaphore.Release(); + } + + bool TryAcquire(ui64 expGeneration) noexcept { + return ReclaimSemaphore.TryAcquire(expGeneration); + } + + ui64 GetGeneration() const noexcept { + return ReclaimSemaphore.GetGeneration(); + } + + size_t Size() const noexcept { + Y_ABORT_UNLESS(!MRs.empty()); + return MRs[0]->length; + } + private: std::vector MRs; IMemPool* MemPool; + class TChunkUseLock { + // Prevent reclaming just after new chunk allocation + std::atomic Lock = ReclaimingBitMask; + static constexpr ui64 ReclaimingBitMask = 1ul << 63; + static constexpr size_t GenerationBits = 46u; + static constexpr size_t UseCountBits = 17u; + static constexpr ui64 UseCountMask = (1ul << UseCountBits) - 1u; + public: + void Release() noexcept { + //TODO: remove this verify it is a bit expensive + Y_ABORT_UNLESS(static_cast(Lock.load() & UseCountMask) > 0); + //Just decrease use count, no need to be synchronized with other threads + Lock.fetch_sub(1, std::memory_order_relaxed); + } + + bool TryAcquire(ui64 expectedGeneration) noexcept { + ui64 x = Lock.fetch_add(1, std::memory_order_acq_rel); + Y_ABORT_UNLESS((x & UseCountMask) < UseCountMask); + if (Y_LIKELY(((x & (~ReclaimingBitMask)) >> UseCountBits) == expectedGeneration)) { + // Check we need to unset reclaiming bit + if (Y_UNLIKELY(x & ReclaimingBitMask)) { + // In most cases (x & UseCountMask) == 0 here but there is a chanse that parallel TryAcquire + // will increase UseCount in moment (the time between Lock.fetch_add and Lock.fetch_sub) but + // there is only one thread which call TryAcquire with aim to unset Reclaiming bit. + // All other threads have not seen the updated generation yiet so they can't be in this part of code. + // That all mean we can set UseCount 1 and unset Reclaiming bit + const ui64 newX = (x & (~(ReclaimingBitMask | UseCountMask))) | 1ul; //generation without reclaiming bit and witn + x = (x & (~UseCountMask)) | 1ul; // value already incremented in the memory - expect single use count + while (!Lock.compare_exchange_strong(x, newX, + std::memory_order_release, std::memory_order_acquire)) { + // The only way to fail compare exchange here is parallel TryAcuire - see comment above + // So verify everething is as expected and try again + Y_ABORT_UNLESS(x & ReclaimingBitMask); //still reclaiming + Y_ABORT_UNLESS(((x & (~ReclaimingBitMask)) >> UseCountBits) == expectedGeneration); //still expected generation + Y_ABORT_UNLESS((x & UseCountMask) > 1ul); //it is the reson of CAS fail + + x = (x & (~UseCountMask)) | 1ul; // It is what we expect in the memory + } + } + return true; + } else { + Lock.fetch_sub(1, std::memory_order_release); + return false; + } + } + + bool TryReclaim() noexcept { + ui64 oldX = Lock.load(std::memory_order_relaxed); + ui64 newX; + do { + // Check the chunk is currently not in progress of reclaim + if (oldX & ReclaimingBitMask) { + return false; + } + // Check use count + if ((oldX & UseCountMask) != 0) { + return false; + } + // Bump generation. We have 46 bits for generation. + // 1 << 46 is 70368744177664, which mean even if we have 1000000 reclaimations per second + // we need ((1 << 46) / 1000000 ) / 3600 / 24 / 365 = 2.23 yaar of continuous reclaimations + // to rollower this counter. + // Also set Reclaimint bit to prevent possible multiple reclaim of same chunk (the time + // between Reclaim and first Acquire) + newX = (((oldX >> UseCountBits) + 1ul) << UseCountBits) | ReclaimingBitMask; + } while (!Lock.compare_exchange_weak(oldX, newX, + std::memory_order_release, std::memory_order_relaxed)); + + return true; + } + + ui64 GetGeneration() const noexcept { + return (Lock.load(std::memory_order_relaxed) & ~ReclaimingBitMask) >> UseCountBits; + } + } ReclaimSemaphore; }; TMemRegion::TMemRegion(TChunkPtr chunk, uint32_t offset, uint32_t size) noexcept @@ -103,7 +215,8 @@ namespace NInterconnect::NRdma { } TMemRegion::~TMemRegion() { - Chunk->Free(std::move(*this)); + if (Chunk) + Chunk->Free(std::move(*this)); } void* TMemRegion::GetAddr() const { @@ -286,19 +399,19 @@ namespace NInterconnect::NRdma { } TMemRegionPtr IMemPool::Alloc(int size, ui32 flags) noexcept { - TMemRegion* region = AllocImpl(size, flags); + TMemRegionPtr region = AllocImpl(size, flags); if (!region) { return nullptr; } - return TMemRegionPtr(region); + return region; } std::optional IMemPool::AllocRcBuf(int size, ui32 flags) noexcept { - TMemRegion* region = AllocImpl(size, flags); + TIntrusivePtr region = AllocImpl(size, flags); if (!region) { return {}; } - return TRcBuf(IContiguousChunk::TPtr(region)); + return TRcBuf(region); } static NMonitoring::TDynamicCounterPtr MakeCounters(TDynamicCounters* counters) { @@ -324,20 +437,32 @@ namespace NInterconnect::NRdma { AllocatedCounter = counter->GetCounter("RdmaPoolAllocatedUserBytes", false); AllocatedChunksCounter = counter->GetCounter("RdmaPoolAllocatedChunks", false); MaxAllocated.Counter = counter->GetCounter("RdmaPoolAllocatedUserMaxBytes", false); + ReclaimationRunCounter = counter->GetCounter("RdmaPoolReclaimationRunCounter", true); + ReclaimationFailCounter = counter->GetCounter("RdmaPoolReclaimationFailCounter", true); Y_ABORT_UNLESS((Alignment & Alignment - 1) == 0, "Alignment must be a power of two %zu", Alignment); } + + size_t GetAllocatedChunksCount() const noexcept { + return AllocatedChunks.load(std::memory_order_relaxed); + } + + size_t GetMaxChunks() const noexcept { + return MaxChunk; + } + protected: TChunkPtr AllocNewChunk(size_t size, bool hp) noexcept { const std::lock_guard lock(Mutex); Y_ABORT_UNLESS(AllocatedChunks <= MaxChunk); - if (AllocatedChunks == MaxChunk) { - return nullptr; - } - size = AlignUp(size, Alignment); + if (Y_UNLIKELY(AllocatedChunks == MaxChunk)) { + //return nullptr; + return TryReclaim(size); + } + void* ptr = allocateMemory(size, Alignment, hp); if (!ptr) { return nullptr; @@ -352,12 +477,16 @@ namespace NInterconnect::NRdma { AllocatedChunksCounter->Inc(); AllocatedChunks++; - return MakeIntrusive(std::move(mrs), this); + auto chunk = MakeIntrusive(std::move(mrs), this); + Chunks.PushBack(chunk.Get()); + return chunk; } - void DealocateMr(std::vector& mrs) noexcept override { + void DealocateMr(TChunk* chunk) noexcept override { + std::vector& mrs = chunk->MRs; { const std::lock_guard lock(Mutex); + chunk->Unlink(); AllocatedChunks--; AllocatedChunksCounter->Dec(); } @@ -388,20 +517,36 @@ namespace NInterconnect::NRdma { } } } + private: + TChunkPtr TryReclaim(size_t size) noexcept { + ReclaimationRunCounter->Inc(); + for (auto it = Chunks.Begin(); it!= Chunks.End(); it++) { + if (it->Size() == size && it->TryReclaim()) { + return &(*it); + } + } + ReclaimationFailCounter->Inc(); + return nullptr; + } + + protected: const NInterconnect::NRdma::NLinkMgr::TCtxsMap Ctxs; const size_t MaxChunk; const size_t Alignment; ::NMonitoring::TDynamicCounters::TCounterPtr AllocatedCounter; ::NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunksCounter; + ::NMonitoring::TDynamicCounters::TCounterPtr ReclaimationRunCounter; + ::NMonitoring::TDynamicCounters::TCounterPtr ReclaimationFailCounter; struct { NMonotonic::TMonotonic Time; std::atomic Val = 0; ::NMonitoring::TDynamicCounters::TCounterPtr Counter; } MaxAllocated; - size_t AllocatedChunks = 0; + std::atomic AllocatedChunks = 0; private: std::mutex Mutex; + TIntrusiveList Chunks; void Tick(NMonotonic::TMonotonic time) noexcept override { constexpr TDuration holdTime = TDuration::Seconds(15); @@ -420,7 +565,7 @@ namespace NInterconnect::NRdma { : TMemPoolBase(-1, MakeCounters(nullptr)) {} - TMemRegion* AllocImpl(int size, ui32) noexcept override { + TMemRegionPtr AllocImpl(int size, ui32) noexcept override { auto chunk = AllocNewChunk(size, false); if (!chunk) { return nullptr; @@ -450,35 +595,73 @@ namespace NInterconnect::NRdma { class TSlotMemPool: public TMemPoolBase { struct TChain { + using TMemRegcontainer = std::multiset, TMemRegCompare>; TChain() = default; void Init(ui32 slotSize) { SlotSize = slotSize; SlotsInBatch = GetSlotsInBatch(slotSize); } - TMemRegion* TryGetSlot() noexcept { + TMemRegionPtr TryGetSlot() noexcept { if (Slots.empty()) { return nullptr; } - // TMemRegion* slot = Slots.front(); - TMemRegion* slot = Slots.front().release(); - Slots.pop_front(); - return slot; + auto it = Slots.begin(); + TIntrusivePtr slot = *it; + const TChunk* const curChunkPtr = slot->Chunk.Get(); + const ui64 curGeneration = slot->Generation; + if (Y_LIKELY(slot->Chunk->TryAcquire(curGeneration))) { + Slots.erase(it); + return slot; + } else { + // Search for slots with same chunk and same (important!) generation to remove it from local cache + // The generation check is mandatory here because the cache can contain slots with same parent chunk but with different generation + // we need to delete only slots with generation we check in TryAcquire + while ((it != Slots.end()) && ((*it)->Chunk.Get() == curChunkPtr) && ((*it)->Generation == curGeneration)) { + (*it)->Chunk.Reset(); + Slots.erase(it++); + } + return TryGetSlot(); + } } - void PutSlot(std::unique_ptr&& slot) noexcept { - Slots.push_back(std::move(slot)); + void PutSlot(TIntrusivePtr slot) noexcept { + Slots.insert(slot); } - void PutSlotsBatch(std::list>&& slots) noexcept { - Slots.splice(Slots.end(), slots); + TMemRegcontainer::iterator PutSlot(TIntrusivePtr slot, TMemRegcontainer::iterator hint) noexcept { + return Slots.insert(hint, slot); } - std::list> GetSlotsBatch(ui32 batchSize) { + TMemRegcontainer::iterator FindHint(const TChunkPtr& p) noexcept { + return Slots.find(p); + } + void PutSlotsBatch(std::list>&& slots) noexcept { + for (auto& x : slots) { + PutSlot(x); + } + } + std::list> GetSlotsBatch(ui32 batchSize) { Y_ABORT_UNLESS(Slots.size() >= batchSize, "Not enough slots in chain"); - std::list> res; + std::list> res; auto it = Slots.begin(); - std::advance(it, batchSize); - res.splice(res.end(), Slots, Slots.begin(), it); + for (size_t i = 0; i < batchSize; i++) { + res.insert(res.end(), *it); + it = Slots.erase(it); + } return res; } - std::list> Slots; + //TODO: It shoult be possible to improve it using more optimised container and may be using raw pointer to store TMemRegion + // Requirements: + // * We need to group all regions with same Chunk + // * We need to be able to delete only slots with given chunk + generation + // * We need some way to prevent cal "Free" during deletion + // + // multiset here is a bit naive approach but it works + // unordered_multiset works much worse due to div op inside hash calculation. + // + // Some ideas to think about: + // - The size of chunk relative large (32Mb+) - so 25 low bits in addres are same + // - The user address space less than 64 bits + // - Probably we can reserve address on startup to get linear space + TMemRegcontainer Slots; + ui32 SlotSize; ui32 SlotsInBatch; }; @@ -489,24 +672,34 @@ namespace NInterconnect::NRdma { SlotSize = slotSize; SlotsInBatch = GetSlotsInBatch(slotSize); } - std::optional>> GetSlotsBatch() { - std::list> res; + std::optional>> GetSlotsBatch(bool allowIncompleted) { + std::list> res; if (FullBatchesSlots.Dequeue(&res)) { return res; } + if (allowIncompleted && IncompleteBatchMutex.try_lock()) { + if (IncompleteBatch.size()) { + res = std::move(IncompleteBatch); + IncompleteBatch.clear(); + } + IncompleteBatchMutex.unlock(); + if (res.size()) { + return res; + } + } return std::nullopt; } - void PutSlotsBatches(std::list>&& slots) { + void PutSlotsBatches(std::list>&& slots) { Y_DEBUG_ABORT_UNLESS(slots.size() == SlotsInBatch, "Invalid slots size: %zu, expected: %u", slots.size(), SlotsInBatch); FullBatchesSlots.Enqueue(std::move(slots)); } - void PutSlotsBatches(std::list>>&& slots) { + void PutSlotsBatches(std::list>>&& slots) { for (auto& batch : slots) { Y_DEBUG_ABORT_UNLESS(batch.size() == SlotsInBatch, "Invalid slots size: %zu, expected: %u", batch.size(), SlotsInBatch); FullBatchesSlots.Enqueue(std::move(batch)); } } - void PutSlot(std::unique_ptr&& slot) noexcept { + void PutSlot(TIntrusivePtr slot) noexcept { std::lock_guard lock(IncompleteBatchMutex); IncompleteBatch.push_back(std::move(slot)); if (IncompleteBatch.size() >= SlotsInBatch) { @@ -515,16 +708,16 @@ namespace NInterconnect::NRdma { } } - TLockFreeStack>> FullBatchesSlots; + TLockFreeStack>> FullBatchesSlots; ui32 SlotSize; ui32 SlotsInBatch; std::mutex IncompleteBatchMutex; - std::list> IncompleteBatch; + std::list> IncompleteBatch; }; static constexpr ui32 MinAllocSz = 512; - static constexpr ui32 MaxAllocSz = 8 * 1024 * 1024; + static constexpr ui32 MaxAllocSz = 32 * 1024 * 1024; static_assert((MinAllocSz & (MinAllocSz - 1)) == 0, "MinAllocSz must be a power of 2"); static_assert((MaxAllocSz & (MaxAllocSz - 1)) == 0, "MaxAllocSz must be a power of 2"); static constexpr ui32 ChainsNum = GetNumChains(MinAllocSz, MaxAllocSz); @@ -547,7 +740,25 @@ namespace NInterconnect::NRdma { ~TSlotMemPoolCache() { Stopped = true; } - TMemRegion* AllocImpl(int size, ui32 flags, TSlotMemPool& pool) noexcept { + + bool AllocAndSplitNewChunk(TChain& chain, TSlotMemPool& pool) { + TChunkPtr chunk = pool.AllocNewChunk(BatchSizeBytes, true); + if (!chunk) { + return false; + } + TChain::TMemRegcontainer::iterator it = chain.FindHint(chunk); + const ui64 generation = chunk->GetGeneration(); + for (ui32 i = 0; i < chain.SlotsInBatch; ++i) { + auto x = MakeIntrusive(chunk, i * chain.SlotSize, chain.SlotSize); + //TODO: Use Y_DEBUG_ABORT_UNLESS instead + Y_ABORT_UNLESS(generation == chunk->GetGeneration()); + x->Generation = generation; + it = chain.PutSlot(x, it); + } + return true; + } + + TMemRegionPtr AllocImpl(int size, ui32 flags, TSlotMemPool& pool) noexcept { if (flags & IMemPool::PAGE_ALIGNED && static_cast(size) < pool.Alignment) { size = pool.Alignment; } @@ -559,39 +770,40 @@ namespace NInterconnect::NRdma { Y_ABORT_UNLESS(chainIndex < ChainsNum, "Invalid chain index: %u", chainIndex); // Try to get slot from local cache auto& localChain = Chains[chainIndex]; - TMemRegion* slot = localChain.TryGetSlot(); + TMemRegionPtr slot = localChain.TryGetSlot(); if (slot) { return slot; } + + const bool allChunksAllocated = pool.GetAllocatedChunksCount() == pool.GetMaxChunks(); + // If no slot in local cache, try to get from global pool - auto batch = pool.Chains[chainIndex].GetSlotsBatch(); + auto batch = pool.Chains[chainIndex].GetSlotsBatch(allChunksAllocated); if (batch) { localChain.PutSlotsBatch(std::move(*batch)); return localChain.TryGetSlot(); } // If no slots in global pool, allocate new chunk - auto chunk = pool.AllocNewChunk(BatchSizeBytes, true); - if (!chunk) { + if (!AllocAndSplitNewChunk(localChain, pool)) { return nullptr; } - for (size_t i = 0; i < localChain.SlotsInBatch; ++i) { - auto region = std::make_unique(chunk, i * localChain.SlotSize, localChain.SlotSize); - localChain.PutSlot(std::move(region)); - } + return localChain.TryGetSlot(); } void Free(TMemRegion&& mr, TSlotMemPool& pool) noexcept { ui32 chainIndex = GetChainIndex(mr.GetSize()); - if (Stopped) { + if (Y_UNLIKELY(Stopped)) { // current thread is stopped, return mr to global pool - pool.Chains[chainIndex].PutSlot(std::make_unique(std::move(mr))); + pool.Chains[chainIndex].PutSlot(MakeIntrusive(std::move(mr))); return; } + mr.Chunk->DoRelease(); + auto& chain = Chains[chainIndex]; Y_ABORT_UNLESS(chainIndex < ChainsNum, "Invalid chain index: %u", chainIndex); - chain.PutSlot(std::make_unique(std::move(mr))); + chain.PutSlot(MakeIntrusive(std::move(mr))); if (chain.Slots.size() >= 2 * chain.SlotsInBatch) { // TODO: replace constant 2 // If we have too much slots in local cache, put them back to global pool @@ -625,6 +837,23 @@ namespace NInterconnect::NRdma { } } + ~TSlotMemPool() { + for (size_t i = 0; i < Chains.size(); i++) { + TLockFreeChain& chain = Chains[i]; + for (auto& x : chain.IncompleteBatch) { + x->Chunk.Reset(); + } + + std::list> tmp; + while (chain.FullBatchesSlots.Dequeue(&tmp)) { + for (auto& x : tmp) { + x->Chunk.Reset(); + } + tmp.clear(); + } + } + } + int GetMaxAllocSz() const noexcept override { return MaxAllocSz; } @@ -634,7 +863,7 @@ namespace NInterconnect::NRdma { } protected: - TMemRegion* AllocImpl(int size, ui32 flags) noexcept override { + TMemRegionPtr AllocImpl(int size, ui32 flags) noexcept override { if (auto memReg = LocalCache.AllocImpl(size, flags, *this)) { memReg->Resize(size); i64 newVal = AllocatedCounter->Add(size); diff --git a/ydb/library/actors/interconnect/rdma/mem_pool.h b/ydb/library/actors/interconnect/rdma/mem_pool.h index 798fcc882022..badf8d435c0e 100644 --- a/ydb/library/actors/interconnect/rdma/mem_pool.h +++ b/ydb/library/actors/interconnect/rdma/mem_pool.h @@ -14,16 +14,20 @@ namespace NMonitoring { struct TDynamicCounters; } +struct TMemRegCompare; namespace NInterconnect::NRdma { class IMemPool; class TMemRegion; class TChunk; class TMemPoolImpl; + class TSlotMemPool; using TChunkPtr = TIntrusivePtr; class TMemRegion: public NNonCopyable::TMoveOnly, public IContiguousChunk { + friend struct ::TMemRegCompare; + friend class TSlotMemPool; public: TMemRegion(TChunkPtr chunk, uint32_t offset, uint32_t size) noexcept; TMemRegion(TMemRegion&& other) noexcept = default; @@ -35,8 +39,6 @@ namespace NInterconnect::NRdma { uint32_t GetLKey(size_t deviceIndex) const; uint32_t GetRKey(size_t deviceIndex) const; - void Resize(uint32_t newSize) noexcept; - public: // IContiguousChunk TContiguousSpan GetData() const override; TMutableContiguousSpan UnsafeGetDataMut() override; @@ -44,7 +46,9 @@ namespace NInterconnect::NRdma { EInnerType GetInnerType() const noexcept override; IContiguousChunk::TPtr Clone() noexcept override; protected: + void Resize(uint32_t newSize) noexcept; TChunkPtr Chunk; + ui64 Generation = 0; const uint32_t Offset; uint32_t Size; const uint32_t OrigSize; @@ -97,9 +101,9 @@ namespace NInterconnect::NRdma { virtual TString GetName() const noexcept = 0; protected: - virtual TMemRegion* AllocImpl(int size, ui32 flags) noexcept = 0; + virtual TMemRegionPtr AllocImpl(int size, ui32 flags) noexcept = 0; virtual void Free(TMemRegion&& mr, TChunk& chunk) noexcept = 0; - virtual void DealocateMr(std::vector& mrs) noexcept = 0; + virtual void DealocateMr(TChunk*) noexcept = 0; private: virtual void Tick(NMonotonic::TMonotonic time) noexcept = 0; }; diff --git a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/allocator_ut.cpp b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/allocator_ut.cpp deleted file mode 100644 index 21d8a6475229..000000000000 --- a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/allocator_ut.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include -#include - -#include -#include - -namespace NMonitoring { - struct TDynamicCounters; -} - -static void GTestSkip() { - GTEST_SKIP() << "Skipping all rdma tests for suit, set \"" - << NRdmaTest::RdmaTestEnvSwitchName << "\" env if it is RDMA compatible"; -} - -class TAllocatorSuite : public ::testing::Test { -protected: - void SetUp() override { - using namespace NRdmaTest; - if (IsRdmaTestDisabled()) { - GTestSkip(); - } - } -}; - -TEST_F(TAllocatorSuite, SlotPoolLimit) { - const NInterconnect::NRdma::TMemPoolSettings settings { - .SizeLimitMb = 32 - }; - static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); - - const size_t sz = 4 << 20; - std::vector regions; - regions.reserve(8); - size_t i = 0; - for (;;i++) { - auto reg = pool->Alloc(sz, 0); - if (!reg) { - UNIT_ASSERT(i == 8); // 32 / 4 - break; - } - ASSERT_TRUE(reg->GetAddr()) << "invalid address"; - ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; - regions.push_back(reg); - } - - regions.erase(regions.begin()); // free one region - - { - auto reg = pool->Alloc(sz, 0); // allocate one - ASSERT_TRUE(reg->GetAddr()) << "invalid address"; - ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; - UNIT_ASSERT(!pool->Alloc(sz, 0)); // pool is full - } - - regions.clear(); -} diff --git a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/allocator_ut.cpp b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/allocator_ut.cpp new file mode 100644 index 000000000000..1187c7c882e4 --- /dev/null +++ b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/allocator_ut.cpp @@ -0,0 +1,526 @@ +#include +#include + +#include +#include + +#include + +#include +#include + +#include + +namespace NMonitoring { + struct TDynamicCounters; +} + +static void GTestSkip() { + GTEST_SKIP() << "Skipping all rdma tests for suit, set \"" + << NRdmaTest::RdmaTestEnvSwitchName << "\" env if it is RDMA compatible"; +} + +class TAllocatorSuite128 : public ::testing::Test { +protected: + void SetUp() override { + using namespace NRdmaTest; + if (IsRdmaTestDisabled()) { + GTestSkip(); + } + } +}; + +TEST_F(TAllocatorSuite128, SlotPoolLimit) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + const size_t sz = 4 << 20; + std::vector regions; + regions.reserve(32); + size_t i = 0; + for (;;i++) { + auto reg = pool->Alloc(sz, 0); + if (!reg) { + UNIT_ASSERT(i == 32); + break; + } + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; + regions.push_back(reg); + } + + regions.erase(regions.begin()); // free one region + + { + auto reg = pool->Alloc(sz, 0); // allocate one + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; + UNIT_ASSERT(!pool->Alloc(sz, 0)); // pool is full + } + + regions.clear(); +} + +TEST_F(TAllocatorSuite128, SlotPoolHugeAlloc) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + const size_t sz = 32 << 20; + for (size_t i = 0; i < 4; i++) { + auto reg = pool->Alloc(sz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); +} + +TEST_F(TAllocatorSuite128, SlotPoolTinyAlloc) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + const size_t sz = 512; + regions.reserve(262144); + for (size_t i = 0;; i++) { + auto reg = pool->Alloc(sz, 0); + if (!reg) { + UNIT_ASSERT(i == 262144); + break; + } + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); + + { + auto reg = pool->Alloc(sz, 0); // allocate one + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; + } +} + +TEST_F(TAllocatorSuite128, SlotPoolHugeAfterTinyAlloc) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + const size_t sz = 512; + + regions.reserve(262144); + + for (size_t i = 0;; i++) { + auto reg = pool->Alloc(sz, 0); + if (!reg) { + UNIT_ASSERT(i == 262144); + break; + } + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + + regions.clear(); + + { + for (size_t i = 0; i < 4; i++) { + auto reg = pool->Alloc(sz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); + } +} + + +TEST_F(TAllocatorSuite128, AllocationFourThreads) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + auto allocFn = [&](size_t sz, size_t num, float& s, bool holdAllocations) { + size_t j = 0; + + auto now = TInstant::Now(); + std::vector alls; + alls.reserve(num); + while (j < num) { + auto memRegion = pool->Alloc(sz, 0); + if (!memRegion) { + continue; + } + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + if (holdAllocations) { + alls.push_back(memRegion); + } + j++; + } + alls.clear(); + + s = (TInstant::Now() - now).MicroSeconds(); + }; + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + size_t numAlloc0 = 100000; + size_t numAlloc1 = 100000; + size_t numAlloc2 = 100000; + size_t numAlloc3 = 100000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), false); + std::thread thread3(allocFn, 512, numAlloc3, std::ref(s3), false); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + size_t numAlloc2 = 10000; + size_t numAlloc3 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 4096, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 32768, numAlloc2, std::ref(s2), false); + std::thread thread3(allocFn, 65536, numAlloc3, std::ref(s3), false); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + size_t numAlloc2 = 10000; + size_t numAlloc3 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), true); + std::thread thread3(allocFn, 512, numAlloc2, std::ref(s3), true); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + size_t numAlloc0 = 65536; + size_t numAlloc1 = 32768; + size_t numAlloc2 = 8192; + size_t numAlloc3 = 1024; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 1024, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 4096, numAlloc2, std::ref(s2), true); + std::thread thread3(allocFn, 32768, numAlloc3, std::ref(s3), true); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us" << Endl; + } +} + +TEST_F(TAllocatorSuite128, AllocationWithReclaimSixThreads) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + auto allocFn = [&](size_t sz, size_t num, float& s, bool holdAllocations) { + size_t j = 0; + + auto now = TInstant::Now(); + std::vector alls; + alls.reserve(num); + while (j < num) { + auto memRegion = pool->Alloc(sz, 0); + if (!memRegion) { + continue; + } + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + if (holdAllocations) { + alls.push_back(memRegion); + } + j++; + } + alls.clear(); + + s = (TInstant::Now() - now).MicroSeconds(); + }; + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + float s4 = 0.0; + float s5 = 0.0; + size_t numAlloc0 = 100000; + size_t numAlloc1 = 100000; + size_t numAlloc2 = 100000; + size_t numAlloc3 = 100000; + size_t numAlloc4 = 100000; + size_t numAlloc5 = 100000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), false); + std::thread thread3(allocFn, 512, numAlloc3, std::ref(s3), false); + std::thread thread4(allocFn, 512, numAlloc4, std::ref(s4), false); + std::thread thread5(allocFn, 512, numAlloc5, std::ref(s5), false); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + thread5.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + s4 = s4 / float(numAlloc4); + s5 = s5 / float(numAlloc5); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us, t4: " << s4 << " us, t5: " << s5 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + float s4 = 0.0; + float s5 = 0.0; + size_t numAlloc0 = 100000; + size_t numAlloc1 = 100000; + size_t numAlloc2 = 100000; + size_t numAlloc3 = 100000; + size_t numAlloc4 = 100000; + size_t numAlloc5 = 100000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 4096, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 32768, numAlloc2, std::ref(s2), false); + std::thread thread3(allocFn, 65536, numAlloc3, std::ref(s3), false); + std::thread thread4(allocFn, 131072, numAlloc4, std::ref(s4), false); + std::thread thread5(allocFn, 262144, numAlloc5, std::ref(s5), false); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + thread5.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + s4 = s4 / float(numAlloc4); + s5 = s5 / float(numAlloc5); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us, t4: " << s4 << " us, t5: " << s5 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + float s4 = 0.0; + float s5 = 0.0; + size_t numAlloc0 = 43690; // total chunks we have 4 * 65536, so per thread: 4 * 65536 / 6 + size_t numAlloc1 = 43690; + size_t numAlloc2 = 43690; + size_t numAlloc3 = 43690; + size_t numAlloc4 = 43690; + size_t numAlloc5 = 43690; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), true); + std::thread thread3(allocFn, 512, numAlloc3, std::ref(s3), true); + std::thread thread4(allocFn, 512, numAlloc4, std::ref(s4), true); + std::thread thread5(allocFn, 512, numAlloc5, std::ref(s5), true); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + thread5.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + s4 = s4 / float(numAlloc4); + s5 = s5 / float(numAlloc5); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us, t4: " << s4 << " us, t5: " << s5 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + float s3 = 0.0; + float s4 = 0.0; + float s5 = 0.0; + size_t numAlloc0 = 65536; + size_t numAlloc1 = 32768; + size_t numAlloc2 = 8192; + size_t numAlloc3 = 4096; + size_t numAlloc4 = 1024; + size_t numAlloc5 = 512; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 1024, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 4096, numAlloc2, std::ref(s2), true); + std::thread thread3(allocFn, 8192, numAlloc3, std::ref(s3), true); + std::thread thread4(allocFn, 32768, numAlloc4, std::ref(s4), true); + std::thread thread5(allocFn, 65536, numAlloc5, std::ref(s5), true); + + thread0.join(); + thread1.join(); + thread2.join(); + thread3.join(); + thread4.join(); + thread5.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + s3 = s3 / float(numAlloc3); + s4 = s4 / float(numAlloc4); + s5 = s5 / float(numAlloc5); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us, t3: " << s3 <<" us, t4: " << s4 << " us, t5: " << s5 << " us" << Endl; + } +} + +TEST_F(TAllocatorSuite128, AllocationMultipleThreads) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 128 + }; + + auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + std::atomic threadsNum = 100; + + auto allocFn = [&](size_t maxSz, size_t num, NThreading::TFuture barrier) { + size_t j = 0; + + std::vector alls; + alls.reserve(num); + TReallyFastRng32 rng(RandomNumber()); + while (j < num) { + auto memRegion = pool->Alloc((rng() % maxSz) | 1u, 0); + if (!memRegion) { + continue; + } + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + alls.push_back(memRegion); + j++; + } + alls.clear(); + + threadsNum.fetch_sub(1); + + barrier.GetValueSync(); + }; + + NThreading::TPromise promise = NThreading::NewPromise(); + + std::vector> threads; + threads.reserve(threadsNum.load()); + + + // Launch threads + for (size_t i = threadsNum.load(); i > 0; i--) { + threads.emplace_back(std::make_unique(allocFn, 32768, 35, promise.GetFuture())); + } + + // whait the work to be done + while (threadsNum.load()) { + Sleep(TDuration::MilliSeconds(100)); + } + + // Signal to finish therads + promise.SetValue(); + + // Join + for (const auto& t : threads) { + t->join(); + } +} diff --git a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/ya.make b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/ya.make new file mode 100644 index 000000000000..03e671f0abca --- /dev/null +++ b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_128/ya.make @@ -0,0 +1,25 @@ +GTEST() +#TIMEOUT(3600) +IF (OS_LINUX AND SANITIZER_TYPE != "memory") + +IF (SANITIZER_TYPE == "thread") + SIZE(LARGE) + TIMEOUT(3600) + TAG(ya:fat) +ELSE() + SIZE(MEDIUM) +ENDIF() + +SRCS( + allocator_ut.cpp +) + +PEERDIR( + library/cpp/threading/future + ydb/library/actors/interconnect/rdma + ydb/library/actors/interconnect/rdma/ut/utils +) + +ENDIF() + +END() diff --git a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/allocator_ut.cpp b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/allocator_ut.cpp new file mode 100644 index 000000000000..3d7e3da09400 --- /dev/null +++ b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/allocator_ut.cpp @@ -0,0 +1,427 @@ +#include +#include + +#include +#include + +#include +#include + +#include + +namespace NMonitoring { + struct TDynamicCounters; +} + +static void GTestSkip() { + GTEST_SKIP() << "Skipping all rdma tests for suit, set \"" + << NRdmaTest::RdmaTestEnvSwitchName << "\" env if it is RDMA compatible"; +} + +class TAllocatorSuite32 : public ::testing::Test { +protected: + void SetUp() override { + using namespace NRdmaTest; + if (IsRdmaTestDisabled()) { + GTestSkip(); + } + } +}; + +TEST_F(TAllocatorSuite32, SlotPoolLimit) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + const size_t sz = 4 << 20; + std::vector regions; + regions.reserve(8); + size_t i = 0; + for (;;i++) { + auto reg = pool->Alloc(sz, 0); + if (!reg) { + UNIT_ASSERT(i == 8); // 32 / 4 + break; + } + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; + regions.push_back(reg); + } + + regions.erase(regions.begin()); // free one region + + { + auto reg = pool->Alloc(sz, 0); // allocate one + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + ASSERT_TRUE(reg->GetSize() == sz) << "invalid size of allocated chunk"; + UNIT_ASSERT(!pool->Alloc(sz, 0)); // pool is full + } + + regions.clear(); +} + +TEST_F(TAllocatorSuite32, SlotPoolHugeAlloc) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + const size_t sz = 8 << 20; + for (size_t i = 0; i < 4; i++) { + auto reg = pool->Alloc(sz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); +} + +TEST_F(TAllocatorSuite32, SlotPoolHugeAllocAfterSmall) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + const size_t smallSz = 1 << 20; + const size_t hugeSz = 4 << 20; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + regions.reserve(32); + for (size_t i = 0; i < 32;i++) { + auto reg = pool->Alloc(smallSz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); + + auto reg = pool->Alloc(hugeSz, 0); + ASSERT_TRUE(reg) << "allocation failed"; + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; +} + +TEST_F(TAllocatorSuite32, SlotPoolHugeAllocOtherThreadAfterSmall) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + const size_t smallSz = 1 << 20; + const size_t hugeSz = 4 << 20; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + std::vector regions; + regions.reserve(32); + for (size_t i = 0; i < 32;i++) { + auto reg = pool->Alloc(smallSz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); + + auto fn = [&]() { + auto reg = pool->Alloc(hugeSz, 0); + ASSERT_TRUE(reg) << "allocation failed"; + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + }; + + std::thread thread(fn); + thread.join(); + + // And try to alloc small again + for (size_t i = 0; i < 32;i++) { + auto reg = pool->Alloc(smallSz, 0); + ASSERT_TRUE(reg->GetAddr()) << "invalid address"; + regions.push_back(reg); + } + regions.clear(); +} + +TEST_F(TAllocatorSuite32, AllocationRandSizeWithReclaimOneThread) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + static auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + constexpr ui32 NUM_ALLOC = 40000; + constexpr ui32 MAX_REG_SZ = 2u << 20; + + TReallyFastRng32 rng(RandomNumber()); + + auto now = TInstant::Now(); + for (ui32 j = 0; j < NUM_ALLOC; ++j) { + auto memRegion = pool->Alloc(((rng() % MAX_REG_SZ) | 1u), 0); + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + } + + float s = (TInstant::Now() - now).MicroSeconds(); + + s = s / float(NUM_ALLOC); + Cerr << "Average time per allocation: " << s << " us" << Endl; +} + +TEST_F(TAllocatorSuite32, AllocationWithReclaimTwoThreads) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + auto allocFn = [&](size_t sz, size_t num, float& s, bool holdAllocations) { + size_t j = 0; + + auto now = TInstant::Now(); + std::vector alls; + alls.reserve(num); + while (j < num) { + auto memRegion = pool->Alloc(sz, 0); + if (!memRegion) { + continue; + } + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + if (holdAllocations) { + alls.push_back(memRegion); + } + j++; + } + alls.clear(); + + s = (TInstant::Now() - now).MicroSeconds(); + }; + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 1000; + size_t numAlloc1 = 1000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), false); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 100000; + size_t numAlloc1 = 100000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 32768, numAlloc1, std::ref(s1), false); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), true); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 1024, numAlloc1, std::ref(s1), true); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), false); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 1024, numAlloc1, std::ref(s1), false); + + thread0.join(); + thread1.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us" << Endl; + } + +} + +TEST_F(TAllocatorSuite32, AllocationWithReclaimThreeThreads) { + const NInterconnect::NRdma::TMemPoolSettings settings { + .SizeLimitMb = 32 + }; + + auto pool = NInterconnect::NRdma::CreateSlotMemPool(nullptr, settings); + + auto allocFn = [&](size_t sz, size_t num, float& s, bool holdAllocations) { + size_t j = 0; + + auto now = TInstant::Now(); + std::vector alls; + alls.reserve(num); + while (j < num) { + auto memRegion = pool->Alloc(sz, 0); + if (!memRegion) { + continue; + } + ASSERT_TRUE(memRegion) << "allocation failed"; + ASSERT_TRUE(memRegion->GetAddr()) << "invalid address"; + if (holdAllocations) { + alls.push_back(memRegion); + } + j++; + } + alls.clear(); + + s = (TInstant::Now() - now).MicroSeconds(); + }; + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + size_t numAlloc0 = 100; + size_t numAlloc1 = 100; + size_t numAlloc2 = 100; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), false); + + thread0.join(); + thread1.join(); + thread2.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + size_t numAlloc2 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), false); + std::thread thread1(allocFn, 4096, numAlloc1, std::ref(s1), false); + std::thread thread2(allocFn, 32768, numAlloc2, std::ref(s2), false); + + thread0.join(); + thread1.join(); + thread2.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + size_t numAlloc2 = 10000; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 512, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 512, numAlloc2, std::ref(s2), true); + + thread0.join(); + thread1.join(); + thread2.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us" << Endl; + } + + Cerr << "===" << Endl; + + for (size_t i = 0; i < 10; i++) { + float s0 = 0.0; + float s1 = 0.0; + float s2 = 0.0; + size_t numAlloc0 = 10000; + size_t numAlloc1 = 10000; + size_t numAlloc2 = 8192; + std::thread thread0(allocFn, 512, numAlloc0, std::ref(s0), true); + std::thread thread1(allocFn, 1024, numAlloc1, std::ref(s1), true); + std::thread thread2(allocFn, 4096, numAlloc2, std::ref(s2), true); + + thread0.join(); + thread1.join(); + thread2.join(); + + s0 = s0 / float(numAlloc0); + s1 = s1 / float(numAlloc1); + s2 = s2 / float(numAlloc2); + Cerr << "Average time per allocation t0: " << s0 << " us, t1: " << s1 << " us, t2: " << s2 << " us" << Endl; + } +} diff --git a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/ya.make b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/ya.make similarity index 90% rename from ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/ya.make rename to ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/ya.make index 40e41e647887..78cfdac613c8 100644 --- a/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit/ya.make +++ b/ydb/library/actors/interconnect/rdma/ut_mem_pool_limit_32/ya.make @@ -1,9 +1,10 @@ GTEST() - +#TIMEOUT(3600) IF (OS_LINUX AND SANITIZER_TYPE != "memory") IF (SANITIZER_TYPE == "thread") SIZE(LARGE) + TIMEOUT(3600) TAG(ya:fat) ELSE() SIZE(MEDIUM) diff --git a/ydb/library/actors/interconnect/rdma/ya.make b/ydb/library/actors/interconnect/rdma/ya.make index 40f5a86a9bb9..a5eb4459af93 100644 --- a/ydb/library/actors/interconnect/rdma/ya.make +++ b/ydb/library/actors/interconnect/rdma/ya.make @@ -43,5 +43,6 @@ RECURSE( RECURSE_FOR_TESTS( ut - ut_mem_pool_limit + ut_mem_pool_limit_32 + ut_mem_pool_limit_128 )