Skip to content

Commit 5cbafaa

Browse files
authored
Introduce plain data chunks in PDisk (#17508)
1 parent 9a7b42a commit 5cbafaa

22 files changed

+574
-159
lines changed

ydb/core/blobstorage/base/bufferwithgaps.h

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ namespace NKikimr {
4040
{}
4141

4242
TBufferWithGaps(ui32 offset, ui32 size)
43-
: Data(TRcBuf::Uninitialized(size))
43+
: Data(TRcBuf::UninitializedPageAligned(size))
44+
, Offset(offset)
45+
, IsCommited(false)
46+
{}
47+
48+
TBufferWithGaps(ui32 offset, ui32 size, ui32 tailroom)
49+
: Data(TRcBuf::UninitializedPageAligned(size, tailroom))
4450
, Offset(offset)
4551
, IsCommited(false)
4652
{}
@@ -74,10 +80,14 @@ namespace NKikimr {
7480
return Data;
7581
}
7682

83+
TRcBuf SubstrRaw(ui32 offset, ui32 len) const {
84+
return TRcBuf(TRcBuf::Piece, Data.data() + offset, len, Data);
85+
}
86+
7787
TRcBuf Substr(ui32 offset, ui32 len) const {
7888
Y_VERIFY_S(IsReadable(offset, len), "returned data is corrupt (or was never written) at offset# %" << offset
7989
<< " len# " << len << " and therefore could not be used safely, State# " << PrintState());
80-
return TRcBuf(TRcBuf::Piece, Data.data() + offset, len, Data);
90+
return SubstrRaw(offset, len);
8191
}
8292

8393
template<typename T>
@@ -88,7 +98,7 @@ namespace NKikimr {
8898
}
8999

90100
ui8 *RawDataPtr(ui32 offset, ui32 len) {
91-
Y_ABORT_UNLESS(offset + len <= Data.size(), "Buffer has size# %zu less then requested offset# %" PRIu32
101+
Y_ABORT_UNLESS(offset + len <= Data.size() + Data.Tailroom(), "Buffer has size# %zu less then requested offset# %" PRIu32
92102
" len# %" PRIu32, Data.size(), offset, len);
93103
IsCommited = false;
94104
return reinterpret_cast<ui8 *>(Data.GetDataMut() + offset);
@@ -135,6 +145,10 @@ namespace NKikimr {
135145
return Data.size();
136146
}
137147

148+
ui32 SizeWithTail() const {
149+
return Data.size() + Data.Tailroom();
150+
}
151+
138152
void Swap(TBufferWithGaps& other) {
139153
std::swap(Data, other.Data);
140154
Gaps.swap(other.Gaps);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,12 +450,12 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
450450
FormatPDisk(cfg->GetDevicePath(), 0, cfg->SectorSize, cfg->ChunkSize,
451451
cfg->PDiskGuid, chunkKey, logKey, sysLogKey, actor->MainKey.Keys.back(), TString(), false,
452452
cfg->FeatureFlags.GetTrimEntireDeviceOnStartup(), cfg->SectorMap,
453-
cfg->FeatureFlags.GetEnableSmallDiskOptimization(), metadata);
453+
cfg->FeatureFlags.GetEnableSmallDiskOptimization(), metadata, cfg->PlainDataChunks);
454454
} catch (NPDisk::TPDiskFormatBigChunkException) {
455455
FormatPDisk(cfg->GetDevicePath(), 0, cfg->SectorSize, NPDisk::SmallDiskMaximumChunkSize,
456456
cfg->PDiskGuid, chunkKey, logKey, sysLogKey, actor->MainKey.Keys.back(), TString(), false,
457457
cfg->FeatureFlags.GetTrimEntireDeviceOnStartup(), cfg->SectorMap,
458-
cfg->FeatureFlags.GetEnableSmallDiskOptimization(), metadata);
458+
cfg->FeatureFlags.GetEnableSmallDiskOptimization(), metadata, cfg->PlainDataChunks);
459459
}
460460
actorSystem->Send(pDiskActor, new TEvPDiskFormattingFinished(true, ""));
461461
} catch (yexception ex) {
@@ -668,7 +668,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
668668
evSlay.VDiskId, evSlay.SlayOwnerRound, evSlay.PDiskId, evSlay.VSlotId, str.Str()));
669669
PDisk->Mon.YardSlay.CountResponse();
670670
}
671-
671+
672672
void InitHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
673673
const NPDisk::TEvShredPDisk &evShredPDisk = *ev->Get();
674674
InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration, ev->Cookie);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,8 @@ TBuffer *TCompletionChunkReadPart::GetBuffer() {
150150
return Buffer.Get();
151151
}
152152

153-
void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
154-
auto execSpan = Span.CreateChild(TWilson::PDiskDetailed, "PDisk.CompletionChunkReadPart.Exec");
155-
Y_VERIFY(actorSystem);
156-
Y_VERIFY(CumulativeCompletion);
157-
if (TCompletionAction::Result != EIoResult::Ok) {
158-
Release(actorSystem);
159-
return;
160-
}
161153

154+
void TCompletionChunkReadPart::UnencryptData(TActorSystem *actorSystem) {
162155
const TDiskFormat &format = PDisk->Format;
163156

164157
ui64 firstSector;
@@ -169,7 +162,6 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
169162
PDisk->PCtx->PDiskLogPrefix);
170163
Y_VERIFY(isOk);
171164

172-
173165
ui8* source = Buffer->Data();
174166

175167
TPDiskStreamCypher cypher(PDisk->Cfg->EnableSectorEncryption);
@@ -276,6 +268,20 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
276268
beginBadUserOffset = 0xffffffff;
277269
endBadUserOffset = 0xffffffff;
278270
}
271+
}
272+
273+
void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
274+
auto execSpan = Span.CreateChild(TWilson::PDiskDetailed, "PDisk.CompletionChunkReadPart.Exec");
275+
Y_VERIFY(actorSystem);
276+
Y_VERIFY(CumulativeCompletion);
277+
if (TCompletionAction::Result != EIoResult::Ok) {
278+
Release(actorSystem);
279+
return;
280+
}
281+
282+
if (Read->ChunkEncrypted) {
283+
UnencryptData(actorSystem);
284+
}
279285

280286
double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime);
281287
LWTRACK(PDiskChunkReadPieceComplete, Orbit, PDisk->PCtx->PDiskId, RawReadSize, CommonBufferOffset, deviceTimeMs);
@@ -301,6 +307,38 @@ void TCompletionChunkReadPart::Release(TActorSystem *actorSystem) {
301307
delete this;
302308
}
303309

310+
TCompletionChunkRead::TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
311+
ui64 chunkNonce, NWilson::TSpan&& span)
312+
: TCompletionAction()
313+
, PDisk(pDisk)
314+
, Read(read)
315+
// 1 in PartsPending stands for the last part, so if any non-last part completes it will not lead to call of Exec()
316+
, PartsPending(1)
317+
, Deletes(0)
318+
, OnDestroy(std::move(onDestroy))
319+
, ChunkNonce(chunkNonce)
320+
, Span(std::move(span))
321+
, DoubleFreeCanary(ReferenceCanary)
322+
{
323+
// in case of plain chunks we can have unaligned offset and unaligned size of the read
324+
// so we have to calculate headroom and tailroom as follow
325+
// ^ requested data
326+
// * not interesting data
327+
// | sector border (sector size equals to 4 in the example)
328+
//
329+
// | * * ^ ^ | ^ ^ ^ ^ | ^ ^ ^ * |
330+
// headroom = 2, size = 9, tailroom = 1, new size should be equal
331+
// - increase size by offset unalignment (2 in this case), newSize is 11 now
332+
// - align the size up to sector border, we get 12
333+
// - calculate tailroom as follow: 12 - 11 = 1
334+
size_t sectorSize = PDisk->Format.SectorSize;
335+
auto newSize = read->ChunkEncrypted
336+
? read->Size
337+
: read->Size + read->Offset % sectorSize;
338+
size_t tailroom = AlignUp<size_t>(newSize, sectorSize) - newSize;
339+
CommonBuffer = TBufferWithGaps(read->Offset, newSize, tailroom);
340+
}
341+
304342
TCompletionChunkRead::~TCompletionChunkRead() {
305343
OnDestroy();
306344
Y_VERIFY(CommonBuffer.Empty());
@@ -313,6 +351,12 @@ void TCompletionChunkRead::Exec(TActorSystem *actorSystem) {
313351
auto execSpan = Span.CreateChild(TWilson::PDiskDetailed, "PDisk.CompletionChunkRead.Exec");
314352
THolder<TEvChunkReadResult> result = MakeHolder<TEvChunkReadResult>(NKikimrProto::OK,
315353
Read->ChunkIdx, Read->Offset, Read->Cookie, PDisk->GetStatusFlags(Read->Owner, Read->OwnerGroupType), "");
354+
355+
if (!Read->ChunkEncrypted) {
356+
size_t headroom = Read->Offset % PDisk->Format.SectorSize;
357+
auto newBuf = CommonBuffer.SubstrRaw(headroom, CommonBuffer.Size() - headroom);
358+
CommonBuffer.SetData(std::move(newBuf));
359+
}
316360
result->Data = std::move(CommonBuffer);
317361
CommonBuffer.Clear();
318362
//Y_VERIFY(result->Data.IsDetached());

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class TCompletionChunkWrite : public TCompletionAction {
7878
NWilson::TSpan Span;
7979

8080
public:
81+
TEvChunkWrite::TPartsPtr Parts;
82+
std::optional<TAlignedData> Buffer;
83+
8184
TCompletionChunkWrite(const TActorId &recipient, TEvChunkWriteResult *event,
8285
TPDiskMon *mon, ui32 pdiskId, NHPTimer::STime startTime, size_t sizeBytes,
8386
ui8 priorityClass, std::function<void()> onDestroy, TReqId reqId,
@@ -96,6 +99,49 @@ class TCompletionChunkWrite : public TCompletionAction {
9699
TCompletionAction::ShouldBeExecutedInCompletionThread = false;
97100
}
98101

102+
const void *GetBuffer() {
103+
if (Buffer) {
104+
return Buffer->Get();
105+
} else if (Parts->Size() == 1) {
106+
return (*Parts)[0].first;
107+
} else {
108+
return nullptr;
109+
}
110+
}
111+
112+
size_t CompactBuffer(size_t tailroom, size_t sectorSize) {
113+
size_t totalSize = 0;
114+
for (size_t i = 0; i < Parts->Size(); ++i) {
115+
totalSize += (*Parts)[i].second;
116+
}
117+
totalSize += tailroom;
118+
119+
totalSize = AlignUp<ui64>(totalSize, sectorSize);
120+
Buffer.emplace(totalSize);
121+
122+
size_t written = 0;
123+
124+
// body
125+
for (size_t i = 0; i < Parts->Size(); ++i) {
126+
auto [ptr, size] = (*Parts)[i];
127+
if (ptr) {
128+
memcpy(Buffer->Get() + written, ptr, size);
129+
} else {
130+
memset(Buffer->Get() + written, 0, size);
131+
}
132+
written += size;
133+
}
134+
135+
// tail
136+
if (written < totalSize) {
137+
auto size = totalSize - written;
138+
memset(Buffer->Get() + written, 0, size);
139+
written += size;
140+
}
141+
Y_VERIFY(written == totalSize);
142+
return totalSize;
143+
}
144+
99145
~TCompletionChunkWrite() {
100146
OnDestroy();
101147
}
@@ -179,20 +225,7 @@ class TCompletionChunkRead : public TCompletionAction {
179225
const ui64 DoubleFreeCanary;
180226
public:
181227
TCompletionChunkRead(TPDisk *pDisk, TIntrusivePtr<TChunkRead> &read, std::function<void()> onDestroy,
182-
ui64 chunkNonce, NWilson::TSpan&& span)
183-
: TCompletionAction()
184-
, PDisk(pDisk)
185-
, Read(read)
186-
, CommonBuffer(read->Offset, read->Size)
187-
// 1 in PartsPending stands for the last part, so if any non-last part completes it will not lead to call of Exec()
188-
, PartsPending(1)
189-
, Deletes(0)
190-
, OnDestroy(std::move(onDestroy))
191-
, ChunkNonce(chunkNonce)
192-
, Span(std::move(span))
193-
, DoubleFreeCanary(ReferenceCanary)
194-
{}
195-
228+
ui64 chunkNonce, NWilson::TSpan&& span);
196229
void Exec(TActorSystem *actorSystem) override;
197230
~TCompletionChunkRead();
198231
void ReplyError(TActorSystem *actorSystem, TString reason);
@@ -250,6 +283,8 @@ class TCompletionChunkReadPart : public TCompletionAction {
250283
}
251284

252285
TBuffer *GetBuffer();
286+
void UnencryptData(TActorSystem *actorSystem);
287+
253288
void Exec(TActorSystem *actorSystem) override;
254289
void Release(TActorSystem *actorSystem) override;
255290
virtual ~TCompletionChunkReadPart();

ydb/core/blobstorage/pdisk/blobstorage_pdisk_config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ struct TPDiskConfig : public TThrRefBase {
167167
ui32 CompletionThreadsCount = 1;
168168
bool UseNoopScheduler = false;
169169

170+
bool PlainDataChunks = false;
171+
170172
bool MetadataOnly = false;
171173

172174
bool ReadOnly = false;
@@ -325,6 +327,7 @@ struct TPDiskConfig : public TThrRefBase {
325327
str << " SpaceColorBorder# " << SpaceColorBorder << x;
326328
str << " CompletionThreadsCount# " << CompletionThreadsCount << x;
327329
str << " UseNoopScheduler# " << (UseNoopScheduler ? "true" : "false") << x;
330+
str << " PlainDataChunks# " << PlainDataChunks << x;
328331
str << "}";
329332
return str.Str();
330333
}
@@ -417,6 +420,9 @@ struct TPDiskConfig : public TThrRefBase {
417420
if (cfg->HasUseNoopScheduler()) {
418421
UseNoopScheduler = cfg->GetUseNoopScheduler();
419422
}
423+
if (cfg->HasPlainDataChunks()) {
424+
PlainDataChunks = cfg->GetPlainDataChunks();
425+
}
420426
}
421427
};
422428

ydb/core/blobstorage/pdisk/blobstorage_pdisk_data.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,8 @@ enum EFormatFlags {
579579
FormatFlagEncryptFormat = 1 << 5, // Always on, flag is useless
580580
FormatFlagEncryptData = 1 << 6, // Always on, flag is useless
581581
FormatFlagFormatInProgress = 1 << 7, // Not implemented (Must be OFF for a formatted disk)
582+
583+
FormatFlagPlainDataChunks = 1 << 8, // Default is off, means "encrypted", for backward compatibility
582584
};
583585

584586
struct TDiskFormat {
@@ -625,6 +627,7 @@ struct TDiskFormat {
625627
isFirst = NText::OutFlag(isFirst, flags & FormatFlagEncryptFormat, "EncryptFormat", str);
626628
isFirst = NText::OutFlag(isFirst, flags & FormatFlagEncryptData, "EncryptData", str);
627629
isFirst = NText::OutFlag(isFirst, flags & FormatFlagFormatInProgress, "FormatFlagFormatInProgress", str);
630+
isFirst = NText::OutFlag(isFirst, flags & FormatFlagPlainDataChunks, "FormatFlagPlainDataChunks", str);
628631
NText::OutFlag(isFirst, isFirst, "Unknown", str);
629632
return str.Str();
630633
}
@@ -694,13 +697,29 @@ struct TDiskFormat {
694697
return FormatFlags & FormatFlagFormatInProgress;
695698
}
696699

700+
bool IsPlainDataChunks() const {
701+
return FormatFlags & FormatFlagPlainDataChunks;
702+
}
703+
704+
size_t GetAppendBlockSize() const {
705+
return IsPlainDataChunks() ? SectorSize : SectorPayloadSize();
706+
}
707+
697708
void SetFormatInProgress(bool isInProgress) {
698709
FormatFlags &= ~FormatFlagFormatInProgress;
699710
if (isInProgress) {
700711
FormatFlags |= FormatFlagFormatInProgress;
701712
}
702713
}
703714

715+
void SetPlainDataChunks(bool plain) {
716+
if (plain) {
717+
FormatFlags |= FormatFlagPlainDataChunks;
718+
} else {
719+
FormatFlags &= ~FormatFlagPlainDataChunks;
720+
}
721+
}
722+
704723
ui64 Offset(TChunkIdx chunkIdx, ui32 sectorIdx, ui64 offset) const {
705724
return (ui64)ChunkSize * chunkIdx + (ui64)SectorSize * sectorIdx + offset;
706725
}
@@ -848,6 +867,7 @@ struct TDiskFormat {
848867
FormatFlagErasureEncodeNextChunkReference |
849868
FormatFlagEncryptFormat |
850869
FormatFlagEncryptData;
870+
851871
Hash = 0;
852872

853873
memset(FormatText, 0, sizeof(FormatText));
@@ -867,6 +887,8 @@ struct TDiskFormat {
867887
FormatFlagErasureEncodeNextChunkReference |
868888
FormatFlagEncryptFormat |
869889
FormatFlagEncryptData;
890+
SetPlainDataChunks(format.IsPlainDataChunks());
891+
870892
Y_VERIFY(format.Version <= Version);
871893
Y_VERIFY(format.GetUsedSize() <= sizeof(TDiskFormat));
872894
memcpy(this, &format, format.GetUsedSize());

0 commit comments

Comments
 (0)