diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index fd5e8fd181d..571a1c9b904 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -1419,4 +1419,7 @@ message TStorageServiceConfig // Timeout for attach/detach path requests (in milliseconds). optional uint32 AttachDetachPathRequestTimeout = 480; + + // Execute separate compaction transaction for each range of the batch. + optional bool SplitTxInBatchCompactionEnabled = 481; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index d00e1a88bcd..c7eb7f31f58 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -209,6 +209,7 @@ NProto::TLinkedDiskFillBandwidth GetBandwidth( xxx(ForcedCompactionRangeCountPerRun, ui32, 1 )\ xxx(CompactionCountPerRunChangingPeriod, TDuration, Seconds(60) )\ xxx(BatchCompactionEnabled, bool, false )\ + xxx(SplitTxInBatchCompactionEnabled, bool, false )\ xxx(BlobPatchingEnabled, bool, false )\ /* If threshold is not 0, use it */ \ xxx(MaxDiffPercentageForBlobPatching, ui32, 0 )\ @@ -689,6 +690,7 @@ BLOCKSTORE_STORAGE_CONFIG(BLOCKSTORE_STORAGE_DECLARE_CONFIG) xxx(FreshChannelWriteRequests) \ xxx(MixedIndexCacheV1) \ xxx(BatchCompaction) \ + xxx(SplitTxInBatchCompaction) \ xxx(BlobPatching) \ xxx(UseRdma) \ xxx(ChangeThrottlingPolicy) \ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 68f6a638674..7f3187c8032 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -108,6 +108,7 @@ class TStorageConfig ui32 GetForcedCompactionRangeCountPerRun() const; TDuration GetCompactionCountPerRunChangingPeriod() const; bool GetBatchCompactionEnabled() const; + bool GetSplitTxInBatchCompactionEnabled() const; bool GetBlobPatchingEnabled() const; ui32 GetMaxDiffPercentageForBlobPatching() const; @@ -343,6 +344,10 @@ class TStorageConfig const TString& cloudId, const TString& folderId, const TString& diskId) const; + bool IsSplitTxInBatchCompactionFeatureEnabled( + const TString& cloudId, + const TString& folderId, + const TString& diskId) const; bool IsBlobPatchingFeatureEnabled( const TString& cloudId, const TString& folderId, diff --git a/cloud/blockstore/libs/storage/partition/model/affected.cpp b/cloud/blockstore/libs/storage/partition/model/affected.cpp new file mode 100644 index 00000000000..51de2c3bdc7 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/model/affected.cpp @@ -0,0 +1 @@ +#include "affected.h" diff --git a/cloud/blockstore/libs/storage/partition/model/affected.h b/cloud/blockstore/libs/storage/partition/model/affected.h new file mode 100644 index 00000000000..9ed24148145 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/model/affected.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +//////////////////////////////////////////////////////////////////////////////// + +struct TAffectedBlob +{ + TVector Offsets; + TMaybe BlockMask; + TVector AffectedBlockIndices; + + // Filled only if a flag is set. BlobMeta is needed only to do some extra + // consistency checks. + TMaybe BlobMeta; +}; + +using TAffectedBlobs = THashMap; + +//////////////////////////////////////////////////////////////////////////////// + +struct TAffectedBlock +{ + ui32 BlockIndex = 0; + ui64 CommitId = 0; +}; + +using TAffectedBlocks = TVector; + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp index b0c60f6ce22..5a350f402fd 100644 --- a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp +++ b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp @@ -7,7 +7,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition { void TCommitQueue::Enqueue(TTxPtr tx, ui64 commitId) { if (Items) { - Y_ABORT_UNLESS(Items.back().CommitId < commitId); + Y_ABORT_UNLESS(Items.back().CommitId <= commitId); } Items.emplace_back(commitId, std::move(tx)); } diff --git a/cloud/blockstore/libs/storage/partition/part_actor.cpp b/cloud/blockstore/libs/storage/partition/part_actor.cpp index d0853230137..00076ac5a4a 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor.cpp @@ -1043,6 +1043,7 @@ STFUNC(TPartitionActor::StateWork) IgnoreFunc(TEvPartitionPrivate::TEvCleanupResponse); IgnoreFunc(TEvPartitionPrivate::TEvCollectGarbageResponse); IgnoreFunc(TEvPartitionPrivate::TEvCompactionResponse); + IgnoreFunc(TEvPartitionPrivate::TEvCompactionTxResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildUsedBlocksResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse); IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); @@ -1106,6 +1107,7 @@ STFUNC(TPartitionActor::StateZombie) IgnoreFunc(TEvPartitionPrivate::TEvCleanupResponse); IgnoreFunc(TEvPartitionPrivate::TEvCollectGarbageResponse); IgnoreFunc(TEvPartitionPrivate::TEvCompactionResponse); + IgnoreFunc(TEvPartitionPrivate::TEvCompactionTxResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildUsedBlocksResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse); IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index 10de02782af..de951efd82e 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -2,6 +2,7 @@ #include "public.h" +#include "part_compaction.h" #include "part_counters.h" #include "part_events_private.h" #include "part_state.h" diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index f2676a22549..0773f104a6e 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -1,4 +1,5 @@ #include "part_actor.h" +#include "part_compaction.h" #include #include @@ -16,6 +17,8 @@ #include #include #include +#include +#include namespace NCloud::NBlockStore::NStorage::NPartition { @@ -30,89 +33,11 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -struct TRangeCompactionInfo -{ - const TBlockRange32 BlockRange; - const TPartialBlobId OriginalBlobId; - const TPartialBlobId DataBlobId; - const TBlockMask DataBlobSkipMask; - const TPartialBlobId ZeroBlobId; - const TBlockMask ZeroBlobSkipMask; - const ui32 BlobsSkippedByCompaction; - const ui32 BlocksSkippedByCompaction; - const TVector BlockChecksums; - const EChannelDataKind ChannelDataKind; - - TGuardedBuffer BlobContent; - TVector ZeroBlocks; - TAffectedBlobs AffectedBlobs; - TAffectedBlocks AffectedBlocks; - TVector UnchangedBlobOffsets; - TArrayHolder Diffs; - ui32 DiffCount = 0; - - TRangeCompactionInfo( - TBlockRange32 blockRange, - TPartialBlobId originalBlobId, - TPartialBlobId dataBlobId, - TBlockMask dataBlobSkipMask, - TPartialBlobId zeroBlobId, - TBlockMask zeroBlobSkipMask, - ui32 blobsSkippedByCompaction, - ui32 blocksSkippedByCompaction, - TVector blockChecksums, - EChannelDataKind channelDataKind, - TBlockBuffer blobContent, - TVector zeroBlocks, - TAffectedBlobs affectedBlobs, - TAffectedBlocks affectedBlocks) - : BlockRange(blockRange) - , OriginalBlobId(originalBlobId) - , DataBlobId(dataBlobId) - , DataBlobSkipMask(dataBlobSkipMask) - , ZeroBlobId(zeroBlobId) - , ZeroBlobSkipMask(zeroBlobSkipMask) - , BlobsSkippedByCompaction(blobsSkippedByCompaction) - , BlocksSkippedByCompaction(blocksSkippedByCompaction) - , BlockChecksums(std::move(blockChecksums)) - , ChannelDataKind(channelDataKind) - , BlobContent(std::move(blobContent)) - , ZeroBlocks(std::move(zeroBlocks)) - , AffectedBlobs(std::move(affectedBlobs)) - , AffectedBlocks(std::move(affectedBlocks)) - {} -}; - class TCompactionActor final : public TActorBootstrapped { public: - struct TRequest - { - TPartialBlobId BlobId; - TActorId Proxy; - ui16 BlobOffset; - ui32 BlockIndex; - size_t IndexInBlobContent; - ui32 GroupId; - ui32 RangeCompactionIndex; - - TRequest(const TPartialBlobId& blobId, - const TActorId& proxy, - ui16 blobOffset, - ui32 blockIndex, - size_t indexInBlobContent, - ui32 groupId, - ui32 rangeCompactionIndex) - : BlobId(blobId) - , Proxy(proxy) - , BlobOffset(blobOffset) - , BlockIndex(blockIndex) - , IndexInBlobContent(indexInBlobContent) - , GroupId(groupId) - , RangeCompactionIndex(rangeCompactionIndex) - {} - }; + using TRequest = TCompactionReadRequest; struct TBatchRequest { @@ -159,6 +84,13 @@ class TCompactionActor final TChildLogTitle LogTitle; const ui64 CommitId; + const TCompactionOptions CompactionOptions; + + const bool SplitTxInBatchCompaction = false; + TVector Ranges; + + TVector ForkedCompactionTxCallContexts; + ui32 AwaitedCompactionTxCall = 0; TVector RangeCompactionInfos; TVector Requests; @@ -193,13 +125,16 @@ class TCompactionActor final TDuration blobStorageAsyncRequestTimeout, ECompactionType compactionType, ui64 commitId, - TVector rangeCompactionInfos, - TVector requests, + TCompactionOptions CompactionOptions, + bool splitTxInBatchCompaction, + TVector ranges, TChildLogTitle logTitle); void Bootstrap(const TActorContext& ctx); private: + void ProcessRequests(const TActorContext& ctx); + void InitBlockDigests(); void ReadBlocks(const TActorContext& ctx); @@ -217,6 +152,10 @@ class TCompactionActor final private: STFUNC(StateWork); + void HandleCompactionTxResponse( + const TEvPartitionPrivate::TEvCompactionTxResponse::TPtr& ev, + const TActorContext& ctx); + template void HandleWriteOrPatchBlobResponse( TEvent& ev, @@ -258,8 +197,9 @@ TCompactionActor::TCompactionActor( TDuration blobStorageAsyncRequestTimeout, ECompactionType compactionType, ui64 commitId, - TVector rangeCompactionInfos, - TVector requests, + TCompactionOptions compactionOptions, + bool splitTxInBatchCompaction, + TVector ranges, TChildLogTitle logTitle) : RequestInfo(std::move(requestInfo)) , TabletId(tabletId) @@ -274,8 +214,9 @@ TCompactionActor::TCompactionActor( , CompactionType(compactionType) , LogTitle(std::move(logTitle)) , CommitId(commitId) - , RangeCompactionInfos(std::move(rangeCompactionInfos)) - , Requests(std::move(requests)) + , CompactionOptions(compactionOptions) + , SplitTxInBatchCompaction(splitTxInBatchCompaction) + , Ranges(std::move(ranges)) {} void TCompactionActor::Bootstrap(const TActorContext& ctx) @@ -290,6 +231,47 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) "Compaction", RequestInfo->CallContext->RequestId); + auto sendRequest = + [&](ui32 rangeCompactionIndex, + TVector ranges) + { + ++AwaitedCompactionTxCall; + + auto request = std::make_unique( + CommitId, + rangeCompactionIndex, + CompactionOptions, + std::move(ranges)); + + if (!RequestInfo->CallContext->LWOrbit.Fork( + request->CallContext->LWOrbit)) + { + LWTRACK( + ForkFailed, + RequestInfo->CallContext->LWOrbit, + "TEvPartitionPrivate::TEvCompactionTxRequest", + RequestInfo->CallContext->RequestId); + } + + request->CallContext->RequestId = RequestInfo->CallContext->RequestId; + ForkedCompactionTxCallContexts.emplace_back(request->CallContext); + + NCloud::Send(ctx, Tablet, std::move(request)); + }; + + if (SplitTxInBatchCompaction) { + for (ui32 i = 0; i < Ranges.size(); ++i) { + sendRequest(i, TVector{std::move(Ranges[i])}); + } + } else { + sendRequest(0, std::move(Ranges)); + } + + Ranges.clear(); +} + +void TCompactionActor::ProcessRequests(const TActorContext& ctx) +{ if (Requests) { ReadBlocks(ctx); @@ -378,6 +360,16 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx) [makeTie](const TRequest* l, const TRequest* r) { return makeTie(l) < makeTie(r); }); + TVector infos(Reserve(RangeCompactionInfos.size())); + for (auto& info: RangeCompactionInfos) { + infos.push_back(&info); + } + + Sort( + infos, + [](const TRangeCompactionInfo* l, const TRangeCompactionInfo* r) + { return l->RangeCompactionIndex < r->RangeCompactionIndex; }); + TBatchRequest current; ui32 currentRangeCompactionIndex = 0; for (auto* r: requests) { @@ -404,7 +396,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx) current.Proxy = r->Proxy; currentRangeCompactionIndex = r->RangeCompactionIndex; current.RangeCompactionInfo = - &RangeCompactionInfos[r->RangeCompactionIndex]; + infos[r->RangeCompactionIndex]; } if (current.BlobId == current.RangeCompactionInfo->OriginalBlobId) { @@ -864,6 +856,39 @@ void TCompactionActor::ReplyAndDie( //////////////////////////////////////////////////////////////////////////////// +void TCompactionActor::HandleCompactionTxResponse( + const TEvPartitionPrivate::TEvCompactionTxResponse::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + if (HandleError(ctx, msg->GetError())) { + return; + } + + std::move( + msg->RangeCompactionInfos.begin(), + msg->RangeCompactionInfos.end(), + std::back_insert_iterator>( + RangeCompactionInfos)); + std::move( + msg->Requests.begin(), + msg->Requests.end(), + std::back_insert_iterator>(Requests)); + + if (--AwaitedCompactionTxCall) { + return; + } + + for (auto context: ForkedReadCallContexts) { + RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit); + } + + ForkedReadCallContexts.clear(); + + ProcessRequests(ctx); +} + void TCompactionActor::HandleReadBlobResponse( const TEvPartitionCommonPrivate::TEvReadBlobResponse::TPtr& ev, const TActorContext& ctx) @@ -1001,6 +1026,7 @@ STFUNC(TCompactionActor::StateWork) switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvPartitionPrivate::TEvCompactionTxResponse, HandleCompactionTxResponse); HFunc(TEvPartitionCommonPrivate::TEvReadBlobResponse, HandleReadBlobResponse); HFunc(TEvPartitionPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); HFunc(TEvPartitionPrivate::TEvPatchBlobResponse, HandlePatchBlobResponse); @@ -1017,55 +1043,6 @@ STFUNC(TCompactionActor::StateWork) //////////////////////////////////////////////////////////////////////////////// -class TCompactionBlockVisitor final - : public IFreshBlocksIndexVisitor - , public IBlocksIndexVisitor -{ -private: - TTxPartition::TRangeCompaction& Args; - const ui64 MaxCommitId; - -public: - TCompactionBlockVisitor( - TTxPartition::TRangeCompaction& args, - ui64 maxCommitId) - : Args(args) - , MaxCommitId(maxCommitId) - {} - - bool Visit(const TFreshBlock& block) override - { - Args.MarkBlock( - block.Meta.BlockIndex, - block.Meta.CommitId, - block.Content); - return true; - } - - bool KeepTrackOfAffectedBlocks = false; - - bool Visit( - ui32 blockIndex, - ui64 commitId, - const TPartialBlobId& blobId, - ui16 blobOffset) override - { - if (commitId > MaxCommitId) { - return true; - } - - Args.MarkBlock( - blockIndex, - commitId, - blobId, - blobOffset, - KeepTrackOfAffectedBlocks); - return true; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - ui32 GetPercentage(ui64 total, ui64 real) { const double p = (real - total) * 100. / Max(total, 1UL); @@ -1415,8 +1392,9 @@ void TPartitionActor::HandleCompaction( return; } - TVector> ranges(Reserve(tops.size())); - for (const auto& x: tops) { + TVector ranges(Reserve(tops.size())); + for (ui32 i = 0; i < tops.size(); ++i) { + const auto& x = tops[i]; const ui32 rangeIdx = cm.GetRangeIndex(x.BlockIndex); const auto blockRange = TBlockRange32::MakeClosedIntervalWithLimit( @@ -1440,7 +1418,7 @@ void TPartitionActor::HandleCompaction( x.Stat.ReadRequestBlockCount, x.Stat.CompactionScore.Score); - ranges.emplace_back(rangeIdx, blockRange); + ranges.emplace_back(rangeIdx, blockRange, i); } State->GetCompactionState(compactionType).SetStatus(EOperationStatus::Started); @@ -1449,24 +1427,42 @@ void TPartitionActor::HandleCompaction( State->GetCleanupQueue().AcquireBarrier(commitId); State->GetGarbageQueue().AcquireBarrier(commitId); - AddTransaction(*requestInfo); + const bool splitTxInBatchCompactionEnabledForDisk = + Config->IsSplitTxInBatchCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool splitTxInBatchCompactionEnabled = + Config->GetSplitTxInBatchCompactionEnabled() || + splitTxInBatchCompactionEnabledForDisk; - auto tx = CreateTx( + auto actor = NCloud::Register( + ctx, requestInfo, + TabletID(), + PartitionConfig.GetDiskId(), + SelfId(), + State->GetBlockSize(), + State->GetMaxBlocksInBlob(), + Config->GetMaxAffectedBlocksPerCompaction(), + Config->GetComputeDigestForEveryBlockOnCompaction(), + BlockDigestGenerator, + GetBlobStorageAsyncRequestTimeout(), + compactionType, commitId, msg->CompactionOptions, - std::move(ranges)); - - ui64 minCommitId = State->GetCommitQueue().GetMinCommitId(); - Y_ABORT_UNLESS(minCommitId <= commitId); + splitTxInBatchCompactionEnabled, + std::move(ranges), + LogTitle.GetChild(GetCycleCount())); + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", + LogTitle.GetWithTime().c_str(), + actor.ToString().c_str(), + commitId); - if (minCommitId == commitId) { - // start execution - ExecuteTx(ctx, std::move(tx)); - } else { - // delay execution until all previous commits completed - State->AccessCommitQueue().Enqueue(std::move(tx), commitId); - } + Actors.Insert(actor); } void TPartitionActor::ProcessCommitQueue(const TActorContext& ctx) @@ -1571,538 +1567,4 @@ void TPartitionActor::HandleCompactionCompleted( ProcessCommitQueue(ctx); } -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -void PrepareRangeCompaction( - const TStorageConfig& config, - const bool incrementalCompactionEnabled, - const ui64 commitId, - const bool fullCompaction, - const TActorContext& ctx, - const ui64 tabletId, - bool& ready, - TPartitionDatabase& db, - TPartitionState& state, - TTxPartition::TRangeCompaction& args, - const TString& logTitle) -{ - TCompactionBlockVisitor visitor(args, commitId); - state.FindFreshBlocks(visitor, args.BlockRange, commitId); - visitor.KeepTrackOfAffectedBlocks = true; - ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx); - visitor.KeepTrackOfAffectedBlocks = false; - ready &= db.FindMergedBlocks( - visitor, - args.BlockRange, - true, // precharge - state.GetMaxBlocksInBlob(), - commitId); - - if (ready && incrementalCompactionEnabled && !fullCompaction) { - THashMap liveBlocks; - for (const auto& m: args.BlockMarks) { - if (m.CommitId && m.BlobId) { - ++liveBlocks[m.BlobId]; - } - } - - TVector blobIds; - blobIds.reserve(liveBlocks.size()); - for (const auto& x: liveBlocks) { - blobIds.push_back(x.first); - } - - Sort( - blobIds, - [&](const TPartialBlobId& l, const TPartialBlobId& r) - { return liveBlocks[l] < liveBlocks[r]; }); - - auto it = blobIds.begin(); - args.BlobsSkipped = blobIds.size(); - ui32 blocks = 0; - - while (it != blobIds.end()) { - const auto bytes = blocks * state.GetBlockSize(); - const auto blobCountOk = - args.BlobsSkipped <= - config.GetMaxSkippedBlobsDuringCompaction(); - const auto byteCountOk = - bytes >= config.GetTargetCompactionBytesPerOp(); - - if (blobCountOk && byteCountOk) { - break; - } - - blocks += liveBlocks[*it]; - --args.BlobsSkipped; - ++it; - } - - // liveBlocks will contain only skipped blobs after this - for (auto it2 = blobIds.begin(); it2 != it; ++it2) { - liveBlocks.erase(*it2); - } - - while (it != blobIds.end()) { - args.BlocksSkipped += liveBlocks[*it]; - ++it; - } - - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Dropping last %u blobs, %u blocks, remaining blobs: %u, " - "blocks: %u", - logTitle.c_str(), - args.BlobsSkipped, - args.BlocksSkipped, - liveBlocks.size(), - blocks); - - THashSet skippedBlockIndices; - - for (const auto& x: liveBlocks) { - auto ab = args.AffectedBlobs.find(x.first); - Y_ABORT_UNLESS(ab != args.AffectedBlobs.end()); - for (const auto blockIndex: ab->second.AffectedBlockIndices) { - // we can actually add extra indices to skippedBlockIndices, - // but it does not cause data corruption - the important thing - // is to ensure that all skipped indices are added, not that - // all non-skipped are preserved - skippedBlockIndices.insert(blockIndex); - } - args.AffectedBlobs.erase(ab); - } - - if (liveBlocks.size()) { - TAffectedBlocks affectedBlocks; - for (const auto& b: args.AffectedBlocks) { - if (!skippedBlockIndices.contains(b.BlockIndex)) { - affectedBlocks.push_back(b); - } - } - args.AffectedBlocks = std::move(affectedBlocks); - - for (auto& m: args.BlockMarks) { - if (liveBlocks.contains(m.BlobId)) { - m = {}; - } - } - } - } - - const ui32 checksumBoundary = - config.GetDiskPrefixLengthWithBlockChecksumsInBlobs() - / state.GetBlockSize(); - args.ChecksumsEnabled = args.BlockRange.Start < checksumBoundary; - - for (auto& kv: args.AffectedBlobs) { - if (db.ReadBlockMask(kv.first, kv.second.BlockMask)) { - Y_ABORT_UNLESS(kv.second.BlockMask.Defined(), - "Could not read block mask for blob: %s", - ToString(MakeBlobId(tabletId, kv.first)).data()); - } else { - ready = false; - } - - if (args.ChecksumsEnabled) { - if (db.ReadBlobMeta(kv.first, kv.second.BlobMeta)) { - Y_ABORT_UNLESS(kv.second.BlobMeta.Defined(), - "Could not read blob meta for blob: %s", - ToString(MakeBlobId(tabletId, kv.first)).data()); - } else { - ready = false; - } - } - } -} - -void CompleteRangeCompaction( - const bool blobPatchingEnabled, - const ui32 mergedBlobThreshold, - const ui64 commitId, - TTabletStorageInfo& tabletStorageInfo, - TPartitionState& state, - TTxPartition::TRangeCompaction& args, - TVector& requests, - TVector& rangeCompactionInfos, - ui32 maxDiffPercentageForBlobPatching) -{ - const EChannelPermissions compactionPermissions = - EChannelPermission::SystemWritesAllowed; - const auto initialRequestsSize = requests.size(); - - // at first we count number of data blocks - size_t dataBlocksCount = 0, zeroBlocksCount = 0; - - for (const auto& mark: args.BlockMarks) { - if (mark.CommitId) { - const bool isFresh = !mark.BlockContent.empty(); - const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); - // there could be fresh block OR merged/mixed block - Y_ABORT_UNLESS(!(isFresh && isMixedOrMerged)); - if (isFresh || isMixedOrMerged) { - ++dataBlocksCount; - } else { - ++zeroBlocksCount; - } - } - } - - // determine the results kind - TPartialBlobId dataBlobId, zeroBlobId; - TBlockMask dataBlobSkipMask, zeroBlobSkipMask; - - auto channelDataKind = EChannelDataKind::Merged; - if (dataBlocksCount) { - ui32 skipped = 0; - for (const auto& mark: args.BlockMarks) { - const bool isFresh = !mark.BlockContent.empty(); - const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); - if (!isFresh && !isMixedOrMerged) { - ++skipped; - } - } - - const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize(); - if (blobSize < mergedBlobThreshold) { - channelDataKind = EChannelDataKind::Mixed; - } - dataBlobId = state.GenerateBlobId( - channelDataKind, - compactionPermissions, - commitId, - blobSize, - rangeCompactionInfos.size()); - } - - if (zeroBlocksCount) { - // for zeroed region we will write blob without any data - // XXX same commitId used for 2 blobs: data blob and zero blob - // we differentiate between them by storing the last block index in - // MergedBlocksIndex::RangeEnd not for the last block of the processed - // compaction range but for the last actual block that's referenced by - // the corresponding blob - zeroBlobId = state.GenerateBlobId( - channelDataKind, - compactionPermissions, - commitId, - 0, - rangeCompactionInfos.size()); - } - - // now build the blob content for all blocks to be written - TBlockBuffer blobContent(TProfilingAllocator::Instance()); - TVector blockChecksums; - TVector zeroBlocks; - - ui32 blockIndex = args.BlockRange.Start; - TPartialBlobId patchingCandidate; - ui32 patchingCandidateChangedBlockCount = 0; - for (auto& mark: args.BlockMarks) { - if (mark.CommitId) { - if (mark.BlockContent) { - Y_ABORT_UNLESS(IsDeletionMarker(mark.BlobId)); - requests.emplace_back( - mark.BlobId, - TActorId(), - mark.BlobOffset, - blockIndex, - blobContent.GetBlocksCount(), - 0, - rangeCompactionInfos.size()); - - // fresh block will be written - blobContent.AddBlock({ - mark.BlockContent.data(), - mark.BlockContent.size() - }); - - if (args.ChecksumsEnabled) { - blockChecksums.push_back( - ComputeDefaultDigest(blobContent.GetBlocks().back())); - } - - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - } else if (!IsDeletionMarker(mark.BlobId)) { - const auto proxy = tabletStorageInfo.BSProxyIDForChannel( - mark.BlobId.Channel(), - mark.BlobId.Generation()); - - requests.emplace_back( - mark.BlobId, - proxy, - mark.BlobOffset, - blockIndex, - blobContent.GetBlocksCount(), - tabletStorageInfo.GroupFor( - mark.BlobId.Channel(), - mark.BlobId.Generation()), - rangeCompactionInfos.size()); - - // we will read this block later - blobContent.AddBlock(state.GetBlockSize(), char(0)); - - // block checksum is simply moved from the affected blob's meta - if (args.ChecksumsEnabled) { - ui32 blockChecksum = 0; - - auto* affectedBlob = args.AffectedBlobs.FindPtr(mark.BlobId); - Y_DEBUG_ABORT_UNLESS(affectedBlob); - if (affectedBlob) { - if (auto* meta = affectedBlob->BlobMeta.Get()) { - if (mark.BlobOffset < meta->BlockChecksumsSize()) { - blockChecksum = - meta->GetBlockChecksums(mark.BlobOffset); - } - } - } - - blockChecksums.push_back(blockChecksum); - } - - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - - if (blobPatchingEnabled) { - if (!patchingCandidate && - mark.BlobId.BlobSize() == dataBlobId.BlobSize()) - { - patchingCandidate = mark.BlobId; - ++patchingCandidateChangedBlockCount; - } else if (patchingCandidate == mark.BlobId) { - ++patchingCandidateChangedBlockCount; - } - } - } else { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - zeroBlocks.push_back(blockIndex); - } - } else { - if (dataBlobId) { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - } - - ++blockIndex; - } - - if (patchingCandidate) { - TPartialBlobId targetBlobId( - dataBlobId.Generation(), - dataBlobId.Step(), - patchingCandidate.Channel(), - dataBlobId.BlobSize(), - dataBlobId.Cookie(), - 0); - - TLogoBlobID realTargetBlobId = MakeBlobId( - tabletStorageInfo.TabletID, - targetBlobId); - - ui32 originalChannel = patchingCandidate.Channel(); - ui32 originalGroup = tabletStorageInfo.GroupFor( - originalChannel, - patchingCandidate.Generation()); - Y_ABORT_UNLESS(originalGroup != Max()); - - ui32 patchedChannel = realTargetBlobId.Channel(); - ui32 patchedGroup = tabletStorageInfo.GroupFor( - patchedChannel, - realTargetBlobId.Generation()); - Y_ABORT_UNLESS(patchedGroup != Max()); - - bool found = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement( - MakeBlobId(tabletStorageInfo.TabletID, patchingCandidate), - &realTargetBlobId, - 0xfe0000, - originalGroup, - patchedGroup); - - ui32 blockCount = patchingCandidate.BlobSize() / state.GetBlockSize(); - ui32 patchingBlockCount = - dataBlocksCount - patchingCandidateChangedBlockCount; - ui32 changedPercentage = 100 * patchingBlockCount / blockCount; - - if (found && - (!maxDiffPercentageForBlobPatching || - changedPercentage <= maxDiffPercentageForBlobPatching)) - { - dataBlobId = TPartialBlobId( - dataBlobId.Generation(), - dataBlobId.Step(), - patchingCandidate.Channel(), - dataBlobId.BlobSize(), - realTargetBlobId.Cookie(), - 0); - } else { - patchingCandidate = {}; - } - } - - rangeCompactionInfos.emplace_back( - args.BlockRange, - patchingCandidate, - dataBlobId, - dataBlobSkipMask, - zeroBlobId, - zeroBlobSkipMask, - args.BlobsSkipped, - args.BlocksSkipped, - std::move(blockChecksums), - channelDataKind, - std::move(blobContent), - std::move(zeroBlocks), - std::move(args.AffectedBlobs), - std::move(args.AffectedBlocks)); - - if (!dataBlobId && !zeroBlobId) { - const auto rangeDescr = DescribeRange(args.BlockRange); - Y_ABORT("No blocks in compacted range: %s", rangeDescr.c_str()); - } - Y_ABORT_UNLESS(requests.size() - initialRequestsSize == dataBlocksCount); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -bool TPartitionActor::PrepareCompaction( - const TActorContext& ctx, - TTransactionContext& tx, - TTxPartition::TCompaction& args) -{ - TRequestScope timer(*args.RequestInfo); - TPartitionDatabase db(tx.DB); - - const bool incrementalCompactionEnabled = - Config->GetIncrementalCompactionEnabled() || - Config->IsIncrementalCompactionFeatureEnabled( - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId()); - const bool fullCompaction = - args.CompactionOptions.test(ToBit(ECompactionOption::Full)); - - bool ready = true; - - for (auto& rangeCompaction: args.RangeCompactions) { - PrepareRangeCompaction( - *Config, - incrementalCompactionEnabled, - args.CommitId, - fullCompaction, - ctx, - TabletID(), - ready, - db, - *State, - rangeCompaction, - LogTitle.GetWithTime()); - } - - return ready; -} - -void TPartitionActor::ExecuteCompaction( - const TActorContext& ctx, - TTransactionContext& tx, - TTxPartition::TCompaction& args) -{ - Y_UNUSED(ctx); - Y_UNUSED(tx); - Y_UNUSED(args); -} - -void TPartitionActor::CompleteCompaction( - const TActorContext& ctx, - TTxPartition::TCompaction& args) -{ - TRequestScope timer(*args.RequestInfo); - - RemoveTransaction(*args.RequestInfo); - - for (auto& rangeCompaction: args.RangeCompactions) { - State->RaiseRangeTemperature(rangeCompaction.RangeIdx); - } - - const bool blobPatchingEnabledForCloud = - Config->IsBlobPatchingFeatureEnabled( - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId()); - const bool blobPatchingEnabled = - Config->GetBlobPatchingEnabled() || blobPatchingEnabledForCloud; - - TVector rangeCompactionInfos; - TVector requests; - - const auto mergedBlobThreshold = - PartitionConfig.GetStorageMediaKind() == - NCloud::NProto::STORAGE_MEDIA_SSD - ? 0 - : Config->GetCompactionMergedBlobThresholdHDD(); - for (auto& rangeCompaction: args.RangeCompactions) { - CompleteRangeCompaction( - blobPatchingEnabled, - mergedBlobThreshold, - args.CommitId, - *Info(), - *State, - rangeCompaction, - requests, - rangeCompactionInfos, - Config->GetMaxDiffPercentageForBlobPatching()); - - if (rangeCompactionInfos.back().OriginalBlobId) { - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Selected patching candidate: %s, data blob: %s", - LogTitle.GetWithTime().c_str(), - ToString(rangeCompactionInfos.back().OriginalBlobId).c_str(), - ToString(rangeCompactionInfos.back().DataBlobId).c_str()); - } - } - - const auto compactionType = - args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? - ECompactionType::Forced: - ECompactionType::Tablet; - - auto actor = NCloud::Register( - ctx, - args.RequestInfo, - TabletID(), - PartitionConfig.GetDiskId(), - SelfId(), - State->GetBlockSize(), - State->GetMaxBlocksInBlob(), - Config->GetMaxAffectedBlocksPerCompaction(), - Config->GetComputeDigestForEveryBlockOnCompaction(), - BlockDigestGenerator, - GetBlobStorageAsyncRequestTimeout(), - compactionType, - args.CommitId, - std::move(rangeCompactionInfos), - std::move(requests), - LogTitle.GetChild(GetCycleCount())); - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Partition registered TCompactionActor with id [%lu]", - LogTitle.GetWithTime().c_str(), - actor.ToString().c_str()); - - Actors.Insert(actor); -} - } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp new file mode 100644 index 00000000000..a0eb90ffb2f --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -0,0 +1,628 @@ +#include "part_actor.h" +#include "part_compaction.h" + +#include +#include + +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; + +using namespace NCloud::NStorage; + +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCompactionBlockVisitor final + : public IFreshBlocksIndexVisitor + , public IBlocksIndexVisitor +{ +private: + TTxPartition::TRangeCompaction& Args; + const ui64 MaxCommitId; + +public: + TCompactionBlockVisitor( + TTxPartition::TRangeCompaction& args, + ui64 maxCommitId) + : Args(args) + , MaxCommitId(maxCommitId) + {} + + bool Visit(const TFreshBlock& block) override + { + Args.MarkBlock( + block.Meta.BlockIndex, + block.Meta.CommitId, + block.Content); + return true; + } + + bool KeepTrackOfAffectedBlocks = false; + + bool Visit( + ui32 blockIndex, + ui64 commitId, + const TPartialBlobId& blobId, + ui16 blobOffset) override + { + if (commitId > MaxCommitId) { + return true; + } + + Args.MarkBlock( + blockIndex, + commitId, + blobId, + blobOffset, + KeepTrackOfAffectedBlocks); + return true; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +void PrepareRangeCompaction( + const TStorageConfig& config, + const bool incrementalCompactionEnabled, + const ui64 commitId, + const bool fullCompaction, + const TActorContext& ctx, + const ui64 tabletId, + bool& ready, + TPartitionDatabase& db, + TPartitionState& state, + TTxPartition::TRangeCompaction& args, + const TString& logTitle) +{ + TCompactionBlockVisitor visitor(args, commitId); + state.FindFreshBlocks(visitor, args.Range.BlockRange, commitId); + visitor.KeepTrackOfAffectedBlocks = true; + ready &= state.FindMixedBlocksForCompaction(db, visitor, args.Range.RangeIdx); + visitor.KeepTrackOfAffectedBlocks = false; + ready &= db.FindMergedBlocks( + visitor, + args.Range.BlockRange, + true, // precharge + state.GetMaxBlocksInBlob(), + commitId); + + if (ready && incrementalCompactionEnabled && !fullCompaction) { + THashMap liveBlocks; + for (const auto& m: args.BlockMarks) { + if (m.CommitId && m.BlobId) { + ++liveBlocks[m.BlobId]; + } + } + + TVector blobIds; + blobIds.reserve(liveBlocks.size()); + for (const auto& x: liveBlocks) { + blobIds.push_back(x.first); + } + + Sort( + blobIds, + [&](const TPartialBlobId& l, const TPartialBlobId& r) + { return liveBlocks[l] < liveBlocks[r]; }); + + auto it = blobIds.begin(); + args.BlobsSkipped = blobIds.size(); + ui32 blocks = 0; + + while (it != blobIds.end()) { + const auto bytes = blocks * state.GetBlockSize(); + const auto blobCountOk = + args.BlobsSkipped <= + config.GetMaxSkippedBlobsDuringCompaction(); + const auto byteCountOk = + bytes >= config.GetTargetCompactionBytesPerOp(); + + if (blobCountOk && byteCountOk) { + break; + } + + blocks += liveBlocks[*it]; + --args.BlobsSkipped; + ++it; + } + + // liveBlocks will contain only skipped blobs after this + for (auto it2 = blobIds.begin(); it2 != it; ++it2) { + liveBlocks.erase(*it2); + } + + while (it != blobIds.end()) { + args.BlocksSkipped += liveBlocks[*it]; + ++it; + } + + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Dropping last %u blobs, %u blocks, remaining blobs: %u, " + "blocks: %u", + logTitle.c_str(), + args.BlobsSkipped, + args.BlocksSkipped, + liveBlocks.size(), + blocks); + + THashSet skippedBlockIndices; + + for (const auto& x: liveBlocks) { + auto ab = args.AffectedBlobs.find(x.first); + Y_ABORT_UNLESS(ab != args.AffectedBlobs.end()); + for (const auto blockIndex: ab->second.AffectedBlockIndices) { + // we can actually add extra indices to skippedBlockIndices, + // but it does not cause data corruption - the important thing + // is to ensure that all skipped indices are added, not that + // all non-skipped are preserved + skippedBlockIndices.insert(blockIndex); + } + args.AffectedBlobs.erase(ab); + } + + if (liveBlocks.size()) { + TAffectedBlocks affectedBlocks; + for (const auto& b: args.AffectedBlocks) { + if (!skippedBlockIndices.contains(b.BlockIndex)) { + affectedBlocks.push_back(b); + } + } + args.AffectedBlocks = std::move(affectedBlocks); + + for (auto& m: args.BlockMarks) { + if (liveBlocks.contains(m.BlobId)) { + m = {}; + } + } + } + } + + const ui32 checksumBoundary = + config.GetDiskPrefixLengthWithBlockChecksumsInBlobs() + / state.GetBlockSize(); + args.ChecksumsEnabled = args.Range.BlockRange.Start < checksumBoundary; + + for (auto& kv: args.AffectedBlobs) { + if (db.ReadBlockMask(kv.first, kv.second.BlockMask)) { + Y_ABORT_UNLESS(kv.second.BlockMask.Defined(), + "Could not read block mask for blob: %s", + ToString(MakeBlobId(tabletId, kv.first)).data()); + } else { + ready = false; + } + + if (args.ChecksumsEnabled) { + if (db.ReadBlobMeta(kv.first, kv.second.BlobMeta)) { + Y_ABORT_UNLESS(kv.second.BlobMeta.Defined(), + "Could not read blob meta for blob: %s", + ToString(MakeBlobId(tabletId, kv.first)).data()); + } else { + ready = false; + } + } + } +} + +void CompleteRangeCompaction( + const bool blobPatchingEnabled, + const ui32 mergedBlobThreshold, + const ui64 commitId, + TTabletStorageInfo& tabletStorageInfo, + TPartitionState& state, + TTxPartition::TRangeCompaction& args, + TVector& requests, + TVector& rangeCompactionInfos, + ui32 maxDiffPercentageForBlobPatching) +{ + const EChannelPermissions compactionPermissions = + EChannelPermission::SystemWritesAllowed; + const auto initialRequestsSize = requests.size(); + + // at first we count number of data blocks + size_t dataBlocksCount = 0, zeroBlocksCount = 0; + + for (const auto& mark: args.BlockMarks) { + if (mark.CommitId) { + const bool isFresh = !mark.BlockContent.empty(); + const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); + // there could be fresh block OR merged/mixed block + Y_ABORT_UNLESS(!(isFresh && isMixedOrMerged)); + if (isFresh || isMixedOrMerged) { + ++dataBlocksCount; + } else { + ++zeroBlocksCount; + } + } + } + + // determine the results kind + TPartialBlobId dataBlobId, zeroBlobId; + TBlockMask dataBlobSkipMask, zeroBlobSkipMask; + + auto channelDataKind = EChannelDataKind::Merged; + if (dataBlocksCount) { + ui32 skipped = 0; + for (const auto& mark: args.BlockMarks) { + const bool isFresh = !mark.BlockContent.empty(); + const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); + if (!isFresh && !isMixedOrMerged) { + ++skipped; + } + } + + const auto blobSize = (args.Range.BlockRange.Size() - skipped) * state.GetBlockSize(); + if (blobSize < mergedBlobThreshold) { + channelDataKind = EChannelDataKind::Mixed; + } + dataBlobId = state.GenerateBlobId( + channelDataKind, + compactionPermissions, + commitId, + blobSize, + args.Range.RangeCompactionIndex); + } + + if (zeroBlocksCount) { + // for zeroed region we will write blob without any data + // XXX same commitId used for 2 blobs: data blob and zero blob + // we differentiate between them by storing the last block index in + // MergedBlocksIndex::RangeEnd not for the last block of the processed + // compaction range but for the last actual block that's referenced by + // the corresponding blob + zeroBlobId = state.GenerateBlobId( + channelDataKind, + compactionPermissions, + commitId, + 0, + args.Range.RangeCompactionIndex); + } + + // now build the blob content for all blocks to be written + TBlockBuffer blobContent(TProfilingAllocator::Instance()); + TVector blockChecksums; + TVector zeroBlocks; + + ui32 blockIndex = args.Range.BlockRange.Start; + TPartialBlobId patchingCandidate; + ui32 patchingCandidateChangedBlockCount = 0; + for (auto& mark: args.BlockMarks) { + if (mark.CommitId) { + if (mark.BlockContent) { + Y_ABORT_UNLESS(IsDeletionMarker(mark.BlobId)); + requests.emplace_back( + mark.BlobId, + TActorId(), + mark.BlobOffset, + blockIndex, + blobContent.GetBlocksCount(), + 0, + args.Range.RangeCompactionIndex); + + // fresh block will be written + blobContent.AddBlock({ + mark.BlockContent.data(), + mark.BlockContent.size() + }); + + if (args.ChecksumsEnabled) { + blockChecksums.push_back( + ComputeDefaultDigest(blobContent.GetBlocks().back())); + } + + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); + } + } else if (!IsDeletionMarker(mark.BlobId)) { + const auto proxy = tabletStorageInfo.BSProxyIDForChannel( + mark.BlobId.Channel(), + mark.BlobId.Generation()); + + requests.emplace_back( + mark.BlobId, + proxy, + mark.BlobOffset, + blockIndex, + blobContent.GetBlocksCount(), + tabletStorageInfo.GroupFor( + mark.BlobId.Channel(), + mark.BlobId.Generation()), + args.Range.RangeCompactionIndex); + + // we will read this block later + blobContent.AddBlock(state.GetBlockSize(), char(0)); + + // block checksum is simply moved from the affected blob's meta + if (args.ChecksumsEnabled) { + ui32 blockChecksum = 0; + + auto* affectedBlob = args.AffectedBlobs.FindPtr(mark.BlobId); + Y_DEBUG_ABORT_UNLESS(affectedBlob); + if (affectedBlob) { + if (auto* meta = affectedBlob->BlobMeta.Get()) { + if (mark.BlobOffset < meta->BlockChecksumsSize()) { + blockChecksum = + meta->GetBlockChecksums(mark.BlobOffset); + } + } + } + + blockChecksums.push_back(blockChecksum); + } + + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); + } + + if (blobPatchingEnabled) { + if (!patchingCandidate && + mark.BlobId.BlobSize() == dataBlobId.BlobSize()) + { + patchingCandidate = mark.BlobId; + ++patchingCandidateChangedBlockCount; + } else if (patchingCandidate == mark.BlobId) { + ++patchingCandidateChangedBlockCount; + } + } + } else { + dataBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); + zeroBlocks.push_back(blockIndex); + } + } else { + if (dataBlobId) { + dataBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); + } + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); + } + } + + ++blockIndex; + } + + if (patchingCandidate) { + TPartialBlobId targetBlobId( + dataBlobId.Generation(), + dataBlobId.Step(), + patchingCandidate.Channel(), + dataBlobId.BlobSize(), + dataBlobId.Cookie(), + 0); + + TLogoBlobID realTargetBlobId = MakeBlobId( + tabletStorageInfo.TabletID, + targetBlobId); + + ui32 originalChannel = patchingCandidate.Channel(); + ui32 originalGroup = tabletStorageInfo.GroupFor( + originalChannel, + patchingCandidate.Generation()); + Y_ABORT_UNLESS(originalGroup != Max()); + + ui32 patchedChannel = realTargetBlobId.Channel(); + ui32 patchedGroup = tabletStorageInfo.GroupFor( + patchedChannel, + realTargetBlobId.Generation()); + Y_ABORT_UNLESS(patchedGroup != Max()); + + bool found = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement( + MakeBlobId(tabletStorageInfo.TabletID, patchingCandidate), + &realTargetBlobId, + 0xfe0000, + originalGroup, + patchedGroup); + + ui32 blockCount = patchingCandidate.BlobSize() / state.GetBlockSize(); + ui32 patchingBlockCount = + dataBlocksCount - patchingCandidateChangedBlockCount; + ui32 changedPercentage = 100 * patchingBlockCount / blockCount; + + if (found && + (!maxDiffPercentageForBlobPatching || + changedPercentage <= maxDiffPercentageForBlobPatching)) + { + dataBlobId = TPartialBlobId( + dataBlobId.Generation(), + dataBlobId.Step(), + patchingCandidate.Channel(), + dataBlobId.BlobSize(), + realTargetBlobId.Cookie(), + 0); + } else { + patchingCandidate = {}; + } + } + + rangeCompactionInfos.emplace_back( + args.Range.BlockRange, + patchingCandidate, + dataBlobId, + dataBlobSkipMask, + zeroBlobId, + zeroBlobSkipMask, + args.BlobsSkipped, + args.BlocksSkipped, + std::move(blockChecksums), + channelDataKind, + args.Range.RangeCompactionIndex, + std::move(blobContent), + std::move(zeroBlocks), + std::move(args.AffectedBlobs), + std::move(args.AffectedBlocks)); + + if (!dataBlobId && !zeroBlobId) { + const auto rangeDescr = DescribeRange(args.Range.BlockRange); + Y_ABORT("No blocks in compacted range: %s", rangeDescr.c_str()); + } + Y_ABORT_UNLESS(requests.size() - initialRequestsSize == dataBlocksCount); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TPartitionActor::HandleCompactionTx( + const TEvPartitionPrivate::TEvCompactionTxRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + auto requestInfo = CreateRequestInfo( + ev->Sender, + ev->Cookie, + msg->CallContext); + + TRequestScope timer(*requestInfo); + + LWTRACK( + RequestReceived_Partition, + requestInfo->CallContext->LWOrbit, + "CompactionTx", + requestInfo->CallContext->RequestId); + + AddTransaction(*requestInfo); + + auto tx = CreateTx( + requestInfo, + msg->CommitId, + msg->CompactionOptions, + std::move(msg->Ranges)); + + ui64 minCommitId = State->GetCommitQueue().GetMinCommitId(); + Y_ABORT_UNLESS(minCommitId <= msg->CommitId); + + if (minCommitId == msg->CommitId) { + // start execution + ExecuteTx(ctx, std::move(tx)); + } else { + // delay execution until all previous commits completed + State->AccessCommitQueue().Enqueue(std::move(tx), msg->CommitId); + } +} + +bool TPartitionActor::PrepareCompaction( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TCompaction& args) +{ + TRequestScope timer(*args.RequestInfo); + TPartitionDatabase db(tx.DB); + + const bool incrementalCompactionEnabled = + Config->GetIncrementalCompactionEnabled() || + Config->IsIncrementalCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool fullCompaction = + args.CompactionOptions.test(ToBit(ECompactionOption::Full)); + + bool ready = true; + + for (auto& rangeCompaction: args.RangeCompactions) { + PrepareRangeCompaction( + *Config, + incrementalCompactionEnabled, + args.CommitId, + fullCompaction, + ctx, + TabletID(), + ready, + db, + *State, + rangeCompaction, + LogTitle.GetWithTime()); + } + + return ready; +} + +void TPartitionActor::ExecuteCompaction( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TCompaction& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(tx); + Y_UNUSED(args); +} + +void TPartitionActor::CompleteCompaction( + const TActorContext& ctx, + TTxPartition::TCompaction& args) +{ + RemoveTransaction(*args.RequestInfo); + + for (auto& rangeCompaction: args.RangeCompactions) { + State->RaiseRangeTemperature(rangeCompaction.Range.RangeIdx); + } + + const bool blobPatchingEnabledForDisk = + Config->IsBlobPatchingFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool blobPatchingEnabled = + Config->GetBlobPatchingEnabled() || blobPatchingEnabledForDisk; + + const auto mergedBlobThreshold = + PartitionConfig.GetStorageMediaKind() == + NCloud::NProto::STORAGE_MEDIA_SSD + ? 0 + : Config->GetCompactionMergedBlobThresholdHDD(); + + TVector rangeCompactionInfos; + TVector requests; + + for (ui32 i = 0; i < args.RangeCompactions.size(); ++i) { + CompleteRangeCompaction( + blobPatchingEnabled, + mergedBlobThreshold, + args.CommitId, + *Info(), + *State, + args.RangeCompactions[i], + requests, + rangeCompactionInfos, + Config->GetMaxDiffPercentageForBlobPatching()); + + if (rangeCompactionInfos.back().OriginalBlobId) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Selected patching candidate: %s, data blob: %s", + LogTitle.GetWithTime().c_str(), + ToString(rangeCompactionInfos.back().OriginalBlobId).c_str(), + ToString(rangeCompactionInfos.back().DataBlobId).c_str()); + } + } + + TRequestScope timer(*args.RequestInfo); + + auto response = + std::make_unique( + std::move(rangeCompactionInfos), + std::move(requests)); + + LWTRACK( + ResponseSent_Partition, + args.RequestInfo->CallContext->LWOrbit, + "CompactionTx", + args.RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); +} + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h new file mode 100644 index 00000000000..8ce08c0d30c --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -0,0 +1,125 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +//////////////////////////////////////////////////////////////////////////////// + +struct TRangeCompactionInfo +{ + const TBlockRange32 BlockRange; + const NCloud::TPartialBlobId OriginalBlobId; + const NCloud::TPartialBlobId DataBlobId; + const TBlockMask DataBlobSkipMask; + const NCloud::TPartialBlobId ZeroBlobId; + const TBlockMask ZeroBlobSkipMask; + const ui32 BlobsSkippedByCompaction; + const ui32 BlocksSkippedByCompaction; + const TVector BlockChecksums; + const EChannelDataKind ChannelDataKind; + const ui32 RangeCompactionIndex; + + TGuardedBuffer BlobContent; + TVector ZeroBlocks; + TAffectedBlobs AffectedBlobs; + TAffectedBlocks AffectedBlocks; + TVector UnchangedBlobOffsets; + TArrayHolder Diffs; + ui32 DiffCount = 0; + + TRangeCompactionInfo( + TBlockRange32 blockRange, + NCloud::TPartialBlobId originalBlobId, + NCloud::TPartialBlobId dataBlobId, + TBlockMask dataBlobSkipMask, + NCloud::TPartialBlobId zeroBlobId, + TBlockMask zeroBlobSkipMask, + ui32 blobsSkippedByCompaction, + ui32 blocksSkippedByCompaction, + TVector blockChecksums, + EChannelDataKind channelDataKind, + ui32 rangeCompactionIndex, + TBlockBuffer blobContent, + TVector zeroBlocks, + TAffectedBlobs affectedBlobs, + TAffectedBlocks affectedBlocks) + : BlockRange(blockRange) + , OriginalBlobId(originalBlobId) + , DataBlobId(dataBlobId) + , DataBlobSkipMask(dataBlobSkipMask) + , ZeroBlobId(zeroBlobId) + , ZeroBlobSkipMask(zeroBlobSkipMask) + , BlobsSkippedByCompaction(blobsSkippedByCompaction) + , BlocksSkippedByCompaction(blocksSkippedByCompaction) + , BlockChecksums(std::move(blockChecksums)) + , ChannelDataKind(channelDataKind) + , RangeCompactionIndex(rangeCompactionIndex) + , BlobContent(std::move(blobContent)) + , ZeroBlocks(std::move(zeroBlocks)) + , AffectedBlobs(std::move(affectedBlobs)) + , AffectedBlocks(std::move(affectedBlocks)) + {} +}; + +struct TCompactionReadRequest +{ + NCloud::TPartialBlobId BlobId; + NActors::TActorId Proxy; + ui16 BlobOffset; + ui32 BlockIndex; + size_t IndexInBlobContent; + ui32 GroupId; + ui32 RangeCompactionIndex; + + TCompactionReadRequest( + const NCloud::TPartialBlobId& blobId, + const NActors::TActorId& proxy, + ui16 blobOffset, + ui32 blockIndex, + size_t indexInBlobContent, + ui32 groupId, + ui32 rangeCompactionIndex) + : BlobId(blobId) + , Proxy(proxy) + , BlobOffset(blobOffset) + , BlockIndex(blockIndex) + , IndexInBlobContent(indexInBlobContent) + , GroupId(groupId) + , RangeCompactionIndex(rangeCompactionIndex) + {} +}; + +struct TCompactionRange { + const ui32 RangeIdx; + TBlockRange32 BlockRange; + const ui32 RangeCompactionIndex; // Index of the range in its batch for + // batch compaction. + + TCompactionRange( + ui32 rangeIdx, + TBlockRange32 blockRange, + ui32 rangeCompactionIndex) + : RangeIdx(rangeIdx) + , BlockRange(blockRange) + , RangeCompactionIndex(rangeCompactionIndex) + {} +}; + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 46db9325fee..319ef57c388 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -1,5 +1,6 @@ #pragma once +#include "part_compaction.h" #include "public.h" #include @@ -11,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -115,31 +117,6 @@ struct TWriteFreshBlocksRequest //////////////////////////////////////////////////////////////////////////////// -struct TAffectedBlob -{ - TVector Offsets; - TMaybe BlockMask; - TVector AffectedBlockIndices; - - // Filled only if a flag is set. BlobMeta is needed only to do some extra - // consistency checks. - TMaybe BlobMeta; -}; - -using TAffectedBlobs = THashMap; - -//////////////////////////////////////////////////////////////////////////////// - -struct TAffectedBlock -{ - ui32 BlockIndex = 0; - ui64 CommitId = 0; -}; - -using TAffectedBlocks = TVector; - -//////////////////////////////////////////////////////////////////////////////// - struct TBlobCompactionInfo { const ui32 BlobsSkippedByCompaction = 0; @@ -169,6 +146,7 @@ using TFlushedCommitIds = TVector; xxx(AddFreshBlocks, __VA_ARGS__) \ xxx(Flush, __VA_ARGS__) \ xxx(Compaction, __VA_ARGS__) \ + xxx(CompactionTx, __VA_ARGS__) \ xxx(MetadataRebuildUsedBlocks, __VA_ARGS__) \ xxx(MetadataRebuildBlockCount, __VA_ARGS__) \ xxx(ScanDiskBatch, __VA_ARGS__) \ @@ -483,6 +461,46 @@ struct TEvPartitionPrivate { }; + // + // CompactionTx + // + + struct TCompactionTxRequest + { + const ui64 CommitId; + const ui32 RangeCompactionIndex; + const TCompactionOptions CompactionOptions; + TVector Ranges; + + TCompactionTxRequest() = default; + + TCompactionTxRequest( + ui64 commitId, + ui32 rangeCompactionIndex, + TCompactionOptions compactionOptions, + TVector ranges) + : CommitId(commitId) + , RangeCompactionIndex(rangeCompactionIndex) + , CompactionOptions(compactionOptions) + , Ranges(std::move(ranges)) + {} + }; + + struct TCompactionTxResponse + { + TVector RangeCompactionInfos; + TVector Requests; + + TCompactionTxResponse() = default; + + TCompactionTxResponse( + TVector rangeCompactionInfos, + TVector requests) + : RangeCompactionInfos(std::move(rangeCompactionInfos)) + , Requests(std::move(requests)) + {} + }; + // // MetadataRebuildUsedBlocks // diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index f72487d64b7..6eef2389ee0 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -2,6 +2,7 @@ #include "public.h" +#include "part_compaction.h" #include "part_events_private.h" #include @@ -354,8 +355,7 @@ struct TTxPartition struct TRangeCompaction { - const ui32 RangeIdx; - const TBlockRange32 BlockRange; + const TCompactionRange Range; struct TBlockMark { @@ -372,10 +372,9 @@ struct TTxPartition ui32 BlocksSkipped = 0; bool ChecksumsEnabled = false; - TRangeCompaction(ui32 rangeIdx, const TBlockRange32& blockRange) - : RangeIdx(rangeIdx) - , BlockRange(blockRange) - , BlockMarks(blockRange.Size()) + TRangeCompaction(const TCompactionRange& range) + : Range(range) + , BlockMarks(range.BlockRange.Size()) {} void Clear() @@ -390,8 +389,8 @@ struct TTxPartition TBlockMark& GetBlockMark(ui32 blockIndex) { - Y_DEBUG_ABORT_UNLESS(BlockRange.Contains(blockIndex)); - return BlockMarks[blockIndex - BlockRange.Start]; + Y_DEBUG_ABORT_UNLESS(Range.BlockRange.Contains(blockIndex)); + return BlockMarks[blockIndex - Range.BlockRange.Start]; } void MarkBlock( @@ -445,14 +444,14 @@ struct TTxPartition TRequestInfoPtr requestInfo, ui64 commitId, TCompactionOptions compactionOptions, - const TVector>& ranges) + const TVector& ranges) : RequestInfo(std::move(requestInfo)) , CommitId(commitId) , CompactionOptions(compactionOptions) { RangeCompactions.reserve(ranges.size()); for (const auto& range: ranges) { - RangeCompactions.emplace_back(range.first, range.second); + RangeCompactions.emplace_back(range); } } diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index c45257f4e4a..159098d1864 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -8443,11 +8443,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT(suicideHappened); } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponCompaction) + void DoShouldProcessMultipleRangesUponCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetCompactionRangeCountPerRun(3); config.SetSSDMaxBlobsPerRange(999); config.SetHDDMaxBlobsPerRange(999); @@ -8542,6 +8545,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } + Y_UNIT_TEST(ShouldProcessMultipleRangesUponCompaction) + { + DoShouldProcessMultipleRangesUponCompaction(true); + DoShouldProcessMultipleRangesUponCompaction(false); + } + Y_UNIT_TEST(ShouldPatchBlobsDuringCompaction) { auto config = DefaultConfig(); @@ -10006,11 +10015,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) ui32 increasingPercentageThreshold, ui32 decreasingPercentageThreshold, ui32 compactionRangeCountPerRun, - ui32 maxCompactionRangeCountPerRun = 10) + ui32 maxCompactionRangeCountPerRun = 10, + bool splitTxInBatchCompactionEnabled = true) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(diskGarbageThreshold); config.SetCompactionRangeGarbageThreshold(rangeGarbageThreshold); @@ -10107,7 +10119,9 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // blobs percentage: (10 - 9 / 10) * 100 = 10 // 10 < 100, so batch size should decrement CheckIncrementAndDecrementCompactionPerRun( - 2, 9, 999999, 999999, 99999, 7, 200, 100, 1); + 2, 9, 999999, 999999, 99999, 7, 200, 100, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 9, 999999, 999999, 99999, 7, 200, 100, 1, 10, false); } Y_UNIT_TEST(ShouldChangeBatchSizeDueToBlocksPerDisk) @@ -10120,11 +10134,15 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // 9 > 8, so should increment and compact 2 ranges CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 203, 99999, 5, 8, 5, 2); + 1, 1000, 99999, 203, 99999, 5, 8, 5, 2, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 203, 99999, 5, 8, 5, 2, 10, false); // 9 < 15, so should decrement and compact only 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 99999, 203, 99999, 7, 30, 15, 1); + 2, 1000, 99999, 203, 99999, 7, 30, 15, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 99999, 203, 99999, 7, 30, 15, 1, 10, false); } Y_UNIT_TEST(ShouldChangeBatchSizeDueToBlocksPerRangeCount) @@ -10136,29 +10154,40 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // 7 > 6, so should increment and compact 2 ranges CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 9999, 280, 5, 6, 4, 2); + 1, 1000, 99999, 9999, 280, 5, 6, 4, 2, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 9999, 280, 5, 6, 4, 2, 10, false); // 7 > 6, but should compact only 1 range due to maxRangeCountPerRun CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1); + 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1, false); // 7 < 8, so should decrement and compact only 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 99999, 9999, 280, 7, 30, 8, 1); + 2, 1000, 99999, 9999, 280, 7, 30, 8, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 99999, 9999, 280, 7, 30, 8, 1, 10, false); } Y_UNIT_TEST(ShouldDecrementBatchSizeWhenBlobsPerRangeCountIsSmall) { // CompactionScore in range3: 4 - 4 + eps // 100 * (eps - 0) / 1024 < 3, so should decrement and compact 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 4, 9999, 9999, 7, 50, 10, 1); + 2, 1000, 4, 9999, 9999, 7, 50, 10, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 4, 9999, 9999, 7, 50, 10, 1, 10, false); } - Y_UNIT_TEST(ShouldRespectCompactionCountPerRunChangingPeriod) + void DoShouldRespectCompactionCountPerRunChangingPeriod( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(99999); config.SetCompactionRangeGarbageThreshold(280); @@ -10265,6 +10294,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } + Y_UNIT_TEST(ShouldRespectCompactionCountPerRunChangingPeriod) + { + DoShouldRespectCompactionCountPerRunChangingPeriod(true); + DoShouldRespectCompactionCountPerRunChangingPeriod(false); + } + template void DoShouldReportLongRunningBlobOperations( FSend sendRequest, @@ -11237,10 +11272,13 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT_VALUES_EQUAL(E_TRY_AGAIN, response->GetStatus()); } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponGarbageCompaction) + void DoShouldProcessMultipleRangesUponGarbageCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetGarbageCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(20); @@ -11336,10 +11374,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponForceCompaction) + Y_UNIT_TEST(ShouldProcessMultipleRangesUponGarbageCompaction) + { + DoShouldProcessMultipleRangesUponGarbageCompaction(true); + DoShouldProcessMultipleRangesUponGarbageCompaction(false); + } + + void DoShouldProcessMultipleRangesUponForceCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(false); @@ -11440,10 +11487,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - Y_UNIT_TEST(ShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges) + Y_UNIT_TEST(ShouldProcessMultipleRangesUponForceCompaction) + { + DoShouldProcessMultipleRangesUponForceCompaction(true); + DoShouldProcessMultipleRangesUponForceCompaction(false); + } + + void DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(false); @@ -11473,6 +11529,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT_EQUAL(5, compactionStatus->Record.GetTotal()); } + Y_UNIT_TEST(ShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges) + { + DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges(true); + DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges(false); + } + Y_UNIT_TEST(ShouldBatchSmallWritesToFreshChannelIfThresholdNotExceeded) { NProto::TStorageServiceConfig config; @@ -11838,10 +11900,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - void TestForcedCompaction(ui32 rangesPerRun) + void TestForcedCompaction( + ui32 rangesPerRun, + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(rangesPerRun); config.SetV1GarbageCompactionEnabled(false); config.SetWriteBlobThreshold(15_KB); @@ -11960,12 +12026,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) Y_UNIT_TEST(ShouldCompactSeveralBlobsInTheSameRangeWithOneRangePerRun) { - TestForcedCompaction(1); + TestForcedCompaction(1, true); + TestForcedCompaction(1, false); } Y_UNIT_TEST(ShouldCompactSeveralBlobsInTheSameRangeWithSeveralRangesPerRun) { - TestForcedCompaction(10); + TestForcedCompaction(10, true); + TestForcedCompaction(10, false); } Y_UNIT_TEST(ShouldWriteToMixedChannelOnHddIfThresholdExceeded) diff --git a/cloud/blockstore/libs/storage/partition/ya.make b/cloud/blockstore/libs/storage/partition/ya.make index a983d6e28cc..56668bdd5f7 100644 --- a/cloud/blockstore/libs/storage/partition/ya.make +++ b/cloud/blockstore/libs/storage/partition/ya.make @@ -14,6 +14,7 @@ SRCS( part_actor_cleanup.cpp part_actor_collectgarbage.cpp part_actor_compaction.cpp + part_actor_compactiontx.cpp part_actor_compactrange.cpp part_actor_confirmblobs.cpp part_actor_deletegarbage.cpp