Skip to content

Commit 87fe3fd

Browse files
authored
[ICRDMA] Support for RDMA memory region in pdisk, NBYDB-1578 (ydb-platform#27580)
Common RDMA aware RCBUF allocation infrastructure and pdisk support.
1 parent 58ec6db commit 87fe3fd

26 files changed

+331
-39
lines changed

ydb/core/blobstorage/base/bufferwithgaps.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,16 @@ namespace NKikimr {
4545
, IsCommited(false)
4646
{}
4747

48-
TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom)
49-
: Data(TRcBuf::UninitializedPageAligned(size, tailroom))
48+
TBufferWithGaps(ui32 offset, TRcBuf&& data)
49+
: Data(std::move(data))
5050
, Offset(offset)
5151
, IsCommited(false)
5252
{}
5353

54+
TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom)
55+
: TBufferWithGaps(offset, TRcBuf::UninitializedPageAligned(size, tailroom))
56+
{}
57+
5458
TBufferWithGaps(TBufferWithGaps &&) = default;
5559
TBufferWithGaps &operator=(TBufferWithGaps &&) = default;
5660

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) {
311311
}
312312

313313
TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
314-
ui64 chunkNonce)
314+
ui64 chunkNonce, IRcBufAllocator* alloc)
315315
: TCompletionAction()
316316
, PDisk(pDisk)
317317
, Read(read)
@@ -339,7 +339,7 @@ TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRe
339339
? read->Size
340340
: read->Size + read->Offset % sectorSize;
341341
size_t tailroom = AlignUp<size_t>(newSize, sectorSize) - newSize;
342-
CommonBuffer = TBufferWithGaps(read->Offset, newSize, tailroom);
342+
CommonBuffer = TBufferWithGaps(read->Offset, alloc->AllocPageAlignedRcBuf(newSize, tailroom));
343343
}
344344

345345
TCompletionChunkRead::~TCompletionChunkRead() {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ class TCompletionChunkRead : public TCompletionAction {
222222
const ui64 DoubleFreeCanary;
223223
public:
224224
TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
225-
ui64 chunkNonce);
225+
ui64 chunkNonce, IRcBufAllocator* alloc);
226226
void Exec(TActorSystem *actorSystem) override;
227227
~TCompletionChunkRead();
228228
void ReplyError(TActorSystem *actorSystem, TString reason);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3187,7 +3187,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
31873187
--state.OperationsInProgress;
31883188
--inFlight->ChunkReads;
31893189
};
3190-
read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce);
3190+
read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce, PCtx->ActorSystem->GetRcBufAllocator());
31913191

31923192
static_cast<TChunkRead*>(request)->SelfPointer = std::move(read);
31933193

ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <ydb/core/driver_lib/version/ut/ut_helpers.h>
1111
#include <ydb/core/testlib/actors/test_runtime.h>
1212

13+
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
14+
1315
#include <util/system/hp_timer.h>
1416

1517
namespace NKikimr {
@@ -1562,9 +1564,10 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
15621564
return data;
15631565
}
15641566

1565-
void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks) {
1567+
void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks, bool rdmaAlloc) {
15661568
TActorTestContext testCtx({
15671569
.PlainDataChunks = plainDataChunks,
1570+
.UseRdmaAllocator = rdmaAlloc,
15681571
});
15691572
Cerr << "plainDataChunks# " << plainDataChunks << Endl;
15701573

@@ -1609,8 +1612,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
16091612
}
16101613
}
16111614
Y_UNIT_TEST(ChunkWriteDifferentOffsetAndSize) {
1612-
for (int i = 0; i <= 1; ++i) {
1613-
ChunkWriteDifferentOffsetAndSizeImpl(i);
1615+
for (ui32 i = 0; i <= 3; ++i) {
1616+
ChunkWriteDifferentOffsetAndSizeImpl(i & 1, i & 2);
16141617
}
16151618
}
16161619

@@ -3033,4 +3036,86 @@ Y_UNIT_TEST_SUITE(TPDiskPrefailureDiskTest) {
30333036
}
30343037
}
30353038

3039+
Y_UNIT_TEST_SUITE(RDMA) {
3040+
void TestChunkReadWithRdmaAllocator(bool plainDataChunks) {
3041+
TActorTestContext testCtx({
3042+
.PlainDataChunks = plainDataChunks,
3043+
.UseRdmaAllocator = true,
3044+
});
3045+
TVDiskMock vdisk(&testCtx);
3046+
3047+
vdisk.InitFull();
3048+
vdisk.ReserveChunk();
3049+
vdisk.CommitReservedChunks();
3050+
auto chunk = *vdisk.Chunks[EChunkState::COMMITTED].begin();
3051+
3052+
for (i64 writeSize: {1_KB - 123, 64_KB, 128_KB + 8765}) {
3053+
for (i64 readOffset: {0_KB, 0_KB + 123, 4_KB, 8_KB + 123}) {
3054+
for (i64 readSize: {writeSize, writeSize - 567, writeSize - i64(8_KB), writeSize - i64(8_KB) + 987}) {
3055+
if (readOffset < 0 || readSize < 0 || readOffset + readSize > writeSize) {
3056+
continue;
3057+
}
3058+
Cerr << "chunkBufSize: " << writeSize << ", readOffset: " << readOffset << ", readSize: " << readSize << Endl;
3059+
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(PrepareData(writeSize));
3060+
testCtx.Send(new NPDisk::TEvChunkWrite(
3061+
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
3062+
chunk, 0, parts, nullptr, false, 0));
3063+
auto write = testCtx.Recv<NPDisk::TEvChunkWriteResult>();
3064+
3065+
testCtx.Send(new NPDisk::TEvChunkRead(
3066+
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
3067+
chunk, readOffset, readSize, 0, nullptr));
3068+
auto read = testCtx.Recv<NPDisk::TEvChunkReadResult>();
3069+
TRcBuf readBuf = read->Data.ToString();
3070+
NInterconnect::NRdma::TMemRegionSlice memReg = NInterconnect::NRdma::TryExtractFromRcBuf(readBuf);
3071+
UNIT_ASSERT_C(!memReg.Empty(), "Failed to extract RDMA memory region from RcBuf");
3072+
UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), readSize, "Unexpected size of RDMA memory region");
3073+
}
3074+
}
3075+
}
3076+
}
3077+
3078+
Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorEncryptedChunks) {
3079+
TestChunkReadWithRdmaAllocator(false);
3080+
}
3081+
Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorPlainChunks) {
3082+
TestChunkReadWithRdmaAllocator(true);
3083+
}
3084+
3085+
Y_UNIT_TEST(TestRcBuf) {
3086+
ui32 size = 129961;
3087+
ui32 offset = 123;
3088+
ui32 tailRoom = 1111;
3089+
ui32 totalSize = size + tailRoom;
3090+
UNIT_ASSERT_VALUES_EQUAL(totalSize, 131072);
3091+
3092+
auto alloc1 = [](ui32 size, ui32 headRoom, ui32 tailRoom) {
3093+
TRcBuf buf = TRcBuf::UninitializedPageAligned(size + headRoom + tailRoom);
3094+
buf.TrimFront(size + tailRoom);
3095+
buf.TrimBack(size);
3096+
Cerr << "alloc1: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl;
3097+
return buf;
3098+
};
3099+
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
3100+
auto alloc2 = [memPool](ui32 size, ui32 headRoom, ui32 tailRoom) -> TRcBuf {
3101+
TRcBuf buf = memPool->AllocRcBuf(size + headRoom + tailRoom, NInterconnect::NRdma::IMemPool::EMPTY).value();
3102+
buf.TrimFront(size + tailRoom);
3103+
buf.TrimBack(size);
3104+
Cerr << "alloc2: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl;
3105+
return buf;
3106+
};
3107+
3108+
auto buf1 = TBufferWithGaps(offset, alloc1(size, 0, tailRoom));
3109+
auto buf2 = TBufferWithGaps(offset, alloc2(size, 0, tailRoom));
3110+
3111+
Cerr << "buf1: " << buf1.PrintState() << " " << buf1.Size() << " " << buf1.SizeWithTail() << Endl;
3112+
Cerr << "buf2: " << buf2.PrintState() << " " << buf2.Size() << " " << buf2.SizeWithTail() << Endl;
3113+
3114+
UNIT_ASSERT_VALUES_EQUAL_C(buf1.Size(), buf2.Size(), "Buffers should have the same size");
3115+
3116+
buf1.RawDataPtr(0, totalSize);
3117+
buf2.RawDataPtr(0, totalSize);
3118+
}
3119+
}
3120+
30363121
} // namespace NKikimr

ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct TActorTestContext {
3535
bool ReadOnly = false;
3636
bool InitiallyZeroed = false; // Only for sector map. Zero first 1MiB on start.
3737
bool PlainDataChunks = false;
38+
bool UseRdmaAllocator = false;
3839
};
3940

4041
private:
@@ -94,7 +95,7 @@ struct TActorTestContext {
9495
}
9596

9697
TActorTestContext(TSettings settings)
97-
: Runtime(new TTestActorRuntime(1, true))
98+
: Runtime(new TTestActorRuntime(1, 1, true, settings.UseRdmaAllocator))
9899
, TestCtx(settings.UseSectorMap, settings.DiskMode, settings.DiskSize, settings.UsePath)
99100
, Settings(settings)
100101
{

ydb/core/blobstorage/ut_blobstorage/get.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
22
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
33

4+
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
5+
46
Y_UNIT_TEST_SUITE(Get) {
57

68
void SendGet(
@@ -108,4 +110,54 @@ Y_UNIT_TEST_SUITE(Get) {
108110
// check that now TEvGet returns BLOCKED for blocked generation with reader params
109111
SendGet(test, originalBlobId, data, NKikimrProto::BLOCKED, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration));
110112
}
113+
114+
Y_UNIT_TEST(TestRdmaRegiseredMemory) {
115+
TEnvironmentSetup env(true);
116+
TTestInfo test = InitTest(env);
117+
118+
auto groups = env.GetGroups();
119+
auto groupInfo = env.GetGroupInfo(groups.front());
120+
121+
ui64 tabletId = 1;
122+
ui32 tabletGeneration = 1;
123+
124+
constexpr ui32 size = 1000;
125+
TString data(size, 'a');
126+
TLogoBlobID originalBlobId(tabletId, tabletGeneration, 0, 0, size, 0);
127+
128+
SendPut(test, originalBlobId, data, NKikimrProto::OK);
129+
env.CompactVDisk(groupInfo->GetActorId(0));
130+
131+
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
132+
env.Runtime->FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) {
133+
if (ev->GetTypeRewrite() == TEvBlobStorage::EvChunkReadResult) {
134+
Cerr << "TEvChunkReadResult: " << ev->ToString() << Endl;
135+
auto* res = ev->CastAsLocal<NPDisk::TEvChunkReadResult>();
136+
auto buf = res->Data.ToString();
137+
auto newBuf = memPool->AllocRcBuf(buf.size(), NInterconnect::NRdma::IMemPool::EMPTY);
138+
UNIT_ASSERT(newBuf);
139+
std::memcpy(newBuf->UnsafeGetDataMut(), buf.GetData(), buf.GetSize());
140+
res->Data = TBufferWithGaps(0, std::move(*newBuf));
141+
res->Data.Commit();
142+
} else if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGetResult) {
143+
Cerr << "TEvVGetResult: " << ev->ToString() << Endl;
144+
auto* res = ev->CastAsLocal<TEvBlobStorage::TEvVGetResult>();
145+
auto payload = res->GetPayload();
146+
UNIT_ASSERT_VALUES_EQUAL(payload.size(), 1);
147+
for (auto& rope : payload) {
148+
for (auto it = rope.Begin(); it != rope.End(); ++it) {
149+
const TRcBuf& chunk = it.GetChunk();
150+
auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(chunk);
151+
UNIT_ASSERT_C(!memReg.Empty(), "unable to extract mem region from chunk");
152+
UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), size, "invalid size for memReg");
153+
UNIT_ASSERT_C(memReg.GetLKey(0) != 0, "invalid lkey");
154+
UNIT_ASSERT_C(memReg.GetRKey(0) != 0, "invalid rkey");
155+
}
156+
}
157+
}
158+
return true;
159+
};
160+
161+
SendGet(test, originalBlobId, data, NKikimrProto::OK, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration));
162+
}
111163
}

ydb/core/testlib/actor_helpers.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
namespace NKikimr {
44

5-
TActorSystemStub::TActorSystemStub()
5+
TActorSystemStub::TActorSystemStub(std::shared_ptr<IRcBufAllocator> alloc)
66
: AppData(0, 0, 0, 0, {}, nullptr, nullptr, nullptr, nullptr)
77
{
88
THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup);
9+
if (alloc) {
10+
setup->RcBufAllocator = alloc;
11+
}
912
System.Reset(new NActors::TActorSystem(setup, &AppData));
1013
Mailbox.Reset(new NActors::TMailbox());
1114
ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, "thread"));

ydb/core/testlib/actor_helpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ struct TActorSystemStub {
1616
NActors::TActivationContext* PrevCtx;
1717
TAppData AppData;
1818

19-
TActorSystemStub();
19+
TActorSystemStub(std::shared_ptr<IRcBufAllocator> alloc = {});
2020
~TActorSystemStub();
2121
};
2222

ydb/core/testlib/actors/test_runtime.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ namespace NActors {
6969
Initialize();
7070
}
7171

72-
TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
72+
TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, bool useRdmaAllocator)
7373
: TPortManager(false)
74-
, TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads}
74+
, TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads, useRdmaAllocator}
7575
{
7676
Initialize();
7777
}

0 commit comments

Comments
 (0)