Skip to content

Commit a6599a2

Browse files
Calculate data integrity checksums on migration and resync (#4570)
#2988 For being able to use new policy introduced in #4571 in loadtests, enable the data integrity feature for migration and resync. But, even loadtests aside, it seems useful
1 parent d6c4f3d commit a6599a2

File tree

9 files changed

+288
-15
lines changed

9 files changed

+288
-15
lines changed

cloud/blockstore/libs/storage/disk_agent/actors/direct_copy_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ void TDirectCopyActor::HandleReadBlocksResponse(
123123
auto writeRequest =
124124
std::make_unique<TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
125125
PrepareRequest(Request, writeRequest.get());
126+
if (msg->Record.HasChecksum()) {
127+
*writeRequest->Record.MutableChecksum() =
128+
std::move(*msg->Record.MutableChecksum());
129+
}
126130
writeRequest->Record.MutableBlocks()->Swap(msg->Record.MutableBlocks());
127131
request = std::move(writeRequest);
128132
}

cloud/blockstore/libs/storage/partition_nonrepl/copy_range.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,11 @@ void TCopyRangeActor::ReadBlocks(const TActorContext& ctx)
107107
ReadStartTs = ctx.Now();
108108
}
109109

110-
void TCopyRangeActor::WriteBlocks(const TActorContext& ctx, NProto::TIOVector blocks)
110+
void TCopyRangeActor::WriteBlocks(
111+
const TActorContext& ctx,
112+
NProto::TReadBlocksResponse readResponse)
111113
{
114+
NProto::TIOVector blocks = std::move(*readResponse.MutableBlocks());
112115
// BlobStorage-based volumes returns empty blocks for zero-blocks.
113116
for (auto& block: *blocks.MutableBuffers()) {
114117
if (block.empty()) {
@@ -118,6 +121,10 @@ void TCopyRangeActor::WriteBlocks(const TActorContext& ctx, NProto::TIOVector bl
118121

119122
auto request = std::make_unique<TEvService::TEvWriteBlocksRequest>();
120123
request->Record.SetStartIndex(Range.Start);
124+
if (readResponse.HasChecksum()) {
125+
request->Record.MutableChecksums()->Add(
126+
std::move(*readResponse.MutableChecksum()));
127+
}
121128
request->Record.MutableBlocks()->Swap(&blocks);
122129
auto clientId =
123130
WriterClientId ? WriterClientId : TString(BackgroundOpsClientId);
@@ -289,7 +296,7 @@ void TCopyRangeActor::HandleReadResponse(
289296
AllZeroes = true;
290297
ZeroBlocks(ctx);
291298
} else {
292-
WriteBlocks(ctx, std::move(*msg->Record.MutableBlocks()));
299+
WriteBlocks(ctx, std::move(msg->Record));
293300
}
294301
}
295302

cloud/blockstore/libs/storage/partition_nonrepl/copy_range.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class TCopyRangeActor final
6262
void ReadBlocks(const NActors::TActorContext& ctx);
6363
void WriteBlocks(
6464
const NActors::TActorContext& ctx,
65-
NProto::TIOVector blocks);
65+
NProto::TReadBlocksResponse readResponse);
6666
void ZeroBlocks(const NActors::TActorContext& ctx);
6767
void Done(const NActors::TActorContext& ctx, NProto::TError error);
6868

cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_ut.cpp

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "part_mirror.h"
2+
23
#include "part_mirror_actor.h"
34
#include "part_mirror_resync_actor.h"
45
#include "part_mirror_resync_util.h"
@@ -26,11 +27,14 @@
2627
#include <util/datetime/base.h>
2728
#include <util/string/builder.h>
2829

30+
#include <google/protobuf/util/message_differencer.h>
31+
2932
namespace NCloud::NBlockStore::NStorage {
3033

3134
using namespace NActors;
3235
using namespace NKikimr;
3336
using namespace std::chrono_literals;
37+
using MessageDifferencer = google::protobuf::util::MessageDifferencer;
3438

3539
namespace {
3640

@@ -256,6 +260,7 @@ struct TTestEnv
256260
}
257261
}
258262

263+
DiskAgentState->EnableDataIntegrityValidation = true;
259264
Runtime.AddLocalService(
260265
MakeDiskAgentServiceId(nodeId),
261266
TActorSetupCmd(
@@ -668,6 +673,119 @@ Y_UNIT_TEST_SUITE(TMirrorPartitionResyncTest)
668673
counters.RequestCounters.ReadBlocks.RequestBytes);
669674
}
670675

676+
void ShouldUseDataIntegrityChecksums(
677+
NProto::EResyncPolicy resyncPolicy,
678+
bool multiBlockCorruption)
679+
{
680+
constexpr ui32 BlockSize = 4_KB;
681+
TTestBasicRuntime runtime;
682+
TTestEnv env(runtime, BlockSize);
683+
684+
const auto range =
685+
TBlockRange64::WithLength(0, 5120 * 4_KB / BlockSize);
686+
687+
env.WriteMirror(range, 'A');
688+
env.WriteReplica(1, range, 'B');
689+
env.WriteReplica(2, range, 'B');
690+
691+
// Corrupt multiple blocks to engage blobk by block resyncing logic.
692+
if (multiBlockCorruption) {
693+
env.WriteReplica(0, TBlockRange64::WithLength(0, 1), '0');
694+
env.WriteReplica(1, TBlockRange64::WithLength(0, 1), '0');
695+
696+
env.WriteReplica(1, TBlockRange64::WithLength(1, 1), '1');
697+
env.WriteReplica(2, TBlockRange64::WithLength(1, 1), '1');
698+
699+
env.WriteReplica(0, TBlockRange64::WithLength(2, 1), '2');
700+
env.WriteReplica(2, TBlockRange64::WithLength(2, 1), '2');
701+
}
702+
703+
bool seenWrites = false;
704+
TVector<NProto::TChecksum> checksums;
705+
runtime.SetEventFilter(
706+
[&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) -> bool
707+
{
708+
switch (event->GetTypeRewrite()) {
709+
case TEvDiskAgent::EvReadDeviceBlocksResponse: {
710+
auto* msg = event->Get<
711+
TEvDiskAgent::TEvReadDeviceBlocksResponse>();
712+
UNIT_ASSERT(msg->Record.HasChecksum());
713+
checksums.push_back(msg->Record.GetChecksum());
714+
break;
715+
}
716+
case TEvDiskAgent::EvWriteDeviceBlocksRequest: {
717+
seenWrites = true;
718+
auto* msg = event->Get<
719+
TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
720+
UNIT_ASSERT(msg->Record.HasChecksum());
721+
UNIT_ASSERT(!checksums.empty());
722+
const auto expectedChecksum =
723+
multiBlockCorruption ? CalculateChecksum(
724+
msg->Record.GetBlocks(),
725+
BlockSize)
726+
: checksums.back();
727+
728+
// The first checksum should be calculated after the
729+
// block by block resync.
730+
if (multiBlockCorruption && checksums.size() == 3) {
731+
UNIT_ASSERT(!MessageDifferencer::Equals(
732+
expectedChecksum,
733+
checksums.back()));
734+
}
735+
UNIT_ASSERT_C(
736+
MessageDifferencer::Equals(
737+
msg->Record.GetChecksum(),
738+
expectedChecksum),
739+
TStringBuilder()
740+
<< "Checksum mismatch: "
741+
<< expectedChecksum.ShortUtf8DebugString()
742+
.Quote()
743+
<< " != "
744+
<< msg->Record.GetChecksum()
745+
.ShortUtf8DebugString()
746+
.Quote());
747+
748+
break;
749+
}
750+
default:
751+
break;
752+
}
753+
return false;
754+
});
755+
756+
env.StartResync(0, resyncPolicy);
757+
env.ResyncController.WaitForResyncedRangeCount(5);
758+
UNIT_ASSERT(env.ResyncController.ResyncFinished);
759+
UNIT_ASSERT(seenWrites);
760+
const ui32 expectedChecksumsCount =
761+
resyncPolicy == NProto::RESYNC_POLICY_MINOR_AND_MAJOR_BLOCK_BY_BLOCK
762+
? 15
763+
: 5;
764+
UNIT_ASSERT_VALUES_EQUAL(expectedChecksumsCount, checksums.size());
765+
}
766+
767+
Y_UNIT_TEST(ShouldUseDataIntegrityChecksums_4MB)
768+
{
769+
ShouldUseDataIntegrityChecksums(
770+
NProto::RESYNC_POLICY_MINOR_AND_MAJOR_4MB,
771+
false);
772+
}
773+
774+
Y_UNIT_TEST(ShouldUseDataIntegrityChecksums_BlockByBlock)
775+
{
776+
ShouldUseDataIntegrityChecksums(
777+
NProto::RESYNC_POLICY_MINOR_AND_MAJOR_BLOCK_BY_BLOCK,
778+
false);
779+
}
780+
781+
Y_UNIT_TEST(
782+
ShouldUseDataIntegrityChecksums_BlockByBlock_MultiBlockCorruption)
783+
{
784+
ShouldUseDataIntegrityChecksums(
785+
NProto::RESYNC_POLICY_MINOR_AND_MAJOR_BLOCK_BY_BLOCK,
786+
true);
787+
}
788+
671789
void DoTestShouldResyncWholeDisk(
672790
ui32 blockSize,
673791
NProto::EResyncPolicy resyncPolicy)

cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
#include <util/datetime/base.h>
2424
#include <util/generic/size_literals.h>
2525

26+
#include <google/protobuf/util/message_differencer.h>
27+
2628
namespace NCloud::NBlockStore::NStorage {
2729

2830
using namespace NActors;
2931
using namespace NKikimr;
3032
using namespace std::chrono_literals;
33+
using MessageDifferencer = google::protobuf::util::MessageDifferencer;
3134

3235
namespace {
3336

@@ -156,6 +159,7 @@ struct TTestEnv
156159
std::move(record),
157160
1000);
158161
};
162+
DiskAgentState->EnableDataIntegrityValidation = true;
159163

160164
NProto::TStorageServiceConfig storageConfig;
161165
storageConfig.SetMaxTimedOutDeviceStateDuration(20'000);
@@ -1290,6 +1294,119 @@ Y_UNIT_TEST_SUITE(TNonreplicatedPartitionMigrationTest)
12901294
counters.WriteBlocks.RequestBytes);
12911295
}
12921296

1297+
void ShouldUseDataIntegrityChecksums(bool useDirectCopy)
1298+
{
1299+
const size_t migratedRangeCount = 3;
1300+
1301+
TTestBasicRuntime runtime;
1302+
1303+
auto migrationState = std::make_shared<TMigrationState>();
1304+
migrationState->IsMigrationAllowed = false;
1305+
1306+
NProto::TStorageServiceConfig storageConfigPatch;
1307+
storageConfigPatch.SetUseDirectCopyRange(useDirectCopy);
1308+
storageConfigPatch.SetMaxMigrationIoDepth(1);
1309+
TTestEnv env(
1310+
runtime,
1311+
TTestEnv::DefaultDevices(runtime.GetNodeId(0)),
1312+
TTestEnv::DefaultMigrations(runtime.GetNodeId(0)),
1313+
NProto::VOLUME_IO_OK,
1314+
false,
1315+
migrationState,
1316+
storageConfigPatch);
1317+
TPartitionClient client(runtime, env.ActorId);
1318+
1319+
// Initialize the disk with some data to migrate via WriteBlocks instead
1320+
// of ZeroBlocks.
1321+
client.WriteBlocksLocal(
1322+
TBlockRange64::WithLength(
1323+
0,
1324+
migratedRangeCount * ProcessingBlockCount),
1325+
TString(DefaultBlockSize, 'A'));
1326+
runtime.AdvanceCurrentTime(UpdateCountersInterval);
1327+
runtime.DispatchEvents({}, TDuration::Seconds(1));
1328+
1329+
bool seenWrites = false;
1330+
TVector<NProto::TChecksum> checksums;
1331+
runtime.SetEventFilter(
1332+
[&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) -> bool
1333+
{
1334+
switch (event->GetTypeRewrite()) {
1335+
case TEvDiskAgent::EvReadDeviceBlocksResponse: {
1336+
auto* msg = event->Get<
1337+
TEvDiskAgent::TEvReadDeviceBlocksResponse>();
1338+
UNIT_ASSERT(msg->Record.HasChecksum());
1339+
checksums.push_back(msg->Record.GetChecksum());
1340+
break;
1341+
}
1342+
case TEvDiskAgent::EvWriteDeviceBlocksRequest: {
1343+
seenWrites = true;
1344+
auto* msg = event->Get<
1345+
TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
1346+
UNIT_ASSERT(msg->Record.HasChecksum());
1347+
UNIT_ASSERT(!checksums.empty());
1348+
UNIT_ASSERT_C(
1349+
MessageDifferencer::Equals(
1350+
msg->Record.GetChecksum(),
1351+
checksums.back()),
1352+
TStringBuilder() << "Checksum mismatch: "
1353+
<< checksums.back()
1354+
.ShortUtf8DebugString()
1355+
.Quote()
1356+
<< " != "
1357+
<< msg->Record.GetChecksum()
1358+
.ShortUtf8DebugString()
1359+
.Quote());
1360+
break;
1361+
}
1362+
default:
1363+
break;
1364+
}
1365+
return false;
1366+
});
1367+
1368+
migrationState->IsMigrationAllowed = true;
1369+
WaitForMigrations(runtime, migratedRangeCount);
1370+
UNIT_ASSERT_VALUES_EQUAL(migratedRangeCount, checksums.size());
1371+
UNIT_ASSERT(seenWrites);
1372+
1373+
const size_t migrationCopyBlocks =
1374+
useDirectCopy ? migratedRangeCount : 0;
1375+
const size_t migrationWriteBlocks =
1376+
migratedRangeCount - migrationCopyBlocks;
1377+
1378+
runtime.AdvanceCurrentTime(UpdateCountersInterval);
1379+
runtime.DispatchEvents({}, TDuration::Seconds(1));
1380+
1381+
auto& counters = env.StorageStatsServiceState->Counters.RequestCounters;
1382+
UNIT_ASSERT_VALUES_EQUAL(
1383+
migrationCopyBlocks,
1384+
counters.CopyBlocks.Count);
1385+
UNIT_ASSERT_VALUES_EQUAL(
1386+
migrationCopyBlocks * MigrationRangeSize,
1387+
counters.CopyBlocks.RequestBytes);
1388+
1389+
runtime.AdvanceCurrentTime(UpdateCountersInterval);
1390+
runtime.DispatchEvents({}, TDuration::Seconds(1));
1391+
1392+
UNIT_ASSERT_VALUES_EQUAL(
1393+
migrationWriteBlocks,
1394+
counters.WriteBlocks.Count);
1395+
UNIT_ASSERT_VALUES_EQUAL(
1396+
migrationWriteBlocks * MigrationRangeSize,
1397+
counters.WriteBlocks.RequestBytes);
1398+
}
1399+
1400+
Y_UNIT_TEST(ShouldUseDataIntegrityChecksums_DirectCopy)
1401+
{
1402+
ShouldUseDataIntegrityChecksums(true);
1403+
}
1404+
1405+
Y_UNIT_TEST(ShouldUseDataIntegrityChecksums_NoDirectCopy)
1406+
{
1407+
ShouldUseDataIntegrityChecksums(false);
1408+
}
1409+
12931410
Y_UNIT_TEST(ShouldUseRecommendedBandwidth)
12941411
{
12951412
using TEvGetDeviceForRangeRequest =

cloud/blockstore/libs/storage/partition_nonrepl/resync_range.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,17 +207,22 @@ void TResyncRangeActor::ReadBlocks(const TActorContext& ctx)
207207
ReadStartTs = ctx.Now();
208208
}
209209

210-
void TResyncRangeActor::WriteBlocks(const TActorContext& ctx)
210+
void TResyncRangeActor::WriteBlocks(
211+
const TActorContext& ctx,
212+
const NProto::TReadBlocksResponse& readResponse)
211213
{
212214
for (int idx: ActorsToResync) {
213215
WriteRangeInfo.ReplicaChecksums.push_back(MakeChecksums(idx, SgList));
214-
WriteReplicaBlocks(ctx, idx);
216+
WriteReplicaBlocks(ctx, readResponse, idx);
215217
}
216218

217219
WriteStartTs = ctx.Now();
218220
}
219221

220-
void TResyncRangeActor::WriteReplicaBlocks(const TActorContext& ctx, int idx)
222+
void TResyncRangeActor::WriteReplicaBlocks(
223+
const TActorContext& ctx,
224+
const NProto::TReadBlocksResponse& readResponse,
225+
int idx)
221226
{
222227
auto request = std::make_unique<TEvService::TEvWriteBlocksLocalRequest>();
223228
request->Record.MutableHeaders()->SetVolumeRequestId(VolumeRequestId);
@@ -227,6 +232,9 @@ void TResyncRangeActor::WriteReplicaBlocks(const TActorContext& ctx, int idx)
227232
request->Record.BlocksCount = Range.Size();
228233
request->Record.BlockSize = BlockSize;
229234
request->Record.Sglist = SgList;
235+
if (readResponse.HasChecksum()) {
236+
*request->Record.MutableChecksums()->Add() = readResponse.GetChecksum();
237+
}
230238

231239
auto* headers = request->Record.MutableHeaders();
232240
headers->SetIsBackgroundRequest(true);
@@ -371,7 +379,7 @@ void TResyncRangeActor::HandleReadResponse(
371379
ReadRangeInfo.ReplicaChecksums.push_back(
372380
MakeChecksums(ReplicaIndexToReadFrom, SgList));
373381

374-
WriteBlocks(ctx);
382+
WriteBlocks(ctx, msg->Record);
375383
}
376384

377385
void TResyncRangeActor::HandleWriteUndelivery(

0 commit comments

Comments
 (0)