Skip to content

Commit 9bfc3df

Browse files
authored
NBS Persistent buffer write to disk (#33263)
1 parent 556ac4a commit 9bfc3df

File tree

6 files changed

+250
-42
lines changed

6 files changed

+250
-42
lines changed

ydb/core/blobstorage/ddisk/ddisk_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace NKikimr::NDDisk {
9898
hFunc(NPDisk::TEvChunkReserveResult, Handle)
9999
hFunc(NPDisk::TEvLogResult, Handle)
100100
hFunc(TEvPrivate::TEvHandleEventForChunk, Handle)
101+
hFunc(TEvPrivate::TEvHandlePersistentBufferEventForChunk, Handle)
101102
hFunc(NPDisk::TEvCutLog, Handle)
102103
hFunc(NPDisk::TEvChunkWriteRawResult, Handle)
103104
hFunc(NPDisk::TEvChunkReadRawResult, Handle)

ydb/core/blobstorage/ddisk/ddisk_actor.h

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "defs.h"
44

55
#include "ddisk.h"
6+
#include "persistent_buffer_space_allocator.h"
67

78
#include <ydb/core/blobstorage/vdisk/common/vdisk_config.h>
89
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
@@ -115,6 +116,7 @@ namespace NKikimr::NDDisk {
115116
enum {
116117
EvHandleSingleQuery = EventSpaceBegin(TEvents::ES_PRIVATE),
117118
EvHandleEventForChunk,
119+
EvHandlePersistentBufferEventForChunk,
118120
};
119121

120122
struct TEvHandleEventForChunk : TEventLocal<TEvHandleEventForChunk, EvHandleEventForChunk> {
@@ -126,6 +128,14 @@ namespace NKikimr::NDDisk {
126128
, VChunkIndex(vChunkIndex)
127129
{}
128130
};
131+
132+
struct TEvHandlePersistentBufferEventForChunk : TEventLocal<TEvHandlePersistentBufferEventForChunk, EvHandlePersistentBufferEventForChunk> {
133+
ui32 ChunkIndex;
134+
135+
TEvHandlePersistentBufferEventForChunk(ui32 chunkIndex)
136+
: ChunkIndex(chunkIndex)
137+
{}
138+
};
129139
};
130140

131141
public:
@@ -200,14 +210,23 @@ namespace NKikimr::NDDisk {
200210
std::queue<std::variant<TChunkForData, TChunkForPersistentBuffer>> ChunkAllocateQueue;
201211
THashMap<ui64, std::function<void()>> LogCallbacks;
202212
ui64 NextCookie = 1;
203-
THashMap<ui64, std::tuple<NWilson::TSpan, std::function<void(NPDisk::TEvChunkWriteRawResult&, NWilson::TSpan&&)>>> WriteCallbacks;
213+
214+
struct TPendingWrite {
215+
NWilson::TSpan Span;
216+
std::function<void(NPDisk::TEvChunkWriteRawResult&, NWilson::TSpan&&)> Callback;
217+
};
218+
219+
using TPersistentBufferPendingWrite = std::function<void(NPDisk::TEvChunkWriteRawResult&)>;
220+
221+
THashMap<ui64, std::variant<TPendingWrite, TPersistentBufferPendingWrite>> WriteCallbacks;
204222
THashMap<ui64, std::tuple<NWilson::TSpan, std::function<void(NPDisk::TEvChunkReadRawResult&, NWilson::TSpan&&)>>> ReadCallbacks;
205223

206224
void IssueChunkAllocation(ui64 tabletId, ui64 vChunkIndex);
207225
void Handle(NPDisk::TEvChunkReserveResult::TPtr ev);
208226
void HandleChunkReserved();
209227
void Handle(NPDisk::TEvLogResult::TPtr ev);
210228
void Handle(TEvPrivate::TEvHandleEventForChunk::TPtr ev);
229+
void Handle(TEvPrivate::TEvHandlePersistentBufferEventForChunk::TPtr ev);
211230

212231
void Handle(NPDisk::TEvCutLog::TPtr ev);
213232

@@ -312,6 +331,7 @@ namespace NKikimr::NDDisk {
312331
struct TRecord {
313332
ui32 OffsetInBytes;
314333
ui32 Size;
334+
std::vector<TPersistentBufferSectorInfo> Sectors;
315335
TRope Data;
316336
};
317337

@@ -320,10 +340,50 @@ namespace NKikimr::NDDisk {
320340

321341
std::map<std::tuple<ui64, ui64>, TPersistentBuffer> PersistentBuffers;
322342

323-
std::set<TChunkIdx> PersistentBufferOwnedChunks;
343+
static constexpr ui32 SectorSize = 4096;
344+
static constexpr ui32 SectorInChunk = 32768;
345+
static constexpr ui32 ChunkSize = SectorSize * SectorInChunk;
346+
static constexpr ui32 MaxChunks = 128;
347+
static constexpr ui32 MaxSectorsPerBuffer = 128;
348+
349+
350+
struct TPersistentBufferHeader {
351+
static constexpr ui8 PersistentBufferHeaderSignature[16] = {249, 173, 163, 160, 196, 193, 69, 133, 83, 38, 34, 104, 170, 146, 237, 156};
352+
static constexpr ui32 HeaderChecksumOffset = 24;
353+
static constexpr ui32 HeaderChecksumSize = 8;
354+
ui8 Signature[16];
355+
ui64 HeaderChecksum;
356+
ui64 TabletId;
357+
ui64 VChunkIndex;
358+
ui32 OffsetInBytes;
359+
ui32 Size;
360+
ui64 Lsn;
361+
TPersistentBufferSectorInfo Locations[MaxSectorsPerBuffer];
362+
};
363+
static_assert(sizeof(TPersistentBufferHeader) <= ChunkSize);
364+
365+
bool IssuePersistentBufferChunkAllocationInflight = false;
366+
struct TPersistentBufferToDiskWriteInFlight {
367+
TActorId Sender;
368+
ui64 Cookie;
369+
TActorId Session;
370+
ui32 OffsetInBytes;
371+
ui32 Size;
372+
std::set<ui64> WriteCookies;
373+
std::vector<TPersistentBufferSectorInfo> Sectors;
374+
TRope Data;
375+
NWilson::TSpan Span;
376+
};
377+
std::map<std::tuple<ui64, ui64, ui64>, TPersistentBufferToDiskWriteInFlight> PersistentBufferWriteInflight;
378+
379+
TPersistentBufferSpaceAllocator PersistentBufferSpaceAllocator;
380+
324381
ui64 PersistentBufferChunkMapSnapshotLsn = Max<ui64>();
382+
std::queue<TPendingEvent> PendingPersistentBufferEvents;
325383

326384
void IssuePersistentBufferChunkAllocation();
385+
void ProcessPersistentBufferQueue();
386+
std::vector<std::tuple<ui32, ui32, TRope>> SlicePersistentBuffer(ui64 tabletId, ui64 vchunkIndex, ui64 lsn, ui32 offsetInBytes, ui32 size, TRope&& data, const std::vector<TPersistentBufferSectorInfo>& sectors);
327387

328388
struct TWriteInFlight {
329389
TActorId Sender;

ydb/core/blobstorage/ddisk/ddisk_actor_boot.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ namespace NKikimr::NDDisk {
4444
const bool success = chunkMap.ParseFromArray(record.Data.data(), record.Data.size());
4545
Y_ABORT_UNLESS(success);
4646
for (auto idx : chunkMap.GetChunkIdxs()) {
47-
PersistentBufferOwnedChunks.insert(idx);
47+
PersistentBufferSpaceAllocator.AddNewChunk(idx);
4848
++*Counters.Chunks.ChunksOwned;
4949
}
5050
}

ydb/core/blobstorage/ddisk/ddisk_actor_chunks.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ namespace NKikimr::NDDisk {
1111
}
1212

1313
void TDDiskActor::IssuePersistentBufferChunkAllocation() {
14-
ChunkAllocateQueue.emplace(TChunkForPersistentBuffer{});
15-
HandleChunkReserved();
14+
if (!IssuePersistentBufferChunkAllocationInflight) {
15+
IssuePersistentBufferChunkAllocationInflight = true;
16+
ChunkAllocateQueue.emplace(TChunkForPersistentBuffer{});
17+
HandleChunkReserved();
18+
}
1619
}
1720

1821
void TDDiskActor::Handle(NPDisk::TEvChunkReserveResult::TPtr ev) {
@@ -67,11 +70,10 @@ namespace NKikimr::NDDisk {
6770
ChunkMapIncrementsInFlight.emplace(tabletId, vChunkIndex, chunkIdx);
6871
},
6972
[this, chunkIdx](const TChunkForPersistentBuffer&) {
70-
Y_ABORT_UNLESS(!PersistentBufferOwnedChunks.contains(chunkIdx));
7173
IssuePDiskLogRecord(TLogSignature::SignaturePersistentBufferChunkMap, chunkIdx
7274
, CreatePersistentBufferChunkMapSnapshot(), &PersistentBufferChunkMapSnapshotLsn, [this, chunkIdx] {
73-
PersistentBufferOwnedChunks.insert(chunkIdx);
74-
// TODO: Send(SelfId(), new TEvPrivate::TEvHandlePersistentBufferEventForChunk(chunkIdx));
75+
PersistentBufferSpaceAllocator.AddNewChunk(chunkIdx);
76+
Send(SelfId(), new TEvPrivate::TEvHandlePersistentBufferEventForChunk(chunkIdx));
7577
++*Counters.Chunks.ChunksOwned;
7678
});
7779
}
@@ -123,7 +125,7 @@ namespace NKikimr::NDDisk {
123125

124126
NKikimrBlobStorage::NDDisk::NInternal::TPersistentBufferChunkMapLogRecord TDDiskActor::CreatePersistentBufferChunkMapSnapshot() {
125127
NKikimrBlobStorage::NDDisk::NInternal::TPersistentBufferChunkMapLogRecord record;
126-
for (const auto& chunkIdx : PersistentBufferOwnedChunks) {
128+
for (const auto& chunkIdx : PersistentBufferSpaceAllocator.OwnedChunks) {
127129
record.AddChunkIdxs(chunkIdx);
128130
}
129131
return record;

ydb/core/blobstorage/ddisk/ddisk_actor_persistent_buffer.cpp

Lines changed: 166 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,191 @@
11
#include "ddisk_actor.h"
22

33
#include <ydb/core/util/stlog.h>
4-
54
#include <ydb/core/util/pb.h>
65

6+
#define XXH_INLINE_ALL
7+
#include <contrib/libs/xxhash/xxhash.h>
8+
79
namespace NKikimr::NDDisk {
810

9-
void TDDiskActor::Handle(TEvWritePersistentBuffer::TPtr ev) {
10-
if (!CheckQuery(*ev, &Counters.Interface.WritePersistentBuffer)) {
11-
return;
11+
void TDDiskActor::Handle(TEvPrivate::TEvHandlePersistentBufferEventForChunk::TPtr ev) {
12+
auto chunkIdx = ev->Get()->ChunkIndex;
13+
Y_ABORT_UNLESS(chunkIdx);
14+
ProcessPersistentBufferQueue();
15+
}
16+
17+
ui64 CalculateChecksum(const TRope::TIterator begin, size_t numBytes) {
18+
XXH3_state_t state;
19+
XXH3_64bits_reset(&state);
20+
21+
for (auto it = begin; numBytes && it.Valid(); it.AdvanceToNextContiguousBlock()) {
22+
const size_t n = Min(numBytes, it.ContiguousSize());
23+
XXH3_64bits_update(&state, it.ContiguousData(), n);
24+
numBytes -= n;
1225
}
1326

14-
const auto& record = ev->Get()->Record;
27+
return XXH3_64bits_digest(&state);
28+
}
29+
30+
std::vector<std::tuple<ui32, ui32, TRope>> TDDiskActor::SlicePersistentBuffer(ui64 tabletId, ui64 vchunkIndex,
31+
ui64 lsn, ui32 offsetInBytes, ui32 sizeInBytes, TRope&& payload, const std::vector<TPersistentBufferSectorInfo>& sectors) {
32+
auto headerData = TRcBuf::Uninitialized(SectorSize);
33+
TPersistentBufferHeader *header = (TPersistentBufferHeader*)headerData.GetDataMut();
34+
memcpy(header->Signature, TPersistentBufferHeader::PersistentBufferHeaderSignature, 16);
35+
header->TabletId = tabletId;
36+
header->VChunkIndex = vchunkIndex;
37+
header->OffsetInBytes = offsetInBytes;
38+
header->Size = sizeInBytes;
39+
header->Lsn = lsn;
40+
41+
for (ui32 i = 1; i < sectors.size(); ++i) {
42+
auto& loc = header->Locations[i - 1];
43+
loc = sectors[i];
44+
auto it = payload.Position(SectorSize * (i - 1));
45+
if (memcmp((*it).first, header->Signature, 16) != 0) {
46+
loc.HasSignatureCorrection = true;
47+
*payload.Position(SectorSize * (i - 1)).ContiguousDataMut() = 0;
48+
}
49+
loc.Checksum = CalculateChecksum(payload.Position(SectorSize * (i - 1)), SectorSize);
50+
}
51+
header->HeaderChecksum = 0;
52+
std::vector<std::tuple<ui32, ui32, TRope>> parts;
53+
parts.reserve(sectors.size());
54+
for (ui32 sectorIdx = 0, first = 0; sectorIdx <= sectors.size(); sectorIdx++) {
55+
if (sectorIdx == sectors.size()
56+
|| sectors[first].ChunkIdx != sectors[sectorIdx].ChunkIdx
57+
|| sectors[first].SectorIdx != sectors[sectorIdx].SectorIdx + sectorIdx - first) {
58+
TRope data;
59+
ui32 partSize = (sectorIdx - (first == 0 ? 1 : first)) * SectorSize;
60+
if (first == 0) {
61+
data = headerData;
62+
auto cs = CalculateChecksum(data.Position(TPersistentBufferHeader::HeaderChecksumOffset + TPersistentBufferHeader::HeaderChecksumSize)
63+
, SectorSize - TPersistentBufferHeader::HeaderChecksumOffset - TPersistentBufferHeader::HeaderChecksumSize);
64+
memcpy(data.Position(TPersistentBufferHeader::HeaderChecksumOffset).ContiguousDataMut(), &cs, TPersistentBufferHeader::HeaderChecksumSize);
65+
66+
}
67+
payload.ExtractFront(partSize, &data);
68+
parts.emplace_back(sectors[first].ChunkIdx, sectors[first].SectorIdx * SectorSize, std::move(data));
69+
first = sectorIdx;
70+
}
71+
}
72+
return parts;
73+
}
74+
75+
void TDDiskActor::ProcessPersistentBufferQueue() {
76+
Y_ABORT_UNLESS(!PendingPersistentBufferEvents.empty());
77+
auto& temp = PendingPersistentBufferEvents.front().Ev;
78+
const auto& record = temp->Get<TEvWritePersistentBuffer>()->Record;
1579
const TQueryCredentials creds(record.GetCredentials());
1680
const TBlockSelector selector(record.GetSelector());
1781
const ui64 lsn = record.GetLsn();
82+
ui32 sectorsCnt = selector.Size / SectorSize;
83+
const auto sectors = PersistentBufferSpaceAllocator.Occupy(sectorsCnt);
84+
if (sectors.size() == 0) {
85+
IssuePersistentBufferChunkAllocation();
86+
return;
87+
}
88+
Y_ABORT_UNLESS(sectors.size() == sectorsCnt && sectorsCnt <= MaxSectorsPerBuffer);
1889

19-
Counters.Interface.WritePersistentBuffer.Request(selector.Size);
20-
21-
auto span = std::move(NWilson::TSpan(TWilson::DDiskTopLevel, std::move(ev->TraceId), "DDisk.WritePersistentBuffer",
90+
const TWriteInstruction instr(record.GetInstruction());
91+
TRope payload;
92+
if (instr.PayloadId) {
93+
payload = temp->Get<TEvWritePersistentBuffer>()->GetPayload(*instr.PayloadId);
94+
}
95+
auto span = std::move(NWilson::TSpan(TWilson::DDiskTopLevel, std::move(temp->TraceId), "DDisk.WritePersistentBuffer",
2296
NWilson::EFlags::NONE, TActivationContext::ActorSystem())
2397
.Attribute("tablet_id", static_cast<long>(creds.TabletId))
2498
.Attribute("vchunk_index", static_cast<long>(selector.VChunkIndex))
2599
.Attribute("offset_in_bytes", selector.OffsetInBytes)
26100
.Attribute("size", selector.Size)
27101
.Attribute("lsn", static_cast<long>(lsn)));
102+
Counters.Interface.WritePersistentBuffer.Request(selector.Size);
28103

29-
const TWriteInstruction instr(record.GetInstruction());
30-
TRope data;
31-
if (instr.PayloadId) {
32-
data = ev->Get()->GetPayload(*instr.PayloadId);
104+
auto parts = SlicePersistentBuffer(creds.TabletId,
105+
selector.VChunkIndex, lsn, selector.OffsetInBytes, selector.Size, TRope(payload), sectors);
106+
107+
auto& inflightRecord = PersistentBufferWriteInflight[{creds.TabletId, selector.VChunkIndex, lsn}];
108+
inflightRecord = {
109+
.Sender = temp->Sender,
110+
.Cookie = temp->Cookie,
111+
.Session = temp->InterconnectSession,
112+
.OffsetInBytes = selector.OffsetInBytes,
113+
.Size = selector.Size,
114+
.Sectors = std::move(sectors),
115+
.Data = std::move(payload),
116+
.Span = std::move(span),
117+
};
118+
119+
for(auto& [chunkIdx, offset, data] : parts) {
120+
const ui64 cookie = NextCookie++;
121+
inflightRecord.WriteCookies.insert(cookie);
122+
123+
Send(BaseInfo.PDiskActorID, new NPDisk::TEvChunkWriteRaw(
124+
PDiskParams->Owner,
125+
PDiskParams->OwnerRound,
126+
chunkIdx,
127+
offset,
128+
std::move(data)), 0, cookie);
129+
130+
WriteCallbacks.try_emplace(cookie, [this, writeCookie = cookie, tabletId = creds.TabletId,
131+
vchunkIndex = selector.VChunkIndex, lsn = lsn](NPDisk::TEvChunkWriteRawResult& /*ev*/) {
132+
auto itInflight = PersistentBufferWriteInflight.find({tabletId, vchunkIndex, lsn});
133+
Y_ABORT_UNLESS(itInflight != PersistentBufferWriteInflight.end());
134+
auto& inflight = itInflight->second;
135+
auto eraseCnt = inflight.WriteCookies.erase(writeCookie);
136+
Y_ABORT_UNLESS(eraseCnt == 1);
137+
if (inflight.WriteCookies.empty()) {
138+
Counters.Interface.WritePersistentBuffer.Reply(true);
139+
inflight.Span.End();
140+
auto& buffer = PersistentBuffers[{tabletId, vchunkIndex}];
141+
auto [it, inserted] = buffer.Records.try_emplace(lsn);
142+
TPersistentBuffer::TRecord& pr = it->second;
143+
if (inserted) {
144+
pr = {
145+
.OffsetInBytes = inflight.OffsetInBytes,
146+
.Size = inflight.Size,
147+
.Sectors = std::move(inflight.Sectors),
148+
.Data = std::move(inflight.Data),
149+
};
150+
} else {
151+
Y_ABORT_UNLESS(pr.OffsetInBytes == inflight.OffsetInBytes);
152+
Y_ABORT_UNLESS(pr.Size == inflight.Size);
153+
Y_ABORT_UNLESS(pr.Data == inflight.Data);
154+
}
155+
auto replyEv = std::make_unique<TEvWritePersistentBufferResult>(NKikimrBlobStorage::NDDisk::TReplyStatus::OK);
156+
auto h = std::make_unique<IEventHandle>(inflight.Sender, SelfId(), replyEv.release(), 0, inflight.Cookie);
157+
if (inflight.Session) {
158+
h->Rewrite(TEvInterconnect::EvForward, inflight.Session);
159+
}
160+
TActivationContext::Send(h.release());
161+
PersistentBufferWriteInflight.erase(itInflight);
162+
}
163+
});
33164
}
34165

35-
auto& buffer = PersistentBuffers[{creds.TabletId, selector.VChunkIndex}];
36-
auto [it, inserted] = buffer.Records.try_emplace(lsn);
37-
TPersistentBuffer::TRecord& pr = it->second;
38-
if (inserted) {
39-
pr = {
40-
.OffsetInBytes = selector.OffsetInBytes,
41-
.Size = selector.Size,
42-
.Data = std::move(data),
43-
};
44-
} else {
45-
Y_ABORT_UNLESS(pr.OffsetInBytes == selector.OffsetInBytes);
46-
Y_ABORT_UNLESS(pr.Size == selector.Size);
47-
Y_ABORT_UNLESS(pr.Data == data);
166+
PendingPersistentBufferEvents.pop();
167+
if (!PendingPersistentBufferEvents.empty()) {
168+
ProcessPersistentBufferQueue();
48169
}
170+
}
49171

50-
Counters.Interface.WritePersistentBuffer.Reply(true);
51-
span.End();
52-
SendReply(*ev, std::make_unique<TEvWritePersistentBufferResult>(NKikimrBlobStorage::NDDisk::TReplyStatus::OK));
172+
void TDDiskActor::Handle(TEvWritePersistentBuffer::TPtr ev) {
173+
if (!CheckQuery(*ev, &Counters.Interface.WritePersistentBuffer)) {
174+
return;
175+
}
176+
const auto& record = ev->Get()->Record;
177+
const TBlockSelector selector(record.GetSelector());
178+
if (selector.Size > MaxSectorsPerBuffer * SectorSize) {
179+
Counters.Interface.WritePersistentBuffer.Request(selector.Size);
180+
Counters.Interface.WritePersistentBuffer.Reply(false);
181+
SendReply(*ev, std::make_unique<TEvWritePersistentBufferResult>(
182+
NKikimrBlobStorage::NDDisk::TReplyStatus::INCORRECT_REQUEST,
183+
TStringBuilder() << "persistent buffer write limit "
184+
<< (MaxSectorsPerBuffer * SectorSize) << " bytes, received " << selector.Size << " bytes"));
185+
return;
186+
}
187+
PendingPersistentBufferEvents.emplace(ev, "WaitingPersistentBufferWrite");
188+
ProcessPersistentBufferQueue();
53189
}
54190

55191
void TDDiskActor::Handle(TEvReadPersistentBuffer::TPtr ev) {
@@ -200,6 +336,8 @@ namespace NKikimr::NDDisk {
200336
Y_ABORT_UNLESS(pr.OffsetInBytes == selector.OffsetInBytes);
201337
Y_ABORT_UNLESS(pr.Size == selector.Size);
202338

339+
PersistentBufferSpaceAllocator.Free(pr.Sectors);
340+
203341
buffer.Records.erase(jt);
204342
if (buffer.Records.empty()) {
205343
PersistentBuffers.erase(it);

0 commit comments

Comments
 (0)