Skip to content

Commit 8c820fd

Browse files
authored
Add delivery tracking to many of ut_vdisk helper actors (#26602)
1 parent c21f894 commit 8c820fd

File tree

2 files changed

+118
-51
lines changed

2 files changed

+118
-51
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@ ydb/core/blobstorage/ut_blobstorage BlobPatching.PatchBlock42
33
ydb/core/blobstorage/ut_blobstorage unittest.[*/*] chunk
44
ydb/core/blobstorage/ut_blobstorage/ut_huge HugeBlobOnlineSizeChange.Compaction
55
ydb/core/blobstorage/ut_blobstorage/ut_huge unittest.[*/*] chunk
6-
ydb/core/blobstorage/ut_vdisk TBsLocalRecovery.WriteRestartReadHugeDecreased
7-
ydb/core/blobstorage/ut_vdisk TBsVDiskGC.GCPutKeepBarrierSync
8-
ydb/core/blobstorage/ut_vdisk TBsVDiskManyPutGet.ManyPutRangeGetCompactionIndexOnly
9-
ydb/core/blobstorage/ut_vdisk unittest.[*/*] chunk
106
ydb/core/client/ut TObjectStorageListingTest.TestSkipShards
117
ydb/core/cms/ut_sentinel_unstable TSentinelUnstableTests.BSControllerCantChangeStatus
128
ydb/core/fq/libs/row_dispatcher/format_handler/ut TestFormatHandler.ManyRawClients

ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp

Lines changed: 118 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ class TRangeGet : public TActorBootstrapped<TRangeGet> {
113113
LOG_NOTICE(ctx, NActorsServices::TEST,
114114
" RANGE READ: from=%s to=%s\n", ReadFrom.ToString().data(), ReadTo.ToString().data());
115115

116+
SendRequest();
117+
}
118+
119+
void SendRequest() {
116120
auto req = TEvBlobStorage::TEvVGet::CreateRangeIndexQuery(VDiskInfo.VDiskID,
117121
TInstant::Max(),
118122
NKikimrBlobStorage::EGetHandleClass::FastRead,
@@ -122,7 +126,7 @@ class TRangeGet : public TActorBootstrapped<TRangeGet> {
122126
ReadTo,
123127
MaxResults);
124128

125-
ctx.Send(VDiskInfo.ActorID, req.release());
129+
Send(VDiskInfo.ActorID, req.release(), IEventHandle::FlagTrackDelivery);
126130
}
127131

128132
void CheckOrder(const NKikimrBlobStorage::TEvVGetResult &rec,
@@ -163,6 +167,7 @@ class TRangeGet : public TActorBootstrapped<TRangeGet> {
163167
STRICT_STFUNC(StateFunc,
164168
HFunc(TEvBlobStorage::TEvVGetResult, Handle);
165169
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
170+
cFunc(TEvents::TEvUndelivered::EventType, SendRequest);
166171
)
167172

168173
public:
@@ -581,14 +586,18 @@ class TManyGets : public TActorBootstrapped<TManyGets> {
581586
// get logo blob
582587
if (Step % 100 == 0)
583588
LOG_NOTICE(ctx, NActorsServices::TEST, "GET Step=%u", Step);
589+
SendRequest();
590+
}
591+
592+
void SendRequest() {
584593
TLogoBlobID logoBlobID(TabletId, Gen, Step, Channel, MsgData.size(), 0, 1);
585594
auto req = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskInfo.VDiskID,
586595
TInstant::Max(),
587596
NKikimrBlobStorage::EGetHandleClass::AsyncRead,
588597
TEvBlobStorage::TEvVGet::EFlags::None,
589598
{},
590599
{logoBlobID});
591-
ctx.Send(VDiskInfo.ActorID, req.release());
600+
Send(VDiskInfo.ActorID, req.release(), IEventHandle::FlagTrackDelivery);
592601
}
593602

594603
void Check(const TActorContext &ctx, const NKikimrBlobStorage::TEvVGetResult &rec, const TEvBlobStorage::TEvVGetResult& ev) {
@@ -626,6 +635,7 @@ class TManyGets : public TActorBootstrapped<TManyGets> {
626635
STRICT_STFUNC(StateReadFunc,
627636
HFunc(TEvBlobStorage::TEvVGetResult, Handle);
628637
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
638+
cFunc(TEvents::TEvUndelivered::EventType, SendRequest);
629639
)
630640

631641
public:
@@ -816,12 +826,9 @@ class TPutGC : public TActorBootstrapped<TPutGC> {
816826

817827
friend class TActorBootstrapped<TPutGC>;
818828

819-
void Bootstrap(const TActorContext &ctx) {
829+
void Bootstrap(const TActorContext&) {
820830
TThis::Become(&TThis::StateFunc);
821-
ctx.Send(VDiskInfo.ActorID,
822-
new TEvBlobStorage::TEvVCollectGarbage(TabletID, RecGen, RecGenCounter, Channel, Collect, CollectGen,
823-
CollectStep, false, Keep.Get(), DoNotKeep.Get(), VDiskInfo.VDiskID,
824-
TInstant::Max()));
831+
SendRequest();
825832
}
826833

827834
void Handle(TEvBlobStorage::TEvVCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
@@ -831,9 +838,18 @@ class TPutGC : public TActorBootstrapped<TPutGC> {
831838
TThis::Die(ctx);
832839
}
833840

841+
void SendRequest() {
842+
Send(VDiskInfo.ActorID,
843+
new TEvBlobStorage::TEvVCollectGarbage(TabletID, RecGen, RecGenCounter, Channel, Collect, CollectGen,
844+
CollectStep, false, Keep.Get(), DoNotKeep.Get(), VDiskInfo.VDiskID,
845+
TInstant::Max()),
846+
IEventHandle::FlagTrackDelivery);
847+
}
848+
834849
STRICT_STFUNC(StateFunc,
835850
HFunc(TEvBlobStorage::TEvVCollectGarbageResult, Handle);
836851
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
852+
cFunc(TEvents::TEvUndelivered::EventType, SendRequest);
837853
)
838854

839855
public:
@@ -872,23 +888,30 @@ class TWaitForCompactionOneDisk : public TActorBootstrapped<TWaitForCompactionOn
872888

873889
friend class TActorBootstrapped<TWaitForCompactionOneDisk>;
874890

875-
void Bootstrap(const TActorContext &ctx) {
891+
void Bootstrap(const TActorContext&) {
876892
Become(&TThis::StateSchedule);
877-
ctx.Send(VDiskInfo.ActorID,
878-
new TEvBlobStorage::TEvVCompact(VDiskInfo.VDiskID, NKikimrBlobStorage::TEvVCompact::ASYNC));
893+
SendVCompact();
879894
}
880895

881-
void Handle(TEvBlobStorage::TEvVCompactResult::TPtr &ev, const TActorContext &ctx) {
896+
void Handle(TEvBlobStorage::TEvVCompactResult::TPtr &ev) {
882897
if (ev->Get()->Record.GetStatus() == NKikimrProto::OK) {
883898
Become(&TThis::StateWait);
884-
ctx.Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVStatus(VDiskInfo.VDiskID));
899+
SendVStatus();
885900
} else {
886-
ctx.Send(VDiskInfo.ActorID,
887-
new TEvBlobStorage::TEvVCompact(VDiskInfo.VDiskID, NKikimrBlobStorage::TEvVCompact::ASYNC));
901+
SendVCompact();
888902
}
889903
}
890904

891-
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev, const TActorContext &ctx) {
905+
void SendVCompact() {
906+
Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVCompact(VDiskInfo.VDiskID,
907+
NKikimrBlobStorage::TEvVCompact::ASYNC), IEventHandle::FlagTrackDelivery);
908+
}
909+
910+
void SendVStatus() {
911+
Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVStatus(VDiskInfo.VDiskID), IEventHandle::FlagTrackDelivery);
912+
}
913+
914+
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev) {
892915
Y_ABORT_UNLESS(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
893916
const NKikimrBlobStorage::TEvVStatusResult &record = ev->Get()->Record;
894917

@@ -898,21 +921,22 @@ class TWaitForCompactionOneDisk : public TActorBootstrapped<TWaitForCompactionOn
898921

899922
if (logoBlobsCompacted && blocksCompacted && barriersCompacted) {
900923
// Finished
901-
ctx.Send(NotifyID, new TEvents::TEvCompleted());
902-
Die(ctx);
924+
Send(NotifyID, new TEvents::TEvCompleted());
925+
PassAway();
903926
} else {
904927
Become(&TThis::StateSchedule);
905-
ctx.Send(VDiskInfo.ActorID,
906-
new TEvBlobStorage::TEvVCompact(VDiskInfo.VDiskID, NKikimrBlobStorage::TEvVCompact::ASYNC));
928+
SendVCompact();
907929
}
908930
}
909931

910932
STRICT_STFUNC(StateSchedule,
911-
HFunc(TEvBlobStorage::TEvVCompactResult, Handle);
933+
hFunc(TEvBlobStorage::TEvVCompactResult, Handle);
934+
cFunc(TEvents::TEvUndelivered::EventType, SendVCompact)
912935
)
913936

914937
STRICT_STFUNC(StateWait,
915-
HFunc(TEvBlobStorage::TEvVStatusResult, Handle);
938+
hFunc(TEvBlobStorage::TEvVStatusResult, Handle);
939+
cFunc(TEvents::TEvUndelivered::EventType, SendVStatus);
916940
)
917941

918942
public:
@@ -981,9 +1005,14 @@ class TWaitForDefragOneDisk : public TActorBootstrapped<TWaitForDefragOneDisk> {
9811005

9821006
friend class TActorBootstrapped<TWaitForDefragOneDisk>;
9831007

984-
void Bootstrap(const TActorContext &ctx) {
1008+
void Bootstrap(const TActorContext&) {
9851009
Become(&TThis::StateFunc);
986-
ctx.Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVDefrag(VDiskInfo.VDiskID, Full));
1010+
SendRequest();
1011+
}
1012+
1013+
void SendRequest() {
1014+
Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVDefrag(VDiskInfo.VDiskID, Full),
1015+
IEventHandle::FlagTrackDelivery);
9871016
}
9881017

9891018
void Handle(TEvBlobStorage::TEvVDefragResult::TPtr &ev, const TActorContext &ctx) {
@@ -993,12 +1022,13 @@ class TWaitForDefragOneDisk : public TActorBootstrapped<TWaitForDefragOneDisk> {
9931022
ctx.Send(NotifyID, new TEvents::TEvCompleted());
9941023
Die(ctx);
9951024
} else {
996-
ctx.Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVDefrag(VDiskInfo.VDiskID, Full));
1025+
SendRequest();
9971026
}
9981027
}
9991028

10001029
STRICT_STFUNC(StateFunc,
10011030
HFunc(TEvBlobStorage::TEvVDefragResult, Handle);
1031+
cFunc(TEvents::TEvUndelivered::EventType, SendRequest);
10021032
)
10031033

10041034
public:
@@ -1198,23 +1228,32 @@ class TWaitForSync : public TActorBootstrapped<TWaitForSync> {
11981228
TActorId NotifyID;
11991229
ui32 Counter = 0;
12001230
TMutualSyncState MutualSyncState;
1231+
std::vector<std::pair<TActorId, TVDiskID>> VStatusSent;
12011232

12021233
friend class TActorBootstrapped<TWaitForSync>;
12031234

1204-
void Bootstrap(const TActorContext &ctx) {
1235+
void Bootstrap(const TActorContext&) {
12051236
Become(&TThis::StateFunc);
1206-
GetStatuses(ctx);
1237+
GetStatuses();
12071238
}
12081239

1209-
void GetStatuses(const TActorContext &ctx) {
1240+
void GetStatuses() {
12101241
// send TEvVStatus to all working VDisks
1211-
auto sendFunc = [&ctx] (const TInfo &info) {
1212-
ctx.Send(info.ActorId, new TEvBlobStorage::TEvVStatus(info.VDiskId));
1242+
VStatusSent.clear();
1243+
auto sendFunc = [this] (const TInfo &info) {
1244+
ui32 idx = VStatusSent.size();
1245+
VStatusSent.emplace_back(info.ActorId, info.VDiskId);
1246+
SendVStatus(idx);
12131247
};
12141248
MutualSyncState.ForEach(sendFunc);
12151249
Counter = MutualSyncState.InfoVecSize();
12161250
}
12171251

1252+
void SendVStatus(ui32 idx) {
1253+
const auto& [actorId, vdiskId] = VStatusSent[idx];
1254+
Send(actorId, new TEvBlobStorage::TEvVStatus(vdiskId), IEventHandle::FlagTrackDelivery, idx);
1255+
}
1256+
12181257
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev, const TActorContext &ctx) {
12191258
Y_ABORT_UNLESS(ev->Get()->Record.GetStatus() == NKikimrProto::OK);
12201259
const NKikimrBlobStorage::TEvVStatusResult &record = ev->Get()->Record;
@@ -1230,13 +1269,18 @@ class TWaitForSync : public TActorBootstrapped<TWaitForSync> {
12301269
} else {
12311270
// Retry
12321271
MutualSyncState.Clear();
1233-
GetStatuses(ctx);
1272+
GetStatuses();
12341273
}
12351274
}
12361275
}
12371276

1277+
void Handle(const TEvents::TEvUndelivered::TPtr& ev) {
1278+
SendVStatus(ev->Cookie);
1279+
}
1280+
12381281
STRICT_STFUNC(StateFunc,
12391282
HFunc(TEvBlobStorage::TEvVStatusResult, Handle);
1283+
hFunc(TEvents::TEvUndelivered, Handle);
12401284
)
12411285

12421286
public:
@@ -1262,10 +1306,9 @@ class TCheckDbEmptynessActor : public TActorBootstrapped<TCheckDbEmptynessActor>
12621306

12631307
friend class TActorBootstrapped<TCheckDbEmptynessActor>;
12641308

1265-
void Bootstrap(const TActorContext &ctx) {
1309+
void Bootstrap(const TActorContext&) {
12661310
Become(&TThis::StateFunc);
1267-
ctx.Send(VDiskInfo.ActorID,
1268-
new TEvBlobStorage::TEvVStatus(VDiskInfo.VDiskID));
1311+
SendVStatus();
12691312
}
12701313

12711314
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev, const TActorContext &ctx) {
@@ -1280,8 +1323,14 @@ class TCheckDbEmptynessActor : public TActorBootstrapped<TCheckDbEmptynessActor>
12801323
TThis::Die(ctx);
12811324
}
12821325

1326+
void SendVStatus() {
1327+
Send(VDiskInfo.ActorID, new TEvBlobStorage::TEvVStatus(VDiskInfo.VDiskID),
1328+
IEventHandle::FlagTrackDelivery);
1329+
}
1330+
12831331
STRICT_STFUNC(StateFunc,
12841332
HFunc(TEvBlobStorage::TEvVStatusResult, Handle);
1333+
cFunc(TEvents::TEvUndelivered::EventType, SendVStatus);
12851334
)
12861335

12871336
public:
@@ -1318,22 +1367,29 @@ class TPutGCToCorrespondingVDisksActor : public TActorBootstrapped<TPutGCToCorre
13181367

13191368
friend class TActorBootstrapped<TPutGCToCorrespondingVDisksActor>;
13201369

1321-
void Bootstrap(const TActorContext &ctx) {
1370+
void Bootstrap(const TActorContext&) {
13221371
TThis::Become(&TThis::StateFunc);
13231372

13241373
ui32 total = Conf->GroupInfo->GetTotalVDisksNum();
13251374
for (ui32 i = 0; i < total; i++) {
13261375
TAllVDisks::TVDiskInstance &instance = Conf->VDisks->Get(i);
13271376
if (instance.Initialized) {
1328-
ctx.Send(instance.ActorID,
1329-
new TEvBlobStorage::TEvVCollectGarbage(TabletID, RecGen, RecGenCounter, Channel, Collect,
1330-
CollectGen, CollectStep, false, Keep.Get(),
1331-
DoNotKeep.Get(), instance.VDiskID, TInstant::Max()));
1377+
SendRequest(i);
13321378
Counter++;
13331379
}
13341380
}
13351381
}
13361382

1383+
void SendRequest(ui32 idx) {
1384+
TAllVDisks::TVDiskInstance &instance = Conf->VDisks->Get(idx);
1385+
Send(instance.ActorID,
1386+
new TEvBlobStorage::TEvVCollectGarbage(TabletID, RecGen, RecGenCounter, Channel, Collect,
1387+
CollectGen, CollectStep, false, Keep.Get(),
1388+
DoNotKeep.Get(), instance.VDiskID, TInstant::Max()),
1389+
IEventHandle::FlagTrackDelivery,
1390+
/*cookie=*/idx);
1391+
}
1392+
13371393
void Handle(TEvBlobStorage::TEvVCollectGarbageResult::TPtr &ev, const TActorContext &ctx) {
13381394
Y_ABORT_UNLESS(ev->Get()->Record.GetStatus() == NKikimrProto::OK, "Status=%d", ev->Get()->Record.GetStatus());
13391395
LOG_NOTICE(ctx, NActorsServices::TEST, " TEvVCollectGarbageResult succeded");
@@ -1346,9 +1402,14 @@ class TPutGCToCorrespondingVDisksActor : public TActorBootstrapped<TPutGCToCorre
13461402
}
13471403
}
13481404

1405+
void Handle(const TEvents::TEvUndelivered::TPtr& ev) {
1406+
SendRequest(ev->Cookie);
1407+
}
1408+
13491409
STRICT_STFUNC(StateFunc,
13501410
HFunc(TEvBlobStorage::TEvVCollectGarbageResult, Handle);
13511411
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
1412+
hFunc(TEvents::TEvUndelivered, Handle);
13521413
)
13531414

13541415
public:
@@ -1605,16 +1666,20 @@ class TCheckDataSnapshotActor : public TActorBootstrapped<TCheckDataSnapshotActo
16051666
TThis::Die(ctx);
16061667
} else {
16071668
// send read
1608-
auto req = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(Cur->VDiskID,
1609-
TInstant::Max(),
1610-
NKikimrBlobStorage::EGetHandleClass::AsyncRead,
1611-
TEvBlobStorage::TEvVGet::EFlags::ShowInternals,
1612-
{},
1613-
{TLogoBlobID(Cur->Id, 0)});
1614-
ctx.Send(Cur->ServiceID, req.release());
1669+
SendRequest();
16151670
}
16161671
}
16171672

1673+
void SendRequest() {
1674+
auto req = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(Cur->VDiskID,
1675+
TInstant::Max(),
1676+
NKikimrBlobStorage::EGetHandleClass::AsyncRead,
1677+
TEvBlobStorage::TEvVGet::EFlags::ShowInternals,
1678+
{},
1679+
{TLogoBlobID(Cur->Id, 0)});
1680+
Send(Cur->ServiceID, req.release(), IEventHandle::FlagTrackDelivery);
1681+
}
1682+
16181683
void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev, const TActorContext &ctx) {
16191684
if ((*PDiskGetStatusHandler)(ctx, NotifyID, ev)) {
16201685
TThis::Die(ctx);
@@ -1665,6 +1730,7 @@ class TCheckDataSnapshotActor : public TActorBootstrapped<TCheckDataSnapshotActo
16651730
STRICT_STFUNC(StateFunc,
16661731
HFunc(TEvBlobStorage::TEvVGetResult, Handle);
16671732
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
1733+
cFunc(TEvents::TEvUndelivered::EventType, SendRequest);
16681734
)
16691735

16701736
public:
@@ -1925,12 +1991,16 @@ class TManyPutsToOneVDiskActor : public TActorBootstrapped<TManyPutsToOneVDiskAc
19251991

19261992
void SendRequest(const TActorContext &ctx) {
19271993
Y_ASSERT(Counter == 0);
1928-
PutLogoBlobToVDisk(ctx, VDiskInfo.ActorID, VDiskInfo.VDiskID, TLogoBlobID(Cur->Get()->Id, PartId),
1929-
Cur->Get()->Data, HandleClass);
1994+
SendPut(ctx);
19301995
Counter = 1;
19311996
Cur->Next();
19321997
}
19331998

1999+
void SendPut(const TActorContext& ctx) {
2000+
PutLogoBlobToVDisk(ctx, VDiskInfo.ActorID, VDiskInfo.VDiskID, TLogoBlobID(Cur->Get()->Id, PartId),
2001+
Cur->Get()->Data, HandleClass);
2002+
}
2003+
19342004
void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev, const TActorContext &ctx) {
19352005
Y_ABORT_UNLESS(ev->Get()->Record.GetStatus() == NKikimrProto::OK, "Status=%d", ev->Get()->Record.GetStatus());
19362006
LOG_DEBUG(ctx, NActorsServices::TEST, " TEvVPutResult succeded");
@@ -1950,6 +2020,7 @@ class TManyPutsToOneVDiskActor : public TActorBootstrapped<TManyPutsToOneVDiskAc
19502020
STRICT_STFUNC(StateFunc,
19512021
HFunc(TEvBlobStorage::TEvVPutResult, Handle);
19522022
IgnoreFunc(TEvBlobStorage::TEvVWindowChange);
2023+
CFunc(TEvents::TEvUndelivered::EventType, SendPut);
19532024
)
19542025

19552026
public:

0 commit comments

Comments
 (0)