Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,13 @@ STFUNC(TPartitionActor::StateWork)
IOCompanion->HandlePatchBlobCompleted);
HFunc(TEvPartitionPrivate::TEvReadBlocksCompleted, HandleReadBlocksCompleted);
HFunc(TEvPartitionPrivate::TEvWriteBlocksCompleted, HandleWriteBlocksCompleted);
HFunc(
TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted,
HandleWriteFreshBlocksCompleted);
HFunc(TEvPartitionPrivate::TEvZeroBlocksCompleted, HandleZeroBlocksCompleted);
HFunc(
TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted,
HandleZeroFreshBlocksCompleted);
HFunc(TEvPartitionPrivate::TEvFlushCompleted, HandleFlushCompleted);
HFunc(TEvPartitionCommonPrivate::TEvTrimFreshLogCompleted, HandleTrimFreshLogCompleted);
HFunc(TEvPartitionPrivate::TEvCompactionCompleted, HandleCompactionCompleted);
Expand Down
29 changes: 29 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,39 @@ class TPartitionActor final
const TEvPartitionPrivate::TEvWriteBlocksCompleted::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleWriteFreshBlocksCompleted(
const TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted::TPtr& ev,
const NActors::TActorContext& ctx);

struct TWriteBlocksCompleted {
bool CollectGarbageBarrierAcquired;
bool AddingUnconfirmedBlobsRequested;
bool FreshBlocksRequest;
TVector<TBlobToConfirm> BlobsToConfirm;
};

void HandleWriteBlocksCompletedImpl(
const NActors::TActorContext& ctx,
NActors::TActorId sender,
NProto::TError error,
const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted,
TWriteBlocksCompleted writeBlocksCompleted);

void HandleZeroBlocksCompleted(
const TEvPartitionPrivate::TEvZeroBlocksCompleted::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleZeroFreshBlocksCompleted(
const TEvPartitionCommonPrivate::TEvZeroFreshBlocksCompleted::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleZeroBlocksCompletedImpl(
const NActors::TActorContext& ctx,
NActors::TActorId sender,
NProto::TError error,
const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted,
bool freshBlocksRequest);

void HandleFlushCompleted(
const TEvPartitionPrivate::TEvFlushCompleted::TPtr& ev,
const NActors::TActorContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,11 @@ void TPartitionActor::CompleteLoadState(
CreateFreshBlocksCompanionClient();

FreshBlocksCompanion = std::make_unique<TFreshBlocksCompanion>(
Config,
StorageAccessMode,
partitionConfig,
Info(),
BlockDigestGenerator,
*FreshBlocksCompanionClient,
*State, // channelsState
*State, // freshBlobState
Expand Down
79 changes: 61 additions & 18 deletions cloud/blockstore/libs/storage/partition/part_actor_writeblocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,34 +311,71 @@ void TPartitionActor::HandleWriteBlocksCompleted(
{
auto* msg = ev->Get();

ui64 commitId = msg->CommitId;
HandleWriteBlocksCompletedImpl(
ctx,
ev->Sender,
msg->GetError(),
*msg,
{
.CollectGarbageBarrierAcquired = msg->CollectGarbageBarrierAcquired,
.AddingUnconfirmedBlobsRequested =
msg->AddingUnconfirmedBlobsRequested,
.FreshBlocksRequest = false,
.BlobsToConfirm = std::move(msg->BlobsToConfirm),
});
}

void TPartitionActor::HandleWriteFreshBlocksCompleted(
const TEvPartitionCommonPrivate::TEvWriteFreshBlocksCompleted::TPtr& ev,
const NActors::TActorContext& ctx)
{
auto* msg = ev->Get();

HandleWriteBlocksCompletedImpl(
ctx,
ev->Sender,
msg->GetError(),
*msg,
{
.FreshBlocksRequest = true,
});
}

void TPartitionActor::HandleWriteBlocksCompletedImpl(
const NActors::TActorContext& ctx,
NActors::TActorId sender,
NProto::TError error,
const TEvPartitionCommonPrivate::TOperationCompleted& opCompleted,
TWriteBlocksCompleted writeBlocksCompleted)
{
ui64 commitId = opCompleted.CommitId;
LOG_TRACE(
ctx,
TBlockStoreComponents::PARTITION,
"%s Complete write blocks @%lu",
LogTitle.GetWithTime().c_str(),
commitId);

UpdateStats(msg->Stats);
UpdateStats(opCompleted.Stats);

ui64 blocksCount = msg->Stats.GetUserWriteCounters().GetBlocksCount();
ui64 blocksCount = opCompleted.Stats.GetUserWriteCounters().GetBlocksCount();
ui64 requestBytes = blocksCount * State->GetBlockSize();

UpdateCPUUsageStat(ctx.Now(), msg->ExecCycles);
UpdateCPUUsageStat(ctx.Now(), opCompleted.ExecCycles);

auto time = CyclesToDurationSafe(msg->TotalCycles).MicroSeconds();
auto time = CyclesToDurationSafe(opCompleted.TotalCycles).MicroSeconds();
const auto requestCount =
msg->Stats.GetUserWriteCounters().GetRequestsCount();
opCompleted.Stats.GetUserWriteCounters().GetRequestsCount();
PartCounters->RequestCounters.WriteBlocks.AddRequest(
time,
requestBytes,
requestCount
);

if (msg->AffectedBlockInfos) {
if (opCompleted.AffectedBlockInfos) {
IProfileLog::TReadWriteRequestBlockInfos request;
request.RequestType = EBlockStoreRequest::WriteBlocks;
request.BlockInfos = std::move(msg->AffectedBlockInfos);
request.BlockInfos = std::move(opCompleted.AffectedBlockInfos);
request.CommitId = commitId;

IProfileLog::TRecord record;
Expand All @@ -349,8 +386,8 @@ void TPartitionActor::HandleWriteBlocksCompleted(
ProfileLog->Write(std::move(record));
}

if (msg->AddingUnconfirmedBlobsRequested) {
if (HasError(msg->GetError())) {
if (writeBlocksCompleted.AddingUnconfirmedBlobsRequested) {
if (HasError(error)) {
// blobs are obsolete, delete them directly
auto request = std::make_unique<
TEvPartitionPrivate::TEvDeleteUnconfirmedBlobsRequest>(
Expand All @@ -360,14 +397,14 @@ void TPartitionActor::HandleWriteBlocksCompleted(
} else {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
State->BlobsConfirmed(commitId, std::move(writeBlocksCompleted.BlobsToConfirm));
}
STORAGE_VERIFY(
msg->CollectGarbageBarrierAcquired,
writeBlocksCompleted.CollectGarbageBarrierAcquired,
TWellKnownEntityTypes::TABLET,
TabletID());
STORAGE_VERIFY(
!msg->TrimFreshLogBarrierAcquired,
!writeBlocksCompleted.FreshBlocksRequest,
TWellKnownEntityTypes::TABLET,
TabletID());
// commit & garbage queue barriers will be released when confirmed
Expand All @@ -381,18 +418,24 @@ void TPartitionActor::HandleWriteBlocksCompleted(
commitId);

State->AccessCommitQueue().ReleaseBarrier(commitId);
if (msg->CollectGarbageBarrierAcquired) {
if (writeBlocksCompleted.CollectGarbageBarrierAcquired) {
State->GetGarbageQueue().ReleaseBarrier(commitId);
}

if (msg->TrimFreshLogBarrierAcquired && HasError(msg->GetError())) {
State->AccessTrimFreshLogBarriers().ReleaseBarrierN(

if (writeBlocksCompleted.FreshBlocksRequest) {
FreshBlocksCompanion->WriteFreshBlocksCompleted(
ctx,
error,
commitId,
blocksCount);
blocksCount,
sender);
}
}

Actors.Erase(ev->Sender);
if (!writeBlocksCompleted.FreshBlocksRequest) {
Actors.Erase(sender);
}

if (Executor()->GetStats().IsAnyChannelYellowMove) {
ScheduleYellowStateUpdate(ctx);
Expand Down
Loading
Loading