From 27bea10f0909b9294e9b041122e7c9fc6b5e8910 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Mon, 2 Feb 2026 13:36:02 +0300 Subject: [PATCH 01/15] DRAFT --- .../libs/storage/partition/part_actor.cpp | 2 + .../libs/storage/partition/part_actor.h | 9 ++ .../partition/part_actor_compaction.cpp | 119 +++++------------- .../libs/storage/partition/part_compaction.h | 107 ++++++++++++++++ .../storage/partition/part_events_private.h | 25 ++++ 5 files changed, 171 insertions(+), 91 deletions(-) create mode 100644 cloud/blockstore/libs/storage/partition/part_compaction.h diff --git a/cloud/blockstore/libs/storage/partition/part_actor.cpp b/cloud/blockstore/libs/storage/partition/part_actor.cpp index d0853230137..00076ac5a4a 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor.cpp @@ -1043,6 +1043,7 @@ STFUNC(TPartitionActor::StateWork) IgnoreFunc(TEvPartitionPrivate::TEvCleanupResponse); IgnoreFunc(TEvPartitionPrivate::TEvCollectGarbageResponse); IgnoreFunc(TEvPartitionPrivate::TEvCompactionResponse); + IgnoreFunc(TEvPartitionPrivate::TEvCompactionTxResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildUsedBlocksResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse); IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); @@ -1106,6 +1107,7 @@ STFUNC(TPartitionActor::StateZombie) IgnoreFunc(TEvPartitionPrivate::TEvCleanupResponse); IgnoreFunc(TEvPartitionPrivate::TEvCollectGarbageResponse); IgnoreFunc(TEvPartitionPrivate::TEvCompactionResponse); + IgnoreFunc(TEvPartitionPrivate::TEvCompactionTxResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildUsedBlocksResponse); IgnoreFunc(TEvPartitionPrivate::TEvMetadataRebuildBlockCountResponse); IgnoreFunc(TEvPartitionPrivate::TEvFlushResponse); diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index 10de02782af..132f31e7e4c 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -2,6 +2,7 @@ #include "public.h" +#include "part_compaction.h" #include "part_counters.h" #include "part_events_private.h" #include "part_state.h" @@ -159,6 +160,14 @@ class TPartitionActor final TDeque PendingForcedCompactionRequests; THashMap CompletedForcedCompactionRequests; + // TODO:_ move it to separate actor! + TRequestInfoPtr CompactionRequestInfo; + ui32 AwaitedCompactionTxResponses = 0; + // Compaction infos and requests, waiting until all transactions of the + // current batch are completed + TVector PendingRangeCompactionInfos; + TVector PendingCompactionRequests; + NBlobMetrics::TBlobLoadMetrics PrevMetrics; NBlobMetrics::TBlobLoadMetrics OverlayMetrics; diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index f2676a22549..674a8d012cf 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -1,4 +1,5 @@ #include "part_actor.h" +#include "part_compaction.h" #include #include @@ -30,89 +31,11 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -struct TRangeCompactionInfo -{ - const TBlockRange32 BlockRange; - const TPartialBlobId OriginalBlobId; - const TPartialBlobId DataBlobId; - const TBlockMask DataBlobSkipMask; - const TPartialBlobId ZeroBlobId; - const TBlockMask ZeroBlobSkipMask; - const ui32 BlobsSkippedByCompaction; - const ui32 BlocksSkippedByCompaction; - const TVector BlockChecksums; - const EChannelDataKind ChannelDataKind; - - TGuardedBuffer BlobContent; - TVector ZeroBlocks; - TAffectedBlobs AffectedBlobs; - TAffectedBlocks AffectedBlocks; - TVector UnchangedBlobOffsets; - TArrayHolder Diffs; - ui32 DiffCount = 0; - - TRangeCompactionInfo( - TBlockRange32 blockRange, - TPartialBlobId originalBlobId, - TPartialBlobId dataBlobId, - TBlockMask dataBlobSkipMask, - TPartialBlobId zeroBlobId, - TBlockMask zeroBlobSkipMask, - ui32 blobsSkippedByCompaction, - ui32 blocksSkippedByCompaction, - TVector blockChecksums, - EChannelDataKind channelDataKind, - TBlockBuffer blobContent, - TVector zeroBlocks, - TAffectedBlobs affectedBlobs, - TAffectedBlocks affectedBlocks) - : BlockRange(blockRange) - , OriginalBlobId(originalBlobId) - , DataBlobId(dataBlobId) - , DataBlobSkipMask(dataBlobSkipMask) - , ZeroBlobId(zeroBlobId) - , ZeroBlobSkipMask(zeroBlobSkipMask) - , BlobsSkippedByCompaction(blobsSkippedByCompaction) - , BlocksSkippedByCompaction(blocksSkippedByCompaction) - , BlockChecksums(std::move(blockChecksums)) - , ChannelDataKind(channelDataKind) - , BlobContent(std::move(blobContent)) - , ZeroBlocks(std::move(zeroBlocks)) - , AffectedBlobs(std::move(affectedBlobs)) - , AffectedBlocks(std::move(affectedBlocks)) - {} -}; - class TCompactionActor final : public TActorBootstrapped { public: - struct TRequest - { - TPartialBlobId BlobId; - TActorId Proxy; - ui16 BlobOffset; - ui32 BlockIndex; - size_t IndexInBlobContent; - ui32 GroupId; - ui32 RangeCompactionIndex; - - TRequest(const TPartialBlobId& blobId, - const TActorId& proxy, - ui16 blobOffset, - ui32 blockIndex, - size_t indexInBlobContent, - ui32 groupId, - ui32 rangeCompactionIndex) - : BlobId(blobId) - , Proxy(proxy) - , BlobOffset(blobOffset) - , BlockIndex(blockIndex) - , IndexInBlobContent(indexInBlobContent) - , GroupId(groupId) - , RangeCompactionIndex(rangeCompactionIndex) - {} - }; + using TRequest = TCompactionRequest; struct TBatchRequest { @@ -1975,6 +1898,15 @@ void CompleteRangeCompaction( //////////////////////////////////////////////////////////////////////////////// +void TPartitionActor::HandleCompactionTx( + const TEvPartitionPrivate::TEvCompactionTxRequest::TPtr& ev, + const TActorContext& ctx) +{ + // TODO:_ ??? +} + +//////////////////////////////////////////////////////////////////////////////// + bool TPartitionActor::PrepareCompaction( const TActorContext& ctx, TTransactionContext& tx, @@ -2042,9 +1974,6 @@ void TPartitionActor::CompleteCompaction( const bool blobPatchingEnabled = Config->GetBlobPatchingEnabled() || blobPatchingEnabledForCloud; - TVector rangeCompactionInfos; - TVector requests; - const auto mergedBlobThreshold = PartitionConfig.GetStorageMediaKind() == NCloud::NProto::STORAGE_MEDIA_SSD @@ -2058,21 +1987,25 @@ void TPartitionActor::CompleteCompaction( *Info(), *State, rangeCompaction, - requests, - rangeCompactionInfos, + PendingCompactionRequests, + PendingRangeCompactionInfos, Config->GetMaxDiffPercentageForBlobPatching()); - if (rangeCompactionInfos.back().OriginalBlobId) { + if (PendingRangeCompactionInfos.back().OriginalBlobId) { LOG_DEBUG( ctx, TBlockStoreComponents::PARTITION, "%s Selected patching candidate: %s, data blob: %s", LogTitle.GetWithTime().c_str(), - ToString(rangeCompactionInfos.back().OriginalBlobId).c_str(), - ToString(rangeCompactionInfos.back().DataBlobId).c_str()); + ToString(PendingRangeCompactionInfos.back().OriginalBlobId).c_str(), + ToString(PendingRangeCompactionInfos.back().DataBlobId).c_str()); } } + if (--AwaitedCompactionTxResponses) { + return; + } + const auto compactionType = args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? ECompactionType::Forced: @@ -2092,15 +2025,19 @@ void TPartitionActor::CompleteCompaction( GetBlobStorageAsyncRequestTimeout(), compactionType, args.CommitId, - std::move(rangeCompactionInfos), - std::move(requests), + std::move(PendingRangeCompactionInfos), + std::move(PendingCompactionRequests), LogTitle.GetChild(GetCycleCount())); LOG_DEBUG( ctx, TBlockStoreComponents::PARTITION, - "%s Partition registered TCompactionActor with id [%lu]", + "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", LogTitle.GetWithTime().c_str(), - actor.ToString().c_str()); + actor.ToString().c_str(), + args.CommitId); + + PendingRangeCompactionInfos.clear(); + PendingCompactionRequests.clear(); Actors.Insert(actor); } diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h new file mode 100644 index 00000000000..50bb2e6d5ae --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -0,0 +1,107 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +//////////////////////////////////////////////////////////////////////////////// + +struct TRangeCompactionInfo +{ + const TBlockRange32 BlockRange; + const NCloud::TPartialBlobId OriginalBlobId; + const NCloud::TPartialBlobId DataBlobId; + const TBlockMask DataBlobSkipMask; + const NCloud::TPartialBlobId ZeroBlobId; + const TBlockMask ZeroBlobSkipMask; + const ui32 BlobsSkippedByCompaction; + const ui32 BlocksSkippedByCompaction; + const TVector BlockChecksums; + const EChannelDataKind ChannelDataKind; + + TGuardedBuffer BlobContent; + TVector ZeroBlocks; + TAffectedBlobs AffectedBlobs; + TAffectedBlocks AffectedBlocks; + TVector UnchangedBlobOffsets; + TArrayHolder Diffs; + ui32 DiffCount = 0; + + TRangeCompactionInfo( + TBlockRange32 blockRange, + NCloud::TPartialBlobId originalBlobId, + NCloud::TPartialBlobId dataBlobId, + TBlockMask dataBlobSkipMask, + NCloud::TPartialBlobId zeroBlobId, + TBlockMask zeroBlobSkipMask, + ui32 blobsSkippedByCompaction, + ui32 blocksSkippedByCompaction, + TVector blockChecksums, + EChannelDataKind channelDataKind, + TBlockBuffer blobContent, + TVector zeroBlocks, + TAffectedBlobs affectedBlobs, + TAffectedBlocks affectedBlocks) + : BlockRange(blockRange) + , OriginalBlobId(originalBlobId) + , DataBlobId(dataBlobId) + , DataBlobSkipMask(dataBlobSkipMask) + , ZeroBlobId(zeroBlobId) + , ZeroBlobSkipMask(zeroBlobSkipMask) + , BlobsSkippedByCompaction(blobsSkippedByCompaction) + , BlocksSkippedByCompaction(blocksSkippedByCompaction) + , BlockChecksums(std::move(blockChecksums)) + , ChannelDataKind(channelDataKind) + , BlobContent(std::move(blobContent)) + , ZeroBlocks(std::move(zeroBlocks)) + , AffectedBlobs(std::move(affectedBlobs)) + , AffectedBlocks(std::move(affectedBlocks)) + {} +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TCompactionRequest +{ + NCloud::TPartialBlobId BlobId; + NActors::TActorId Proxy; + ui16 BlobOffset; + ui32 BlockIndex; + size_t IndexInBlobContent; + ui32 GroupId; + ui32 RangeCompactionIndex; + + TCompactionRequest( + const NCloud::TPartialBlobId& blobId, + const NActors::TActorId& proxy, + ui16 blobOffset, + ui32 blockIndex, + size_t indexInBlobContent, + ui32 groupId, + ui32 rangeCompactionIndex) + : BlobId(blobId) + , Proxy(proxy) + , BlobOffset(blobOffset) + , BlockIndex(blockIndex) + , IndexInBlobContent(indexInBlobContent) + , GroupId(groupId) + , RangeCompactionIndex(rangeCompactionIndex) + {} +}; + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 46db9325fee..0a69faafa66 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -169,6 +169,7 @@ using TFlushedCommitIds = TVector; xxx(AddFreshBlocks, __VA_ARGS__) \ xxx(Flush, __VA_ARGS__) \ xxx(Compaction, __VA_ARGS__) \ + xxx(CompactionTx, __VA_ARGS__) \ xxx(MetadataRebuildUsedBlocks, __VA_ARGS__) \ xxx(MetadataRebuildBlockCount, __VA_ARGS__) \ xxx(ScanDiskBatch, __VA_ARGS__) \ @@ -483,6 +484,30 @@ struct TEvPartitionPrivate { }; + // + // CompactionTx + // + + struct TCompactionTxRequest + { + const ui64 CommitId; + const TCompactionOptions CompactionOptions; + TVector> Ranges; + + TCompactionTxRequest( + ui64 commitId, + TCompactionOptions compactionOptions, + TVector> ranges) + : CommitId(commitId) + , CompactionOptions(compactionOptions) + , Ranges(std::move(ranges)) + {} + }; + + struct TCompactionTxResponse + { + }; + // // MetadataRebuildUsedBlocks // From 998f07ee6bf7cff4d8dd700508b12875770096ca Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 5 Feb 2026 20:16:56 +0300 Subject: [PATCH 02/15] DRAFT 2 add part_actor_compactiontx.cpp --- .../partition/part_actor_compaction.cpp | 597 ----------------- .../partition/part_actor_compactiontx.cpp | 623 ++++++++++++++++++ .../blockstore/libs/storage/partition/ya.make | 1 + 3 files changed, 624 insertions(+), 597 deletions(-) create mode 100644 cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 674a8d012cf..a676aedb6e9 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -940,55 +940,6 @@ STFUNC(TCompactionActor::StateWork) //////////////////////////////////////////////////////////////////////////////// -class TCompactionBlockVisitor final - : public IFreshBlocksIndexVisitor - , public IBlocksIndexVisitor -{ -private: - TTxPartition::TRangeCompaction& Args; - const ui64 MaxCommitId; - -public: - TCompactionBlockVisitor( - TTxPartition::TRangeCompaction& args, - ui64 maxCommitId) - : Args(args) - , MaxCommitId(maxCommitId) - {} - - bool Visit(const TFreshBlock& block) override - { - Args.MarkBlock( - block.Meta.BlockIndex, - block.Meta.CommitId, - block.Content); - return true; - } - - bool KeepTrackOfAffectedBlocks = false; - - bool Visit( - ui32 blockIndex, - ui64 commitId, - const TPartialBlobId& blobId, - ui16 blobOffset) override - { - if (commitId > MaxCommitId) { - return true; - } - - Args.MarkBlock( - blockIndex, - commitId, - blobId, - blobOffset, - KeepTrackOfAffectedBlocks); - return true; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - ui32 GetPercentage(ui64 total, ui64 real) { const double p = (real - total) * 100. / Max(total, 1UL); @@ -1494,552 +1445,4 @@ void TPartitionActor::HandleCompactionCompleted( ProcessCommitQueue(ctx); } -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -void PrepareRangeCompaction( - const TStorageConfig& config, - const bool incrementalCompactionEnabled, - const ui64 commitId, - const bool fullCompaction, - const TActorContext& ctx, - const ui64 tabletId, - bool& ready, - TPartitionDatabase& db, - TPartitionState& state, - TTxPartition::TRangeCompaction& args, - const TString& logTitle) -{ - TCompactionBlockVisitor visitor(args, commitId); - state.FindFreshBlocks(visitor, args.BlockRange, commitId); - visitor.KeepTrackOfAffectedBlocks = true; - ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx); - visitor.KeepTrackOfAffectedBlocks = false; - ready &= db.FindMergedBlocks( - visitor, - args.BlockRange, - true, // precharge - state.GetMaxBlocksInBlob(), - commitId); - - if (ready && incrementalCompactionEnabled && !fullCompaction) { - THashMap liveBlocks; - for (const auto& m: args.BlockMarks) { - if (m.CommitId && m.BlobId) { - ++liveBlocks[m.BlobId]; - } - } - - TVector blobIds; - blobIds.reserve(liveBlocks.size()); - for (const auto& x: liveBlocks) { - blobIds.push_back(x.first); - } - - Sort( - blobIds, - [&](const TPartialBlobId& l, const TPartialBlobId& r) - { return liveBlocks[l] < liveBlocks[r]; }); - - auto it = blobIds.begin(); - args.BlobsSkipped = blobIds.size(); - ui32 blocks = 0; - - while (it != blobIds.end()) { - const auto bytes = blocks * state.GetBlockSize(); - const auto blobCountOk = - args.BlobsSkipped <= - config.GetMaxSkippedBlobsDuringCompaction(); - const auto byteCountOk = - bytes >= config.GetTargetCompactionBytesPerOp(); - - if (blobCountOk && byteCountOk) { - break; - } - - blocks += liveBlocks[*it]; - --args.BlobsSkipped; - ++it; - } - - // liveBlocks will contain only skipped blobs after this - for (auto it2 = blobIds.begin(); it2 != it; ++it2) { - liveBlocks.erase(*it2); - } - - while (it != blobIds.end()) { - args.BlocksSkipped += liveBlocks[*it]; - ++it; - } - - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Dropping last %u blobs, %u blocks, remaining blobs: %u, " - "blocks: %u", - logTitle.c_str(), - args.BlobsSkipped, - args.BlocksSkipped, - liveBlocks.size(), - blocks); - - THashSet skippedBlockIndices; - - for (const auto& x: liveBlocks) { - auto ab = args.AffectedBlobs.find(x.first); - Y_ABORT_UNLESS(ab != args.AffectedBlobs.end()); - for (const auto blockIndex: ab->second.AffectedBlockIndices) { - // we can actually add extra indices to skippedBlockIndices, - // but it does not cause data corruption - the important thing - // is to ensure that all skipped indices are added, not that - // all non-skipped are preserved - skippedBlockIndices.insert(blockIndex); - } - args.AffectedBlobs.erase(ab); - } - - if (liveBlocks.size()) { - TAffectedBlocks affectedBlocks; - for (const auto& b: args.AffectedBlocks) { - if (!skippedBlockIndices.contains(b.BlockIndex)) { - affectedBlocks.push_back(b); - } - } - args.AffectedBlocks = std::move(affectedBlocks); - - for (auto& m: args.BlockMarks) { - if (liveBlocks.contains(m.BlobId)) { - m = {}; - } - } - } - } - - const ui32 checksumBoundary = - config.GetDiskPrefixLengthWithBlockChecksumsInBlobs() - / state.GetBlockSize(); - args.ChecksumsEnabled = args.BlockRange.Start < checksumBoundary; - - for (auto& kv: args.AffectedBlobs) { - if (db.ReadBlockMask(kv.first, kv.second.BlockMask)) { - Y_ABORT_UNLESS(kv.second.BlockMask.Defined(), - "Could not read block mask for blob: %s", - ToString(MakeBlobId(tabletId, kv.first)).data()); - } else { - ready = false; - } - - if (args.ChecksumsEnabled) { - if (db.ReadBlobMeta(kv.first, kv.second.BlobMeta)) { - Y_ABORT_UNLESS(kv.second.BlobMeta.Defined(), - "Could not read blob meta for blob: %s", - ToString(MakeBlobId(tabletId, kv.first)).data()); - } else { - ready = false; - } - } - } -} - -void CompleteRangeCompaction( - const bool blobPatchingEnabled, - const ui32 mergedBlobThreshold, - const ui64 commitId, - TTabletStorageInfo& tabletStorageInfo, - TPartitionState& state, - TTxPartition::TRangeCompaction& args, - TVector& requests, - TVector& rangeCompactionInfos, - ui32 maxDiffPercentageForBlobPatching) -{ - const EChannelPermissions compactionPermissions = - EChannelPermission::SystemWritesAllowed; - const auto initialRequestsSize = requests.size(); - - // at first we count number of data blocks - size_t dataBlocksCount = 0, zeroBlocksCount = 0; - - for (const auto& mark: args.BlockMarks) { - if (mark.CommitId) { - const bool isFresh = !mark.BlockContent.empty(); - const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); - // there could be fresh block OR merged/mixed block - Y_ABORT_UNLESS(!(isFresh && isMixedOrMerged)); - if (isFresh || isMixedOrMerged) { - ++dataBlocksCount; - } else { - ++zeroBlocksCount; - } - } - } - - // determine the results kind - TPartialBlobId dataBlobId, zeroBlobId; - TBlockMask dataBlobSkipMask, zeroBlobSkipMask; - - auto channelDataKind = EChannelDataKind::Merged; - if (dataBlocksCount) { - ui32 skipped = 0; - for (const auto& mark: args.BlockMarks) { - const bool isFresh = !mark.BlockContent.empty(); - const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); - if (!isFresh && !isMixedOrMerged) { - ++skipped; - } - } - - const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize(); - if (blobSize < mergedBlobThreshold) { - channelDataKind = EChannelDataKind::Mixed; - } - dataBlobId = state.GenerateBlobId( - channelDataKind, - compactionPermissions, - commitId, - blobSize, - rangeCompactionInfos.size()); - } - - if (zeroBlocksCount) { - // for zeroed region we will write blob without any data - // XXX same commitId used for 2 blobs: data blob and zero blob - // we differentiate between them by storing the last block index in - // MergedBlocksIndex::RangeEnd not for the last block of the processed - // compaction range but for the last actual block that's referenced by - // the corresponding blob - zeroBlobId = state.GenerateBlobId( - channelDataKind, - compactionPermissions, - commitId, - 0, - rangeCompactionInfos.size()); - } - - // now build the blob content for all blocks to be written - TBlockBuffer blobContent(TProfilingAllocator::Instance()); - TVector blockChecksums; - TVector zeroBlocks; - - ui32 blockIndex = args.BlockRange.Start; - TPartialBlobId patchingCandidate; - ui32 patchingCandidateChangedBlockCount = 0; - for (auto& mark: args.BlockMarks) { - if (mark.CommitId) { - if (mark.BlockContent) { - Y_ABORT_UNLESS(IsDeletionMarker(mark.BlobId)); - requests.emplace_back( - mark.BlobId, - TActorId(), - mark.BlobOffset, - blockIndex, - blobContent.GetBlocksCount(), - 0, - rangeCompactionInfos.size()); - - // fresh block will be written - blobContent.AddBlock({ - mark.BlockContent.data(), - mark.BlockContent.size() - }); - - if (args.ChecksumsEnabled) { - blockChecksums.push_back( - ComputeDefaultDigest(blobContent.GetBlocks().back())); - } - - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - } else if (!IsDeletionMarker(mark.BlobId)) { - const auto proxy = tabletStorageInfo.BSProxyIDForChannel( - mark.BlobId.Channel(), - mark.BlobId.Generation()); - - requests.emplace_back( - mark.BlobId, - proxy, - mark.BlobOffset, - blockIndex, - blobContent.GetBlocksCount(), - tabletStorageInfo.GroupFor( - mark.BlobId.Channel(), - mark.BlobId.Generation()), - rangeCompactionInfos.size()); - - // we will read this block later - blobContent.AddBlock(state.GetBlockSize(), char(0)); - - // block checksum is simply moved from the affected blob's meta - if (args.ChecksumsEnabled) { - ui32 blockChecksum = 0; - - auto* affectedBlob = args.AffectedBlobs.FindPtr(mark.BlobId); - Y_DEBUG_ABORT_UNLESS(affectedBlob); - if (affectedBlob) { - if (auto* meta = affectedBlob->BlobMeta.Get()) { - if (mark.BlobOffset < meta->BlockChecksumsSize()) { - blockChecksum = - meta->GetBlockChecksums(mark.BlobOffset); - } - } - } - - blockChecksums.push_back(blockChecksum); - } - - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - - if (blobPatchingEnabled) { - if (!patchingCandidate && - mark.BlobId.BlobSize() == dataBlobId.BlobSize()) - { - patchingCandidate = mark.BlobId; - ++patchingCandidateChangedBlockCount; - } else if (patchingCandidate == mark.BlobId) { - ++patchingCandidateChangedBlockCount; - } - } - } else { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - zeroBlocks.push_back(blockIndex); - } - } else { - if (dataBlobId) { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); - } - } - - ++blockIndex; - } - - if (patchingCandidate) { - TPartialBlobId targetBlobId( - dataBlobId.Generation(), - dataBlobId.Step(), - patchingCandidate.Channel(), - dataBlobId.BlobSize(), - dataBlobId.Cookie(), - 0); - - TLogoBlobID realTargetBlobId = MakeBlobId( - tabletStorageInfo.TabletID, - targetBlobId); - - ui32 originalChannel = patchingCandidate.Channel(); - ui32 originalGroup = tabletStorageInfo.GroupFor( - originalChannel, - patchingCandidate.Generation()); - Y_ABORT_UNLESS(originalGroup != Max()); - - ui32 patchedChannel = realTargetBlobId.Channel(); - ui32 patchedGroup = tabletStorageInfo.GroupFor( - patchedChannel, - realTargetBlobId.Generation()); - Y_ABORT_UNLESS(patchedGroup != Max()); - - bool found = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement( - MakeBlobId(tabletStorageInfo.TabletID, patchingCandidate), - &realTargetBlobId, - 0xfe0000, - originalGroup, - patchedGroup); - - ui32 blockCount = patchingCandidate.BlobSize() / state.GetBlockSize(); - ui32 patchingBlockCount = - dataBlocksCount - patchingCandidateChangedBlockCount; - ui32 changedPercentage = 100 * patchingBlockCount / blockCount; - - if (found && - (!maxDiffPercentageForBlobPatching || - changedPercentage <= maxDiffPercentageForBlobPatching)) - { - dataBlobId = TPartialBlobId( - dataBlobId.Generation(), - dataBlobId.Step(), - patchingCandidate.Channel(), - dataBlobId.BlobSize(), - realTargetBlobId.Cookie(), - 0); - } else { - patchingCandidate = {}; - } - } - - rangeCompactionInfos.emplace_back( - args.BlockRange, - patchingCandidate, - dataBlobId, - dataBlobSkipMask, - zeroBlobId, - zeroBlobSkipMask, - args.BlobsSkipped, - args.BlocksSkipped, - std::move(blockChecksums), - channelDataKind, - std::move(blobContent), - std::move(zeroBlocks), - std::move(args.AffectedBlobs), - std::move(args.AffectedBlocks)); - - if (!dataBlobId && !zeroBlobId) { - const auto rangeDescr = DescribeRange(args.BlockRange); - Y_ABORT("No blocks in compacted range: %s", rangeDescr.c_str()); - } - Y_ABORT_UNLESS(requests.size() - initialRequestsSize == dataBlocksCount); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - -void TPartitionActor::HandleCompactionTx( - const TEvPartitionPrivate::TEvCompactionTxRequest::TPtr& ev, - const TActorContext& ctx) -{ - // TODO:_ ??? -} - -//////////////////////////////////////////////////////////////////////////////// - -bool TPartitionActor::PrepareCompaction( - const TActorContext& ctx, - TTransactionContext& tx, - TTxPartition::TCompaction& args) -{ - TRequestScope timer(*args.RequestInfo); - TPartitionDatabase db(tx.DB); - - const bool incrementalCompactionEnabled = - Config->GetIncrementalCompactionEnabled() || - Config->IsIncrementalCompactionFeatureEnabled( - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId()); - const bool fullCompaction = - args.CompactionOptions.test(ToBit(ECompactionOption::Full)); - - bool ready = true; - - for (auto& rangeCompaction: args.RangeCompactions) { - PrepareRangeCompaction( - *Config, - incrementalCompactionEnabled, - args.CommitId, - fullCompaction, - ctx, - TabletID(), - ready, - db, - *State, - rangeCompaction, - LogTitle.GetWithTime()); - } - - return ready; -} - -void TPartitionActor::ExecuteCompaction( - const TActorContext& ctx, - TTransactionContext& tx, - TTxPartition::TCompaction& args) -{ - Y_UNUSED(ctx); - Y_UNUSED(tx); - Y_UNUSED(args); -} - -void TPartitionActor::CompleteCompaction( - const TActorContext& ctx, - TTxPartition::TCompaction& args) -{ - TRequestScope timer(*args.RequestInfo); - - RemoveTransaction(*args.RequestInfo); - - for (auto& rangeCompaction: args.RangeCompactions) { - State->RaiseRangeTemperature(rangeCompaction.RangeIdx); - } - - const bool blobPatchingEnabledForCloud = - Config->IsBlobPatchingFeatureEnabled( - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId()); - const bool blobPatchingEnabled = - Config->GetBlobPatchingEnabled() || blobPatchingEnabledForCloud; - - const auto mergedBlobThreshold = - PartitionConfig.GetStorageMediaKind() == - NCloud::NProto::STORAGE_MEDIA_SSD - ? 0 - : Config->GetCompactionMergedBlobThresholdHDD(); - for (auto& rangeCompaction: args.RangeCompactions) { - CompleteRangeCompaction( - blobPatchingEnabled, - mergedBlobThreshold, - args.CommitId, - *Info(), - *State, - rangeCompaction, - PendingCompactionRequests, - PendingRangeCompactionInfos, - Config->GetMaxDiffPercentageForBlobPatching()); - - if (PendingRangeCompactionInfos.back().OriginalBlobId) { - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Selected patching candidate: %s, data blob: %s", - LogTitle.GetWithTime().c_str(), - ToString(PendingRangeCompactionInfos.back().OriginalBlobId).c_str(), - ToString(PendingRangeCompactionInfos.back().DataBlobId).c_str()); - } - } - - if (--AwaitedCompactionTxResponses) { - return; - } - - const auto compactionType = - args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? - ECompactionType::Forced: - ECompactionType::Tablet; - - auto actor = NCloud::Register( - ctx, - args.RequestInfo, - TabletID(), - PartitionConfig.GetDiskId(), - SelfId(), - State->GetBlockSize(), - State->GetMaxBlocksInBlob(), - Config->GetMaxAffectedBlocksPerCompaction(), - Config->GetComputeDigestForEveryBlockOnCompaction(), - BlockDigestGenerator, - GetBlobStorageAsyncRequestTimeout(), - compactionType, - args.CommitId, - std::move(PendingRangeCompactionInfos), - std::move(PendingCompactionRequests), - LogTitle.GetChild(GetCycleCount())); - LOG_DEBUG( - ctx, - TBlockStoreComponents::PARTITION, - "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", - LogTitle.GetWithTime().c_str(), - actor.ToString().c_str(), - args.CommitId); - - PendingRangeCompactionInfos.clear(); - PendingCompactionRequests.clear(); - - Actors.Insert(actor); -} - } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp new file mode 100644 index 00000000000..8b0f1d8eee7 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -0,0 +1,623 @@ +#include "part_actor.h" +#include "part_compaction.h" + +#include +#include + +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +using namespace NActors; + +using namespace NCloud::NStorage; + +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TCompactionBlockVisitor final + : public IFreshBlocksIndexVisitor + , public IBlocksIndexVisitor +{ +private: + TTxPartition::TRangeCompaction& Args; + const ui64 MaxCommitId; + +public: + TCompactionBlockVisitor( + TTxPartition::TRangeCompaction& args, + ui64 maxCommitId) + : Args(args) + , MaxCommitId(maxCommitId) + {} + + bool Visit(const TFreshBlock& block) override + { + Args.MarkBlock( + block.Meta.BlockIndex, + block.Meta.CommitId, + block.Content); + return true; + } + + bool KeepTrackOfAffectedBlocks = false; + + bool Visit( + ui32 blockIndex, + ui64 commitId, + const TPartialBlobId& blobId, + ui16 blobOffset) override + { + if (commitId > MaxCommitId) { + return true; + } + + Args.MarkBlock( + blockIndex, + commitId, + blobId, + blobOffset, + KeepTrackOfAffectedBlocks); + return true; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +void PrepareRangeCompaction( + const TStorageConfig& config, + const bool incrementalCompactionEnabled, + const ui64 commitId, + const bool fullCompaction, + const TActorContext& ctx, + const ui64 tabletId, + bool& ready, + TPartitionDatabase& db, + TPartitionState& state, + TTxPartition::TRangeCompaction& args, + const TString& logTitle) +{ + TCompactionBlockVisitor visitor(args, commitId); + state.FindFreshBlocks(visitor, args.BlockRange, commitId); + visitor.KeepTrackOfAffectedBlocks = true; + ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx); + visitor.KeepTrackOfAffectedBlocks = false; + ready &= db.FindMergedBlocks( + visitor, + args.BlockRange, + true, // precharge + state.GetMaxBlocksInBlob(), + commitId); + + if (ready && incrementalCompactionEnabled && !fullCompaction) { + THashMap liveBlocks; + for (const auto& m: args.BlockMarks) { + if (m.CommitId && m.BlobId) { + ++liveBlocks[m.BlobId]; + } + } + + TVector blobIds; + blobIds.reserve(liveBlocks.size()); + for (const auto& x: liveBlocks) { + blobIds.push_back(x.first); + } + + Sort( + blobIds, + [&](const TPartialBlobId& l, const TPartialBlobId& r) + { return liveBlocks[l] < liveBlocks[r]; }); + + auto it = blobIds.begin(); + args.BlobsSkipped = blobIds.size(); + ui32 blocks = 0; + + while (it != blobIds.end()) { + const auto bytes = blocks * state.GetBlockSize(); + const auto blobCountOk = + args.BlobsSkipped <= + config.GetMaxSkippedBlobsDuringCompaction(); + const auto byteCountOk = + bytes >= config.GetTargetCompactionBytesPerOp(); + + if (blobCountOk && byteCountOk) { + break; + } + + blocks += liveBlocks[*it]; + --args.BlobsSkipped; + ++it; + } + + // liveBlocks will contain only skipped blobs after this + for (auto it2 = blobIds.begin(); it2 != it; ++it2) { + liveBlocks.erase(*it2); + } + + while (it != blobIds.end()) { + args.BlocksSkipped += liveBlocks[*it]; + ++it; + } + + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Dropping last %u blobs, %u blocks, remaining blobs: %u, " + "blocks: %u", + logTitle.c_str(), + args.BlobsSkipped, + args.BlocksSkipped, + liveBlocks.size(), + blocks); + + THashSet skippedBlockIndices; + + for (const auto& x: liveBlocks) { + auto ab = args.AffectedBlobs.find(x.first); + Y_ABORT_UNLESS(ab != args.AffectedBlobs.end()); + for (const auto blockIndex: ab->second.AffectedBlockIndices) { + // we can actually add extra indices to skippedBlockIndices, + // but it does not cause data corruption - the important thing + // is to ensure that all skipped indices are added, not that + // all non-skipped are preserved + skippedBlockIndices.insert(blockIndex); + } + args.AffectedBlobs.erase(ab); + } + + if (liveBlocks.size()) { + TAffectedBlocks affectedBlocks; + for (const auto& b: args.AffectedBlocks) { + if (!skippedBlockIndices.contains(b.BlockIndex)) { + affectedBlocks.push_back(b); + } + } + args.AffectedBlocks = std::move(affectedBlocks); + + for (auto& m: args.BlockMarks) { + if (liveBlocks.contains(m.BlobId)) { + m = {}; + } + } + } + } + + const ui32 checksumBoundary = + config.GetDiskPrefixLengthWithBlockChecksumsInBlobs() + / state.GetBlockSize(); + args.ChecksumsEnabled = args.BlockRange.Start < checksumBoundary; + + for (auto& kv: args.AffectedBlobs) { + if (db.ReadBlockMask(kv.first, kv.second.BlockMask)) { + Y_ABORT_UNLESS(kv.second.BlockMask.Defined(), + "Could not read block mask for blob: %s", + ToString(MakeBlobId(tabletId, kv.first)).data()); + } else { + ready = false; + } + + if (args.ChecksumsEnabled) { + if (db.ReadBlobMeta(kv.first, kv.second.BlobMeta)) { + Y_ABORT_UNLESS(kv.second.BlobMeta.Defined(), + "Could not read blob meta for blob: %s", + ToString(MakeBlobId(tabletId, kv.first)).data()); + } else { + ready = false; + } + } + } +} + +void CompleteRangeCompaction( + const bool blobPatchingEnabled, + const ui32 mergedBlobThreshold, + const ui64 commitId, + TTabletStorageInfo& tabletStorageInfo, + TPartitionState& state, + TTxPartition::TRangeCompaction& args, + TVector& requests, + TVector& rangeCompactionInfos, + ui32 maxDiffPercentageForBlobPatching) +{ + const EChannelPermissions compactionPermissions = + EChannelPermission::SystemWritesAllowed; + const auto initialRequestsSize = requests.size(); + + // at first we count number of data blocks + size_t dataBlocksCount = 0, zeroBlocksCount = 0; + + for (const auto& mark: args.BlockMarks) { + if (mark.CommitId) { + const bool isFresh = !mark.BlockContent.empty(); + const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); + // there could be fresh block OR merged/mixed block + Y_ABORT_UNLESS(!(isFresh && isMixedOrMerged)); + if (isFresh || isMixedOrMerged) { + ++dataBlocksCount; + } else { + ++zeroBlocksCount; + } + } + } + + // determine the results kind + TPartialBlobId dataBlobId, zeroBlobId; + TBlockMask dataBlobSkipMask, zeroBlobSkipMask; + + auto channelDataKind = EChannelDataKind::Merged; + if (dataBlocksCount) { + ui32 skipped = 0; + for (const auto& mark: args.BlockMarks) { + const bool isFresh = !mark.BlockContent.empty(); + const bool isMixedOrMerged = !IsDeletionMarker(mark.BlobId); + if (!isFresh && !isMixedOrMerged) { + ++skipped; + } + } + + const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize(); + if (blobSize < mergedBlobThreshold) { + channelDataKind = EChannelDataKind::Mixed; + } + dataBlobId = state.GenerateBlobId( + channelDataKind, + compactionPermissions, + commitId, + blobSize, + rangeCompactionInfos.size()); + } + + if (zeroBlocksCount) { + // for zeroed region we will write blob without any data + // XXX same commitId used for 2 blobs: data blob and zero blob + // we differentiate between them by storing the last block index in + // MergedBlocksIndex::RangeEnd not for the last block of the processed + // compaction range but for the last actual block that's referenced by + // the corresponding blob + zeroBlobId = state.GenerateBlobId( + channelDataKind, + compactionPermissions, + commitId, + 0, + rangeCompactionInfos.size()); + } + + // now build the blob content for all blocks to be written + TBlockBuffer blobContent(TProfilingAllocator::Instance()); + TVector blockChecksums; + TVector zeroBlocks; + + ui32 blockIndex = args.BlockRange.Start; + TPartialBlobId patchingCandidate; + ui32 patchingCandidateChangedBlockCount = 0; + for (auto& mark: args.BlockMarks) { + if (mark.CommitId) { + if (mark.BlockContent) { + Y_ABORT_UNLESS(IsDeletionMarker(mark.BlobId)); + requests.emplace_back( + mark.BlobId, + TActorId(), + mark.BlobOffset, + blockIndex, + blobContent.GetBlocksCount(), + 0, + rangeCompactionInfos.size()); + + // fresh block will be written + blobContent.AddBlock({ + mark.BlockContent.data(), + mark.BlockContent.size() + }); + + if (args.ChecksumsEnabled) { + blockChecksums.push_back( + ComputeDefaultDigest(blobContent.GetBlocks().back())); + } + + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + } + } else if (!IsDeletionMarker(mark.BlobId)) { + const auto proxy = tabletStorageInfo.BSProxyIDForChannel( + mark.BlobId.Channel(), + mark.BlobId.Generation()); + + requests.emplace_back( + mark.BlobId, + proxy, + mark.BlobOffset, + blockIndex, + blobContent.GetBlocksCount(), + tabletStorageInfo.GroupFor( + mark.BlobId.Channel(), + mark.BlobId.Generation()), + rangeCompactionInfos.size()); + + // we will read this block later + blobContent.AddBlock(state.GetBlockSize(), char(0)); + + // block checksum is simply moved from the affected blob's meta + if (args.ChecksumsEnabled) { + ui32 blockChecksum = 0; + + auto* affectedBlob = args.AffectedBlobs.FindPtr(mark.BlobId); + Y_DEBUG_ABORT_UNLESS(affectedBlob); + if (affectedBlob) { + if (auto* meta = affectedBlob->BlobMeta.Get()) { + if (mark.BlobOffset < meta->BlockChecksumsSize()) { + blockChecksum = + meta->GetBlockChecksums(mark.BlobOffset); + } + } + } + + blockChecksums.push_back(blockChecksum); + } + + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + } + + if (blobPatchingEnabled) { + if (!patchingCandidate && + mark.BlobId.BlobSize() == dataBlobId.BlobSize()) + { + patchingCandidate = mark.BlobId; + ++patchingCandidateChangedBlockCount; + } else if (patchingCandidate == mark.BlobId) { + ++patchingCandidateChangedBlockCount; + } + } + } else { + dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + zeroBlocks.push_back(blockIndex); + } + } else { + if (dataBlobId) { + dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + } + if (zeroBlobId) { + zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + } + } + + ++blockIndex; + } + + if (patchingCandidate) { + TPartialBlobId targetBlobId( + dataBlobId.Generation(), + dataBlobId.Step(), + patchingCandidate.Channel(), + dataBlobId.BlobSize(), + dataBlobId.Cookie(), + 0); + + TLogoBlobID realTargetBlobId = MakeBlobId( + tabletStorageInfo.TabletID, + targetBlobId); + + ui32 originalChannel = patchingCandidate.Channel(); + ui32 originalGroup = tabletStorageInfo.GroupFor( + originalChannel, + patchingCandidate.Generation()); + Y_ABORT_UNLESS(originalGroup != Max()); + + ui32 patchedChannel = realTargetBlobId.Channel(); + ui32 patchedGroup = tabletStorageInfo.GroupFor( + patchedChannel, + realTargetBlobId.Generation()); + Y_ABORT_UNLESS(patchedGroup != Max()); + + bool found = TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement( + MakeBlobId(tabletStorageInfo.TabletID, patchingCandidate), + &realTargetBlobId, + 0xfe0000, + originalGroup, + patchedGroup); + + ui32 blockCount = patchingCandidate.BlobSize() / state.GetBlockSize(); + ui32 patchingBlockCount = + dataBlocksCount - patchingCandidateChangedBlockCount; + ui32 changedPercentage = 100 * patchingBlockCount / blockCount; + + if (found && + (!maxDiffPercentageForBlobPatching || + changedPercentage <= maxDiffPercentageForBlobPatching)) + { + dataBlobId = TPartialBlobId( + dataBlobId.Generation(), + dataBlobId.Step(), + patchingCandidate.Channel(), + dataBlobId.BlobSize(), + realTargetBlobId.Cookie(), + 0); + } else { + patchingCandidate = {}; + } + } + + rangeCompactionInfos.emplace_back( + args.BlockRange, + patchingCandidate, + dataBlobId, + dataBlobSkipMask, + zeroBlobId, + zeroBlobSkipMask, + args.BlobsSkipped, + args.BlocksSkipped, + std::move(blockChecksums), + channelDataKind, + std::move(blobContent), + std::move(zeroBlocks), + std::move(args.AffectedBlobs), + std::move(args.AffectedBlocks)); + + if (!dataBlobId && !zeroBlobId) { + const auto rangeDescr = DescribeRange(args.BlockRange); + Y_ABORT("No blocks in compacted range: %s", rangeDescr.c_str()); + } + Y_ABORT_UNLESS(requests.size() - initialRequestsSize == dataBlocksCount); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TPartitionActor::HandleCompactionTx( + const TEvPartitionPrivate::TEvCompactionTxRequest::TPtr& ev, + const TActorContext& ctx) +{ + // TODO:_ ??? + Y_UNUSED(ev); + Y_UNUSED(ctx); +} + +bool TPartitionActor::PrepareCompaction( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TCompaction& args) +{ + TRequestScope timer(*args.RequestInfo); + TPartitionDatabase db(tx.DB); + + const bool incrementalCompactionEnabled = + Config->GetIncrementalCompactionEnabled() || + Config->IsIncrementalCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool fullCompaction = + args.CompactionOptions.test(ToBit(ECompactionOption::Full)); + + bool ready = true; + + for (auto& rangeCompaction: args.RangeCompactions) { + PrepareRangeCompaction( + *Config, + incrementalCompactionEnabled, + args.CommitId, + fullCompaction, + ctx, + TabletID(), + ready, + db, + *State, + rangeCompaction, + LogTitle.GetWithTime()); + } + + return ready; +} + +void TPartitionActor::ExecuteCompaction( + const TActorContext& ctx, + TTransactionContext& tx, + TTxPartition::TCompaction& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(tx); + Y_UNUSED(args); +} + +void TPartitionActor::CompleteCompaction( + const TActorContext& ctx, + TTxPartition::TCompaction& args) +{ + TRequestScope timer(*args.RequestInfo); + + RemoveTransaction(*args.RequestInfo); + + for (auto& rangeCompaction: args.RangeCompactions) { + State->RaiseRangeTemperature(rangeCompaction.RangeIdx); + } + + const bool blobPatchingEnabledForCloud = + Config->IsBlobPatchingFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool blobPatchingEnabled = + Config->GetBlobPatchingEnabled() || blobPatchingEnabledForCloud; + + const auto mergedBlobThreshold = + PartitionConfig.GetStorageMediaKind() == + NCloud::NProto::STORAGE_MEDIA_SSD + ? 0 + : Config->GetCompactionMergedBlobThresholdHDD(); + for (auto& rangeCompaction: args.RangeCompactions) { + CompleteRangeCompaction( + blobPatchingEnabled, + mergedBlobThreshold, + args.CommitId, + *Info(), + *State, + rangeCompaction, + PendingCompactionRequests, + PendingRangeCompactionInfos, + Config->GetMaxDiffPercentageForBlobPatching()); + + if (PendingRangeCompactionInfos.back().OriginalBlobId) { + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Selected patching candidate: %s, data blob: %s", + LogTitle.GetWithTime().c_str(), + ToString(PendingRangeCompactionInfos.back().OriginalBlobId).c_str(), + ToString(PendingRangeCompactionInfos.back().DataBlobId).c_str()); + } + } + + if (--AwaitedCompactionTxResponses) { + return; + } + + // TODO:_ reply + + // TODO:_ where we use compactionType? + // const auto compactionType = + // args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? + // ECompactionType::Forced: + // ECompactionType::Tablet; + + // TODO:_ where we resigister TCompactionActor? + // auto actor = NCloud::Register( + // ctx, + // args.RequestInfo, + // TabletID(), + // PartitionConfig.GetDiskId(), + // SelfId(), + // State->GetBlockSize(), + // State->GetMaxBlocksInBlob(), + // Config->GetMaxAffectedBlocksPerCompaction(), + // Config->GetComputeDigestForEveryBlockOnCompaction(), + // BlockDigestGenerator, + // GetBlobStorageAsyncRequestTimeout(), + // compactionType, + // args.CommitId, + // std::move(PendingRangeCompactionInfos), + // std::move(PendingCompactionRequests), + // LogTitle.GetChild(GetCycleCount())); + // LOG_DEBUG( + // ctx, + // TBlockStoreComponents::PARTITION, + // "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", + // LogTitle.GetWithTime().c_str(), + // actor.ToString().c_str(), + // args.CommitId); + + // Actors.Insert(actor); + + PendingRangeCompactionInfos.clear(); + PendingCompactionRequests.clear(); +} + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/ya.make b/cloud/blockstore/libs/storage/partition/ya.make index a983d6e28cc..56668bdd5f7 100644 --- a/cloud/blockstore/libs/storage/partition/ya.make +++ b/cloud/blockstore/libs/storage/partition/ya.make @@ -14,6 +14,7 @@ SRCS( part_actor_cleanup.cpp part_actor_collectgarbage.cpp part_actor_compaction.cpp + part_actor_compactiontx.cpp part_actor_compactrange.cpp part_actor_confirmblobs.cpp part_actor_deletegarbage.cpp From 4036d5a43c6ec72a4bb53e4174729d565aecc448 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 5 Feb 2026 20:37:55 +0300 Subject: [PATCH 03/15] Implement HandleCompactionTx --- .../partition/part_actor_compaction.cpp | 1 + .../partition/part_actor_compactiontx.cpp | 38 +++++++++++++++++-- .../storage/partition/part_events_private.h | 2 + 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index a676aedb6e9..e3791323b3c 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -1323,6 +1323,7 @@ void TPartitionActor::HandleCompaction( State->GetCleanupQueue().AcquireBarrier(commitId); State->GetGarbageQueue().AcquireBarrier(commitId); + // TODO:_ what we do here? AddTransaction(*requestInfo); auto tx = CreateTx( diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 8b0f1d8eee7..98c57eff5e7 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -476,9 +476,41 @@ void TPartitionActor::HandleCompactionTx( const TEvPartitionPrivate::TEvCompactionTxRequest::TPtr& ev, const TActorContext& ctx) { - // TODO:_ ??? - Y_UNUSED(ev); - Y_UNUSED(ctx); + auto* msg = ev->Get(); + + auto requestInfo = CreateRequestInfo( + ev->Sender, + ev->Cookie, + msg->CallContext); + + TRequestScope timer(*requestInfo); + + LWTRACK( + BackgroundTaskStarted_Partition, + requestInfo->CallContext->LWOrbit, + "CompactionTx", + static_cast(PartitionConfig.GetStorageMediaKind()), + requestInfo->CallContext->RequestId, + PartitionConfig.GetDiskId()); + + AddTransaction(*requestInfo); + + auto tx = CreateTx( + requestInfo, + msg->CommitId, + msg->CompactionOptions, + std::move(msg->Ranges)); + + ui64 minCommitId = State->GetCommitQueue().GetMinCommitId(); + Y_ABORT_UNLESS(minCommitId <= msg->CommitId); + + if (minCommitId == msg->CommitId) { + // start execution + ExecuteTx(ctx, std::move(tx)); + } else { + // delay execution until all previous commits completed + State->GetCommitQueue().Enqueue(std::move(tx), msg->CommitId); + } } bool TPartitionActor::PrepareCompaction( diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 0a69faafa66..c0bd9fb55c9 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -492,6 +492,8 @@ struct TEvPartitionPrivate { const ui64 CommitId; const TCompactionOptions CompactionOptions; + // TODO:_ coulb be just range, not vercor of ranges? + // TODO:_ but if so, then we lose the original approash with single transaction. TVector> Ranges; TCompactionTxRequest( From 836f205d97587e472db67aa8e06dbaba08c38979 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 5 Feb 2026 20:52:29 +0300 Subject: [PATCH 04/15] Add empty response --- .../partition/part_actor_compactiontx.cpp | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 98c57eff5e7..46d263df0ec 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -486,12 +486,10 @@ void TPartitionActor::HandleCompactionTx( TRequestScope timer(*requestInfo); LWTRACK( - BackgroundTaskStarted_Partition, + RequestReceived_Partition, requestInfo->CallContext->LWOrbit, "CompactionTx", - static_cast(PartitionConfig.GetStorageMediaKind()), - requestInfo->CallContext->RequestId, - PartitionConfig.GetDiskId()); + requestInfo->CallContext->RequestId); AddTransaction(*requestInfo); @@ -564,8 +562,6 @@ void TPartitionActor::CompleteCompaction( const TActorContext& ctx, TTxPartition::TCompaction& args) { - TRequestScope timer(*args.RequestInfo); - RemoveTransaction(*args.RequestInfo); for (auto& rangeCompaction: args.RangeCompactions) { @@ -612,13 +608,20 @@ void TPartitionActor::CompleteCompaction( return; } - // TODO:_ reply + // TODO:_ or to it in the beginning of the Complete... function? + TRequestScope timer(*args.RequestInfo); + + auto response = std::make_unique(); + // TODO:_ ExecCycles? + // response->ExecCycles = args.RequestInfo->GetExecCycles(); + + LWTRACK( + ResponseSent_Partition, + args.RequestInfo->CallContext->LWOrbit, + "CompactionTx", + args.RequestInfo->CallContext->RequestId); - // TODO:_ where we use compactionType? - // const auto compactionType = - // args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? - // ECompactionType::Forced: - // ECompactionType::Tablet; + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); // TODO:_ where we resigister TCompactionActor? // auto actor = NCloud::Register( From e5f15aea205a795784b50c005a17aeaa96800333 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 5 Feb 2026 21:36:15 +0300 Subject: [PATCH 05/15] move affected blobs and blocks to separate .h file --- .../libs/storage/partition/model/affected.cpp | 1 + .../libs/storage/partition/model/affected.h | 39 +++++++++++++++++++ .../libs/storage/partition/part_compaction.h | 9 +++-- .../storage/partition/part_events_private.h | 29 ++------------ 4 files changed, 49 insertions(+), 29 deletions(-) create mode 100644 cloud/blockstore/libs/storage/partition/model/affected.cpp create mode 100644 cloud/blockstore/libs/storage/partition/model/affected.h diff --git a/cloud/blockstore/libs/storage/partition/model/affected.cpp b/cloud/blockstore/libs/storage/partition/model/affected.cpp new file mode 100644 index 00000000000..51de2c3bdc7 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/model/affected.cpp @@ -0,0 +1 @@ +#include "affected.h" diff --git a/cloud/blockstore/libs/storage/partition/model/affected.h b/cloud/blockstore/libs/storage/partition/model/affected.h new file mode 100644 index 00000000000..9ed24148145 --- /dev/null +++ b/cloud/blockstore/libs/storage/partition/model/affected.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage::NPartition { + +//////////////////////////////////////////////////////////////////////////////// + +struct TAffectedBlob +{ + TVector Offsets; + TMaybe BlockMask; + TVector AffectedBlockIndices; + + // Filled only if a flag is set. BlobMeta is needed only to do some extra + // consistency checks. + TMaybe BlobMeta; +}; + +using TAffectedBlobs = THashMap; + +//////////////////////////////////////////////////////////////////////////////// + +struct TAffectedBlock +{ + ui32 BlockIndex = 0; + ui64 CommitId = 0; +}; + +using TAffectedBlocks = TVector; + +} // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h index 50bb2e6d5ae..1844b143833 100644 --- a/cloud/blockstore/libs/storage/partition/part_compaction.h +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -1,16 +1,17 @@ #pragma once #include -#include +#include // TODO:_ ??? +#include #include -#include #include #include #include -#include -#include +#include +#include // TODO:_ ??? +#include // TODO:_ ??? #include #include diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index c0bd9fb55c9..6973e1a6fc9 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -1,6 +1,7 @@ #pragma once #include "public.h" +#include "part_compaction.h" // TODO:_ is it ok? Should we move it to model? Or rearrange dependencies? #include #include @@ -11,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -115,31 +117,6 @@ struct TWriteFreshBlocksRequest //////////////////////////////////////////////////////////////////////////////// -struct TAffectedBlob -{ - TVector Offsets; - TMaybe BlockMask; - TVector AffectedBlockIndices; - - // Filled only if a flag is set. BlobMeta is needed only to do some extra - // consistency checks. - TMaybe BlobMeta; -}; - -using TAffectedBlobs = THashMap; - -//////////////////////////////////////////////////////////////////////////////// - -struct TAffectedBlock -{ - ui32 BlockIndex = 0; - ui64 CommitId = 0; -}; - -using TAffectedBlocks = TVector; - -//////////////////////////////////////////////////////////////////////////////// - struct TBlobCompactionInfo { const ui32 BlobsSkippedByCompaction = 0; @@ -508,6 +485,8 @@ struct TEvPartitionPrivate struct TCompactionTxResponse { + TVector RangeCompactionInfos; + TVector CompactionRequests; }; // From 6643a2bdaaa8f55734af08b7c8d1017fdc5edd67 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Thu, 5 Feb 2026 22:09:39 +0300 Subject: [PATCH 06/15] compaction tx done --- .../libs/storage/partition/part_actor.h | 2 +- .../partition/part_actor_compaction.cpp | 2 +- .../partition/part_actor_compactiontx.cpp | 49 ++++++++++++------- .../libs/storage/partition/part_compaction.h | 5 +- .../storage/partition/part_events_private.h | 14 +++++- .../libs/storage/partition/part_tx.h | 20 ++++++++ 6 files changed, 69 insertions(+), 23 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index 132f31e7e4c..f3b954c48f7 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -166,7 +166,7 @@ class TPartitionActor final // Compaction infos and requests, waiting until all transactions of the // current batch are completed TVector PendingRangeCompactionInfos; - TVector PendingCompactionRequests; + TVector PendingCompactionRequests; NBlobMetrics::TBlobLoadMetrics PrevMetrics; NBlobMetrics::TBlobLoadMetrics OverlayMetrics; diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index e3791323b3c..73681449212 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -35,7 +35,7 @@ class TCompactionActor final : public TActorBootstrapped { public: - using TRequest = TCompactionRequest; + using TRequest = TCompactionBlobRequest; struct TBatchRequest { diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 46d263df0ec..6e758880c22 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -223,9 +223,10 @@ void CompleteRangeCompaction( TTabletStorageInfo& tabletStorageInfo, TPartitionState& state, TTxPartition::TRangeCompaction& args, - TVector& requests, + TVector& requests, TVector& rangeCompactionInfos, - ui32 maxDiffPercentageForBlobPatching) + ui32 maxDiffPercentageForBlobPatching, + ui32 rangeCompactionIndex) { const EChannelPermissions compactionPermissions = EChannelPermission::SystemWritesAllowed; @@ -272,7 +273,7 @@ void CompleteRangeCompaction( compactionPermissions, commitId, blobSize, - rangeCompactionInfos.size()); + rangeCompactionIndex); } if (zeroBlocksCount) { @@ -287,7 +288,7 @@ void CompleteRangeCompaction( compactionPermissions, commitId, 0, - rangeCompactionInfos.size()); + rangeCompactionIndex); } // now build the blob content for all blocks to be written @@ -309,7 +310,7 @@ void CompleteRangeCompaction( blockIndex, blobContent.GetBlocksCount(), 0, - rangeCompactionInfos.size()); + rangeCompactionIndex); // fresh block will be written blobContent.AddBlock({ @@ -339,7 +340,7 @@ void CompleteRangeCompaction( tabletStorageInfo.GroupFor( mark.BlobId.Channel(), mark.BlobId.Generation()), - rangeCompactionInfos.size()); + rangeCompactionIndex); // we will read this block later blobContent.AddBlock(state.GetBlockSize(), char(0)); @@ -496,6 +497,7 @@ void TPartitionActor::HandleCompactionTx( auto tx = CreateTx( requestInfo, msg->CommitId, + msg->RangeCompactionIndex, msg->CompactionOptions, std::move(msg->Ranges)); @@ -568,6 +570,7 @@ void TPartitionActor::CompleteCompaction( State->RaiseRangeTemperature(rangeCompaction.RangeIdx); } + // TODO:_ rename ...ForCloud -> ...ForDisk ??? const bool blobPatchingEnabledForCloud = Config->IsBlobPatchingFeatureEnabled( PartitionConfig.GetCloudId(), @@ -581,6 +584,10 @@ void TPartitionActor::CompleteCompaction( NCloud::NProto::STORAGE_MEDIA_SSD ? 0 : Config->GetCompactionMergedBlobThresholdHDD(); + + TVector rangeCompactionInfos; + TVector requests; + for (auto& rangeCompaction: args.RangeCompactions) { CompleteRangeCompaction( blobPatchingEnabled, @@ -589,29 +596,37 @@ void TPartitionActor::CompleteCompaction( *Info(), *State, rangeCompaction, - PendingCompactionRequests, - PendingRangeCompactionInfos, - Config->GetMaxDiffPercentageForBlobPatching()); + requests, + rangeCompactionInfos, + Config->GetMaxDiffPercentageForBlobPatching(), + args.RangeCompactionIndex + static_cast(rangeCompactionInfos.size())); - if (PendingRangeCompactionInfos.back().OriginalBlobId) { + if (rangeCompactionInfos.back().OriginalBlobId) { LOG_DEBUG( ctx, TBlockStoreComponents::PARTITION, "%s Selected patching candidate: %s, data blob: %s", LogTitle.GetWithTime().c_str(), - ToString(PendingRangeCompactionInfos.back().OriginalBlobId).c_str(), - ToString(PendingRangeCompactionInfos.back().DataBlobId).c_str()); + ToString(rangeCompactionInfos.back().OriginalBlobId).c_str(), + ToString(rangeCompactionInfos.back().DataBlobId).c_str()); } } - if (--AwaitedCompactionTxResponses) { - return; - } - // TODO:_ or to it in the beginning of the Complete... function? TRequestScope timer(*args.RequestInfo); - auto response = std::make_unique(); + // auto response = + // std::make_unique(); + // auto response = + // std::make_unique( + // MakeError(S_OK)); + // response->Record.SetRangeCompactionInfos(std::move(rangeCompactionInfos)); + // response->Record.SetCompactionRequests(std::move(requests)); + auto response = + std::make_unique( + std::move(rangeCompactionInfos), + std::move(requests)); + // TODO:_ ExecCycles? // response->ExecCycles = args.RequestInfo->GetExecCycles(); diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h index 1844b143833..4a11def3a8f 100644 --- a/cloud/blockstore/libs/storage/partition/part_compaction.h +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -77,7 +77,8 @@ struct TRangeCompactionInfo //////////////////////////////////////////////////////////////////////////////// -struct TCompactionRequest +// TODO:_ naming ??? +struct TCompactionBlobRequest { NCloud::TPartialBlobId BlobId; NActors::TActorId Proxy; @@ -87,7 +88,7 @@ struct TCompactionRequest ui32 GroupId; ui32 RangeCompactionIndex; - TCompactionRequest( + TCompactionBlobRequest( const NCloud::TPartialBlobId& blobId, const NActors::TActorId& proxy, ui16 blobOffset, diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 6973e1a6fc9..f57a713d881 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -1,7 +1,7 @@ #pragma once -#include "public.h" #include "part_compaction.h" // TODO:_ is it ok? Should we move it to model? Or rearrange dependencies? +#include "public.h" #include #include @@ -472,6 +472,7 @@ struct TEvPartitionPrivate // TODO:_ coulb be just range, not vercor of ranges? // TODO:_ but if so, then we lose the original approash with single transaction. TVector> Ranges; + ui32 RangeCompactionIndex; TCompactionTxRequest( ui64 commitId, @@ -486,7 +487,16 @@ struct TEvPartitionPrivate struct TCompactionTxResponse { TVector RangeCompactionInfos; - TVector CompactionRequests; + TVector CompactionRequests; + + TCompactionTxResponse() = default; + + TCompactionTxResponse( + TVector rangeCompactionInfos, + TVector compactionRequests) + : RangeCompactionInfos(std::move(rangeCompactionInfos)) + , CompactionRequests(std::move(compactionRequests)) + {} }; // diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index f72487d64b7..200c2701e4e 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -437,6 +437,7 @@ struct TTxPartition { const TRequestInfoPtr RequestInfo; const ui64 CommitId; + const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; TVector RangeCompactions; @@ -444,10 +445,29 @@ struct TTxPartition TCompaction( TRequestInfoPtr requestInfo, ui64 commitId, + ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, const TVector>& ranges) : RequestInfo(std::move(requestInfo)) , CommitId(commitId) + , RangeCompactionIndex(rangeCompactionIndex) + , CompactionOptions(compactionOptions) + { + RangeCompactions.reserve(ranges.size()); + for (const auto& range: ranges) { + RangeCompactions.emplace_back(range.first, range.second); + } + } + + // TODO:_ remove it! + TCompaction( + TRequestInfoPtr requestInfo, + ui64 commitId, + TCompactionOptions compactionOptions, + const TVector>& ranges) + : RequestInfo(std::move(requestInfo)) + , CommitId(commitId) + , RangeCompactionIndex(713) , CompactionOptions(compactionOptions) { RangeCompactions.reserve(ranges.size()); From 1f3026d7d241b6be8387a307965c7e1885ecb46b Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 6 Feb 2026 14:25:31 +0300 Subject: [PATCH 07/15] register compaction actor, add some code there --- .../partition/part_actor_compaction.cpp | 114 ++++++++++++++---- .../partition/part_actor_compactiontx.cpp | 28 ----- .../storage/partition/part_events_private.h | 4 +- 3 files changed, 96 insertions(+), 50 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 73681449212..bc33cc3a305 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -82,6 +82,11 @@ class TCompactionActor final TChildLogTitle LogTitle; const ui64 CommitId; + const TCompactionOptions CompactionOptions; + + TVector> Ranges; + TVector ForkedCompactionTxCallContexts; + ui32 AwaitedCompactionTxCall = 0; TVector RangeCompactionInfos; TVector Requests; @@ -116,13 +121,15 @@ class TCompactionActor final TDuration blobStorageAsyncRequestTimeout, ECompactionType compactionType, ui64 commitId, - TVector rangeCompactionInfos, - TVector requests, + TCompactionOptions CompactionOptions, + TVector> ranges, TChildLogTitle logTitle); void Bootstrap(const TActorContext& ctx); private: + void ProcessRequests(const TActorContext& ctx); + void InitBlockDigests(); void ReadBlocks(const TActorContext& ctx); @@ -140,6 +147,10 @@ class TCompactionActor final private: STFUNC(StateWork); + void HandleCompactionTxResponse( + const TEvPartitionPrivate::TEvCompactionTxResponse::TPtr& ev, + const TActorContext& ctx); + template void HandleWriteOrPatchBlobResponse( TEvent& ev, @@ -181,8 +192,8 @@ TCompactionActor::TCompactionActor( TDuration blobStorageAsyncRequestTimeout, ECompactionType compactionType, ui64 commitId, - TVector rangeCompactionInfos, - TVector requests, + TCompactionOptions compactionOptions, + TVector> ranges, TChildLogTitle logTitle) : RequestInfo(std::move(requestInfo)) , TabletId(tabletId) @@ -197,14 +208,15 @@ TCompactionActor::TCompactionActor( , CompactionType(compactionType) , LogTitle(std::move(logTitle)) , CommitId(commitId) - , RangeCompactionInfos(std::move(rangeCompactionInfos)) - , Requests(std::move(requests)) + , CompactionOptions(compactionOptions) + , Ranges(std::move(ranges)) {} void TCompactionActor::Bootstrap(const TActorContext& ctx) { TRequestScope timer(*RequestInfo); + // TODO:_ maybe split into two states? Become(&TThis::StateWork); LWTRACK( @@ -213,6 +225,42 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) "Compaction", RequestInfo->CallContext->RequestId); + // TODO:_ take desesion according to featureflag + + AwaitedCompactionTxCall = Ranges.size(); + + for (ui32 i = 0; i < Ranges.size(); ++i) { + auto request = std::make_unique( + CommitId, + i, // RangeCompactionIndex + CompactionOptions, + TVector>{std::move(Ranges[i])}); + + if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) { + LWTRACK( + ForkFailed, + RequestInfo->CallContext->LWOrbit, + "TEvPartitionPrivate::TEvCompactionTxRequest", + RequestInfo->CallContext->RequestId); + } + request->CallContext->RequestId = RequestInfo->CallContext->RequestId; + + ForkedCompactionTxCallContexts.emplace_back(request->CallContext); + + NCloud::Send( + ctx, + Tablet, + std::move(request)); + } + + Ranges.clear(); +} + +void TCompactionActor::ProcessRequests(const TActorContext& ctx) +{ + // TODO:_ TRequestScope timer(*RequestInfo) here? + // TODO:_ lwtrack here? + if (Requests) { ReadBlocks(ctx); @@ -787,6 +835,23 @@ void TCompactionActor::ReplyAndDie( //////////////////////////////////////////////////////////////////////////////// +// TODO:_ style: rearrange methods? +void TCompactionActor::HandleCompactionTxResponse( + const TEvPartitionPrivate::TEvCompactionTxResponse::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + // TODO:_ exec cycles? + + // TODO:_ ok to hangle error like this? + if (HandleError(ctx, msg->GetError())) { + return; + } + + ProcessRequests(ctx); +} + void TCompactionActor::HandleReadBlobResponse( const TEvPartitionCommonPrivate::TEvReadBlobResponse::TPtr& ev, const TActorContext& ctx) @@ -1323,25 +1388,32 @@ void TPartitionActor::HandleCompaction( State->GetCleanupQueue().AcquireBarrier(commitId); State->GetGarbageQueue().AcquireBarrier(commitId); - // TODO:_ what we do here? - AddTransaction(*requestInfo); - - auto tx = CreateTx( + auto actor = NCloud::Register( + ctx, requestInfo, + TabletID(), + PartitionConfig.GetDiskId(), + SelfId(), + State->GetBlockSize(), + State->GetMaxBlocksInBlob(), + Config->GetMaxAffectedBlocksPerCompaction(), + Config->GetComputeDigestForEveryBlockOnCompaction(), + BlockDigestGenerator, + GetBlobStorageAsyncRequestTimeout(), + compactionType, commitId, msg->CompactionOptions, - std::move(ranges)); - - ui64 minCommitId = State->GetCommitQueue().GetMinCommitId(); - Y_ABORT_UNLESS(minCommitId <= commitId); + std::move(ranges), + LogTitle.GetChild(GetCycleCount())); + LOG_DEBUG( + ctx, + TBlockStoreComponents::PARTITION, + "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", + LogTitle.GetWithTime().c_str(), + actor.ToString().c_str(), + commitId); - if (minCommitId == commitId) { - // start execution - ExecuteTx(ctx, std::move(tx)); - } else { - // delay execution until all previous commits completed - State->AccessCommitQueue().Enqueue(std::move(tx), commitId); - } + Actors.Insert(actor); } void TPartitionActor::ProcessCommitQueue(const TActorContext& ctx) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 6e758880c22..750a80aa218 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -638,34 +638,6 @@ void TPartitionActor::CompleteCompaction( NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); - // TODO:_ where we resigister TCompactionActor? - // auto actor = NCloud::Register( - // ctx, - // args.RequestInfo, - // TabletID(), - // PartitionConfig.GetDiskId(), - // SelfId(), - // State->GetBlockSize(), - // State->GetMaxBlocksInBlob(), - // Config->GetMaxAffectedBlocksPerCompaction(), - // Config->GetComputeDigestForEveryBlockOnCompaction(), - // BlockDigestGenerator, - // GetBlobStorageAsyncRequestTimeout(), - // compactionType, - // args.CommitId, - // std::move(PendingRangeCompactionInfos), - // std::move(PendingCompactionRequests), - // LogTitle.GetChild(GetCycleCount())); - // LOG_DEBUG( - // ctx, - // TBlockStoreComponents::PARTITION, - // "%s Partition registered TCompactionActor with id [%lu]; commit id %lu", - // LogTitle.GetWithTime().c_str(), - // actor.ToString().c_str(), - // args.CommitId); - - // Actors.Insert(actor); - PendingRangeCompactionInfos.clear(); PendingCompactionRequests.clear(); } diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index f57a713d881..68d7a5dc915 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -468,17 +468,19 @@ struct TEvPartitionPrivate struct TCompactionTxRequest { const ui64 CommitId; + const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; // TODO:_ coulb be just range, not vercor of ranges? // TODO:_ but if so, then we lose the original approash with single transaction. TVector> Ranges; - ui32 RangeCompactionIndex; TCompactionTxRequest( ui64 commitId, + ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, TVector> ranges) : CommitId(commitId) + , RangeCompactionIndex(rangeCompactionIndex) , CompactionOptions(compactionOptions) , Ranges(std::move(ranges)) {} From fc1c1d9cd37de402c06e40e5db2d04096f1c3aff Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 6 Feb 2026 15:14:57 +0300 Subject: [PATCH 08/15] implement HandleCompactionTxResponse --- .../partition/part_actor_compaction.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index bc33cc3a305..9f9ead528ef 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -849,6 +849,25 @@ void TCompactionActor::HandleCompactionTxResponse( return; } + std::move( + msg->RangeCompactionInfos.begin(), + msg->RangeCompactionInfos.end(), + std::back_insert_iterator>(RangeCompactionInfos)); + std::move( + msg->CompactionRequests.begin(), + msg->CompactionRequests.end(), + std::back_insert_iterator>(Requests)); + + if (--AwaitedCompactionTxCall) { + return; + } + + // TODO:_ exec cycles + + for (auto context: ForkedReadCallContexts) { + RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit); + } + ProcessRequests(ctx); } From 141648a5da656052af0dddfc97b5fe7e404efe30 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 6 Feb 2026 15:35:36 +0300 Subject: [PATCH 09/15] fixes --- .../libs/storage/partition/part_actor.h | 8 -------- .../storage/partition/part_actor_compaction.cpp | 1 + .../partition/part_actor_compactiontx.cpp | 3 --- .../blockstore/libs/storage/partition/part_tx.h | 17 ----------------- .../libs/storage/partition/part_ut.cpp | 4 ++-- 5 files changed, 3 insertions(+), 30 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index f3b954c48f7..de951efd82e 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -160,14 +160,6 @@ class TPartitionActor final TDeque PendingForcedCompactionRequests; THashMap CompletedForcedCompactionRequests; - // TODO:_ move it to separate actor! - TRequestInfoPtr CompactionRequestInfo; - ui32 AwaitedCompactionTxResponses = 0; - // Compaction infos and requests, waiting until all transactions of the - // current batch are completed - TVector PendingRangeCompactionInfos; - TVector PendingCompactionRequests; - NBlobMetrics::TBlobLoadMetrics PrevMetrics; NBlobMetrics::TBlobLoadMetrics OverlayMetrics; diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 9f9ead528ef..4cb586d9994 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -1008,6 +1008,7 @@ STFUNC(TCompactionActor::StateWork) switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvPartitionPrivate::TEvCompactionTxResponse, HandleCompactionTxResponse); HFunc(TEvPartitionCommonPrivate::TEvReadBlobResponse, HandleReadBlobResponse); HFunc(TEvPartitionPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); HFunc(TEvPartitionPrivate::TEvPatchBlobResponse, HandlePatchBlobResponse); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 750a80aa218..592a8416540 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -637,9 +637,6 @@ void TPartitionActor::CompleteCompaction( args.RequestInfo->CallContext->RequestId); NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); - - PendingRangeCompactionInfos.clear(); - PendingCompactionRequests.clear(); } } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index 200c2701e4e..9c837d84499 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -459,23 +459,6 @@ struct TTxPartition } } - // TODO:_ remove it! - TCompaction( - TRequestInfoPtr requestInfo, - ui64 commitId, - TCompactionOptions compactionOptions, - const TVector>& ranges) - : RequestInfo(std::move(requestInfo)) - , CommitId(commitId) - , RangeCompactionIndex(713) - , CompactionOptions(compactionOptions) - { - RangeCompactions.reserve(ranges.size()); - for (const auto& range: ranges) { - RangeCompactions.emplace_back(range.first, range.second); - } - } - void Clear() { for (auto& range: RangeCompactions) { diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index c45257f4e4a..1743596bff7 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -349,8 +349,8 @@ auto InitTestActorRuntime( void InitLogSettings(TTestActorRuntime& runtime) { for (ui32 i = TBlockStoreComponents::START; i < TBlockStoreComponents::END; ++i) { - runtime.SetLogPriority(i, NLog::PRI_INFO); - // runtime.SetLogPriority(i, NLog::PRI_DEBUG); + // runtime.SetLogPriority(i, NLog::PRI_INFO); + runtime.SetLogPriority(i, NLog::PRI_DEBUG); } // runtime.SetLogPriority(NLog::InvalidComponent, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_ERROR); From 98d22acd959896df67fd6e6c230ed25cc71a8737 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 6 Feb 2026 15:59:23 +0300 Subject: [PATCH 10/15] resolve rebase --- .../libs/storage/partition/part_actor_compactiontx.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 592a8416540..36b8f6195f2 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -509,7 +509,7 @@ void TPartitionActor::HandleCompactionTx( ExecuteTx(ctx, std::move(tx)); } else { // delay execution until all previous commits completed - State->GetCommitQueue().Enqueue(std::move(tx), msg->CommitId); + State->AccessCommitQueue().Enqueue(std::move(tx), msg->CommitId); } } From f894e3bf65c2ad21a86f4217c70cf42cec8148b0 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Tue, 10 Feb 2026 10:05:55 +0300 Subject: [PATCH 11/15] handle order of responses --- .../storage/partition/model/commit_queue.cpp | 3 +- .../partition/part_actor_compaction.cpp | 42 ++++++++++++++++--- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp index b0c60f6ce22..a40831eda0c 100644 --- a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp +++ b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp @@ -7,7 +7,8 @@ namespace NCloud::NBlockStore::NStorage::NPartition { void TCommitQueue::Enqueue(TTxPtr tx, ui64 commitId) { if (Items) { - Y_ABORT_UNLESS(Items.back().CommitId < commitId); + // TODO:_ Is <= instead of < ok? + Y_ABORT_UNLESS(Items.back().CommitId <= commitId); } Items.emplace_back(commitId, std::move(tx)); } diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 4cb586d9994..9050e8e9149 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace NCloud::NBlockStore::NStorage::NPartition { @@ -87,6 +88,8 @@ class TCompactionActor final TVector> Ranges; TVector ForkedCompactionTxCallContexts; ui32 AwaitedCompactionTxCall = 0; + using InfoWithIndex = std::pair; + TVector RangeCompactionInfosWithIndices; // TODO:_ do it normally TVector RangeCompactionInfos; TVector Requests; @@ -250,7 +253,8 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) NCloud::Send( ctx, Tablet, - std::move(request)); + std::move(request), + i); } Ranges.clear(); @@ -849,10 +853,16 @@ void TCompactionActor::HandleCompactionTxResponse( return; } - std::move( - msg->RangeCompactionInfos.begin(), - msg->RangeCompactionInfos.end(), - std::back_insert_iterator>(RangeCompactionInfos)); + ui32 batchIndex = ev->Cookie; + for (ui32 i = 0; i < msg->RangeCompactionInfos.size(); ++i) { + RangeCompactionInfosWithIndices.emplace_back( + std::move(msg->RangeCompactionInfos[i]), + batchIndex + i); + } + // std::move( + // msg->RangeCompactionInfos.begin(), + // msg->RangeCompactionInfos.end(), + // std::back_insert_iterator>(RangeCompactionInfos)); std::move( msg->CompactionRequests.begin(), msg->CompactionRequests.end(), @@ -862,6 +872,26 @@ void TCompactionActor::HandleCompactionTxResponse( return; } + // TODO:_ use Sort + // std::sort( + // RangeCompactionInfosWithIndices.begin(), + // RangeCompactionInfosWithIndices.end(), + // [](const InfoWithIndex& l, const InfoWithIndex& r) + // { return l.sedond < r.second; }); + TVector infos; + for (auto& info: RangeCompactionInfosWithIndices) { + infos.push_back(&info); + } + Sort( + infos, + [](const InfoWithIndex* l, const InfoWithIndex* r) + { return l->second < r->second; }); + + for (ui32 i = 0; i < infos.size(); ++i) { + RangeCompactionInfos.push_back( + std::move(infos[i]->first)); + } + // TODO:_ exec cycles for (auto context: ForkedReadCallContexts) { @@ -1539,3 +1569,5 @@ void TPartitionActor::HandleCompactionCompleted( } } // namespace NCloud::NBlockStore::NStorage::NPartition + +// TODO:_ upper bould on message size = ??? There could be many requests! From d4f48a4fad9e71dabecc18a12ecb21cf85d37523 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 13 Feb 2026 14:00:48 +0300 Subject: [PATCH 12/15] add featureflag --- cloud/blockstore/config/storage.proto | 3 + cloud/blockstore/libs/storage/core/config.cpp | 2 + cloud/blockstore/libs/storage/core/config.h | 5 ++ .../partition/part_actor_compaction.cpp | 60 +++++++++++++------ 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/cloud/blockstore/config/storage.proto b/cloud/blockstore/config/storage.proto index fd5e8fd181d..571a1c9b904 100644 --- a/cloud/blockstore/config/storage.proto +++ b/cloud/blockstore/config/storage.proto @@ -1419,4 +1419,7 @@ message TStorageServiceConfig // Timeout for attach/detach path requests (in milliseconds). optional uint32 AttachDetachPathRequestTimeout = 480; + + // Execute separate compaction transaction for each range of the batch. + optional bool SplitTxInBatchCompactionEnabled = 481; } diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index d00e1a88bcd..16ac03a5bfd 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -209,6 +209,7 @@ NProto::TLinkedDiskFillBandwidth GetBandwidth( xxx(ForcedCompactionRangeCountPerRun, ui32, 1 )\ xxx(CompactionCountPerRunChangingPeriod, TDuration, Seconds(60) )\ xxx(BatchCompactionEnabled, bool, false )\ + xxx(SplitTxInBatchCompactionEnabled, bool, false )\ xxx(BlobPatchingEnabled, bool, false )\ /* If threshold is not 0, use it */ \ xxx(MaxDiffPercentageForBlobPatching, ui32, 0 )\ @@ -689,6 +690,7 @@ BLOCKSTORE_STORAGE_CONFIG(BLOCKSTORE_STORAGE_DECLARE_CONFIG) xxx(FreshChannelWriteRequests) \ xxx(MixedIndexCacheV1) \ xxx(BatchCompaction) \ + xxx(SplitTxInBatchCompactionEnabled) \ xxx(BlobPatching) \ xxx(UseRdma) \ xxx(ChangeThrottlingPolicy) \ diff --git a/cloud/blockstore/libs/storage/core/config.h b/cloud/blockstore/libs/storage/core/config.h index 68f6a638674..7f3187c8032 100644 --- a/cloud/blockstore/libs/storage/core/config.h +++ b/cloud/blockstore/libs/storage/core/config.h @@ -108,6 +108,7 @@ class TStorageConfig ui32 GetForcedCompactionRangeCountPerRun() const; TDuration GetCompactionCountPerRunChangingPeriod() const; bool GetBatchCompactionEnabled() const; + bool GetSplitTxInBatchCompactionEnabled() const; bool GetBlobPatchingEnabled() const; ui32 GetMaxDiffPercentageForBlobPatching() const; @@ -343,6 +344,10 @@ class TStorageConfig const TString& cloudId, const TString& folderId, const TString& diskId) const; + bool IsSplitTxInBatchCompactionFeatureEnabled( + const TString& cloudId, + const TString& folderId, + const TString& diskId) const; bool IsBlobPatchingFeatureEnabled( const TString& cloudId, const TString& folderId, diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 9050e8e9149..15810ef415c 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -229,35 +229,58 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) RequestInfo->CallContext->RequestId); // TODO:_ take desesion according to featureflag + const bool splitTxInBatchCompactionEnabledForDisk = + Config->IsSplitTxInBatchCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool splitTxInBatchCompactionEnabled = + Config->GetSplitTxInBatchCompactionEnabled() || + splitTxInBatchCompactionEnabledForDisk; - AwaitedCompactionTxCall = Ranges.size(); - - for (ui32 i = 0; i < Ranges.size(); ++i) { - auto request = std::make_unique( - CommitId, - i, // RangeCompactionIndex - CompactionOptions, - TVector>{std::move(Ranges[i])}); - - if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) { + auto forkTraces = [](TEvPartitionPrivate::TEvCompactionTxRequest& request) + { + // TODO:_ is it ok to fork just once? + if (!RequestInfo->CallContext->LWOrbit.Fork( + request->CallContext->LWOrbit)) + { LWTRACK( ForkFailed, RequestInfo->CallContext->LWOrbit, "TEvPartitionPrivate::TEvCompactionTxRequest", RequestInfo->CallContext->RequestId); } - request->CallContext->RequestId = RequestInfo->CallContext->RequestId; + request->CallContext->RequestId = RequestInfo->CallContext->RequestId; ForkedCompactionTxCallContexts.emplace_back(request->CallContext); + }; - NCloud::Send( - ctx, - Tablet, - std::move(request), - i); + if (splitTxInBatchCompactionEnabled) { + AwaitedCompactionTxCall = Ranges.size(); + + for (ui32 i = 0; i < Ranges.size(); ++i) { + auto request = std::make_unique( + CommitId, + i, // RangeCompactionIndex + CompactionOptions, + TVector>{std::move(Ranges[i])}); + + forkTraces(request); + NCloud::Send(ctx, Tablet, std::move(request), i /* Cookie */); + } + } else { + AwaitedCompactionTxCall = 1; + + auto request = std::make_unique( + CommitId, + 0, // RangeCompactionIndex + CompactionOptions, + std::move(Ranges)); + + forkTraces(request); + NCloud::Send(ctx, Tablet, std::move(request)); } - Ranges.clear(); } void TCompactionActor::ProcessRequests(const TActorContext& ctx) @@ -898,6 +921,9 @@ void TCompactionActor::HandleCompactionTxResponse( RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit); } + Ranges.clear(); + ForkedReadCallContexts.Clear(); + ProcessRequests(ctx); } From f04a18669d28d04bcc97f7cf3338749a3836da96 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Fri, 13 Feb 2026 18:12:15 +0300 Subject: [PATCH 13/15] fixes --- cloud/blockstore/libs/storage/core/config.cpp | 2 +- .../storage/partition/model/commit_queue.cpp | 1 - .../partition/part_actor_compaction.cpp | 101 ++++++++++-------- .../partition/part_actor_compactiontx.cpp | 24 ++--- .../libs/storage/partition/part_compaction.h | 14 ++- .../storage/partition/part_events_private.h | 20 ++-- .../libs/storage/partition/part_ut.cpp | 2 + 7 files changed, 89 insertions(+), 75 deletions(-) diff --git a/cloud/blockstore/libs/storage/core/config.cpp b/cloud/blockstore/libs/storage/core/config.cpp index 16ac03a5bfd..c7eb7f31f58 100644 --- a/cloud/blockstore/libs/storage/core/config.cpp +++ b/cloud/blockstore/libs/storage/core/config.cpp @@ -690,7 +690,7 @@ BLOCKSTORE_STORAGE_CONFIG(BLOCKSTORE_STORAGE_DECLARE_CONFIG) xxx(FreshChannelWriteRequests) \ xxx(MixedIndexCacheV1) \ xxx(BatchCompaction) \ - xxx(SplitTxInBatchCompactionEnabled) \ + xxx(SplitTxInBatchCompaction) \ xxx(BlobPatching) \ xxx(UseRdma) \ xxx(ChangeThrottlingPolicy) \ diff --git a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp index a40831eda0c..5a350f402fd 100644 --- a/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp +++ b/cloud/blockstore/libs/storage/partition/model/commit_queue.cpp @@ -7,7 +7,6 @@ namespace NCloud::NBlockStore::NStorage::NPartition { void TCommitQueue::Enqueue(TTxPtr tx, ui64 commitId) { if (Items) { - // TODO:_ Is <= instead of < ok? Y_ABORT_UNLESS(Items.back().CommitId <= commitId); } Items.emplace_back(commitId, std::move(tx)); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 15810ef415c..a492b419024 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace NCloud::NBlockStore::NStorage::NPartition { @@ -36,7 +37,7 @@ class TCompactionActor final : public TActorBootstrapped { public: - using TRequest = TCompactionBlobRequest; + using TRequest = TCompactionReadRequest; struct TBatchRequest { @@ -85,7 +86,9 @@ class TCompactionActor final const ui64 CommitId; const TCompactionOptions CompactionOptions; + const bool SplitTxInBatchCompaction = false; TVector> Ranges; + TVector ForkedCompactionTxCallContexts; ui32 AwaitedCompactionTxCall = 0; using InfoWithIndex = std::pair; @@ -125,6 +128,7 @@ class TCompactionActor final ECompactionType compactionType, ui64 commitId, TCompactionOptions CompactionOptions, + bool splitTxInBatchCompaction, TVector> ranges, TChildLogTitle logTitle); @@ -196,6 +200,7 @@ TCompactionActor::TCompactionActor( ECompactionType compactionType, ui64 commitId, TCompactionOptions compactionOptions, + bool splitTxInBatchCompaction, TVector> ranges, TChildLogTitle logTitle) : RequestInfo(std::move(requestInfo)) @@ -212,6 +217,7 @@ TCompactionActor::TCompactionActor( , LogTitle(std::move(logTitle)) , CommitId(commitId) , CompactionOptions(compactionOptions) + , SplitTxInBatchCompaction(splitTxInBatchCompaction) , Ranges(std::move(ranges)) {} @@ -219,7 +225,6 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) { TRequestScope timer(*RequestInfo); - // TODO:_ maybe split into two states? Become(&TThis::StateWork); LWTRACK( @@ -228,18 +233,38 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) "Compaction", RequestInfo->CallContext->RequestId); - // TODO:_ take desesion according to featureflag - const bool splitTxInBatchCompactionEnabledForDisk = - Config->IsSplitTxInBatchCompactionFeatureEnabled( - PartitionConfig.GetCloudId(), - PartitionConfig.GetFolderId(), - PartitionConfig.GetDiskId()); - const bool splitTxInBatchCompactionEnabled = - Config->GetSplitTxInBatchCompactionEnabled() || - splitTxInBatchCompactionEnabledForDisk; - - auto forkTraces = [](TEvPartitionPrivate::TEvCompactionTxRequest& request) + // TODO:_ remove it + // auto forkTraces = + // [&](std::unique_ptr + // request) + // { + // if (!RequestInfo->CallContext->LWOrbit.Fork( + // request->CallContext->LWOrbit)) + // { + // LWTRACK( + // ForkFailed, + // RequestInfo->CallContext->LWOrbit, + // "TEvPartitionPrivate::TEvCompactionTxRequest", + // RequestInfo->CallContext->RequestId); + // } + + // request->CallContext->RequestId = RequestInfo->CallContext->RequestId; + // ForkedCompactionTxCallContexts.emplace_back(request->CallContext); + // }; + + auto sendRequest = + [&](ui32 rangeCompactionIndex, + TVector> ranges) { + // TODO:_ is it ok? Send does not interrupts execution, doesn't it? + ++AwaitedCompactionTxCall; + + auto request = std::make_unique( + CommitId, + rangeCompactionIndex, + CompactionOptions, + std::move(ranges)); + // TODO:_ is it ok to fork just once? if (!RequestInfo->CallContext->LWOrbit.Fork( request->CallContext->LWOrbit)) @@ -253,34 +278,20 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) request->CallContext->RequestId = RequestInfo->CallContext->RequestId; ForkedCompactionTxCallContexts.emplace_back(request->CallContext); - }; - if (splitTxInBatchCompactionEnabled) { - AwaitedCompactionTxCall = Ranges.size(); + // TODO:_ seems we don't need cookie here + NCloud::Send(ctx, Tablet, std::move(request)); + }; + if (SplitTxInBatchCompaction) { for (ui32 i = 0; i < Ranges.size(); ++i) { - auto request = std::make_unique( - CommitId, - i, // RangeCompactionIndex - CompactionOptions, - TVector>{std::move(Ranges[i])}); - - forkTraces(request); - NCloud::Send(ctx, Tablet, std::move(request), i /* Cookie */); + sendRequest(i, TVector>{std::move(Ranges[i])}); } } else { - AwaitedCompactionTxCall = 1; - - auto request = std::make_unique( - CommitId, - 0, // RangeCompactionIndex - CompactionOptions, - std::move(Ranges)); - - forkTraces(request); - NCloud::Send(ctx, Tablet, std::move(request)); + sendRequest(0, std::move(Ranges)); } + Ranges.clear(); } void TCompactionActor::ProcessRequests(const TActorContext& ctx) @@ -862,7 +873,6 @@ void TCompactionActor::ReplyAndDie( //////////////////////////////////////////////////////////////////////////////// -// TODO:_ style: rearrange methods? void TCompactionActor::HandleCompactionTxResponse( const TEvPartitionPrivate::TEvCompactionTxResponse::TPtr& ev, const TActorContext& ctx) @@ -871,7 +881,6 @@ void TCompactionActor::HandleCompactionTxResponse( // TODO:_ exec cycles? - // TODO:_ ok to hangle error like this? if (HandleError(ctx, msg->GetError())) { return; } @@ -887,15 +896,14 @@ void TCompactionActor::HandleCompactionTxResponse( // msg->RangeCompactionInfos.end(), // std::back_insert_iterator>(RangeCompactionInfos)); std::move( - msg->CompactionRequests.begin(), - msg->CompactionRequests.end(), + msg->Requests.begin(), + msg->Requests.end(), std::back_insert_iterator>(Requests)); if (--AwaitedCompactionTxCall) { return; } - // TODO:_ use Sort // std::sort( // RangeCompactionInfosWithIndices.begin(), // RangeCompactionInfosWithIndices.end(), @@ -921,8 +929,7 @@ void TCompactionActor::HandleCompactionTxResponse( RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit); } - Ranges.clear(); - ForkedReadCallContexts.Clear(); + ForkedReadCallContexts.clear(); ProcessRequests(ctx); } @@ -1464,6 +1471,15 @@ void TPartitionActor::HandleCompaction( State->GetCleanupQueue().AcquireBarrier(commitId); State->GetGarbageQueue().AcquireBarrier(commitId); + const bool splitTxInBatchCompactionEnabledForDisk = + Config->IsSplitTxInBatchCompactionFeatureEnabled( + PartitionConfig.GetCloudId(), + PartitionConfig.GetFolderId(), + PartitionConfig.GetDiskId()); + const bool splitTxInBatchCompactionEnabled = + Config->GetSplitTxInBatchCompactionEnabled() || + splitTxInBatchCompactionEnabledForDisk; + auto actor = NCloud::Register( ctx, requestInfo, @@ -1479,6 +1495,7 @@ void TPartitionActor::HandleCompaction( compactionType, commitId, msg->CompactionOptions, + splitTxInBatchCompactionEnabled, std::move(ranges), LogTitle.GetChild(GetCycleCount())); LOG_DEBUG( @@ -1595,5 +1612,3 @@ void TPartitionActor::HandleCompactionCompleted( } } // namespace NCloud::NBlockStore::NStorage::NPartition - -// TODO:_ upper bould on message size = ??? There could be many requests! diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 36b8f6195f2..1d47da7b83a 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -223,10 +223,10 @@ void CompleteRangeCompaction( TTabletStorageInfo& tabletStorageInfo, TPartitionState& state, TTxPartition::TRangeCompaction& args, - TVector& requests, + TVector& requests, TVector& rangeCompactionInfos, ui32 maxDiffPercentageForBlobPatching, - ui32 rangeCompactionIndex) + ui32 rangeCompactionIndex) // TODO:_ ? { const EChannelPermissions compactionPermissions = EChannelPermission::SystemWritesAllowed; @@ -570,14 +570,13 @@ void TPartitionActor::CompleteCompaction( State->RaiseRangeTemperature(rangeCompaction.RangeIdx); } - // TODO:_ rename ...ForCloud -> ...ForDisk ??? - const bool blobPatchingEnabledForCloud = + const bool blobPatchingEnabledForDisk = Config->IsBlobPatchingFeatureEnabled( PartitionConfig.GetCloudId(), PartitionConfig.GetFolderId(), PartitionConfig.GetDiskId()); const bool blobPatchingEnabled = - Config->GetBlobPatchingEnabled() || blobPatchingEnabledForCloud; + Config->GetBlobPatchingEnabled() || blobPatchingEnabledForDisk; const auto mergedBlobThreshold = PartitionConfig.GetStorageMediaKind() == @@ -586,20 +585,20 @@ void TPartitionActor::CompleteCompaction( : Config->GetCompactionMergedBlobThresholdHDD(); TVector rangeCompactionInfos; - TVector requests; + TVector requests; - for (auto& rangeCompaction: args.RangeCompactions) { + for (ui32 i = 0; i < args.RangeCompactions.size(); ++i) { CompleteRangeCompaction( blobPatchingEnabled, mergedBlobThreshold, args.CommitId, *Info(), *State, - rangeCompaction, + args.RangeCompactions[i], requests, rangeCompactionInfos, Config->GetMaxDiffPercentageForBlobPatching(), - args.RangeCompactionIndex + static_cast(rangeCompactionInfos.size())); + args.RangeCompactionIndex + i); // TODO:_ ok? if (rangeCompactionInfos.back().OriginalBlobId) { LOG_DEBUG( @@ -615,13 +614,6 @@ void TPartitionActor::CompleteCompaction( // TODO:_ or to it in the beginning of the Complete... function? TRequestScope timer(*args.RequestInfo); - // auto response = - // std::make_unique(); - // auto response = - // std::make_unique( - // MakeError(S_OK)); - // response->Record.SetRangeCompactionInfos(std::move(rangeCompactionInfos)); - // response->Record.SetCompactionRequests(std::move(requests)); auto response = std::make_unique( std::move(rangeCompactionInfos), diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h index 4a11def3a8f..44560f7375d 100644 --- a/cloud/blockstore/libs/storage/partition/part_compaction.h +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -75,10 +75,7 @@ struct TRangeCompactionInfo {} }; -//////////////////////////////////////////////////////////////////////////////// - -// TODO:_ naming ??? -struct TCompactionBlobRequest +struct TCompactionReadRequest { NCloud::TPartialBlobId BlobId; NActors::TActorId Proxy; @@ -88,7 +85,7 @@ struct TCompactionBlobRequest ui32 GroupId; ui32 RangeCompactionIndex; - TCompactionBlobRequest( + TCompactionReadRequest( const NCloud::TPartialBlobId& blobId, const NActors::TActorId& proxy, ui16 blobOffset, @@ -106,4 +103,11 @@ struct TCompactionBlobRequest {} }; +struct TCompactionRange { + const ui32 RangeIdx; + const ui32 RangeCompactionIndex; // Index of the range in its batch for + // batch compaction. + TBlockRange32 BlockRange; +}; + } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 68d7a5dc915..64c26fe3f03 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -470,11 +470,13 @@ struct TEvPartitionPrivate const ui64 CommitId; const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; - // TODO:_ coulb be just range, not vercor of ranges? - // TODO:_ but if so, then we lose the original approash with single transaction. - TVector> Ranges; + TVector> Ranges; // TODO:_ use using? - TCompactionTxRequest( + // TVector Rangesaaa; + + TCompactionTxRequest() = default; // TODO:_ do we need it? + + TCompactionTxRequest( // TODO:_ do we need it? ui64 commitId, ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, @@ -489,15 +491,15 @@ struct TEvPartitionPrivate struct TCompactionTxResponse { TVector RangeCompactionInfos; - TVector CompactionRequests; + TVector Requests; - TCompactionTxResponse() = default; + TCompactionTxResponse() = default; // TODO:_ do we need it? - TCompactionTxResponse( + TCompactionTxResponse( // TODO:_ do we need it? TVector rangeCompactionInfos, - TVector compactionRequests) + TVector requests) : RangeCompactionInfos(std::move(rangeCompactionInfos)) - , CompactionRequests(std::move(compactionRequests)) + , Requests(std::move(requests)) {} }; diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index 1743596bff7..b9e439dff67 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -13098,3 +13098,5 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } // namespace NCloud::NBlockStore::NStorage::NPartition + +// TODO:_ add tests!!! From 6ec6f2f046a2ea09eeaef88e0e1ff58c428279e0 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Mon, 16 Feb 2026 13:25:27 +0300 Subject: [PATCH 14/15] fixes --- .../partition/part_actor_compaction.cpp | 102 +++++++----------- .../partition/part_actor_compactiontx.cpp | 44 ++++---- .../libs/storage/partition/part_compaction.h | 14 ++- .../storage/partition/part_events_private.h | 7 +- .../libs/storage/partition/part_tx.h | 22 ++-- 5 files changed, 87 insertions(+), 102 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index a492b419024..a07d64794cc 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -87,12 +87,10 @@ class TCompactionActor final const TCompactionOptions CompactionOptions; const bool SplitTxInBatchCompaction = false; - TVector> Ranges; + TVector Ranges; TVector ForkedCompactionTxCallContexts; ui32 AwaitedCompactionTxCall = 0; - using InfoWithIndex = std::pair; - TVector RangeCompactionInfosWithIndices; // TODO:_ do it normally TVector RangeCompactionInfos; TVector Requests; @@ -129,7 +127,7 @@ class TCompactionActor final ui64 commitId, TCompactionOptions CompactionOptions, bool splitTxInBatchCompaction, - TVector> ranges, + TVector ranges, TChildLogTitle logTitle); void Bootstrap(const TActorContext& ctx); @@ -201,7 +199,7 @@ TCompactionActor::TCompactionActor( ui64 commitId, TCompactionOptions compactionOptions, bool splitTxInBatchCompaction, - TVector> ranges, + TVector ranges, TChildLogTitle logTitle) : RequestInfo(std::move(requestInfo)) , TabletId(tabletId) @@ -233,30 +231,10 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) "Compaction", RequestInfo->CallContext->RequestId); - // TODO:_ remove it - // auto forkTraces = - // [&](std::unique_ptr - // request) - // { - // if (!RequestInfo->CallContext->LWOrbit.Fork( - // request->CallContext->LWOrbit)) - // { - // LWTRACK( - // ForkFailed, - // RequestInfo->CallContext->LWOrbit, - // "TEvPartitionPrivate::TEvCompactionTxRequest", - // RequestInfo->CallContext->RequestId); - // } - - // request->CallContext->RequestId = RequestInfo->CallContext->RequestId; - // ForkedCompactionTxCallContexts.emplace_back(request->CallContext); - // }; - auto sendRequest = [&](ui32 rangeCompactionIndex, - TVector> ranges) + TVector ranges) { - // TODO:_ is it ok? Send does not interrupts execution, doesn't it? ++AwaitedCompactionTxCall; auto request = std::make_unique( @@ -265,7 +243,6 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) CompactionOptions, std::move(ranges)); - // TODO:_ is it ok to fork just once? if (!RequestInfo->CallContext->LWOrbit.Fork( request->CallContext->LWOrbit)) { @@ -279,13 +256,12 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) request->CallContext->RequestId = RequestInfo->CallContext->RequestId; ForkedCompactionTxCallContexts.emplace_back(request->CallContext); - // TODO:_ seems we don't need cookie here NCloud::Send(ctx, Tablet, std::move(request)); }; if (SplitTxInBatchCompaction) { for (ui32 i = 0; i < Ranges.size(); ++i) { - sendRequest(i, TVector>{std::move(Ranges[i])}); + sendRequest(i, TVector{std::move(Ranges[i])}); } } else { sendRequest(0, std::move(Ranges)); @@ -387,6 +363,16 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx) [makeTie](const TRequest* l, const TRequest* r) { return makeTie(l) < makeTie(r); }); + TVector infos(Reserve(RangeCompactionInfos.size())); + for (auto& info: RangeCompactionInfos) { + infos.push_back(&info); + } + + Sort( + infos, + [](const TRangeCompactionInfo* l, const TRangeCompactionInfo* r) + { return l->RangeCompactionIndex < r->RangeCompactionIndex; }); + TBatchRequest current; ui32 currentRangeCompactionIndex = 0; for (auto* r: requests) { @@ -413,7 +399,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx) current.Proxy = r->Proxy; currentRangeCompactionIndex = r->RangeCompactionIndex; current.RangeCompactionInfo = - &RangeCompactionInfos[r->RangeCompactionIndex]; + infos[r->RangeCompactionIndex]; } if (current.BlobId == current.RangeCompactionInfo->OriginalBlobId) { @@ -885,16 +871,11 @@ void TCompactionActor::HandleCompactionTxResponse( return; } - ui32 batchIndex = ev->Cookie; - for (ui32 i = 0; i < msg->RangeCompactionInfos.size(); ++i) { - RangeCompactionInfosWithIndices.emplace_back( - std::move(msg->RangeCompactionInfos[i]), - batchIndex + i); - } - // std::move( - // msg->RangeCompactionInfos.begin(), - // msg->RangeCompactionInfos.end(), - // std::back_insert_iterator>(RangeCompactionInfos)); + std::move( + msg->RangeCompactionInfos.begin(), + msg->RangeCompactionInfos.end(), + std::back_insert_iterator>( + RangeCompactionInfos)); std::move( msg->Requests.begin(), msg->Requests.end(), @@ -904,24 +885,22 @@ void TCompactionActor::HandleCompactionTxResponse( return; } - // std::sort( - // RangeCompactionInfosWithIndices.begin(), - // RangeCompactionInfosWithIndices.end(), - // [](const InfoWithIndex& l, const InfoWithIndex& r) - // { return l.sedond < r.second; }); - TVector infos; - for (auto& info: RangeCompactionInfosWithIndices) { - infos.push_back(&info); - } - Sort( - infos, - [](const InfoWithIndex* l, const InfoWithIndex* r) - { return l->second < r->second; }); - - for (ui32 i = 0; i < infos.size(); ++i) { - RangeCompactionInfos.push_back( - std::move(infos[i]->first)); - } + // TODO:_ remove + // TVector infos; + // infos.Reserve(RangeCompactionInfos.size()) + // for (auto& info: RangeCompactionInfos) { + // infos.push_back(&info); + // } + // Sort( + // RangeCompactionInfos, + // [](const TRangeCompactionInfo& l, const TRangeCompactionInfo& r) + // { return l->RangeCompactionIndex < r->RangeCompactionIndex; }); + + // TODO:_ remove + // for (ui32 i = 0; i < infos.size(); ++i) { + // RangeCompactionInfos.push_back( + // std::move(infos[i]->first)); + // } // TODO:_ exec cycles @@ -1437,8 +1416,9 @@ void TPartitionActor::HandleCompaction( return; } - TVector> ranges(Reserve(tops.size())); - for (const auto& x: tops) { + TVector ranges(Reserve(tops.size())); + for (ui32 i = 0; i < tops.size(); ++i) { + const auto& x = tops[i]; const ui32 rangeIdx = cm.GetRangeIndex(x.BlockIndex); const auto blockRange = TBlockRange32::MakeClosedIntervalWithLimit( @@ -1462,7 +1442,7 @@ void TPartitionActor::HandleCompaction( x.Stat.ReadRequestBlockCount, x.Stat.CompactionScore.Score); - ranges.emplace_back(rangeIdx, blockRange); + ranges.emplace_back(rangeIdx, blockRange, i); } State->GetCompactionState(compactionType).SetStatus(EOperationStatus::Started); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 1d47da7b83a..84122d7f883 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -86,13 +86,13 @@ void PrepareRangeCompaction( const TString& logTitle) { TCompactionBlockVisitor visitor(args, commitId); - state.FindFreshBlocks(visitor, args.BlockRange, commitId); + state.FindFreshBlocks(visitor, args.Range.BlockRange, commitId); visitor.KeepTrackOfAffectedBlocks = true; - ready &= state.FindMixedBlocksForCompaction(db, visitor, args.RangeIdx); + ready &= state.FindMixedBlocksForCompaction(db, visitor, args.Range.RangeIdx); visitor.KeepTrackOfAffectedBlocks = false; ready &= db.FindMergedBlocks( visitor, - args.BlockRange, + args.Range.BlockRange, true, // precharge state.GetMaxBlocksInBlob(), commitId); @@ -193,7 +193,7 @@ void PrepareRangeCompaction( const ui32 checksumBoundary = config.GetDiskPrefixLengthWithBlockChecksumsInBlobs() / state.GetBlockSize(); - args.ChecksumsEnabled = args.BlockRange.Start < checksumBoundary; + args.ChecksumsEnabled = args.Range.BlockRange.Start < checksumBoundary; for (auto& kv: args.AffectedBlobs) { if (db.ReadBlockMask(kv.first, kv.second.BlockMask)) { @@ -225,8 +225,7 @@ void CompleteRangeCompaction( TTxPartition::TRangeCompaction& args, TVector& requests, TVector& rangeCompactionInfos, - ui32 maxDiffPercentageForBlobPatching, - ui32 rangeCompactionIndex) // TODO:_ ? + ui32 maxDiffPercentageForBlobPatching) { const EChannelPermissions compactionPermissions = EChannelPermission::SystemWritesAllowed; @@ -264,7 +263,7 @@ void CompleteRangeCompaction( } } - const auto blobSize = (args.BlockRange.Size() - skipped) * state.GetBlockSize(); + const auto blobSize = (args.Range.BlockRange.Size() - skipped) * state.GetBlockSize(); if (blobSize < mergedBlobThreshold) { channelDataKind = EChannelDataKind::Mixed; } @@ -273,7 +272,7 @@ void CompleteRangeCompaction( compactionPermissions, commitId, blobSize, - rangeCompactionIndex); + args.Range.RangeCompactionIndex); } if (zeroBlocksCount) { @@ -288,7 +287,7 @@ void CompleteRangeCompaction( compactionPermissions, commitId, 0, - rangeCompactionIndex); + args.Range.RangeCompactionIndex); } // now build the blob content for all blocks to be written @@ -296,7 +295,7 @@ void CompleteRangeCompaction( TVector blockChecksums; TVector zeroBlocks; - ui32 blockIndex = args.BlockRange.Start; + ui32 blockIndex = args.Range.BlockRange.Start; TPartialBlobId patchingCandidate; ui32 patchingCandidateChangedBlockCount = 0; for (auto& mark: args.BlockMarks) { @@ -310,7 +309,7 @@ void CompleteRangeCompaction( blockIndex, blobContent.GetBlocksCount(), 0, - rangeCompactionIndex); + args.Range.RangeCompactionIndex); // fresh block will be written blobContent.AddBlock({ @@ -324,7 +323,7 @@ void CompleteRangeCompaction( } if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); } } else if (!IsDeletionMarker(mark.BlobId)) { const auto proxy = tabletStorageInfo.BSProxyIDForChannel( @@ -340,7 +339,7 @@ void CompleteRangeCompaction( tabletStorageInfo.GroupFor( mark.BlobId.Channel(), mark.BlobId.Generation()), - rangeCompactionIndex); + args.Range.RangeCompactionIndex); // we will read this block later blobContent.AddBlock(state.GetBlockSize(), char(0)); @@ -364,7 +363,7 @@ void CompleteRangeCompaction( } if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); } if (blobPatchingEnabled) { @@ -378,15 +377,15 @@ void CompleteRangeCompaction( } } } else { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + dataBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); zeroBlocks.push_back(blockIndex); } } else { if (dataBlobId) { - dataBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + dataBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); } if (zeroBlobId) { - zeroBlobSkipMask.Set(blockIndex - args.BlockRange.Start); + zeroBlobSkipMask.Set(blockIndex - args.Range.BlockRange.Start); } } @@ -447,7 +446,7 @@ void CompleteRangeCompaction( } rangeCompactionInfos.emplace_back( - args.BlockRange, + args.Range.BlockRange, patchingCandidate, dataBlobId, dataBlobSkipMask, @@ -457,13 +456,14 @@ void CompleteRangeCompaction( args.BlocksSkipped, std::move(blockChecksums), channelDataKind, + args.Range.RangeCompactionIndex, std::move(blobContent), std::move(zeroBlocks), std::move(args.AffectedBlobs), std::move(args.AffectedBlocks)); if (!dataBlobId && !zeroBlobId) { - const auto rangeDescr = DescribeRange(args.BlockRange); + const auto rangeDescr = DescribeRange(args.Range.BlockRange); Y_ABORT("No blocks in compacted range: %s", rangeDescr.c_str()); } Y_ABORT_UNLESS(requests.size() - initialRequestsSize == dataBlocksCount); @@ -497,7 +497,6 @@ void TPartitionActor::HandleCompactionTx( auto tx = CreateTx( requestInfo, msg->CommitId, - msg->RangeCompactionIndex, msg->CompactionOptions, std::move(msg->Ranges)); @@ -567,7 +566,7 @@ void TPartitionActor::CompleteCompaction( RemoveTransaction(*args.RequestInfo); for (auto& rangeCompaction: args.RangeCompactions) { - State->RaiseRangeTemperature(rangeCompaction.RangeIdx); + State->RaiseRangeTemperature(rangeCompaction.Range.RangeIdx); } const bool blobPatchingEnabledForDisk = @@ -597,8 +596,7 @@ void TPartitionActor::CompleteCompaction( args.RangeCompactions[i], requests, rangeCompactionInfos, - Config->GetMaxDiffPercentageForBlobPatching(), - args.RangeCompactionIndex + i); // TODO:_ ok? + Config->GetMaxDiffPercentageForBlobPatching()); if (rangeCompactionInfos.back().OriginalBlobId) { LOG_DEBUG( diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h index 44560f7375d..d28450ca91a 100644 --- a/cloud/blockstore/libs/storage/partition/part_compaction.h +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -34,6 +34,7 @@ struct TRangeCompactionInfo const ui32 BlocksSkippedByCompaction; const TVector BlockChecksums; const EChannelDataKind ChannelDataKind; + const ui32 RangeCompactionIndex; TGuardedBuffer BlobContent; TVector ZeroBlocks; @@ -54,6 +55,7 @@ struct TRangeCompactionInfo ui32 blocksSkippedByCompaction, TVector blockChecksums, EChannelDataKind channelDataKind, + ui32 rangeCompactionIndex, TBlockBuffer blobContent, TVector zeroBlocks, TAffectedBlobs affectedBlobs, @@ -68,6 +70,7 @@ struct TRangeCompactionInfo , BlocksSkippedByCompaction(blocksSkippedByCompaction) , BlockChecksums(std::move(blockChecksums)) , ChannelDataKind(channelDataKind) + , RangeCompactionIndex(rangeCompactionIndex) , BlobContent(std::move(blobContent)) , ZeroBlocks(std::move(zeroBlocks)) , AffectedBlobs(std::move(affectedBlobs)) @@ -105,9 +108,18 @@ struct TCompactionReadRequest struct TCompactionRange { const ui32 RangeIdx; + TBlockRange32 BlockRange; const ui32 RangeCompactionIndex; // Index of the range in its batch for // batch compaction. - TBlockRange32 BlockRange; + + TCompactionRange( + ui32 rangeIdx, + TBlockRange32 blockRange, + ui32 rangeCompactionIndex) + : RangeIdx(rangeIdx) + , BlockRange(blockRange) + , RangeCompactionIndex(rangeCompactionIndex) + {} }; } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index 64c26fe3f03..ea978927a64 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -470,9 +470,8 @@ struct TEvPartitionPrivate const ui64 CommitId; const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; - TVector> Ranges; // TODO:_ use using? - - // TVector Rangesaaa; + TVector Ranges; + // TVector> Ranges; // TODO:_ use using? TCompactionTxRequest() = default; // TODO:_ do we need it? @@ -480,7 +479,7 @@ struct TEvPartitionPrivate ui64 commitId, ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, - TVector> ranges) + TVector ranges) : CommitId(commitId) , RangeCompactionIndex(rangeCompactionIndex) , CompactionOptions(compactionOptions) diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index 9c837d84499..6eef2389ee0 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -2,6 +2,7 @@ #include "public.h" +#include "part_compaction.h" #include "part_events_private.h" #include @@ -354,8 +355,7 @@ struct TTxPartition struct TRangeCompaction { - const ui32 RangeIdx; - const TBlockRange32 BlockRange; + const TCompactionRange Range; struct TBlockMark { @@ -372,10 +372,9 @@ struct TTxPartition ui32 BlocksSkipped = 0; bool ChecksumsEnabled = false; - TRangeCompaction(ui32 rangeIdx, const TBlockRange32& blockRange) - : RangeIdx(rangeIdx) - , BlockRange(blockRange) - , BlockMarks(blockRange.Size()) + TRangeCompaction(const TCompactionRange& range) + : Range(range) + , BlockMarks(range.BlockRange.Size()) {} void Clear() @@ -390,8 +389,8 @@ struct TTxPartition TBlockMark& GetBlockMark(ui32 blockIndex) { - Y_DEBUG_ABORT_UNLESS(BlockRange.Contains(blockIndex)); - return BlockMarks[blockIndex - BlockRange.Start]; + Y_DEBUG_ABORT_UNLESS(Range.BlockRange.Contains(blockIndex)); + return BlockMarks[blockIndex - Range.BlockRange.Start]; } void MarkBlock( @@ -437,7 +436,6 @@ struct TTxPartition { const TRequestInfoPtr RequestInfo; const ui64 CommitId; - const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; TVector RangeCompactions; @@ -445,17 +443,15 @@ struct TTxPartition TCompaction( TRequestInfoPtr requestInfo, ui64 commitId, - ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, - const TVector>& ranges) + const TVector& ranges) : RequestInfo(std::move(requestInfo)) , CommitId(commitId) - , RangeCompactionIndex(rangeCompactionIndex) , CompactionOptions(compactionOptions) { RangeCompactions.reserve(ranges.size()); for (const auto& range: ranges) { - RangeCompactions.emplace_back(range.first, range.second); + RangeCompactions.emplace_back(range); } } From 68a7a305f7319c5bbbdd2d7d550809da584e8866 Mon Sep 17 00:00:00 2001 From: gayurgin Date: Mon, 16 Feb 2026 13:44:05 +0300 Subject: [PATCH 15/15] fixes, add tests --- .../partition/part_actor_compaction.cpp | 24 ---- .../partition/part_actor_compactiontx.cpp | 4 - .../libs/storage/partition/part_compaction.h | 6 +- .../storage/partition/part_events_private.h | 11 +- .../libs/storage/partition/part_ut.cpp | 106 ++++++++++++++---- 5 files changed, 94 insertions(+), 57 deletions(-) diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index a07d64794cc..0773f104a6e 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -272,9 +272,6 @@ void TCompactionActor::Bootstrap(const TActorContext& ctx) void TCompactionActor::ProcessRequests(const TActorContext& ctx) { - // TODO:_ TRequestScope timer(*RequestInfo) here? - // TODO:_ lwtrack here? - if (Requests) { ReadBlocks(ctx); @@ -865,8 +862,6 @@ void TCompactionActor::HandleCompactionTxResponse( { auto* msg = ev->Get(); - // TODO:_ exec cycles? - if (HandleError(ctx, msg->GetError())) { return; } @@ -885,25 +880,6 @@ void TCompactionActor::HandleCompactionTxResponse( return; } - // TODO:_ remove - // TVector infos; - // infos.Reserve(RangeCompactionInfos.size()) - // for (auto& info: RangeCompactionInfos) { - // infos.push_back(&info); - // } - // Sort( - // RangeCompactionInfos, - // [](const TRangeCompactionInfo& l, const TRangeCompactionInfo& r) - // { return l->RangeCompactionIndex < r->RangeCompactionIndex; }); - - // TODO:_ remove - // for (ui32 i = 0; i < infos.size(); ++i) { - // RangeCompactionInfos.push_back( - // std::move(infos[i]->first)); - // } - - // TODO:_ exec cycles - for (auto context: ForkedReadCallContexts) { RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit); } diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp index 84122d7f883..a0eb90ffb2f 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactiontx.cpp @@ -609,7 +609,6 @@ void TPartitionActor::CompleteCompaction( } } - // TODO:_ or to it in the beginning of the Complete... function? TRequestScope timer(*args.RequestInfo); auto response = @@ -617,9 +616,6 @@ void TPartitionActor::CompleteCompaction( std::move(rangeCompactionInfos), std::move(requests)); - // TODO:_ ExecCycles? - // response->ExecCycles = args.RequestInfo->GetExecCycles(); - LWTRACK( ResponseSent_Partition, args.RequestInfo->CallContext->LWOrbit, diff --git a/cloud/blockstore/libs/storage/partition/part_compaction.h b/cloud/blockstore/libs/storage/partition/part_compaction.h index d28450ca91a..8ce08c0d30c 100644 --- a/cloud/blockstore/libs/storage/partition/part_compaction.h +++ b/cloud/blockstore/libs/storage/partition/part_compaction.h @@ -1,7 +1,7 @@ #pragma once #include -#include // TODO:_ ??? +#include #include #include @@ -10,8 +10,8 @@ #include #include -#include // TODO:_ ??? -#include // TODO:_ ??? +#include +#include #include #include diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index ea978927a64..319ef57c388 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -1,6 +1,6 @@ #pragma once -#include "part_compaction.h" // TODO:_ is it ok? Should we move it to model? Or rearrange dependencies? +#include "part_compaction.h" #include "public.h" #include @@ -471,11 +471,10 @@ struct TEvPartitionPrivate const ui32 RangeCompactionIndex; const TCompactionOptions CompactionOptions; TVector Ranges; - // TVector> Ranges; // TODO:_ use using? - TCompactionTxRequest() = default; // TODO:_ do we need it? + TCompactionTxRequest() = default; - TCompactionTxRequest( // TODO:_ do we need it? + TCompactionTxRequest( ui64 commitId, ui32 rangeCompactionIndex, TCompactionOptions compactionOptions, @@ -492,9 +491,9 @@ struct TEvPartitionPrivate TVector RangeCompactionInfos; TVector Requests; - TCompactionTxResponse() = default; // TODO:_ do we need it? + TCompactionTxResponse() = default; - TCompactionTxResponse( // TODO:_ do we need it? + TCompactionTxResponse( TVector rangeCompactionInfos, TVector requests) : RangeCompactionInfos(std::move(rangeCompactionInfos)) diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index b9e439dff67..159098d1864 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -349,8 +349,8 @@ auto InitTestActorRuntime( void InitLogSettings(TTestActorRuntime& runtime) { for (ui32 i = TBlockStoreComponents::START; i < TBlockStoreComponents::END; ++i) { - // runtime.SetLogPriority(i, NLog::PRI_INFO); - runtime.SetLogPriority(i, NLog::PRI_DEBUG); + runtime.SetLogPriority(i, NLog::PRI_INFO); + // runtime.SetLogPriority(i, NLog::PRI_DEBUG); } // runtime.SetLogPriority(NLog::InvalidComponent, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::BS_NODE, NLog::PRI_ERROR); @@ -8443,11 +8443,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT(suicideHappened); } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponCompaction) + void DoShouldProcessMultipleRangesUponCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetCompactionRangeCountPerRun(3); config.SetSSDMaxBlobsPerRange(999); config.SetHDDMaxBlobsPerRange(999); @@ -8542,6 +8545,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } + Y_UNIT_TEST(ShouldProcessMultipleRangesUponCompaction) + { + DoShouldProcessMultipleRangesUponCompaction(true); + DoShouldProcessMultipleRangesUponCompaction(false); + } + Y_UNIT_TEST(ShouldPatchBlobsDuringCompaction) { auto config = DefaultConfig(); @@ -10006,11 +10015,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) ui32 increasingPercentageThreshold, ui32 decreasingPercentageThreshold, ui32 compactionRangeCountPerRun, - ui32 maxCompactionRangeCountPerRun = 10) + ui32 maxCompactionRangeCountPerRun = 10, + bool splitTxInBatchCompactionEnabled = true) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(diskGarbageThreshold); config.SetCompactionRangeGarbageThreshold(rangeGarbageThreshold); @@ -10107,7 +10119,9 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // blobs percentage: (10 - 9 / 10) * 100 = 10 // 10 < 100, so batch size should decrement CheckIncrementAndDecrementCompactionPerRun( - 2, 9, 999999, 999999, 99999, 7, 200, 100, 1); + 2, 9, 999999, 999999, 99999, 7, 200, 100, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 9, 999999, 999999, 99999, 7, 200, 100, 1, 10, false); } Y_UNIT_TEST(ShouldChangeBatchSizeDueToBlocksPerDisk) @@ -10120,11 +10134,15 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // 9 > 8, so should increment and compact 2 ranges CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 203, 99999, 5, 8, 5, 2); + 1, 1000, 99999, 203, 99999, 5, 8, 5, 2, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 203, 99999, 5, 8, 5, 2, 10, false); // 9 < 15, so should decrement and compact only 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 99999, 203, 99999, 7, 30, 15, 1); + 2, 1000, 99999, 203, 99999, 7, 30, 15, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 99999, 203, 99999, 7, 30, 15, 1, 10, false); } Y_UNIT_TEST(ShouldChangeBatchSizeDueToBlocksPerRangeCount) @@ -10136,29 +10154,40 @@ Y_UNIT_TEST_SUITE(TPartitionTest) // 7 > 6, so should increment and compact 2 ranges CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 9999, 280, 5, 6, 4, 2); + 1, 1000, 99999, 9999, 280, 5, 6, 4, 2, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 9999, 280, 5, 6, 4, 2, 10, false); // 7 > 6, but should compact only 1 range due to maxRangeCountPerRun CheckIncrementAndDecrementCompactionPerRun( - 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1); + 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1, true); + CheckIncrementAndDecrementCompactionPerRun( + 1, 1000, 99999, 9999, 280, 7, 6, 4, 1, 1, false); // 7 < 8, so should decrement and compact only 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 99999, 9999, 280, 7, 30, 8, 1); + 2, 1000, 99999, 9999, 280, 7, 30, 8, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 99999, 9999, 280, 7, 30, 8, 1, 10, false); } Y_UNIT_TEST(ShouldDecrementBatchSizeWhenBlobsPerRangeCountIsSmall) { // CompactionScore in range3: 4 - 4 + eps // 100 * (eps - 0) / 1024 < 3, so should decrement and compact 1 range CheckIncrementAndDecrementCompactionPerRun( - 2, 1000, 4, 9999, 9999, 7, 50, 10, 1); + 2, 1000, 4, 9999, 9999, 7, 50, 10, 1, 10, true); + CheckIncrementAndDecrementCompactionPerRun( + 2, 1000, 4, 9999, 9999, 7, 50, 10, 1, 10, false); } - Y_UNIT_TEST(ShouldRespectCompactionCountPerRunChangingPeriod) + void DoShouldRespectCompactionCountPerRunChangingPeriod( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetWriteBlobThreshold(1_MB); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(99999); config.SetCompactionRangeGarbageThreshold(280); @@ -10265,6 +10294,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } + Y_UNIT_TEST(ShouldRespectCompactionCountPerRunChangingPeriod) + { + DoShouldRespectCompactionCountPerRunChangingPeriod(true); + DoShouldRespectCompactionCountPerRunChangingPeriod(false); + } + template void DoShouldReportLongRunningBlobOperations( FSend sendRequest, @@ -11237,10 +11272,13 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT_VALUES_EQUAL(E_TRY_AGAIN, response->GetStatus()); } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponGarbageCompaction) + void DoShouldProcessMultipleRangesUponGarbageCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetGarbageCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(true); config.SetCompactionGarbageThreshold(20); @@ -11336,10 +11374,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - Y_UNIT_TEST(ShouldProcessMultipleRangesUponForceCompaction) + Y_UNIT_TEST(ShouldProcessMultipleRangesUponGarbageCompaction) + { + DoShouldProcessMultipleRangesUponGarbageCompaction(true); + DoShouldProcessMultipleRangesUponGarbageCompaction(false); + } + + void DoShouldProcessMultipleRangesUponForceCompaction( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(false); @@ -11440,10 +11487,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - Y_UNIT_TEST(ShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges) + Y_UNIT_TEST(ShouldProcessMultipleRangesUponForceCompaction) + { + DoShouldProcessMultipleRangesUponForceCompaction(true); + DoShouldProcessMultipleRangesUponForceCompaction(false); + } + + void DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges( + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(3); config.SetV1GarbageCompactionEnabled(false); @@ -11473,6 +11529,12 @@ Y_UNIT_TEST_SUITE(TPartitionTest) UNIT_ASSERT_EQUAL(5, compactionStatus->Record.GetTotal()); } + Y_UNIT_TEST(ShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges) + { + DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges(true); + DoShouldSkipEmptyRangesUponForcedCompactionWithMultipleRanges(false); + } + Y_UNIT_TEST(ShouldBatchSmallWritesToFreshChannelIfThresholdNotExceeded) { NProto::TStorageServiceConfig config; @@ -11838,10 +11900,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - void TestForcedCompaction(ui32 rangesPerRun) + void TestForcedCompaction( + ui32 rangesPerRun, + bool splitTxInBatchCompactionEnabled) { auto config = DefaultConfig(); config.SetBatchCompactionEnabled(true); + config.SetSplitTxInBatchCompactionEnabled( + splitTxInBatchCompactionEnabled); config.SetForcedCompactionRangeCountPerRun(rangesPerRun); config.SetV1GarbageCompactionEnabled(false); config.SetWriteBlobThreshold(15_KB); @@ -11960,12 +12026,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) Y_UNIT_TEST(ShouldCompactSeveralBlobsInTheSameRangeWithOneRangePerRun) { - TestForcedCompaction(1); + TestForcedCompaction(1, true); + TestForcedCompaction(1, false); } Y_UNIT_TEST(ShouldCompactSeveralBlobsInTheSameRangeWithSeveralRangesPerRun) { - TestForcedCompaction(10); + TestForcedCompaction(10, true); + TestForcedCompaction(10, false); } Y_UNIT_TEST(ShouldWriteToMixedChannelOnHddIfThresholdExceeded) @@ -13098,5 +13166,3 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } // namespace NCloud::NBlockStore::NStorage::NPartition - -// TODO:_ add tests!!!