Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions ydb/core/blobstorage/base/bufferwithgaps.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ namespace NKikimr {
, IsCommited(false)
{}

TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom)
: Data(TRcBuf::UninitializedPageAligned(size, tailroom))
TBufferWithGaps(ui32 offset, TRcBuf&& data)
: Data(data)
, Offset(offset)
, IsCommited(false)
{}

TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom)
: TBufferWithGaps(offset, TRcBuf::UninitializedPageAligned(size, tailroom))
{}

TBufferWithGaps(TBufferWithGaps &&) = default;
TBufferWithGaps &operator=(TBufferWithGaps &&) = default;

Expand Down Expand Up @@ -98,7 +102,7 @@ namespace NKikimr {
}

ui8 *RawDataPtr(ui32 offset, ui32 len) {
Y_ABORT_UNLESS(offset + len <= Data.size() + Data.Tailroom(), "Buffer has size# %zu less then requested offset# %" PRIu32
Y_ABORT_UNLESS(offset + len <= Data.size() + Data.UnsafeTailroom(), "Buffer has size# %zu less then requested offset# %" PRIu32
" len# %" PRIu32, Data.size(), offset, len);
IsCommited = false;
return reinterpret_cast<ui8 *>(Data.GetDataMut() + offset);
Expand Down Expand Up @@ -146,7 +150,7 @@ namespace NKikimr {
}

ui32 SizeWithTail() const {
return Data.size() + Data.Tailroom();
return Data.size() + Data.UnsafeTailroom();
}

void Swap(TBufferWithGaps& other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) {
}

TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
ui64 chunkNonce)
ui64 chunkNonce, IRcBufAllocator* alloc)
: TCompletionAction()
, PDisk(pDisk)
, Read(read)
Expand Down Expand Up @@ -338,7 +338,7 @@ TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRe
? read->Size
: read->Size + read->Offset % sectorSize;
size_t tailroom = AlignUp<size_t>(newSize, sectorSize) - newSize;
CommonBuffer = TBufferWithGaps(read->Offset, newSize, tailroom);
CommonBuffer = TBufferWithGaps(read->Offset, alloc->AllocPageAlignedRcBuf(newSize, tailroom));
}

TCompletionChunkRead::~TCompletionChunkRead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class TCompletionChunkRead : public TCompletionAction {
const ui64 DoubleFreeCanary;
public:
TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
ui64 chunkNonce);
ui64 chunkNonce, IRcBufAllocator* alloc);
void Exec(TActorSystem *actorSystem) override;
~TCompletionChunkRead();
void ReplyError(TActorSystem *actorSystem, TString reason);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
--state.OperationsInProgress;
--inFlight->ChunkReads;
};
read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce);
read->FinalCompletion = new TCompletionChunkRead(this, read, std::move(onDestroy), state.Nonce, PCtx->ActorSystem->GetRcBufAllocator());

static_cast<TChunkRead*>(request)->SelfPointer = std::move(read);

Expand Down
92 changes: 89 additions & 3 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <ydb/core/driver_lib/version/ut/ut_helpers.h>
#include <ydb/core/testlib/actors/test_runtime.h>

#include <ydb/library/actors/interconnect/rdma/mem_pool.h>

#include <util/system/hp_timer.h>

namespace NKikimr {
Expand Down Expand Up @@ -1560,9 +1562,10 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
return data;
}

void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks) {
void ChunkWriteDifferentOffsetAndSizeImpl(bool plainDataChunks, bool rdmaAlloc) {
TActorTestContext testCtx({
.PlainDataChunks = plainDataChunks,
.UseRdmaAllocator = rdmaAlloc,
});
Cerr << "plainDataChunks# " << plainDataChunks << Endl;

Expand Down Expand Up @@ -1607,8 +1610,8 @@ Y_UNIT_TEST_SUITE(TPDiskTest) {
}
}
Y_UNIT_TEST(ChunkWriteDifferentOffsetAndSize) {
for (int i = 0; i <= 1; ++i) {
ChunkWriteDifferentOffsetAndSizeImpl(i);
for (ui32 i = 0; i <= 3; ++i) {
ChunkWriteDifferentOffsetAndSizeImpl(i & 1, i & 2);
}
}

Expand Down Expand Up @@ -2481,4 +2484,87 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
#endif
}


Y_UNIT_TEST_SUITE(RDMA) {
void TestChunkReadWithRdmaAllocator(bool plainDataChunks) {
TActorTestContext testCtx({
.PlainDataChunks = plainDataChunks,
.UseRdmaAllocator = true,
});
TVDiskMock vdisk(&testCtx);

vdisk.InitFull();
vdisk.ReserveChunk();
vdisk.CommitReservedChunks();
auto chunk = *vdisk.Chunks[EChunkState::COMMITTED].begin();

for (i64 writeSize: {1_KB - 123, 64_KB, 128_KB + 8765}) {
for (i64 readOffset: {0_KB, 0_KB + 123, 4_KB, 8_KB + 123}) {
for (i64 readSize: {writeSize, writeSize - 567, writeSize - i64(8_KB), writeSize - i64(8_KB) + 987}) {
if (readOffset < 0 || readSize < 0 || readOffset + readSize > writeSize) {
continue;
}
Cerr << "chunkBufSize: " << writeSize << ", readOffset: " << readOffset << ", readSize: " << readSize << Endl;
auto parts = MakeIntrusive<NPDisk::TEvChunkWrite::TAlignedParts>(PrepareData(writeSize));
testCtx.Send(new NPDisk::TEvChunkWrite(
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
chunk, 0, parts, nullptr, false, 0));
auto write = testCtx.Recv<NPDisk::TEvChunkWriteResult>();

testCtx.Send(new NPDisk::TEvChunkRead(
vdisk.PDiskParams->Owner, vdisk.PDiskParams->OwnerRound,
chunk, readOffset, readSize, 0, nullptr));
auto read = testCtx.Recv<NPDisk::TEvChunkReadResult>();
TRcBuf readBuf = read->Data.ToString();
NInterconnect::NRdma::TMemRegionSlice memReg = NInterconnect::NRdma::TryExtractFromRcBuf(readBuf);
UNIT_ASSERT_C(!memReg.Empty(), "Failed to extract RDMA memory region from RcBuf");
UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), readSize, "Unexpected size of RDMA memory region");
}
}
}
}

Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorEncryptedChunks) {
TestChunkReadWithRdmaAllocator(false);
}
Y_UNIT_TEST(TestChunkReadWithRdmaAllocatorPlainChunks) {
TestChunkReadWithRdmaAllocator(true);
}

Y_UNIT_TEST(TestRcBuf) {
ui32 size = 129961;
ui32 offset = 123;
ui32 tailRoom = 1111;
ui32 totalSize = size + tailRoom;
UNIT_ASSERT_VALUES_EQUAL(totalSize, 131072);

auto alloc1 = [](ui32 size, ui32 headRoom, ui32 tailRoom) {
TRcBuf buf = TRcBuf::UninitializedPageAligned(size + headRoom + tailRoom);
buf.TrimFront(size + tailRoom);
buf.TrimBack(size);
Cerr << "alloc1: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl;
return buf;
};
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
auto alloc2 = [memPool](ui32 size, ui32 headRoom, ui32 tailRoom) -> TRcBuf {
TRcBuf buf = memPool->AllocRcBuf(size + headRoom + tailRoom, NInterconnect::NRdma::IMemPool::EMPTY).value();
buf.TrimFront(size + tailRoom);
buf.TrimBack(size);
Cerr << "alloc2: " << buf.Size() << " " << buf.Tailroom() << " " << buf.UnsafeTailroom() << Endl;
return buf;
};

auto buf1 = TBufferWithGaps(offset, alloc1(size, 0, tailRoom));
auto buf2 = TBufferWithGaps(offset, alloc2(size, 0, tailRoom));

Cerr << "buf1: " << buf1.PrintState() << " " << buf1.Size() << " " << buf1.SizeWithTail() << Endl;
Cerr << "buf2: " << buf2.PrintState() << " " << buf2.Size() << " " << buf2.SizeWithTail() << Endl;

UNIT_ASSERT_VALUES_EQUAL_C(buf1.Size(), buf2.Size(), "Buffers should have the same size");

buf1.RawDataPtr(0, totalSize);
buf2.RawDataPtr(0, totalSize);
}
}

} // namespace NKikimr
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct TActorTestContext {
bool ReadOnly = false;
bool InitiallyZeroed = false; // Only for sector map. Zero first 1MiB on start.
bool PlainDataChunks = false;
bool UseRdmaAllocator = false;
};

private:
Expand Down Expand Up @@ -92,7 +93,7 @@ struct TActorTestContext {
}

TActorTestContext(TSettings settings)
: Runtime(new TTestActorRuntime(1, true))
: Runtime(new TTestActorRuntime(1, true, true, settings.UseRdmaAllocator))
, TestCtx(settings.UseSectorMap, settings.DiskMode, settings.DiskSize)
, Settings(settings)
{
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/get.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>

#include <ydb/library/actors/interconnect/rdma/mem_pool.h>

Y_UNIT_TEST_SUITE(Get) {

void SendGet(
Expand Down Expand Up @@ -108,4 +110,54 @@ Y_UNIT_TEST_SUITE(Get) {
// check that now TEvGet returns BLOCKED for blocked generation with reader params
SendGet(test, originalBlobId, data, NKikimrProto::BLOCKED, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration));
}

Y_UNIT_TEST(TestRdmaRegiseredMemory) {
TEnvironmentSetup env(true);
TTestInfo test = InitTest(env);

auto groups = env.GetGroups();
auto groupInfo = env.GetGroupInfo(groups.front());

ui64 tabletId = 1;
ui32 tabletGeneration = 1;

constexpr ui32 size = 1000;
TString data(size, 'a');
TLogoBlobID originalBlobId(tabletId, tabletGeneration, 0, 0, size, 0);

SendPut(test, originalBlobId, data, NKikimrProto::OK);
env.CompactVDisk(groupInfo->GetActorId(0));

auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
env.Runtime->FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvBlobStorage::EvChunkReadResult) {
Cerr << "TEvChunkReadResult: " << ev->ToString() << Endl;
auto* res = ev->CastAsLocal<NPDisk::TEvChunkReadResult>();
auto buf = res->Data.ToString();
auto newBuf = memPool->AllocRcBuf(buf.size(), NInterconnect::NRdma::IMemPool::EMPTY);
UNIT_ASSERT(newBuf);
std::memcpy(newBuf->UnsafeGetDataMut(), buf.GetData(), buf.GetSize());
res->Data = TBufferWithGaps(0, std::move(*newBuf));
res->Data.Commit();
} else if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGetResult) {
Cerr << "TEvVGetResult: " << ev->ToString() << Endl;
auto* res = ev->CastAsLocal<TEvBlobStorage::TEvVGetResult>();
auto payload = res->GetPayload();
UNIT_ASSERT_VALUES_EQUAL(payload.size(), 1);
for (auto& rope : payload) {
for (auto it = rope.Begin(); it != rope.End(); ++it) {
const TRcBuf& chunk = it.GetChunk();
auto memReg = NInterconnect::NRdma::TryExtractFromRcBuf(chunk);
UNIT_ASSERT_C(!memReg.Empty(), "unable to extract mem region from chunk");
UNIT_ASSERT_VALUES_EQUAL_C(memReg.GetSize(), size, "invalid size for memReg");
UNIT_ASSERT_C(memReg.GetLKey(0) != 0, "invalid lkey");
UNIT_ASSERT_C(memReg.GetRKey(0) != 0, "invalid rkey");
}
}
}
return true;
};

SendGet(test, originalBlobId, data, NKikimrProto::OK, TEvBlobStorage::TEvGet::TReaderTabletData(tabletId, tabletGeneration));
}
}
5 changes: 4 additions & 1 deletion ydb/core/testlib/actor_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

namespace NKikimr {

TActorSystemStub::TActorSystemStub()
TActorSystemStub::TActorSystemStub(std::shared_ptr<IRcBufAllocator> alloc)
: AppData(0, 0, 0, 0, {}, nullptr, nullptr, nullptr, nullptr)
{
THolder<NActors::TActorSystemSetup> setup(new NActors::TActorSystemSetup);
if (alloc) {
setup->RcBufAllocator = alloc;
}
System.Reset(new NActors::TActorSystem(setup, &AppData));
Mailbox.Reset(new NActors::TMailbox());
ExecutorThread.Reset(new NActors::TExecutorThread(0, System.Get(), nullptr, "thread"));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/actor_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct TActorSystemStub {
NActors::TActivationContext* PrevCtx;
TAppData AppData;

TActorSystemStub();
TActorSystemStub(std::shared_ptr<IRcBufAllocator> alloc = {});
~TActorSystemStub();
};

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/testlib/actors/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ namespace NActors {
Initialize();
}

TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads)
TTestActorRuntime::TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, bool useRdmaAllocator)
: TPortManager(false)
, TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads}
, TTestActorRuntimeBase{nodeCount, dataCenterCount, useRealThreads, useRdmaAllocator}
{
Initialize();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/testlib/actors/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace NActors {
};

TTestActorRuntime(THeSingleSystemEnv d);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads, bool useRdmaAllocator=false);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount);
TTestActorRuntime(ui32 nodeCount = 1, bool useRealThreads = false);
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool useRealThreads, NKikimr::NAudit::TAuditLogBackends&& auditLogBackends);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/mind/dynamic_nameserver.h>
#include <ydb/library/actors/dnsresolver/dnsresolver.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/library/actors/interconnect/interconnect_tcp_server.h>
#include <util/generic/xrange.h>
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/util/actorsys_test/testactorsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/actors/core/scheduler_queue.h>
#include <ydb/library/actors/core/executor_thread.h>
#include <ydb/library/actors/interconnect/interconnect_common.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/util/should_continue.h>
#include <ydb/library/actors/core/monotonic_provider.h>
#include <ydb/core/base/appdata.h>
Expand Down Expand Up @@ -289,6 +290,8 @@ class TTestActorSystem {
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
IExecutorPool *pool = CreateTestExecutorPool(nodeId);
setup->Executors[0].Reset(pool);
auto memPool = NInterconnect::NRdma::CreateDummyMemPool();
setup->RcBufAllocator = std::make_shared<TRdmaAllocatorWithFallback>(memPool);

// we create this actor for correct service lookup through ActorSystem
setup->LocalServices.emplace_back(LoggerSettings_->LoggerActorId, TActorSetupCmd(
Expand Down
35 changes: 35 additions & 0 deletions ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <util/generic/hash.h>
#include <util/system/rwlock.h>
#include <util/random/random.h>
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
#include <ydb/library/actors/util/rc_buf.h>

namespace NActors {

Expand Down Expand Up @@ -70,6 +72,38 @@ namespace NActors {
}
};

TRdmaAllocatorWithFallback::TRdmaAllocatorWithFallback(std::shared_ptr<NInterconnect::NRdma::IMemPool> memPool) noexcept
: RdmaMemPool(memPool)
{}

TRcBuf TRdmaAllocatorWithFallback::AllocRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept {
std::optional<TRcBuf> buf = TryAllocRdmaRcBuf<false>(size, headRoom, tailRoom);
if (!buf) {
return GetDefaultRcBufAllocator()->AllocRcBuf(size, headRoom, tailRoom);
}
return buf.value();
}

TRcBuf TRdmaAllocatorWithFallback::AllocPageAlignedRcBuf(size_t size, size_t tailRoom) noexcept {
std::optional<TRcBuf> buf = TryAllocRdmaRcBuf<true>(size, 0, tailRoom);
if (!buf) {
return GetDefaultRcBufAllocator()->AllocPageAlignedRcBuf(size, tailRoom);
}
return buf.value();
}

template<bool pageAligned>
std::optional<TRcBuf> TRdmaAllocatorWithFallback::TryAllocRdmaRcBuf(size_t size, size_t headRoom, size_t tailRoom) noexcept {
std::optional<TRcBuf> buf = RdmaMemPool->AllocRcBuf(size + headRoom + tailRoom,
pageAligned ? NInterconnect::NRdma::IMemPool::PAGE_ALIGNED : NInterconnect::NRdma::IMemPool::EMPTY);
if (!buf) {
return {};
}
buf->TrimFront(size + tailRoom);
buf->TrimBack(size);
return buf;
}

TActorSystem::TActorSystem(THolder<TActorSystemSetup>& setup, void* appData,
TIntrusivePtr<NLog::TSettings> loggerSettings)
: NodeId(setup->NodeId)
Expand All @@ -80,6 +114,7 @@ namespace NActors {
, CurrentTimestamp(0)
, CurrentMonotonic(0)
, CurrentIDCounter(RandomNumber<ui64>())
, RcBufAllocator(setup->RcBufAllocator ? setup->RcBufAllocator.get() : GetDefaultRcBufAllocator())
, SystemSetup(setup.Release())
, DefSelfID(NodeId, "actorsystem")
, AppData0(appData)
Expand Down
Loading
Loading