diff --git a/cloud/blockstore/libs/storage/partition/part_actor.cpp b/cloud/blockstore/libs/storage/partition/part_actor.cpp index 9f912610fb6..e898a652afd 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor.cpp @@ -1087,7 +1087,13 @@ STFUNC(TPartitionActor::StateWork) IOCompanion->HandlePatchBlobCompleted); HFunc(TEvPartitionPrivate::TEvReadBlocksCompleted, HandleReadBlocksCompleted); HFunc(TEvPartitionPrivate::TEvWriteBlocksCompleted, HandleWriteBlocksCompleted); + HFunc( + TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted, + HandleWriteFreshBlocksCompleted); HFunc(TEvPartitionPrivate::TEvZeroBlocksCompleted, HandleZeroBlocksCompleted); + HFunc( + TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted, + HandleZeroFreshBlocksCompleted); HFunc(TEvPartitionPrivate::TEvFlushCompleted, HandleFlushCompleted); HFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogCompleted, HandleTrimFreshLogCompleted); HFunc(TEvPartitionPrivate::TEvCompactionCompleted, HandleCompactionCompleted); diff --git a/cloud/blockstore/libs/storage/partition/part_actor.h b/cloud/blockstore/libs/storage/partition/part_actor.h index 58901846cea..067febdcb71 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor.h +++ b/cloud/blockstore/libs/storage/partition/part_actor.h @@ -686,10 +686,39 @@ class TPartitionActor final const TEvPartitionPrivate::TEvWriteBlocksCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleWriteFreshBlocksCompleted( + const TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted::TPtr& ev, + const NActors::TActorContext& ctx); + + struct TWriteBlocksCompleted { + bool CollectGarbageBarrierAcquired; + bool AddingUnconfirmedBlobsRequested; + bool FreshBlocksRequest; + TVector BlobsToConfirm; + }; + + void HandleWriteBlocksCompletedImpl( + const NActors::TActorContext& ctx, + NActors::TActorId sender, + NProto::TError error, + const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted, + TWriteBlocksCompleted writeBlocksCompleted); + void HandleZeroBlocksCompleted( const TEvPartitionPrivate::TEvZeroBlocksCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleZeroFreshBlocksCompleted( + const TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleZeroBlocksCompletedImpl( + const NActors::TActorContext& ctx, + NActors::TActorId sender, + NProto::TError error, + const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted, + bool freshBlocksRequest); + void HandleFlushCompleted( const TEvPartitionPrivate::TEvFlushCompleted::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_loadstate.cpp b/cloud/blockstore/libs/storage/partition/part_actor_loadstate.cpp index c2159864d12..9e1c442dc1f 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_loadstate.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_loadstate.cpp @@ -237,9 +237,11 @@ void TPartitionActor::CompleteLoadState( CreateFreshBlocksCompanionClient(); FreshBlocksCompanion = std::make_unique( + Config, StorageAccessMode, partitionConfig, Info(), + BlockDigestGenerator, *FreshBlocksCompanionClient, *State, // channelsState *State, // freshBlobState diff --git a/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp b/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp index 600e770e228..f1044fffe00 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp @@ -311,7 +311,44 @@ void TPartitionActor::HandleWriteBlocksCompleted( { auto* msg = ev->Get(); - ui64 commitId = msg->CommitId; + HandleWriteBlocksCompletedImpl( + ctx, + ev->Sender, + msg->GetError(), + *msg, + { + .CollectGarbageBarrierAcquired = msg->CollectGarbageBarrierAcquired, + .AddingUnconfirmedBlobsRequested = + msg->AddingUnconfirmedBlobsRequested, + .FreshBlocksRequest = false, + .BlobsToConfirm = std::move(msg->BlobsToConfirm), + }); +} + +void TPartitionActor::HandleWriteFreshBlocksCompleted( + const TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted::TPtr& ev, + const NActors::TActorContext& ctx) +{ + auto* msg = ev->Get(); + + HandleWriteBlocksCompletedImpl( + ctx, + ev->Sender, + msg->GetError(), + *msg, + { + .FreshBlocksRequest = true, + }); +} + +void TPartitionActor::HandleWriteBlocksCompletedImpl( + const NActors::TActorContext& ctx, + NActors::TActorId sender, + NProto::TError error, + const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted, + TWriteBlocksCompleted writeBlocksCompleted) +{ + ui64 commitId = opCompleted.CommitId; LOG_TRACE( ctx, TBlockStoreComponents::PARTITION, @@ -319,26 +356,26 @@ void TPartitionActor::HandleWriteBlocksCompleted( LogTitle.GetWithTime().c_str(), commitId); - UpdateStats(msg->Stats); + UpdateStats(opCompleted.Stats); - ui64 blocksCount = msg->Stats.GetUserWriteCounters().GetBlocksCount(); + ui64 blocksCount = opCompleted.Stats.GetUserWriteCounters().GetBlocksCount(); ui64 requestBytes = blocksCount * State->GetBlockSize(); - UpdateCPUUsageStat(ctx.Now(), msg->ExecCycles); + UpdateCPUUsageStat(ctx.Now(), opCompleted.ExecCycles); - auto time = CyclesToDurationSafe(msg->TotalCycles).MicroSeconds(); + auto time = CyclesToDurationSafe(opCompleted.TotalCycles).MicroSeconds(); const auto requestCount = - msg->Stats.GetUserWriteCounters().GetRequestsCount(); + opCompleted.Stats.GetUserWriteCounters().GetRequestsCount(); PartCounters->RequestCounters.WriteBlocks.AddRequest( time, requestBytes, requestCount ); - if (msg->AffectedBlockInfos) { + if (opCompleted.AffectedBlockInfos) { IProfileLog::TReadWriteRequestBlockInfos request; request.RequestType = EBlockStoreRequest::WriteBlocks; - request.BlockInfos = std::move(msg->AffectedBlockInfos); + request.BlockInfos = std::move(opCompleted.AffectedBlockInfos); request.CommitId = commitId; IProfileLog::TRecord record; @@ -349,8 +386,8 @@ void TPartitionActor::HandleWriteBlocksCompleted( ProfileLog->Write(std::move(record)); } - if (msg->AddingUnconfirmedBlobsRequested) { - if (HasError(msg->GetError())) { + if (writeBlocksCompleted.AddingUnconfirmedBlobsRequested) { + if (HasError(error)) { // blobs are obsolete, delete them directly auto request = std::make_unique< TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest>( @@ -360,14 +397,14 @@ void TPartitionActor::HandleWriteBlocksCompleted( } else { // blobs are confirmed, but AddBlobs request will be executed // (for this commit) later - State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm)); + State->BlobsConfirmed(commitId, std::move(writeBlocksCompleted.BlobsToConfirm)); } STORAGE_VERIFY( - msg->CollectGarbageBarrierAcquired, + writeBlocksCompleted.CollectGarbageBarrierAcquired, TWellKnownEntityTypes::TABLET, TabletID()); STORAGE_VERIFY( - !msg->TrimFreshLogBarrierAcquired, + !writeBlocksCompleted.FreshBlocksRequest, TWellKnownEntityTypes::TABLET, TabletID()); // commit & garbage queue barriers will be released when confirmed @@ -381,18 +418,24 @@ void TPartitionActor::HandleWriteBlocksCompleted( commitId); State->AccessCommitQueue().ReleaseBarrier(commitId); - if (msg->CollectGarbageBarrierAcquired) { + if (writeBlocksCompleted.CollectGarbageBarrierAcquired) { State->GetGarbageQueue().ReleaseBarrier(commitId); } - if (msg->TrimFreshLogBarrierAcquired && HasError(msg->GetError())) { - State->AccessTrimFreshLogBarriers().ReleaseBarrierN( + + if (writeBlocksCompleted.FreshBlocksRequest) { + FreshBlocksCompanion->WriteFreshBlocksCompleted( + ctx, + error, commitId, - blocksCount); + blocksCount, + sender); } } - Actors.Erase(ev->Sender); + if (!writeBlocksCompleted.FreshBlocksRequest) { + Actors.Erase(sender); + } if (Executor()->GetStats().IsAnyChannelYellowMove) { ScheduleYellowStateUpdate(ctx); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_writefreshblocks.cpp b/cloud/blockstore/libs/storage/partition/part_actor_writefreshblocks.cpp index f949c85c27e..cdb41fb049c 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_writefreshblocks.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_writefreshblocks.cpp @@ -20,13 +20,6 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -enum class ERequestType -{ - WriteBlocks, - WriteBlocksLocal, - ZeroBlocks -}; - template IEventBasePtr CreateWriteBlocksResponse(bool replyLocal, T&& ...args) { @@ -38,422 +31,6 @@ IEventBasePtr CreateWriteBlocksResponse(bool replyLocal, T&& ...args) std::forward(args)...); } -//////////////////////////////////////////////////////////////////////////////// - -class TWriteFreshBlocksActor final - : public TActorBootstrapped -{ -public: - struct TRequest - { - TRequestInfoPtr RequestInfo; - ERequestType RequestType; - - TRequest(TRequestInfoPtr requestInfo, ERequestType requestType) - : RequestInfo(std::move(requestInfo)) - , RequestType(requestType) - {} - }; - -private: - const TActorId PartitionActorId; - const ui64 CommitId; - const ui32 Channel; - const ui32 BlockCount; - const TVector Requests; - TVector BlockRanges; - TVector WriteHandlers; - const IBlockDigestGeneratorPtr BlockDigestGenerator; - const bool IsZeroRequest; - const TString DiskId; - - TString BlobContent; - ui64 BlobSize = 0; - - TVector AffectedBlockInfos; - - TCallContextPtr CombinedContext = MakeIntrusive(); - -public: - TWriteFreshBlocksActor( - const TActorId& partitionActorId, - ui64 commitId, - ui32 channel, - ui32 blockCount, - TVector requests, - TVector blockRanges, - TVector writeHandlers, - IBlockDigestGeneratorPtr blockDigestGenerator, - TString diskId); - - void Bootstrap(const TActorContext& ctx); - -private: - NProto::TError BuildBlobContentAndComputeDigest(); - - void WriteBlob(const TActorContext& ctx); - void AddBlocks(const TActorContext& ctx); - - template - void NotifyCompleted(const TActorContext& ctx, std::unique_ptr ev); - bool HandleError(const TActorContext& ctx, const NProto::TError& error); - - void ReplyWrite(const TActorContext& ctx, const NProto::TError& error); - void ReplyZero(const TActorContext& ctx, const NProto::TError& error); - - void ReplyAllAndDie(const TActorContext& ctx, const NProto::TError& error); - -private: - STFUNC(StateWork); - - void HandleWriteBlobResponse( - const TEvPartitionCommonPrivate::TEvWriteBlobResponse::TPtr& ev, - const TActorContext& ctx); - - void HandleAddFreshBlocksResponse( - const TEvPartitionPrivate::TEvAddFreshBlocksResponse::TPtr& ev, - const TActorContext& ctx); - - void HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx); -}; - -TWriteFreshBlocksActor::TWriteFreshBlocksActor( - const TActorId& partitionActorId, - ui64 commitId, - ui32 channel, - ui32 blockCount, - TVector requests, - TVector blockRanges, - TVector writeHandlers, - IBlockDigestGeneratorPtr blockDigestGenerator, - TString diskId) - : PartitionActorId(partitionActorId) - , CommitId(commitId) - , Channel(channel) - , BlockCount(blockCount) - , Requests(std::move(requests)) - , BlockRanges(std::move(blockRanges)) - , WriteHandlers(std::move(writeHandlers)) - , BlockDigestGenerator(std::move(blockDigestGenerator)) - , IsZeroRequest( - Requests.size() == 1 && - Requests.front().RequestType == ERequestType::ZeroBlocks) - , DiskId(std::move(diskId)) -{ - if (!IsZeroRequest) { - const bool hasAnyZeroRequest = AnyOf( - Requests, - [](auto r) { return r.RequestType == ERequestType::ZeroBlocks; }); - - STORAGE_VERIFY( - !hasAnyZeroRequest && BlockRanges.size() == WriteHandlers.size(), - TWellKnownEntityTypes::DISK, - DiskId); - } else { - STORAGE_VERIFY( - WriteHandlers.empty() && BlockRanges.size() == 1, - TWellKnownEntityTypes::DISK, - DiskId); - } -} - -void TWriteFreshBlocksActor::Bootstrap(const TActorContext& ctx) -{ - TDeque timers; - - ui64 requestId = 0; - - for (const auto& r: Requests) { - LWTRACK( - RequestReceived_PartitionWorker, - r.RequestInfo->CallContext->LWOrbit, - IsZeroRequest ? "ZeroFreshBlocks" : "WriteFreshBlocks", - r.RequestInfo->CallContext->RequestId); - - timers.emplace_back(*r.RequestInfo); - - if (!r.RequestInfo->CallContext->LWOrbit.Fork(CombinedContext->LWOrbit)) { - LWTRACK( - ForkFailed, - r.RequestInfo->CallContext->LWOrbit, - "TEvPartitionCommonPrivate::TEvWriteBlobRequest", - r.RequestInfo->CallContext->RequestId); - } - - if (r.RequestInfo->CallContext->LWOrbit.HasShuttles()) { - requestId = r.RequestInfo->CallContext->RequestId; - } - } - CombinedContext->RequestId = requestId; - - Become(&TThis::StateWork); - - WriteBlob(ctx); -} - -NProto::TError TWriteFreshBlocksActor::BuildBlobContentAndComputeDigest() -{ - if (IsZeroRequest) { - BlobContent = BuildZeroFreshBlocksBlobContent(BlockRanges.front()); - BlobSize = BlobContent.size(); - - return {}; - } - - TVector holders(Reserve(BlockRanges.size())); - - auto blockRange = BlockRanges.begin(); - auto writeHandler = WriteHandlers.begin(); - - while (blockRange != BlockRanges.end()) { - const auto& holder = holders.emplace_back( - (**writeHandler).GetBlocks(ConvertRangeSafe(*blockRange))); - - if (!holder.Acquired()) { - return MakeError( - E_CANCELLED, - "failed to acquire sglist in WriteFreshBlocksActor"); - } - - const auto& sgList = holder.GetSgList(); - - for (size_t index = 0; index < sgList.size(); ++index) { - const ui32 blockIndex = blockRange->Start + index; - - const auto digest = BlockDigestGenerator->ComputeDigest( - blockIndex, - sgList[index]); - - if (digest.Defined()) { - AffectedBlockInfos.push_back({blockIndex, *digest}); - } - } - - ++blockRange; - ++writeHandler; - } - - BlobContent = BuildWriteFreshBlocksBlobContent(BlockRanges, holders); - BlobSize = BlobContent.size(); - - return {}; -} - -void TWriteFreshBlocksActor::WriteBlob(const TActorContext& ctx) -{ - auto error = BuildBlobContentAndComputeDigest(); - if (HandleError(ctx, error)) { - return; - } - - Y_ABORT_UNLESS(!BlobContent.empty()); - - const auto [generation, step] = ParseCommitId(CommitId); - - TPartialBlobId blobId( - generation, - step, - Channel, - static_cast(BlobContent.size()), - 0, // cookie - 0 // partId - ); - - auto request = std::make_unique( - CombinedContext, - blobId, - std::move(BlobContent), - 0, // blockSizeForChecksums - false); // async - - NCloud::Send( - ctx, - PartitionActorId, - std::move(request)); -} - -void TWriteFreshBlocksActor::AddBlocks(const TActorContext& ctx) -{ - Y_ABORT_UNLESS(BlobSize > 0); - - IEventBasePtr request = - std::make_unique( - CombinedContext, - CommitId, - BlobSize, - std::move(BlockRanges), - std::move(WriteHandlers)); - - NCloud::Send( - ctx, - PartitionActorId, - std::move(request)); -} - -template -void TWriteFreshBlocksActor::NotifyCompleted( - const TActorContext& ctx, - std::unique_ptr ev) -{ - ev->ExecCycles = Requests.front().RequestInfo->GetExecCycles(); - ev->TotalCycles = Requests.front().RequestInfo->GetTotalCycles(); - ev->CommitId = CommitId; - ev->AffectedBlockInfos = std::move(AffectedBlockInfos); - - auto execTime = CyclesToDurationSafe(ev->ExecCycles); - auto waitTime = CyclesToDurationSafe(Requests.front().RequestInfo->GetWaitCycles()); - - auto& counters = *ev->Stats.MutableUserWriteCounters(); - counters.SetRequestsCount(Requests.size()); - counters.SetBatchCount(1); - counters.SetBlocksCount(BlockCount); - counters.SetExecTime(execTime.MicroSeconds()); - counters.SetWaitTime(waitTime.MicroSeconds()); - - NCloud::Send(ctx, PartitionActorId, std::move(ev)); -} - -bool TWriteFreshBlocksActor::HandleError( - const TActorContext& ctx, - const NProto::TError& error) -{ - if (FAILED(error.GetCode())) { - ReplyAllAndDie(ctx, error); - return true; - } - return false; -} - -void TWriteFreshBlocksActor::ReplyWrite( - const TActorContext& ctx, - const NProto::TError& error) -{ - auto completeEvent = - std::make_unique( - error, - TEvPartitionPrivate::TWriteBlocksCompleted:: - CreateFreshBlocksCompleted()); - NotifyCompleted(ctx, std::move(completeEvent)); - - for (const auto& r: Requests) { - IEventBasePtr response = CreateWriteBlocksResponse( - r.RequestType == ERequestType::WriteBlocksLocal, - error); - - LWTRACK( - ResponseSent_Partition, - r.RequestInfo->CallContext->LWOrbit, - "WriteFreshBlocks", - r.RequestInfo->CallContext->RequestId); - - NCloud::Reply(ctx, *r.RequestInfo, std::move(response)); - } -} - -void TWriteFreshBlocksActor::ReplyZero( - const TActorContext& ctx, - const NProto::TError& error) -{ - auto completeEvent = - std::make_unique( - error, - true); // trimFreshLogBarrierAcquired - NotifyCompleted(ctx, std::move(completeEvent)); - - for (const auto& r: Requests) { - IEventBasePtr response = - std::make_unique(error); - - LWTRACK( - ResponseSent_Partition, - r.RequestInfo->CallContext->LWOrbit, - "ZeroFreshBlocks", - r.RequestInfo->CallContext->RequestId); - - NCloud::Reply(ctx, *r.RequestInfo, std::move(response)); - } -} - -void TWriteFreshBlocksActor::ReplyAllAndDie( - const TActorContext& ctx, - const NProto::TError& error) -{ - if (IsZeroRequest) { - ReplyZero(ctx, error); - } else { - ReplyWrite(ctx, error); - } - - Die(ctx); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TWriteFreshBlocksActor::HandleWriteBlobResponse( - const TEvPartitionCommonPrivate::TEvWriteBlobResponse::TPtr& ev, - const TActorContext& ctx) -{ - auto* msg = ev->Get(); - - for (const auto& r: Requests) { - r.RequestInfo->AddExecCycles(msg->ExecCycles); - } - - if (HandleError(ctx, msg->GetError())) { - return; - } - - AddBlocks(ctx); -} - -void TWriteFreshBlocksActor::HandleAddFreshBlocksResponse( - const TEvPartitionPrivate::TEvAddFreshBlocksResponse::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - for (const auto& r: Requests) { - r.RequestInfo->CallContext->LWOrbit.Join(CombinedContext->LWOrbit); - } - - ReplyAllAndDie(ctx, {}); -} - -void TWriteFreshBlocksActor::HandlePoisonPill( - const TEvents::TEvPoisonPill::TPtr& ev, - const TActorContext& ctx) -{ - Y_UNUSED(ev); - - auto error = MakeError(E_REJECTED, "tablet is shutting down"); - - ReplyAllAndDie(ctx, error); -} - -STFUNC(TWriteFreshBlocksActor::StateWork) -{ - TDeque timers; - - for (const auto& r: Requests) { - timers.emplace_back(*r.RequestInfo); - } - - switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); - HFunc(TEvPartitionCommonPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); - HFunc(TEvPartitionPrivate::TEvAddFreshBlocksResponse, HandleAddFreshBlocksResponse); - - default: - HandleUnexpectedEvent( - ev, - TBlockStoreComponents::PARTITION_WORKER, - __PRETTY_FUNCTION__); - break; - } -} - } // namespace //////////////////////////////////////////////////////////////////////////////// @@ -520,54 +97,7 @@ void TPartitionActor::WriteFreshBlocks( PartitionConfig.GetDiskId()); if (freshChannelWriteRequestsEnabled && State->GetFreshChannelCount() > 0) { - TVector requests; - requests.reserve(requestsInBuffer.size()); - - TVector blockRanges; - blockRanges.reserve(requestsInBuffer.size()); - - TVector writeHandlers; - writeHandlers.reserve(requestsInBuffer.size()); - - ui32 blockCount = 0; - - for (const auto& r: requestsInBuffer) { - requests.emplace_back( - r.Data.RequestInfo, - r.Data.ReplyLocal ? ERequestType::WriteBlocksLocal - : ERequestType::WriteBlocks); - - if (!r.Weight) { - continue; - } - - blockCount += r.Weight; - - State->IncrementFreshBlocksInFlight(r.Data.Range.Size()); - - blockRanges.push_back(r.Data.Range); - writeHandlers.push_back(r.Data.Handler); - } - - State->AccessTrimFreshLogBarriers().AcquireBarrierN(commitId, blockCount); - - const ui32 channel = State->PickNextChannel( - EChannelDataKind::Fresh, - EChannelPermission::UserWritesAllowed); - - auto actor = NCloud::Register( - ctx, - SelfId(), - commitId, - channel, - blockCount, - std::move(requests), - std::move(blockRanges), - std::move(writeHandlers), - BlockDigestGenerator, - PartitionConfig.GetDiskId()); - - Actors.Insert(actor); + FreshBlocksCompanion->WriteFreshBlocks(ctx, requestsInBuffer, commitId); } else { // write fresh blocks to FreshBlocks table TVector subRequests( @@ -608,56 +138,10 @@ void TPartitionActor::WriteFreshBlocks( //////////////////////////////////////////////////////////////////////////////// void TPartitionActor::HandleAddFreshBlocks( - const TEvPartitionPrivate::TEvAddFreshBlocksRequest::TPtr& ev, + const TEvPartitionCommonPrivate::TEvAddFreshBlocksRequest::TPtr& ev, const TActorContext& ctx) { - auto* msg = ev->Get(); - - STORAGE_VERIFY( - msg->WriteHandlers.size() == msg->BlockRanges.size() || - msg->WriteHandlers.empty(), - TWellKnownEntityTypes::DISK, - PartitionConfig.GetDiskId()); - - for (size_t i = 0; i < msg->BlockRanges.size(); ++i) { - auto& blockRange = msg->BlockRanges[i]; - - if (!msg->WriteHandlers) { - State->ZeroFreshBlocks(blockRange, msg->CommitId); - State->DecrementFreshBlocksInFlight(blockRange.Size()); - - continue; - } - - auto& writeHandler = msg->WriteHandlers[i]; - auto guardedSgList = - (*writeHandler).GetBlocks(ConvertRangeSafe(blockRange)); - - if (auto guard = guardedSgList.Acquire()) { - const auto& sgList = guard.Get(); - State->WriteFreshBlocks(blockRange, msg->CommitId, sgList); - State->DecrementFreshBlocksInFlight(blockRange.Size()); - } else { - LOG_ERROR( - ctx, - TBlockStoreComponents::PARTITION, - "%s Failed to lock a guardedSgList on AddFreshBlocks", - LogTitle.GetWithTime().c_str()); - Suicide(ctx); - return; - } - } - - State->AddFreshBlob({msg->CommitId, msg->BlobSize}); - State->IncrementUnflushedFreshBlobCount(1); - State->IncrementUnflushedFreshBlobByteCount(msg->BlobSize); - - // TODO(NBS-1976): update used blocks map - - using TResponse = TEvPartitionPrivate::TEvAddFreshBlocksResponse; - auto response = std::make_unique(); - - NCloud::Reply(ctx, *ev, std::move(response)); + FreshBlocksCompanion->HandleAddFreshBlocks(ev, ctx); } //////////////////////////////////////////////////////////////////////////////// @@ -853,31 +337,8 @@ void TPartitionActor::ZeroFreshBlocks( State->IncrementFreshBlocksInFlight(blockCount); if (freshChannelZeroRequestsEnabled && State->GetFreshChannelCount() > 0) { - TVector requests; - TVector blockRanges; - - requests.emplace_back(requestInfo, ERequestType::ZeroBlocks); - blockRanges.emplace_back(writeRange); - - State->AccessTrimFreshLogBarriers().AcquireBarrierN(commitId, blockCount); - - const ui32 channel = State->PickNextChannel( - EChannelDataKind::Fresh, - EChannelPermission::UserWritesAllowed); - - auto actor = NCloud::Register( - ctx, - SelfId(), - commitId, - channel, - blockCount, - std::move(requests), - std::move(blockRanges), - TVector{}, - BlockDigestGenerator, - PartitionConfig.GetDiskId()); - - Actors.Insert(actor); + FreshBlocksCompanion + ->ZeroFreshBlocks(ctx, requestInfo, writeRange, commitId); } else { AddTransaction(*requestInfo); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_zeroblocks.cpp b/cloud/blockstore/libs/storage/partition/part_actor_zeroblocks.cpp index 4ef99235093..10b78908765 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_zeroblocks.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_zeroblocks.cpp @@ -124,10 +124,7 @@ void TZeroBlocksActor::NotifyCompleted( const NProto::TError& error) { auto request = - std::make_unique( - error, - false // trimFreshLogBarrierAcquired - ); + std::make_unique(error); request->ExecCycles = RequestInfo->GetExecCycles(); request->TotalCycles = RequestInfo->GetTotalCycles(); @@ -346,9 +343,36 @@ void TPartitionActor::HandleZeroBlocksCompleted( const TEvPartitionPrivate::TEvZeroBlocksCompleted::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); + auto* msg = ev->Get(); + HandleZeroBlocksCompletedImpl( + ctx, + ev->Sender, + msg->GetError(), + *msg, + false); // freshBlocksRequest +} + +void TPartitionActor::HandleZeroFreshBlocksCompleted( + const TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + HandleZeroBlocksCompletedImpl( + ctx, + ev->Sender, + msg->GetError(), + *msg, + true); // freshBlocksRequest +} - ui64 commitId = msg->CommitId; +void TPartitionActor::HandleZeroBlocksCompletedImpl( + const NActors::TActorContext& ctx, + NActors::TActorId sender, + NProto::TError error, + const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted, + bool freshBlocksRequest) +{ + ui64 commitId = opCompleted.CommitId; LOG_TRACE( ctx, TBlockStoreComponents::PARTITION, @@ -356,26 +380,30 @@ void TPartitionActor::HandleZeroBlocksCompleted( LogTitle.GetWithTime().c_str(), commitId); - UpdateStats(msg->Stats); + UpdateStats(opCompleted.Stats); - ui64 blocksCount = msg->Stats.GetUserWriteCounters().GetBlocksCount(); + ui64 blocksCount = + opCompleted.Stats.GetUserWriteCounters().GetBlocksCount(); ui64 requestBytes = blocksCount * State->GetBlockSize(); - UpdateCPUUsageStat(ctx.Now(), msg->ExecCycles); + UpdateCPUUsageStat(ctx.Now(), opCompleted.ExecCycles); - auto time = CyclesToDurationSafe(msg->TotalCycles).MicroSeconds(); + auto time = CyclesToDurationSafe(opCompleted.TotalCycles).MicroSeconds(); PartCounters->RequestCounters.ZeroBlocks.AddRequest(time, requestBytes); State->AccessCommitQueue().ReleaseBarrier(commitId); - if (msg->TrimFreshLogBarrierAcquired && HasError(msg->GetError())) { - State->AccessTrimFreshLogBarriers().ReleaseBarrierN( + if (freshBlocksRequest) { + FreshBlocksCompanion->ZeroFreshBlocksCompleted( + ctx, + error, commitId, - blocksCount); + blocksCount, + sender); + } else { + Actors.Erase(sender); } - Actors.Erase(ev->Sender); - Y_DEBUG_ABORT_UNLESS(WriteAndZeroRequestsInProgress > 0); --WriteAndZeroRequestsInProgress; diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index f2a3613f8fc..214771a03b2 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -22,8 +23,8 @@ #include #include - #include + #include #include @@ -165,7 +166,6 @@ using TFlushedCommitIds = TVector; #define BLOCKSTORE_PARTITION_REQUESTS_PRIVATE(xxx, ...) \ xxx(AddBlobs, __VA_ARGS__) \ - xxx(AddFreshBlocks, __VA_ARGS__) \ xxx(Flush, __VA_ARGS__) \ xxx(Compaction, __VA_ARGS__) \ xxx(MetadataRebuildUsedBlocks, __VA_ARGS__) \ @@ -224,6 +224,8 @@ struct TBlockCountRebuildState struct TEvPartitionPrivate { + using TOperationCompleted = TEvPartitionCommonPrivate::TOperationCompleted; + // // AddBlobs // @@ -271,33 +273,6 @@ struct TEvPartitionPrivate ui64 ExecCycles = 0; }; - // - // AddFreshBlocks - // - - struct TAddFreshBlocksRequest - { - ui64 CommitId; - ui64 BlobSize; - TVector BlockRanges; - TVector WriteHandlers; - - TAddFreshBlocksRequest( - ui64 commitId, - ui64 blobSize, - TVector blockRanges, - TVector writeHandlers) - : CommitId(commitId) - , BlobSize(blobSize) - , BlockRanges(std::move(blockRanges)) - , WriteHandlers(std::move(writeHandlers)) - {} - }; - - struct TAddFreshBlocksResponse - { - }; - // // Flush // @@ -594,23 +569,6 @@ struct TEvPartitionPrivate ui64 ExecCycles = 0; }; - // - // OperationCompleted - // - - struct TOperationCompleted - { - NProto::TPartitionStats Stats; - - ui64 TotalCycles = 0; - ui64 ExecCycles = 0; - - ui64 CommitId = 0; - - TVector AffectedRanges; - TVector AffectedBlockInfos; - }; - // // ReadBlocksCompleted // @@ -793,19 +751,6 @@ struct TEvPartitionPrivate TBlockRange32 Range; }; - // - // ZeroBlocksCompleted - // - - struct TZeroBlocksCompleted: TOperationCompleted - { - const bool TrimFreshLogBarrierAcquired; - - explicit TZeroBlocksCompleted(bool trimFreshLogBarrierAcquired) - : TrimFreshLogBarrierAcquired(trimFreshLogBarrierAcquired) - {} - }; - // // UpdateResourceMetrics // @@ -861,7 +806,8 @@ struct TEvPartitionPrivate using TEvReadBlocksCompleted = TResponseEvent; using TEvWriteBlocksCompleted = TResponseEvent; - using TEvZeroBlocksCompleted = TResponseEvent; + using TEvZeroBlocksCompleted = + TResponseEvent; using TEvFlushCompleted = TResponseEvent; using TEvCompactionCompleted = TResponseEvent; using TEvCollectGarbageCompleted = TResponseEvent; diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index 935509c3746..4d74f455465 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -7261,7 +7261,7 @@ Y_UNIT_TEST_SUITE(TPartitionTest) runtime->SetObserverFunc([&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case TEvPartitionPrivate::EvAddFreshBlocksRequest: { + case TEvPartitionCommonPrivate::EvAddFreshBlocksRequest: { if (!addFreshBlocks) { addFreshBlocks = event.Release(); return TTestActorRuntime::EEventAction::DROP; @@ -7326,7 +7326,7 @@ Y_UNIT_TEST_SUITE(TPartitionTest) runtime->SetObserverFunc([&] (TAutoPtr& event) { switch (event->GetTypeRewrite()) { - case TEvPartitionPrivate::EvAddFreshBlocksRequest: { + case TEvPartitionCommonPrivate::EvAddFreshBlocksRequest: { if (!addFreshBlocks) { addFreshBlocks = event.Release(); return TTestActorRuntime::EEventAction::DROP; @@ -9614,8 +9614,8 @@ Y_UNIT_TEST_SUITE(TPartitionTest) case TEvPartitionCommonPrivate::EvWriteBlobRequest: { if (spoofWriteBlobs) { - auto response = - std::make_unique(); + auto response = std::make_unique< + TEvPartitionCommonPrivate::TEvWriteBlobResponse>(); response->BlockChecksums.resize(1); runtime->Send(new IEventHandle( event->Sender, @@ -12826,6 +12826,9 @@ Y_UNIT_TEST_SUITE(TPartitionTest) auto commitId = MakeCommitId( r.Id.Generation(), r.Id.Step()); + + Cerr << "commitId: {" << r.Id.Generation() + << "," << r.Id.Step() << "}" << Endl; UNIT_ASSERT_VALUES_EQUAL( 1, expectedBlobs.count(commitId)); @@ -12910,9 +12913,9 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } break; } - case TEvPartitionPrivate::EvWriteBlocksCompleted: { - auto* msg = - ev->Get(); + case TEvPartitionCommonPrivate::EvWriteFreshBlocksCompleted: { + auto* msg = ev->Get(); UNIT_ASSERT_EQUAL(shouldRejectWriteBlob, HasError(msg->GetError())); break; } diff --git a/cloud/blockstore/libs/storage/partition2/part2_actor.cpp b/cloud/blockstore/libs/storage/partition2/part2_actor.cpp index a590ea16d89..2a81e6791e1 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_actor.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_actor.cpp @@ -768,6 +768,15 @@ void TPartitionActor::HandlePatchBlob( Y_ABORT("Unimplemented"); } +void TPartitionActor::HandleAddFreshBlocks( + const TEvPartitionCommonPrivate::TEvAddFreshBlocksRequest::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); + Y_ABORT("Unimplemented"); +} + //////////////////////////////////////////////////////////////////////////////// #define BLOCKSTORE_HANDLE_UNIMPLEMENTED_REQUEST(name, ns) \ diff --git a/cloud/blockstore/libs/storage/partition_common/events_private.h b/cloud/blockstore/libs/storage/partition_common/events_private.h index 38983018ab8..4baca2a4c07 100644 --- a/cloud/blockstore/libs/storage/partition_common/events_private.h +++ b/cloud/blockstore/libs/storage/partition_common/events_private.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -23,6 +24,7 @@ namespace NCloud::NBlockStore::NStorage { xxx(TrimFreshLog, __VA_ARGS__) \ xxx(WriteBlob, __VA_ARGS__) \ xxx(PatchBlob, __VA_ARGS__) \ + xxx(AddFreshBlocks, __VA_ARGS__) \ // BLOCKSTORE_PARTITION_COMMON_REQUESTS_PRIVATE //////////////////////////////////////////////////////////////////////////////// @@ -47,10 +49,15 @@ struct TEvPartitionCommonPrivate struct TOperationCompleted { + NProto::TPartitionStats Stats; + ui64 TotalCycles = 0; ui64 ExecCycles = 0; ui64 CommitId = 0; + + TVector AffectedRanges; + TVector AffectedBlockInfos; }; // @@ -363,6 +370,33 @@ struct TEvPartitionCommonPrivate {} }; + // + // AddFreshBlocks + // + + struct TAddFreshBlocksRequest + { + ui64 CommitId; + ui64 BlobSize; + TVector BlockRanges; + TVector WriteHandlers; + + TAddFreshBlocksRequest( + ui64 commitId, + ui64 blobSize, + TVector blockRanges, + TVector writeHandlers) + : CommitId(commitId) + , BlobSize(blobSize) + , BlockRanges(std::move(blockRanges)) + , WriteHandlers(std::move(writeHandlers)) + {} + }; + + struct TAddFreshBlocksResponse + { + }; + // // Events declaration // @@ -383,6 +417,8 @@ struct TEvPartitionCommonPrivate EvPartCountersCombined, EvPatchBlobCompleted, EvWriteBlobCompleted, + EvWriteFreshBlocksCompleted, + EvZeroFreshBlocksCompleted, EvEnd }; @@ -408,6 +444,11 @@ struct TEvPartitionCommonPrivate using TEvPatchBlobCompleted = TResponseEvent; + + using TEvWriteFreshBlocksCompleted = + TResponseEvent; + using TEvZeroFreshBlocksCompleted = + TResponseEvent; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.cpp b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.cpp index 0497e6edb4f..62e0022fe04 100644 --- a/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.cpp +++ b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.cpp @@ -6,9 +6,11 @@ namespace NCloud::NBlockStore::NStorage { //////////////////////////////////////////////////////////////////////////////// TFreshBlocksCompanion::TFreshBlocksCompanion( + TStorageConfigPtr config, EStorageAccessMode storageAccessMode, NProto::TPartitionConfig partitionConfig, NKikimr::TTabletStorageInfo* tabletStorageInfo, + IBlockDigestGeneratorPtr blockDigestGenerator, IFreshBlocksCompanionClient& client, TPartitionChannelsState& channelsState, TPartitionFreshBlobState& freshBlobState, @@ -16,9 +18,11 @@ TFreshBlocksCompanion::TFreshBlocksCompanion( TPartitionTrimFreshLogState& trimFreshLogState, TPartitionFreshBlocksState& freshBlocksState, TLogTitle logTitle) - : StorageAccessMode(storageAccessMode) + : Config(std::move(config)) + , StorageAccessMode(storageAccessMode) , PartitionConfig(std::move(partitionConfig)) , TabletStorageInfo(tabletStorageInfo) + , BlockDigestGenerator(std::move(blockDigestGenerator)) , Client(client) , ChannelsState(channelsState) , FreshBlobState(freshBlobState) @@ -35,4 +39,18 @@ void TFreshBlocksCompanion::KillActors(const NActors::TActorContext& ctx) } } +void TFreshBlocksCompanion::RebootOnCommitIdOverflow( + const NActors::TActorContext& ctx, + const TStringBuf& requestName) +{ + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "%s CommitId overflow in %s. Restarting partition", + LogTitle.GetWithTime().c_str(), + ToString(requestName).c_str()); + ReportTabletCommitIdOverflow({{"disk", PartitionConfig.GetDiskId()}}); + Client.Poison(ctx); +} + } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.h b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.h index 47dbd093fcf..a6b84578622 100644 --- a/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.h +++ b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion.h @@ -26,9 +26,11 @@ class IFreshBlocksCompanionClient: public IMortalActor class TFreshBlocksCompanion { private: + const TStorageConfigPtr Config; const EStorageAccessMode StorageAccessMode; const NProto::TPartitionConfig PartitionConfig; const NKikimr::TTabletStorageInfoPtr TabletStorageInfo; + const IBlockDigestGeneratorPtr BlockDigestGenerator; IFreshBlocksCompanionClient& Client; @@ -44,9 +46,11 @@ class TFreshBlocksCompanion public: TFreshBlocksCompanion( + TStorageConfigPtr config, EStorageAccessMode storageAccessMode, NProto::TPartitionConfig partitionConfig, NKikimr::TTabletStorageInfo* tabletStorageInfo, + IBlockDigestGeneratorPtr blockDigestGenerator, IFreshBlocksCompanionClient& client, TPartitionChannelsState& channelsState, TPartitionFreshBlobState& freshBlobState, @@ -61,10 +65,44 @@ class TFreshBlocksCompanion const NActors::TActorContext& ctx, ui64 persistedTrimFreshLogToCommitId); + void WriteFreshBlocks( + const NActors::TActorContext& ctx, + TArrayRef> requestsInBuffer, + ui64 commitId); + + void WriteFreshBlocksCompleted( + const NActors::TActorContext& ctx, + const NProto::TError& error, + ui64 commitId, + ui64 blockCount, + NActors::TActorId actorId); + + void ZeroFreshBlocks( + const NActors::TActorContext& ctx, + TRequestInfoPtr requestInfo, + TBlockRange32 writeRange, + ui64 commitId); + + void ZeroFreshBlocksCompleted( + const NActors::TActorContext& ctx, + const NProto::TError& error, + ui64 commitId, + ui64 blockCount, + NActors::TActorId actorId); + + void RebootOnCommitIdOverflow( + const NActors::TActorContext& ctx, + const TStringBuf& requestName); + +public: void HandleLoadFreshBlobsCompleted( const TEvPartitionCommonPrivate::TEvLoadFreshBlobsCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleAddFreshBlocks( + const TEvPartitionCommonPrivate::TEvAddFreshBlocksRequest::TPtr& ev, + const NActors::TActorContext& ctx); + private: auto Info() { diff --git a/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion_writefreshblocks.cpp b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion_writefreshblocks.cpp new file mode 100644 index 00000000000..c768bc665ec --- /dev/null +++ b/cloud/blockstore/libs/storage/partition_common/fresh_blocks_companion_writefreshblocks.cpp @@ -0,0 +1,634 @@ +#include "fresh_blocks_companion.h" + +#include +#include +#include +#include +#include + +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +enum class ERequestType +{ + WriteBlocks, + WriteBlocksLocal, + ZeroBlocks +}; + +template +IEventBasePtr CreateWriteBlocksResponse(bool replyLocal, T&& ...args) +{ + if (replyLocal) { + return std::make_unique( + std::forward(args)...); + } + return std::make_unique( + std::forward(args)...); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TWriteFreshBlocksActor final + : public TActorBootstrapped +{ +public: + struct TRequest + { + TRequestInfoPtr RequestInfo; + ERequestType RequestType; + + TRequest(TRequestInfoPtr requestInfo, ERequestType requestType) + : RequestInfo(std::move(requestInfo)) + , RequestType(requestType) + {} + }; + +private: + const TActorId PartitionActorId; + const ui64 CommitId; + const ui32 Channel; + const ui32 BlockCount; + const TVector Requests; + TVector BlockRanges; + TVector WriteHandlers; + const IBlockDigestGeneratorPtr BlockDigestGenerator; + const bool IsZeroRequest; + const TString DiskId; + + TString BlobContent; + ui64 BlobSize = 0; + + TVector AffectedBlockInfos; + + TCallContextPtr CombinedContext = MakeIntrusive(); + +public: + TWriteFreshBlocksActor( + const TActorId& partitionActorId, + ui64 commitId, + ui32 channel, + ui32 blockCount, + TVector requests, + TVector blockRanges, + TVector writeHandlers, + IBlockDigestGeneratorPtr blockDigestGenerator, + TString diskId); + + void Bootstrap(const TActorContext& ctx); + +private: + NProto::TError BuildBlobContentAndComputeDigest(); + + void WriteBlob(const TActorContext& ctx); + void AddBlocks(const TActorContext& ctx); + + template + void NotifyCompleted(const TActorContext& ctx, std::unique_ptr ev); + bool HandleError(const TActorContext& ctx, const NProto::TError& error); + + void ReplyWrite(const TActorContext& ctx, const NProto::TError& error); + void ReplyZero(const TActorContext& ctx, const NProto::TError& error); + + void ReplyAllAndDie(const TActorContext& ctx, const NProto::TError& error); + +private: + STFUNC(StateWork); + + void HandleWriteBlobResponse( + const TEvPartitionCommonPrivate::TEvWriteBlobResponse::TPtr& ev, + const TActorContext& ctx); + + void HandleAddFreshBlocksResponse( + const TEvPartitionCommonPrivate::TEvAddFreshBlocksResponse::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); +}; + +TWriteFreshBlocksActor::TWriteFreshBlocksActor( + const TActorId& partitionActorId, + ui64 commitId, + ui32 channel, + ui32 blockCount, + TVector requests, + TVector blockRanges, + TVector writeHandlers, + IBlockDigestGeneratorPtr blockDigestGenerator, + TString diskId) + : PartitionActorId(partitionActorId) + , CommitId(commitId) + , Channel(channel) + , BlockCount(blockCount) + , Requests(std::move(requests)) + , BlockRanges(std::move(blockRanges)) + , WriteHandlers(std::move(writeHandlers)) + , BlockDigestGenerator(std::move(blockDigestGenerator)) + , IsZeroRequest( + Requests.size() == 1 && + Requests.front().RequestType == ERequestType::ZeroBlocks) + , DiskId(std::move(diskId)) +{ + if (!IsZeroRequest) { + const bool hasAnyZeroRequest = AnyOf( + Requests, + [](auto r) { return r.RequestType == ERequestType::ZeroBlocks; }); + + STORAGE_VERIFY( + !hasAnyZeroRequest && BlockRanges.size() == WriteHandlers.size(), + TWellKnownEntityTypes::DISK, + DiskId); + } else { + STORAGE_VERIFY( + WriteHandlers.empty() && BlockRanges.size() == 1, + TWellKnownEntityTypes::DISK, + DiskId); + } +} + +void TWriteFreshBlocksActor::Bootstrap(const TActorContext& ctx) +{ + TDeque timers; + + ui64 requestId = 0; + + for (const auto& r: Requests) { + LWTRACK( + RequestReceived_PartitionWorker, + r.RequestInfo->CallContext->LWOrbit, + IsZeroRequest ? "ZeroFreshBlocks" : "WriteFreshBlocks", + r.RequestInfo->CallContext->RequestId); + + timers.emplace_back(*r.RequestInfo); + + if (!r.RequestInfo->CallContext->LWOrbit.Fork(CombinedContext->LWOrbit)) { + LWTRACK( + ForkFailed, + r.RequestInfo->CallContext->LWOrbit, + "TEvPartitionCommonPrivate::TEvWriteBlobRequest", + r.RequestInfo->CallContext->RequestId); + } + + if (r.RequestInfo->CallContext->LWOrbit.HasShuttles()) { + requestId = r.RequestInfo->CallContext->RequestId; + } + } + CombinedContext->RequestId = requestId; + + Become(&TThis::StateWork); + + WriteBlob(ctx); +} + +NProto::TError TWriteFreshBlocksActor::BuildBlobContentAndComputeDigest() +{ + if (IsZeroRequest) { + BlobContent = NPartition::BuildZeroFreshBlocksBlobContent(BlockRanges.front()); + BlobSize = BlobContent.size(); + + return {}; + } + + TVector holders(Reserve(BlockRanges.size())); + + auto blockRange = BlockRanges.begin(); + auto writeHandler = WriteHandlers.begin(); + + while (blockRange != BlockRanges.end()) { + const auto& holder = holders.emplace_back( + (**writeHandler).GetBlocks(ConvertRangeSafe(*blockRange))); + + if (!holder.Acquired()) { + return MakeError( + E_CANCELLED, + "failed to acquire sglist in WriteFreshBlocksActor"); + } + + const auto& sgList = holder.GetSgList(); + + for (size_t index = 0; index < sgList.size(); ++index) { + const ui32 blockIndex = blockRange->Start + index; + + const auto digest = BlockDigestGenerator->ComputeDigest( + blockIndex, + sgList[index]); + + if (digest.Defined()) { + AffectedBlockInfos.push_back({blockIndex, *digest}); + } + } + + ++blockRange; + ++writeHandler; + } + + BlobContent = BuildWriteFreshBlocksBlobContent(BlockRanges, holders); + BlobSize = BlobContent.size(); + + return {}; +} + +void TWriteFreshBlocksActor::WriteBlob(const TActorContext& ctx) +{ + auto error = BuildBlobContentAndComputeDigest(); + if (HandleError(ctx, error)) { + return; + } + + Y_ABORT_UNLESS(!BlobContent.empty()); + + const auto [generation, step] = ParseCommitId(CommitId); + + TPartialBlobId blobId( + generation, + step, + Channel, + static_cast(BlobContent.size()), + 0, // cookie + 0 // partId + ); + + auto request = std::make_unique( + CombinedContext, + blobId, + std::move(BlobContent), + 0, // blockSizeForChecksums + false); // async + + NCloud::Send( + ctx, + PartitionActorId, + std::move(request)); +} + +void TWriteFreshBlocksActor::AddBlocks(const TActorContext& ctx) +{ + Y_ABORT_UNLESS(BlobSize > 0); + + IEventBasePtr request = + std::make_unique( + CombinedContext, + CommitId, + BlobSize, + std::move(BlockRanges), + std::move(WriteHandlers)); + + NCloud::Send( + ctx, + PartitionActorId, + std::move(request)); +} + +template +void TWriteFreshBlocksActor::NotifyCompleted( + const TActorContext& ctx, + std::unique_ptr ev) +{ + ev->ExecCycles = Requests.front().RequestInfo->GetExecCycles(); + ev->TotalCycles = Requests.front().RequestInfo->GetTotalCycles(); + ev->CommitId = CommitId; + ev->AffectedBlockInfos = std::move(AffectedBlockInfos); + + auto execTime = CyclesToDurationSafe(ev->ExecCycles); + auto waitTime = CyclesToDurationSafe(Requests.front().RequestInfo->GetWaitCycles()); + + auto& counters = *ev->Stats.MutableUserWriteCounters(); + counters.SetRequestsCount(Requests.size()); + counters.SetBatchCount(1); + counters.SetBlocksCount(BlockCount); + counters.SetExecTime(execTime.MicroSeconds()); + counters.SetWaitTime(waitTime.MicroSeconds()); + + NCloud::Send(ctx, PartitionActorId, std::move(ev)); +} + +bool TWriteFreshBlocksActor::HandleError( + const TActorContext& ctx, + const NProto::TError& error) +{ + if (FAILED(error.GetCode())) { + ReplyAllAndDie(ctx, error); + return true; + } + return false; +} + +void TWriteFreshBlocksActor::ReplyWrite( + const TActorContext& ctx, + const NProto::TError& error) +{ + auto completeEvent = std::make_unique< + TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted>(error); + NotifyCompleted(ctx, std::move(completeEvent)); + + for (const auto& r: Requests) { + IEventBasePtr response = CreateWriteBlocksResponse( + r.RequestType == ERequestType::WriteBlocksLocal, + error); + + LWTRACK( + ResponseSent_Partition, + r.RequestInfo->CallContext->LWOrbit, + "WriteFreshBlocks", + r.RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *r.RequestInfo, std::move(response)); + } +} + +void TWriteFreshBlocksActor::ReplyZero( + const TActorContext& ctx, + const NProto::TError& error) +{ + auto completeEvent = std::make_unique< + TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted>(error); + NotifyCompleted(ctx, std::move(completeEvent)); + + for (const auto& r: Requests) { + IEventBasePtr response = + std::make_unique(error); + + LWTRACK( + ResponseSent_Partition, + r.RequestInfo->CallContext->LWOrbit, + "ZeroFreshBlocks", + r.RequestInfo->CallContext->RequestId); + + NCloud::Reply(ctx, *r.RequestInfo, std::move(response)); + } +} + +void TWriteFreshBlocksActor::ReplyAllAndDie( + const TActorContext& ctx, + const NProto::TError& error) +{ + if (IsZeroRequest) { + ReplyZero(ctx, error); + } else { + ReplyWrite(ctx, error); + } + + Die(ctx); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TWriteFreshBlocksActor::HandleWriteBlobResponse( + const TEvPartitionCommonPrivate::TEvWriteBlobResponse::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + for (const auto& r: Requests) { + r.RequestInfo->AddExecCycles(msg->ExecCycles); + } + + if (HandleError(ctx, msg->GetError())) { + return; + } + + AddBlocks(ctx); +} + +void TWriteFreshBlocksActor::HandleAddFreshBlocksResponse( + const TEvPartitionCommonPrivate::TEvAddFreshBlocksResponse::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + for (const auto& r: Requests) { + r.RequestInfo->CallContext->LWOrbit.Join(CombinedContext->LWOrbit); + } + + ReplyAllAndDie(ctx, {}); +} + +void TWriteFreshBlocksActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + + auto error = MakeError(E_REJECTED, "tablet is shutting down"); + + ReplyAllAndDie(ctx, error); +} + +STFUNC(TWriteFreshBlocksActor::StateWork) +{ + TDeque timers; + + for (const auto& r: Requests) { + timers.emplace_back(*r.RequestInfo); + } + + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvPartitionCommonPrivate::TEvWriteBlobResponse, HandleWriteBlobResponse); + HFunc(TEvPartitionCommonPrivate::TEvAddFreshBlocksResponse, HandleAddFreshBlocksResponse); + + default: + HandleUnexpectedEvent( + ev, + TBlockStoreComponents::PARTITION_WORKER, + __PRETTY_FUNCTION__); + break; + } +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +void TFreshBlocksCompanion::WriteFreshBlocks( + const TActorContext& ctx, + TArrayRef> requestsInBuffer, + ui64 commitId) +{ + TVector requests; + requests.reserve(requestsInBuffer.size()); + + TVector blockRanges; + blockRanges.reserve(requestsInBuffer.size()); + + TVector writeHandlers; + writeHandlers.reserve(requestsInBuffer.size()); + + ui32 blockCount = 0; + + for (const auto& r: requestsInBuffer) { + requests.emplace_back( + r.Data.RequestInfo, + r.Data.ReplyLocal ? ERequestType::WriteBlocksLocal + : ERequestType::WriteBlocks); + + if (!r.Weight) { + continue; + } + + blockCount += r.Weight; + + FlushState.IncrementFreshBlocksInFlight(r.Data.Range.Size()); + + blockRanges.push_back(r.Data.Range); + writeHandlers.push_back(r.Data.Handler); + } + + TrimFreshLogState.AccessTrimFreshLogBarriers().AcquireBarrierN( + commitId, + blockCount); + + const ui32 channel = ChannelsState.PickNextChannel( + EChannelDataKind::Fresh, + EChannelPermission::UserWritesAllowed); + + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + commitId, + channel, + blockCount, + std::move(requests), + std::move(blockRanges), + std::move(writeHandlers), + BlockDigestGenerator, + PartitionConfig.GetDiskId()); + + Actors.Insert(actor); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFreshBlocksCompanion::HandleAddFreshBlocks( + const TEvPartitionCommonPrivate::TEvAddFreshBlocksRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + STORAGE_VERIFY( + msg->WriteHandlers.size() == msg->BlockRanges.size() || + msg->WriteHandlers.empty(), + TWellKnownEntityTypes::DISK, + PartitionConfig.GetDiskId()); + + for (size_t i = 0; i < msg->BlockRanges.size(); ++i) { + auto& blockRange = msg->BlockRanges[i]; + + if (!msg->WriteHandlers) { + FreshBlocksState.ZeroFreshBlocks(blockRange, msg->CommitId); + FlushState.DecrementFreshBlocksInFlight(blockRange.Size()); + + continue; + } + + auto& writeHandler = msg->WriteHandlers[i]; + auto guardedSgList = + (*writeHandler).GetBlocks(ConvertRangeSafe(blockRange)); + + if (auto guard = guardedSgList.Acquire()) { + const auto& sgList = guard.Get(); + FreshBlocksState.WriteFreshBlocks(blockRange, msg->CommitId, sgList); + FlushState.DecrementFreshBlocksInFlight(blockRange.Size()); + } else { + LOG_ERROR( + ctx, + TBlockStoreComponents::PARTITION, + "%s Failed to lock a guardedSgList on AddFreshBlocks", + LogTitle.GetWithTime().c_str()); + Client.Poison(ctx); + return; + } + } + + FreshBlobState.AddFreshBlob({msg->CommitId, msg->BlobSize}); + FlushState.IncrementUnflushedFreshBlobCount(1); + FlushState.IncrementUnflushedFreshBlobByteCount(msg->BlobSize); + + // TODO(NBS-1976): update used blocks map + + using TResponse = TEvPartitionCommonPrivate::TEvAddFreshBlocksResponse; + auto response = std::make_unique(); + + NCloud::Reply(ctx, *ev, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TFreshBlocksCompanion::ZeroFreshBlocks( + const NActors::TActorContext& ctx, + TRequestInfoPtr requestInfo, + TBlockRange32 writeRange, + ui64 commitId) +{ + TVector requests; + TVector blockRanges; + const ui64 blockCount = writeRange.Size(); + + requests.emplace_back(requestInfo, ERequestType::ZeroBlocks); + blockRanges.emplace_back(writeRange); + + TrimFreshLogState.AccessTrimFreshLogBarriers().AcquireBarrierN(commitId, blockCount); + + const ui32 channel = ChannelsState.PickNextChannel( + EChannelDataKind::Fresh, + EChannelPermission::UserWritesAllowed); + + auto actor = NCloud::Register( + ctx, + ctx.SelfID, + commitId, + channel, + blockCount, + std::move(requests), + std::move(blockRanges), + TVector{}, + BlockDigestGenerator, + PartitionConfig.GetDiskId()); + + Actors.Insert(actor); +} + +void TFreshBlocksCompanion::WriteFreshBlocksCompleted( + const NActors::TActorContext& ctx, + const NProto::TError& error, + ui64 commitId, + ui64 blockCount, + NActors::TActorId actorId) +{ + Y_UNUSED(ctx); + if (HasError(error)) { + TrimFreshLogState.AccessTrimFreshLogBarriers().ReleaseBarrierN( + commitId, + blockCount); + } + + Actors.Erase(actorId); +} + +void TFreshBlocksCompanion::ZeroFreshBlocksCompleted( + const NActors::TActorContext& ctx, + const NProto::TError& error, + ui64 commitId, + ui64 blockCount, + NActors::TActorId actorId) +{ + WriteFreshBlocksCompleted(ctx, error, commitId, blockCount, actorId); +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/partition_common/ya.make b/cloud/blockstore/libs/storage/partition_common/ya.make index f65ad725d22..bde240ba536 100644 --- a/cloud/blockstore/libs/storage/partition_common/ya.make +++ b/cloud/blockstore/libs/storage/partition_common/ya.make @@ -11,6 +11,7 @@ SRCS( commit_ids_state.cpp drain_actor_companion.cpp fresh_blocks_companion_initfreshblocks.cpp + fresh_blocks_companion_writefreshblocks.cpp fresh_blocks_companion.cpp io_companion_patchblob.cpp io_companion_readblob.cpp