From 45ea4c6cb8eceff166ac9e31b809b3d3383d2109 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Fri, 3 Oct 2025 17:03:59 +0200 Subject: [PATCH] [ICRDMA] Support for RDMA memory region in pdisk --- ydb/core/blobstorage/base/bufferwithgaps.h | 12 ++- .../blobstorage_pdisk_completion_impl.cpp | 4 +- .../pdisk/blobstorage_pdisk_completion_impl.h | 2 +- .../pdisk/blobstorage_pdisk_impl.cpp | 2 +- .../pdisk/blobstorage_pdisk_ut.cpp | 92 ++++++++++++++++++- .../pdisk/blobstorage_pdisk_ut_env.h | 3 +- ydb/core/blobstorage/ut_blobstorage/get.cpp | 52 +++++++++++ ydb/core/testlib/actor_helpers.cpp | 5 +- ydb/core/testlib/actor_helpers.h | 2 +- ydb/core/testlib/actors/test_runtime.cpp | 4 +- ydb/core/testlib/actors/test_runtime.h | 2 +- ydb/core/testlib/basics/runtime.cpp | 1 + ydb/core/util/actorsys_test/testactorsys.h | 3 + ydb/library/actors/core/actorsystem.cpp | 35 +++++++ ydb/library/actors/core/actorsystem.h | 23 +++++ ydb/library/actors/core/event.h | 3 + ydb/library/actors/core/event_load.h | 2 + ydb/library/actors/core/ya.make | 1 + ydb/library/actors/testlib/test_runtime.cpp | 9 +- ydb/library/actors/testlib/test_runtime.h | 3 +- ydb/library/actors/util/rc_buf.cpp | 14 +++ ydb/library/actors/util/rc_buf.h | 10 +- ydb/library/actors/util/rope.h | 12 +-- 23 files changed, 270 insertions(+), 26 deletions(-) diff --git a/ydb/core/blobstorage/base/bufferwithgaps.h b/ydb/core/blobstorage/base/bufferwithgaps.h index 47f5f016aeeb..5fda36082cdf 100644 --- a/ydb/core/blobstorage/base/bufferwithgaps.h +++ b/ydb/core/blobstorage/base/bufferwithgaps.h @@ -45,12 +45,16 @@ namespace NKikimr { , IsCommited(false) {} - TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom) - : Data(TRcBuf::UninitializedPageAligned(size, tailroom)) + TBufferWithGaps(ui32 offset, TRcBuf&& data) + : Data(data) , Offset(offset) , IsCommited(false) {} + TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom) + : TBufferWithGaps(offset, TRcBuf::UninitializedPageAligned(size, tailroom)) + {} + TBufferWithGaps(TBufferWithGaps &&) = default; TBufferWithGaps &operator=(TBufferWithGaps &&) = default; @@ -98,7 +102,7 @@ namespace NKikimr { } ui8 *RawDataPtr(ui32 offset, ui32 len) { - Y_ABORT_UNLESS(offset + len <= Data.size() + Data.Tailroom(), "Buffer has size# %zu less then requested offset# %" PRIu32 + Y_ABORT_UNLESS(offset + len <= Data.size() + Data.UnsafeTailroom(), "Buffer has size# %zu less then requested offset# %" PRIu32 " len# %" PRIu32, Data.size(), offset, len); IsCommited = false; return reinterpret_cast(Data.GetDataMut() + offset); @@ -146,7 +150,7 @@ namespace NKikimr { } ui32 SizeWithTail() const { - return Data.size() + Data.Tailroom(); + return Data.size() + Data.UnsafeTailroom(); } void Swap(TBufferWithGaps& other) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index 3a25a7be113c..fc240d5854f5 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -310,7 +310,7 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) { } TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr &read, std::function onDestroy, - ui64 chunkNonce) + ui64 chunkNonce, IRcBufAllocator* alloc) : TCompletionAction() , PDisk(pDisk) , Read(read) @@ -338,7 +338,7 @@ TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtrSize : read->Size + read->Offset % sectorSize; size_t tailroom = AlignUp(newSize, sectorSize) - newSize; - CommonBuffer = TBufferWithGaps(read->Offset, newSize, tailroom); + CommonBuffer = TBufferWithGaps(read->Offset, alloc->AllocPageAlignedRcBuf(newSize, tailroom)); } TCompletionChunkRead::~TCompletionChunkRead() { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h index b8ece04d09f9..cc06f5a45da9 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h @@ -222,7 +222,7 @@ class TCompletionChunkRead : public TCompletionAction { const ui64 DoubleFreeCanary; public: TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr &read, std::function onDestroy, - ui64 chunkNonce); + ui64 chunkNonce, IRcBufAllocator* alloc); void Exec(TActorSystem *actorSystem) override; ~TCompletionChunkRead(); void ReplyError(TActorSystem *actorSystem, TString reason); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 76f89d0fa045..67261245aaea 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -3174,7 +3174,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { --state.OperationsInProgress; --inFlight->ChunkReads; }; - read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce); + read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce, PCtx->ActorSystem->GetRcBufAllocator()); static_cast(request)->SelfPointer = std::move(read); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp index 47d7a15ed6e4..182aa10ccd3b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp @@ -8,6 +8,8 @@ #include #include +#include + #include namespace NKikimr { @@ -1560,9 +1562,10 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { return data; } - void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks) { + void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks, bool rdmaAlloc) { TActorTestContext testCtx({ .PlainDataChunks = plainDataChunks, + .UseRdmaAllocator = rdmaAlloc, }); Cerr << "plainDataChunks# " << plainDataChunks << Endl; @@ -1607,8 +1610,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) { } } Y_UNIT_TEST(ChunkWriteDifferentOffsetAndSize) { - for (int i = 0; i <= 1; ++i) { - ChunkWriteDifferentOffsetAndSizeImpl(i); + for (ui32 i = 0; i <= 3; ++i) { + ChunkWriteDifferentOffsetAndSizeImpl(i & 1, i & 2); } } @@ -2481,4 +2484,87 @@ Y_UNIT_TEST_SUITE(ShredPDisk) { #endif } + +Y_UNIT_TEST_SUITE(RDMA) { + void TestChunkReadWithRdmaAllocator(bool plainDataChunks) { + TActorTestContext testCtx({ + .PlainDataChunks = plainDataChunks, + .UseRdmaAllocator = true, + }); + TVDiskMock vdisk(&testCtx); + + vdisk.InitFull(); + vdisk.ReserveChunk(); + vdisk.CommitReservedChunks(); + auto chunk = *vdisk.Chunks[EChunkState::COMMITTED].begin(); + + for (i64 writeSize: {1_KB - 123, 64_KB, 128_KB + 8765}) { + for (i64 readOffset: {0_KB, 0_KB + 123, 4_KB, 8_KB + 123}) { + for (i64 readSize: {writeSize, writeSize - 567, writeSize - i64(8_KB), writeSize - i64(8_KB) + 987}) { + if (readOffset < 0 || readSize < 0 || readOffset + readSize > writeSize) { + continue; + } + Cerr << "chunkBufSize: " << writeSize << ", readOffset: " << readOffset << ", readSize: " << readSize << Endl; + auto parts = MakeIntrusive(PrepareData(writeSize)); + testCtx.Send(new NPDisk::TEvChunkWrite( + vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, + chunk, 0, parts, nullptr, false, 0)); + auto write = testCtx.Recv(); + + testCtx.Send(new NPDisk::TEvChunkRead( + vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound, + chunk, readOffset, readSize, 0, nullptr)); + auto read = testCtx.Recv(); + TRcBuf readBuf = read->Data.ToString(); + NInterconnect::NRdma::TMemRegionSlice memReg = NInterconnect::NRdma::TryExtractFromRcBuf(readBuf); + UNIT_ASSERT_C(!memReg.Empty(), "Failed to extract RDMA memory region from RcBuf"); + UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), readSize, "Unexpected size of RDMA memory region"); + } + } + } + } + + Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorEncryptedChunks) { + TestChunkReadWithRdmaAllocator(false); + } + Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorPlainChunks) { + TestChunkReadWithRdmaAllocator(true); + } + + Y_UNIT_TEST(TestRcBuf) { + ui32 size = 129961; + ui32 offset = 123; + ui32 tailRoom = 1111; + ui32 totalSize = size + tailRoom; + UNIT_ASSERT_VALUES_EQUAL(totalSize, 131072); + + auto alloc1 = [](ui32 size, ui32 headRoom, ui32 tailRoom) { + TRcBuf buf = TRcBuf::UninitializedPageAligned(size + headRoom + tailRoom); + buf.TrimFront(size + tailRoom); + buf.TrimBack(size); + Cerr << "alloc1: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl; + return buf; + }; + auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); + auto alloc2 = [memPool](ui32 size, ui32 headRoom, ui32 tailRoom) -> TRcBuf { + TRcBuf buf = memPool->AllocRcBuf(size + headRoom + tailRoom, NInterconnect::NRdma::IMemPool::EMPTY).value(); + buf.TrimFront(size + tailRoom); + buf.TrimBack(size); + Cerr << "alloc2: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl; + return buf; + }; + + auto buf1 = TBufferWithGaps(offset, alloc1(size, 0, tailRoom)); + auto buf2 = TBufferWithGaps(offset, alloc2(size, 0, tailRoom)); + + Cerr << "buf1: " << buf1.PrintState() << " " << buf1.Size() << " " << buf1.SizeWithTail() << Endl; + Cerr << "buf2: " << buf2.PrintState() << " " << buf2.Size() << " " << buf2.SizeWithTail() << Endl; + + UNIT_ASSERT_VALUES_EQUAL_C(buf1.Size(), buf2.Size(), "Buffers should have the same size"); + + buf1.RawDataPtr(0, totalSize); + buf2.RawDataPtr(0, totalSize); + } +} + } // namespace NKikimr diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h index 96bfe18ae7a1..1ba784213219 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h @@ -34,6 +34,7 @@ struct TActorTestContext { bool ReadOnly = false; bool InitiallyZeroed = false; // Only for sector map. Zero first 1MiB on start. bool PlainDataChunks = false; + bool UseRdmaAllocator = false; }; private: @@ -92,7 +93,7 @@ struct TActorTestContext { } TActorTestContext(TSettings settings) - : Runtime(new TTestActorRuntime(1, true)) + : Runtime(new TTestActorRuntime(1, true, true, settings.UseRdmaAllocator)) , TestCtx(settings.UseSectorMap, settings.DiskMode, settings.DiskSize) , Settings(settings) { diff --git a/ydb/core/blobstorage/ut_blobstorage/get.cpp b/ydb/core/blobstorage/ut_blobstorage/get.cpp index c0ebf5da0a79..659a90c25c43 100644 --- a/ydb/core/blobstorage/ut_blobstorage/get.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/get.cpp @@ -1,6 +1,8 @@ #include #include +#include + Y_UNIT_TEST_SUITE(Get) { void SendGet( @@ -108,4 +110,54 @@ Y_UNIT_TEST_SUITE(Get) { // check that now TEvGet returns BLOCKED for blocked generation with reader params SendGet(test, originalBlobId, data, NKikimrProto::BLOCKED, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration)); } + + Y_UNIT_TEST(TestRdmaRegiseredMemory) { + TEnvironmentSetup env(true); + TTestInfo test = InitTest(env); + + auto groups = env.GetGroups(); + auto groupInfo = env.GetGroupInfo(groups.front()); + + ui64 tabletId = 1; + ui32 tabletGeneration = 1; + + constexpr ui32 size = 1000; + TString data(size, 'a'); + TLogoBlobID originalBlobId(tabletId, tabletGeneration, 0, 0, size, 0); + + SendPut(test, originalBlobId, data, NKikimrProto::OK); + env.CompactVDisk(groupInfo->GetActorId(0)); + + auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); + env.Runtime->FilterFunction = [&](ui32, std::unique_ptr& ev) { + if (ev->GetTypeRewrite() == TEvBlobStorage::EvChunkReadResult) { + Cerr << "TEvChunkReadResult: " << ev->ToString() << Endl; + auto* res = ev->CastAsLocal(); + auto buf = res->Data.ToString(); + auto newBuf = memPool->AllocRcBuf(buf.size(), NInterconnect::NRdma::IMemPool::EMPTY); + UNIT_ASSERT(newBuf); + std::memcpy(newBuf->UnsafeGetDataMut(), buf.GetData(), buf.GetSize()); + res->Data = TBufferWithGaps(0, std::move(*newBuf)); + res->Data.Commit(); + } else if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGetResult) { + Cerr << "TEvVGetResult: " << ev->ToString() << Endl; + auto* res = ev->CastAsLocal(); + auto payload = res->GetPayload(); + UNIT_ASSERT_VALUES_EQUAL(payload.size(), 1); + for (auto& rope : payload) { + for (auto it = rope.Begin(); it != rope.End(); ++it) { + const TRcBuf& chunk = it.GetChunk(); + auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(chunk); + UNIT_ASSERT_C(!memReg.Empty(), "unable to extract mem region from chunk"); + UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), size, "invalid size for memReg"); + UNIT_ASSERT_C(memReg.GetLKey(0) != 0, "invalid lkey"); + UNIT_ASSERT_C(memReg.GetRKey(0) != 0, "invalid rkey"); + } + } + } + return true; + }; + + SendGet(test, originalBlobId, data, NKikimrProto::OK, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration)); + } } diff --git a/ydb/core/testlib/actor_helpers.cpp b/ydb/core/testlib/actor_helpers.cpp index 73b963d5b5d6..7efaefabd323 100644 --- a/ydb/core/testlib/actor_helpers.cpp +++ b/ydb/core/testlib/actor_helpers.cpp @@ -2,10 +2,13 @@ namespace NKikimr { -TActorSystemStub::TActorSystemStub() +TActorSystemStub::TActorSystemStub(std::shared_ptr alloc) : AppData(0, 0, 0, 0, {}, nullptr, nullptr, nullptr, nullptr) { THolder setup(new NActors::TActorSystemSetup); + if (alloc) { + setup->RcBufAllocator = alloc; + } System.Reset(new NActors::TActorSystem(setup, &AppData)); Mailbox.Reset(new NActors::TMailbox()); ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, "thread")); diff --git a/ydb/core/testlib/actor_helpers.h b/ydb/core/testlib/actor_helpers.h index 263166e929f8..0256bd955e36 100644 --- a/ydb/core/testlib/actor_helpers.h +++ b/ydb/core/testlib/actor_helpers.h @@ -16,7 +16,7 @@ struct TActorSystemStub { NActors::TActivationContext* PrevCtx; TAppData AppData; - TActorSystemStub(); + TActorSystemStub(std::shared_ptr alloc = {}); ~TActorSystemStub(); }; diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 02a4a37ab126..fe4f9523db34 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -69,9 +69,9 @@ namespace NActors { Initialize(); } - TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) + TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, bool useRdmaAllocator) : TPortManager(false) - , TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads} + , TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads, useRdmaAllocator} { Initialize(); } diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 9c2e22574a31..916864282c28 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -72,7 +72,7 @@ namespace NActors { }; TTestActorRuntime(THeSingleSystemEnv d); - TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); + TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads, bool useRdmaAllocator=false); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount); TTestActorRuntime(ui32 nodeCount = 1, bool useRealThreads = false); TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, NKikimr::NAudit::TAuditLogBackends&& auditLogBackends); diff --git a/ydb/core/testlib/basics/runtime.cpp b/ydb/core/testlib/basics/runtime.cpp index 2f9653183fdc..d96e3cd7d2f9 100644 --- a/ydb/core/testlib/basics/runtime.cpp +++ b/ydb/core/testlib/basics/runtime.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include diff --git a/ydb/core/util/actorsys_test/testactorsys.h b/ydb/core/util/actorsys_test/testactorsys.h index bd7efdf07a36..0d9421bbdeb4 100644 --- a/ydb/core/util/actorsys_test/testactorsys.h +++ b/ydb/core/util/actorsys_test/testactorsys.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -289,6 +290,8 @@ class TTestActorSystem { setup->Executors.Reset(new TAutoPtr[setup->ExecutorsCount]); IExecutorPool *pool = CreateTestExecutorPool(nodeId); setup->Executors[0].Reset(pool); + auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); + setup->RcBufAllocator = std::make_shared(memPool); // we create this actor for correct service lookup through ActorSystem setup->LocalServices.emplace_back(LoggerSettings_->LoggerActorId, TActorSetupCmd( diff --git a/ydb/library/actors/core/actorsystem.cpp b/ydb/library/actors/core/actorsystem.cpp index e39449061235..4731bf091451 100644 --- a/ydb/library/actors/core/actorsystem.cpp +++ b/ydb/library/actors/core/actorsystem.cpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include namespace NActors { @@ -70,6 +72,38 @@ namespace NActors { } }; + TRdmaAllocatorWithFallback::TRdmaAllocatorWithFallback(std::shared_ptr memPool) noexcept + : RdmaMemPool(memPool) + {} + + TRcBuf TRdmaAllocatorWithFallback::AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept { + std::optional buf = TryAllocRdmaRcBuf(size, headRoom, tailRoom); + if (!buf) { + return GetDefaultRcBufAllocator()->AllocRcBuf(size, headRoom, tailRoom); + } + return buf.value(); + } + + TRcBuf TRdmaAllocatorWithFallback::AllocPageAlignedRcBuf(size_t size, size_t tailRoom) noexcept { + std::optional buf = TryAllocRdmaRcBuf(size, 0, tailRoom); + if (!buf) { + return GetDefaultRcBufAllocator()->AllocPageAlignedRcBuf(size, tailRoom); + } + return buf.value(); + } + + template + std::optional TRdmaAllocatorWithFallback::TryAllocRdmaRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept { + std::optional buf = RdmaMemPool->AllocRcBuf(size + headRoom + tailRoom, + pageAligned ? NInterconnect::NRdma::IMemPool::PAGE_ALIGNED : NInterconnect::NRdma::IMemPool::EMPTY); + if (!buf) { + return {}; + } + buf->TrimFront(size + tailRoom); + buf->TrimBack(size); + return buf; + } + TActorSystem::TActorSystem(THolder& setup, void* appData, TIntrusivePtr loggerSettings) : NodeId(setup->NodeId) @@ -80,6 +114,7 @@ namespace NActors { , CurrentTimestamp(0) , CurrentMonotonic(0) , CurrentIDCounter(RandomNumber()) + , RcBufAllocator(setup->RcBufAllocator ? setup->RcBufAllocator.get() : GetDefaultRcBufAllocator()) , SystemSetup(setup.Release()) , DefSelfID(NodeId, "actorsystem") , AppData0(appData) diff --git a/ydb/library/actors/core/actorsystem.h b/ydb/library/actors/core/actorsystem.h index 4f6a82218ada..e916255fa22b 100644 --- a/ydb/library/actors/core/actorsystem.h +++ b/ydb/library/actors/core/actorsystem.h @@ -16,6 +16,10 @@ #include #include +namespace NInterconnect::NRdma { + class IMemPool; +} + namespace NActors { class IActor; class TActorSystem; @@ -84,6 +88,18 @@ namespace NActors { TProxyWrapperFactory ProxyWrapperFactory; }; + using TRcBufAllocator = std::function; + class TRdmaAllocatorWithFallback : public IRcBufAllocator { + public: + TRdmaAllocatorWithFallback(std::shared_ptr memPool) noexcept; + TRcBuf AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept override; + TRcBuf AllocPageAlignedRcBuf(size_t size, size_t tailRoom) noexcept override; + private: + template + std::optional TryAllocRdmaRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept; + std::shared_ptr RdmaMemPool; + }; + struct TActorSystemSetup { ui32 NodeId = 0; @@ -102,6 +118,8 @@ namespace NActors { using TLocalServices = TVector>; TLocalServices LocalServices; + std::shared_ptr RcBufAllocator; + ui32 GetExecutorsCount() const { return Executors ? ExecutorsCount : CpuManager.GetExecutorsCount(); } @@ -152,6 +170,8 @@ namespace NActors { THolder ScheduleQueue; mutable TTicketLock ScheduleLock; + mutable IRcBufAllocator* RcBufAllocator; + friend class TExecutorThread; THolder SystemSetup; @@ -310,5 +330,8 @@ namespace NActors { void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const; void GetExecutorPoolStates(std::vector &states) const; + IRcBufAllocator* GetRcBufAllocator() const { + return RcBufAllocator; + } }; } diff --git a/ydb/library/actors/core/event.h b/ydb/library/actors/core/event.h index 1a6a0321683f..79a188f32837 100644 --- a/ydb/library/actors/core/event.h +++ b/ydb/library/actors/core/event.h @@ -33,6 +33,9 @@ namespace NActors { } virtual ui32 Type() const = 0; virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0; + virtual std::optional SerializeToRope(std::function) const { + return std::nullopt; + } virtual bool IsSerializable() const = 0; virtual ui32 CalculateSerializedSizeCached() const { return CalculateSerializedSize(); diff --git a/ydb/library/actors/core/event_load.h b/ydb/library/actors/core/event_load.h index f2e5da4ef832..b289706ed9a3 100644 --- a/ydb/library/actors/core/event_load.h +++ b/ydb/library/actors/core/event_load.h @@ -6,6 +6,8 @@ #include #include +#include + namespace NActors { class IEventHandle; diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index 31984280e6e6..72071fc837d3 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -103,6 +103,7 @@ GENERATE_ENUM_SERIALIZATION(log_iface.h) PEERDIR( ydb/library/actors/actor_type + ydb/library/actors/interconnect/rdma ydb/library/actors/core/harmonizer ydb/library/actors/memory_log ydb/library/actors/prof diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index cff90d149471..79323ec19537 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -501,7 +502,7 @@ namespace NActors { SingleSysEnv = true; } - TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads) + TTestActorRuntimeBase::TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, bool useRdmaAllocator) : ScheduledCount(0) , ScheduledLimit(100000) , MainThreadId(TThread::CurrentThreadId()) @@ -510,6 +511,7 @@ namespace NActors { , NodeCount(nodeCount) , DataCenterCount(dataCenterCount) , UseRealThreads(useRealThreads) + , UseRdmaAllocator(useRdmaAllocator) , LocalId(0) , DispatchCyclesCount(0) , DispatchedEventsCount(0) @@ -1758,6 +1760,11 @@ namespace NActors { setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0)); } + if (UseRdmaAllocator) { + auto memPool = NInterconnect::NRdma::CreateDummyMemPool(); + setup->RcBufAllocator = std::make_shared(memPool); + } + InitActorSystemSetup(*setup, node); return setup; diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index 0f37b11f96ef..15daa6a41276 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -283,7 +283,7 @@ namespace NActors { TTestActorRuntimeBase(THeSingleSystemEnv); - TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads); + TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads, bool useRdmaAllocator=false); TTestActorRuntimeBase(ui32 nodeCount, ui32 dataCenterCount); TTestActorRuntimeBase(ui32 nodeCount = 1, bool useRealThreads = false); virtual ~TTestActorRuntimeBase(); @@ -755,6 +755,7 @@ namespace NActors { const ui32 NodeCount; const ui32 DataCenterCount; const bool UseRealThreads; + const bool UseRdmaAllocator = false; std::function)> ICCommonSetupper; ui64 LocalId; diff --git a/ydb/library/actors/util/rc_buf.cpp b/ydb/library/actors/util/rc_buf.cpp index 946c9846ee5a..0721eb95592a 100644 --- a/ydb/library/actors/util/rc_buf.cpp +++ b/ydb/library/actors/util/rc_buf.cpp @@ -4,3 +4,17 @@ template<> void Out(IOutputStream& s, const TRcBuf& x) { s.Write(TStringBuf(x)); } + +static class DefaultRcBufAllocator final : public IRcBufAllocator { +public: + TRcBuf AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept { + return TRcBuf::Uninitialized(size, headRoom, tailRoom); + } + TRcBuf AllocPageAlignedRcBuf(size_t size, size_t tailRoom) noexcept { + return TRcBuf::UninitializedPageAligned(size, tailRoom); + } +} RcBufAllocator; + +IRcBufAllocator* GetDefaultRcBufAllocator() noexcept { + return &RcBufAllocator; +} diff --git a/ydb/library/actors/util/rc_buf.h b/ydb/library/actors/util/rc_buf.h index 3f3e5be7bac2..ac63d7893627 100644 --- a/ydb/library/actors/util/rc_buf.h +++ b/ydb/library/actors/util/rc_buf.h @@ -269,7 +269,6 @@ class TMutableContiguousSpan struct IContiguousChunk : TThrRefBase { using TPtr = TIntrusivePtr; - enum EInnerType { OTHER=0, RDMA_MEM_REG=1, @@ -1150,3 +1149,12 @@ class TRcBuf { return TMutableContiguousSpan(GetDataMut(), GetSize()); } }; + +class IRcBufAllocator { +public: + virtual ~IRcBufAllocator() = default; + virtual TRcBuf AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept = 0; + virtual TRcBuf AllocPageAlignedRcBuf(size_t size, size_t tailRoom) noexcept = 0; +}; + +IRcBufAllocator* GetDefaultRcBufAllocator() noexcept; diff --git a/ydb/library/actors/util/rope.h b/ydb/library/actors/util/rope.h index 94c380e23999..08449f67e6a9 100644 --- a/ydb/library/actors/util/rope.h +++ b/ydb/library/actors/util/rope.h @@ -308,6 +308,12 @@ class TRope { return *Iter; } + template = true> + TRcBuf& GetChunk() { + CheckValid(); + return *Iter; + } + private: friend class TRope; @@ -316,12 +322,6 @@ class TRope { return Iter; } - template = true> - TRcBuf& GetChunk() { - CheckValid(); - return *Iter; - } - typename TTraits::TListIterator GetChainBegin() const { CheckValid(); return Rope->Chain.begin();