Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions cloud/blockstore/apps/client/lib/check_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ class TCheckRangeCommand final: public TCommand
{
private:
TString DiskId;
bool IsMirror = false;
ui64 StartIndex = 0;
ui64 BlocksCount = 0;
ui64 BlocksPerRequest = 0;
bool ShowReadErrorsEnabled = false;
bool SaveResultsEnabled = false;
bool CompareResultsEnabled = false;
TString FolderPostfix;
ui32 ReplicaCount = 0;

public:
TCheckRangeCommand(IBlockStorePtr client)
Expand Down Expand Up @@ -165,7 +165,7 @@ class TCheckRangeCommand final: public TCommand
auto request = std::make_shared<NProto::TExecuteActionRequest>();

request->SetAction("checkrange");
request->SetInput(CreateNextInput(*range, isRetry));
request->SetInput(CreateNextInput(*range));

const auto requestId = GetRequestId(*request);
auto result = WaitFor(ClientEndpoint->ExecuteAction(
Expand All @@ -190,7 +190,7 @@ class TCheckRangeCommand final: public TCommand

if (HasError(status)) {
if (status.GetCode() == E_REJECTED && !isRetry &&
ReplicaCount)
IsMirror)
{
if (ShowReadErrorsEnabled) {
output << "ReadBlocks error while reading all "
Expand Down Expand Up @@ -222,7 +222,7 @@ class TCheckRangeCommand final: public TCommand
}

output << "Total requests sended: " << requestCount << Endl;
if (ReplicaCount && mirrorErrorsCount) {
if (mirrorErrorsCount) {
output << "Errors while reading all mirror disk replicas caught: "
<< mirrorErrorsCount << Endl;
}
Expand Down Expand Up @@ -287,16 +287,9 @@ class TCheckRangeCommand final: public TCommand
}

ui64 diskBlockCount = result.GetVolume().GetBlocksCount();
if (result.GetVolume().GetStorageMediaKind() ==
NProto::STORAGE_MEDIA_SSD_MIRROR3)
{
ReplicaCount = 3;
} else if (
result.GetVolume().GetStorageMediaKind() ==
NProto::STORAGE_MEDIA_SSD_MIRROR2)
{
ReplicaCount = 2;
}
auto diskType = result.GetVolume().GetStorageMediaKind();
IsMirror = diskType == NProto::STORAGE_MEDIA_SSD_MIRROR3 ||
diskType == NProto::STORAGE_MEDIA_SSD_MIRROR2;

ui64 remainingBlocks = diskBlockCount;

Expand All @@ -311,15 +304,12 @@ class TCheckRangeCommand final: public TCommand
return TRequestBuilder(StartIndex, remainingBlocks, BlocksPerRequest);
}

TString CreateNextInput(TBlockRange64 range, bool isRetry) const
TString CreateNextInput(TBlockRange64 range) const
{
NJson::TJsonValue input;
input["DiskId"] = DiskId;
input["StartIndex"] = range.Start;
input["BlocksCount"] = range.Size();
if (!isRetry && ReplicaCount) {
input["ReplicaCount"] = ReplicaCount;
}

return input.GetStringRobust();
}
Expand Down
11 changes: 1 addition & 10 deletions cloud/blockstore/apps/client/lib/command_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,16 +838,7 @@ Y_UNIT_TEST_SUITE(TCommandTest)
client->ExecuteActionHandler =
[&](std::shared_ptr<NProto::TExecuteActionRequest> request)
{
NJson::TJsonValue json;

UNIT_ASSERT(NJson::ReadJsonTree(request->GetInput(), &json));
ui32 replicaCount = json["ReplicaCount"].GetUIntegerRobust();

if(requestCount == 0){
UNIT_ASSERT_VALUES_EQUAL(3, replicaCount);
} else {
UNIT_ASSERT_VALUES_UNEQUAL(3, replicaCount);
}
Y_UNUSED(request);
requestCount++;

NProto::TExecuteActionResponse response;
Expand Down
8 changes: 4 additions & 4 deletions cloud/blockstore/libs/storage/partition2/part2_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7489,8 +7489,8 @@ Y_UNIT_TEST_SUITE(TPartition2Test)
options.FinalEvents.emplace_back(TEvVolume::EvCheckRangeResponse);
runtime->DispatchEvents(options, TDuration::Seconds(3));

const auto& checksums1 = response1->Record.GetChecksums();
const auto& checksums2 = response2->Record.GetChecksums();
const auto& checksums1 = response1->Record.GetDiskChecksums().GetData();
const auto& checksums2 = response2->Record.GetDiskChecksums().GetData();

ASSERT_VECTORS_EQUAL(
TVector<ui32>(checksums1.begin(), checksums1.end()),
Expand Down Expand Up @@ -7524,8 +7524,8 @@ Y_UNIT_TEST_SUITE(TPartition2Test)
options.FinalEvents.emplace_back(TEvVolume::EvCheckRangeResponse);
runtime->DispatchEvents(options, TDuration::Seconds(3));

const auto& checksums1 = response1->Record.GetChecksums();
const auto& checksums2 = response2->Record.GetChecksums();
const auto& checksums1 = response1->Record.GetDiskChecksums().GetData();
const auto& checksums2 = response2->Record.GetDiskChecksums().GetData();

UNIT_ASSERT_VALUES_EQUAL(
checksums1.size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,21 @@ void TCheckRangeActor::ReplyAndDie(

////////////////////////////////////////////////////////////////////////////////

bool TCheckRangeActor::OnMessage(TAutoPtr<NActors::IEventHandle>& ev)
{
Y_UNUSED(ev);

return false;
}

STFUNC(TCheckRangeActor::StateWork)
{
TRequestScope timer(*RequestInfo);

if (OnMessage(ev)) {
return;
}

switch (ev->GetTypeRewrite()) {
HFunc(TEvService::TEvReadBlocksLocalResponse, HandleReadBlocksResponse);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
Expand Down Expand Up @@ -172,7 +183,7 @@ void TCheckRangeActor::HandleReadBlocksResponse(
offset += BlockSize, ++i)
{
const char* data = Buffer.Get().data() + offset;
response->Record.MutableChecksums()->Add(
response->Record.MutableDiskChecksums()->MutableData()->Add(
TBlockChecksum().Extend(data, BlockSize));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class TCheckRangeActor: public NActors::TActorBootstrapped<TCheckRangeActor>
const TEvService::TEvReadBlocksLocalResponse::TPtr& ev,
const NActors::TActorContext& ctx);

virtual bool OnMessage(TAutoPtr<NActors::IEventHandle>& ev);
virtual void SendReadBlocksRequest(const NActors::TActorContext& ctx);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/disk_agent/model/public.h>
#include <cloud/blockstore/libs/storage/model/log_title.h>
#include <cloud/blockstore/libs/storage/partition_common/actor_checkrange.h>

#include <cloud/storage/core/libs/common/helpers.h>
#include <cloud/storage/core/libs/common/verify.h>

#include <util/datetime/base.h>
#include <util/generic/string.h>
#include <util/generic/xrange.h>
Expand All @@ -26,45 +30,154 @@ namespace {

class TMirrorCheckRangeActor final: public TCheckRangeActor
{
private:
const ui32 ReplicasNumber;
bool ErrorOnReplicaReading{false};
ui32 ResponseCount{0};
TVector<ui32> ReplicasSummaryChecksums;
NProto::TCheckRangeResponse Response;

public:
using TCheckRangeActor::TCheckRangeActor;
template <typename... TArgs>
explicit TMirrorCheckRangeActor(
ui32 replicasNumber,
TArgs&&... args);

protected:
bool OnMessage(TAutoPtr<NActors::IEventHandle>& ev) override;
void SendReadBlocksRequest(const TActorContext& ctx) override;
void HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const NActors::TActorContext& ctx);
void Done(const NActors::TActorContext& ctx);

private:
void CalculateChecksums(const TEvService::TEvReadBlocksResponse::TPtr& ev);
void HandleReadBlocksResponseError(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx,
const ::NCloud::NProto::TError& error);
};

template <typename... TArgs>
TMirrorCheckRangeActor::TMirrorCheckRangeActor(
ui32 replicasNumber,
TArgs&&... args)
: TCheckRangeActor(std::forward<TArgs>(args)...)
, ReplicasNumber{replicasNumber}
{}

bool TMirrorCheckRangeActor::OnMessage(TAutoPtr<NActors::IEventHandle>& ev)
{
switch (ev->GetTypeRewrite()) {
HFunc(TEvService::TEvReadBlocksResponse, HandleReadBlocksResponse);
default:
return false;
}

return true;
}

////////////////////////////////////////////////////////////////////////////////

void TMirrorCheckRangeActor::SendReadBlocksRequest(const TActorContext& ctx)
{
const TString clientId{CheckRangeClientId};
TBlockRange64 range = TBlockRange64::WithLength(
Request.GetStartIndex(),
Request.GetBlocksCount());
for (ui32 i = 1; i <= ReplicasNumber; ++i) {
auto request = std::make_unique<TEvService::TEvReadBlocksRequest>();

Buffer = TGuardedBuffer(TString::Uninitialized(range.Size() * BlockSize));
request->Record.SetStartIndex(Request.GetStartIndex());
request->Record.SetBlocksCount(Request.GetBlocksCount());

auto sgList = Buffer.GetGuardedSgList();
auto sgListOrError = SgListNormalize(sgList.Acquire().Get(), BlockSize);
if (HasError(sgListOrError)) {
ReplyAndDie(ctx, sgListOrError.GetError());
return;
auto* headers = request->Record.MutableHeaders();
headers->SetReplicaIndex(i);
headers->SetClientId(TString(CheckRangeClientId));
headers->SetIsBackgroundRequest(true);

Response.MutableMirrorChecksums()->MutableReplicas()->Add({});
ReplicasSummaryChecksums.push_back(0);
NCloud::Send(ctx, Partition, std::move(request), i);
}
SgList.SetSgList(sgListOrError.ExtractResult());
}

auto request = std::make_unique<TEvService::TEvReadBlocksLocalRequest>();
void TMirrorCheckRangeActor::HandleReadBlocksResponseError(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx,
const ::NCloud::NProto::TError& error)
{
LOG_ERROR_S(
ctx,
TBlockStoreComponents::PARTITION_WORKER,
LogTitle.GetWithTime()
<< " reading error has occurred: " << FormatError(error));

// 1 result error for all replicas
ErrorOnReplicaReading = true;
Response.MutableStatus()->SetCode(error.GetCode());
*Response.MutableStatus()->MutableMessage() +=
"replica id: " + std::to_string(ev->Cookie - 1) + " ";
}

request->Record.SetStartIndex(Request.GetStartIndex());
request->Record.SetBlocksCount(Request.GetBlocksCount());
request->Record.Sglist = SgList;
request->Record.ShouldReportFailedRangesOnFailure = true;
void TMirrorCheckRangeActor::CalculateChecksums(
const TEvService::TEvReadBlocksResponse::TPtr& ev)
{
const ui32 replicaIdx = ev->Cookie - 1;
auto& replica =
(*Response.MutableMirrorChecksums()->MutableReplicas())[replicaIdx];
auto* replicaChecksums = replica.MutableData();

TBlockChecksum summaryChecksum;
for (const auto& buffer: ev->Get()->Record.GetBlocks().GetBuffers()) {
summaryChecksum.Extend(buffer.data(), buffer.size());
replicaChecksums->Add(
TBlockChecksum().Extend(buffer.data(), buffer.size()));
}

auto* headers = request->Record.MutableHeaders();
headers->SetReplicaCount(Request.headers().GetReplicaCount());
headers->SetClientId(clientId);
headers->SetIsBackgroundRequest(true);
ReplicasSummaryChecksums[replicaIdx] = summaryChecksum.GetValue();
}

NCloud::Send(ctx, Partition, std::move(request));
void TMirrorCheckRangeActor::HandleReadBlocksResponse(
const TEvService::TEvReadBlocksResponse::TPtr& ev,
const TActorContext& ctx)
{
++ResponseCount;
if (HasError(ev->Get()->Record)) {
HandleReadBlocksResponseError(ev, ctx, ev->Get()->Record.GetError());
} else {
CalculateChecksums(ev);
}

if (ResponseCount == ReplicasNumber) {
Done(ctx);
}
}

void TMirrorCheckRangeActor::Done(const NActors::TActorContext& ctx)
{
auto response = std::make_unique<TEvVolume::TEvCheckRangeResponse>();
bool checksumsEqual =
std::ranges::adjacent_find(
ReplicasSummaryChecksums,
std::not_equal_to{}) == std::ranges::end(ReplicasSummaryChecksums);
if (!ErrorOnReplicaReading && checksumsEqual) {
auto replicas = Response.MutableMirrorChecksums()->MutableReplicas();
auto* srcData = (*replicas)[0].MutableData();
std::remove_pointer_t<decltype(srcData)> hlp;
hlp.Swap(srcData);

auto& destData = *Response.MutableDiskChecksums()->MutableData();
// at this moment Response.MutableMirrorChecksums() is empty already
destData.Swap(&hlp);
} else {
if (!ErrorOnReplicaReading) {
ui32 flags = 0;
SetProtoFlag(flags, NProto::EF_CHECKSUM_MISMATCH);
*Response.MutableStatus() =
MakeError(E_IO, "Replicas checksum mismatch", flags);
} // Else using the error that we already have
}

response->Record = std::move(Response);
ReplyAndDie(ctx, std::move(response));
}

} // namespace
Expand All @@ -75,15 +188,6 @@ void TMirrorPartitionActor::HandleCheckRange(
const TEvVolume::TEvCheckRangeRequest::TPtr& ev,
const NActors::TActorContext& ctx)
{
auto fatalErr = MakeError(
E_NOT_IMPLEMENTED,
"Checkrange is not supported for mirror disks temporarily");
auto response =
std::make_unique<TEvVolume::TEvCheckRangeResponse>(std::move(fatalErr));
NCloud::Reply(ctx, *ev, std::move(response));
return;
/////

auto& record = ev->Get()->Record;

auto error = ValidateBlocksCount(
Expand All @@ -101,6 +205,7 @@ void TMirrorPartitionActor::HandleCheckRange(

NCloud::Register<TMirrorCheckRangeActor>(
ctx,
State.GetReplicaInfos().size(),
SelfId(),
std::move(record),
CreateRequestInfo(ev->Sender, ev->Cookie, ev->Get()->CallContext),
Expand Down
Loading
Loading