Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions cloud/blockstore/apps/client/lib/check_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <cloud/blockstore/libs/service/context.h>
#include <cloud/blockstore/libs/service/request_helpers.h>
#include <cloud/blockstore/libs/service/service.h>
#include <cloud/storage/core/libs/common/media.h>
#include <cloud/storage/core/libs/common/error.h>
#include <cloud/storage/core/libs/diagnostics/logging.h>

Expand Down Expand Up @@ -83,14 +84,14 @@ class TCheckRangeCommand final: public TCommand
{
private:
TString DiskId;
bool IsMirror = false;
ui64 StartIndex = 0;
ui64 BlocksCount = 0;
ui64 BlocksPerRequest = 0;
bool ShowReadErrorsEnabled = false;
bool SaveResultsEnabled = false;
bool CompareResultsEnabled = false;
TString FolderPostfix;
ui32 ReplicaCount = 0;

public:
TCheckRangeCommand(IBlockStorePtr client)
Expand Down Expand Up @@ -165,7 +166,7 @@ class TCheckRangeCommand final: public TCommand
auto request = std::make_shared<NProto::TExecuteActionRequest>();

request->SetAction("checkrange");
request->SetInput(CreateNextInput(*range, isRetry));
request->SetInput(CreateNextInput(*range));

const auto requestId = GetRequestId(*request);
auto result = WaitFor(ClientEndpoint->ExecuteAction(
Expand All @@ -190,7 +191,7 @@ class TCheckRangeCommand final: public TCommand

if (HasError(status)) {
if (status.GetCode() == E_REJECTED && !isRetry &&
ReplicaCount)
IsMirror)
{
if (ShowReadErrorsEnabled) {
output << "ReadBlocks error while reading all "
Expand Down Expand Up @@ -222,7 +223,7 @@ class TCheckRangeCommand final: public TCommand
}

output << "Total requests sended: " << requestCount << Endl;
if (ReplicaCount && mirrorErrorsCount) {
if (mirrorErrorsCount) {
output << "Errors while reading all mirror disk replicas caught: "
<< mirrorErrorsCount << Endl;
}
Expand Down Expand Up @@ -286,18 +287,10 @@ class TCheckRangeCommand final: public TCommand
return result.GetError();
}

ui64 diskBlockCount = result.GetVolume().GetBlocksCount();
if (result.GetVolume().GetStorageMediaKind() ==
NProto::STORAGE_MEDIA_SSD_MIRROR3)
{
ReplicaCount = 3;
} else if (
result.GetVolume().GetStorageMediaKind() ==
NProto::STORAGE_MEDIA_SSD_MIRROR2)
{
ReplicaCount = 2;
}
IsMirror = NCloud::IsReliableDiskRegistryMediaKind(
result.GetVolume().GetStorageMediaKind());

ui64 diskBlockCount = result.GetVolume().GetBlocksCount();
ui64 remainingBlocks = diskBlockCount;

if (BlocksCount) {
Expand All @@ -311,15 +304,12 @@ class TCheckRangeCommand final: public TCommand
return TRequestBuilder(StartIndex, remainingBlocks, BlocksPerRequest);
}

TString CreateNextInput(TBlockRange64 range, bool isRetry) const
TString CreateNextInput(TBlockRange64 range) const
{
NJson::TJsonValue input;
input["DiskId"] = DiskId;
input["StartIndex"] = range.Start;
input["BlocksCount"] = range.Size();
if (!isRetry && ReplicaCount) {
input["ReplicaCount"] = ReplicaCount;
}

return input.GetStringRobust();
}
Expand Down
11 changes: 1 addition & 10 deletions cloud/blockstore/apps/client/lib/command_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,16 +838,7 @@ Y_UNIT_TEST_SUITE(TCommandTest)
client->ExecuteActionHandler =
[&](std::shared_ptr<NProto::TExecuteActionRequest> request)
{
NJson::TJsonValue json;

UNIT_ASSERT(NJson::ReadJsonTree(request->GetInput(), &json));
ui32 replicaCount = json["ReplicaCount"].GetUIntegerRobust();

if(requestCount == 0){
UNIT_ASSERT_VALUES_EQUAL(3, replicaCount);
} else {
UNIT_ASSERT_VALUES_UNEQUAL(3, replicaCount);
}
Y_UNUSED(request);
requestCount++;

NProto::TExecuteActionResponse response;
Expand Down
8 changes: 4 additions & 4 deletions cloud/blockstore/libs/storage/partition2/part2_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7489,8 +7489,8 @@ Y_UNIT_TEST_SUITE(TPartition2Test)
options.FinalEvents.emplace_back(TEvVolume::EvCheckRangeResponse);
runtime->DispatchEvents(options, TDuration::Seconds(3));

const auto& checksums1 = response1->Record.GetChecksums();
const auto& checksums2 = response2->Record.GetChecksums();
const auto& checksums1 = response1->Record.GetDiskChecksums().GetData();
const auto& checksums2 = response2->Record.GetDiskChecksums().GetData();

ASSERT_VECTORS_EQUAL(
TVector<ui32>(checksums1.begin(), checksums1.end()),
Expand Down Expand Up @@ -7524,8 +7524,8 @@ Y_UNIT_TEST_SUITE(TPartition2Test)
options.FinalEvents.emplace_back(TEvVolume::EvCheckRangeResponse);
runtime->DispatchEvents(options, TDuration::Seconds(3));

const auto& checksums1 = response1->Record.GetChecksums();
const auto& checksums2 = response2->Record.GetChecksums();
const auto& checksums1 = response1->Record.GetDiskChecksums().GetData();
const auto& checksums2 = response2->Record.GetDiskChecksums().GetData();

UNIT_ASSERT_VALUES_EQUAL(
checksums1.size(),
Expand Down
34 changes: 21 additions & 13 deletions cloud/blockstore/libs/storage/partition_common/actor_checkrange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ void TCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx)
auto sgList = Buffer.GetGuardedSgList();
auto sgListOrError = SgListNormalize(sgList.Acquire().Get(), BlockSize);
if (HasError(sgListOrError)) {
LOG_ERROR_S(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
LogTitle.GetWithTime() << " sgList error occured: "
<< FormatError(sgListOrError.GetError()));
ReplyAndDie(ctx, sgListOrError.GetError());
return;
}
Expand All @@ -97,16 +102,8 @@ void TCheckRangeActor::ReplyAndDie(
const TActorContext& ctx,
const NProto::TError& error)
{
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"%s CheckRangeActor has finished",
LogTitle.GetWithTime().c_str());

auto response = std::make_unique<TEvVolume::TEvCheckRangeResponse>(error);
NCloud::Reply(ctx, *RequestInfo, std::move(response));

Die(ctx);
ReplyAndDie(ctx, std::move(response));
}

void TCheckRangeActor::ReplyAndDie(
Expand All @@ -116,19 +113,31 @@ void TCheckRangeActor::ReplyAndDie(
LOG_INFO(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
"%s CheckRangeActor has finished",
LogTitle.GetWithTime().c_str());
"%s CheckRangeActor has finished with status %d",
LogTitle.GetWithTime().c_str(),
response->GetStatus());

NCloud::Reply(ctx, *RequestInfo, std::move(response));
Die(ctx);
}

////////////////////////////////////////////////////////////////////////////////

bool TCheckRangeActor::OnMessage(TAutoPtr<NActors::IEventHandle>& ev)
{
Y_UNUSED(ev);

return false;
}

STFUNC(TCheckRangeActor::StateWork)
{
TRequestScope timer(*RequestInfo);

if (OnMessage(ev)) {
return;
}

switch (ev->GetTypeRewrite()) {
HFunc(TEvService::TEvReadBlocksLocalResponse, HandleReadBlocksResponse);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
Expand All @@ -148,7 +157,6 @@ void TCheckRangeActor::HandlePoisonPill(
Y_UNUSED(ev);

auto error = MakeError(E_REJECTED, "tablet is shutting down");

ReplyAndDie(ctx, error);
}

Expand All @@ -172,7 +180,7 @@ void TCheckRangeActor::HandleReadBlocksResponse(
offset += BlockSize, ++i)
{
const char* data = Buffer.Get().data() + offset;
response->Record.MutableChecksums()->Add(
response->Record.MutableDiskChecksums()->MutableData()->Add(
TBlockChecksum().Extend(data, BlockSize));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ class TCheckRangeActor: public NActors::TActorBootstrapped<TCheckRangeActor>

void ReplyAndDie(
const NActors::TActorContext& ctx,
std::unique_ptr<TEvVolume::TEvCheckRangeResponse>);
std::unique_ptr<TEvVolume::TEvCheckRangeResponse> response);

void HandleReadBlocksResponse(
const TEvService::TEvReadBlocksLocalResponse::TPtr& ev,
const NActors::TActorContext& ctx);

virtual bool OnMessage(TAutoPtr<NActors::IEventHandle>& ev);
virtual void SendReadBlocksRequest(const NActors::TActorContext& ctx);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ STFUNC(TMirrorPartitionActor::StateWork)
HFunc(TEvVolume::TEvScanDiskRequest, HandleScanDisk);
HFunc(TEvVolume::TEvGetScanDiskStatusRequest, HandleGetScanDiskStatus);
HFunc(TEvVolume::TEvCheckRangeRequest, HandleCheckRange);
HFunc(TEvVolume::TEvCheckRangeResponse, HandleCheckRangeResponse);

HFunc(
TEvNonreplPartitionPrivate::TEvWriteOrZeroCompleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ TDuration CalculateScrubbingInterval(
ui64 maxBandwidth,
ui64 minBandwidth);

struct TCheckRangeRequestInfo
{
TCheckRangeRequestInfo() = default;

TCheckRangeRequestInfo(TRequestInfoPtr requestInfo, TBlockRange64 range)
: RequestInfo(std::move(requestInfo))
, Range(range)
{}

TRequestInfoPtr RequestInfo;
TBlockRange64 Range;
};

////////////////////////////////////////////////////////////////////////////////

class TMirrorPartitionActor final
Expand Down Expand Up @@ -74,6 +87,7 @@ class TMirrorPartitionActor final
TDuration CpuUsage;

THashSet<ui64> DirtyReadRequestIds;
THashMap<NActors::TActorId, TCheckRangeRequestInfo> CheckRangeRequestsInfo;
TRequestsInProgress<
EAllowedRequests::ReadWrite,
ui64, // key
Expand Down Expand Up @@ -150,6 +164,10 @@ class TMirrorPartitionActor final
EWriteRequestType SuggestWriteRequestType(
const NActors::TActorContext& ctx,
TBlockRange64 range);
void ReplyError(
const NActors::TActorContext& ctx,
const TEvVolume::TEvCheckRangeRequest::TPtr& ev,
NProto::TError&& error);

private:
STFUNC(StateWork);
Expand Down Expand Up @@ -228,6 +246,10 @@ class TMirrorPartitionActor final
const NActors::TEvents::TEvPoisonTaken::TPtr& ev,
const NActors::TActorContext& ctx);

void HandleCheckRangeResponse(
const TEvVolume::TEvCheckRangeResponse::TPtr& ev,
const NActors::TActorContext& ctx);

template <typename TMethod>
void MirrorRequest(
const typename TMethod::TRequest::TPtr& ev,
Expand Down
Loading
Loading