Skip to content

Commit 504ce4a

Browse files
serbel324ydbot
authored andcommitted
Add ExplicitMultiPut event to avoid data races in DSProxy UT (#25761)
1 parent b91c2e2 commit 504ce4a

File tree

6 files changed

+58
-25
lines changed

6 files changed

+58
-25
lines changed

ydb/core/base/blobstorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,8 @@ struct TEvBlobStorage {
496496
EvGetBlock,
497497
EvCheckIntegrity,
498498

499+
EvExplicitMultiPut, // for debugging purposes
500+
499501
//
500502
EvPutResult = EvPut + 512, /// 268 632 576
501503
EvGetResult,

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
195195
TMaybe<TGroupStat::EKind> LatencyQueueKind = {};
196196

197197
std::optional<ui32> ForceGroupGeneration; // work only with this specific group generation and nothing else
198+
bool DoSendDeathNote = true; // unschedules DSProxy timeout on termination, be careful with disabling
198199
};
199200

200201
struct TTypeSpecificParameters {
@@ -222,6 +223,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
222223
, RacingDomains(&Info->GetTopology())
223224
, ExecutionRelay(std::move(params.Common.ExecutionRelay))
224225
, ForceGroupGeneration(params.Common.ForceGroupGeneration)
226+
, DoSendDeathNote(params.Common.DoSendDeathNote)
225227
{
226228
if (ParentSpan) {
227229
const NWilson::TTraceId& parentTraceId = ParentSpan.GetTraceId();
@@ -327,6 +329,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
327329
bool FirstResponse = true;
328330
std::optional<ui32> ForceGroupGeneration;
329331
ui32 RacingGeneration = 0;
332+
bool DoSendDeathNote;
330333
};
331334

332335
void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,

ydb/core/blobstorage/dsproxy/dsproxy_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44
#include "dsproxy.h"
5+
#include "dsproxy_test_helpers.h"
56

67
#include <ydb/core/blobstorage/base/utility.h>
78

@@ -294,6 +295,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
294295
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
295296
void Handle(TEvDeathNote::TPtr ev);
296297
void Handle(TEvGetQueuesInfo::TPtr ev);
298+
void Handle(TEvExplicitMultiPut::TPtr ev);
297299

298300
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
299301
// Error state
@@ -388,6 +390,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
388390
fFunc(Ev5min, Handle5min);
389391
cFunc(EvCheckDeadlines, CheckDeadlines);
390392
hFunc(TEvGetQueuesInfo, Handle);
393+
hFunc(TEvExplicitMultiPut, Handle);
391394
)
392395

393396
#define HANDLE_EVENTS(HANDLER) \

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,9 @@ namespace NKikimr {
918918
// ensure that we are dying for the first time
919919
Y_ABORT_UNLESS(!std::exchange(Dead, true));
920920
GetActiveCounter()->Dec();
921-
SendToProxy(std::make_unique<TEvDeathNote>(Responsiveness));
921+
if (DoSendDeathNote) {
922+
SendToProxy(std::make_unique<TEvDeathNote>(Responsiveness));
923+
}
922924
TActor::PassAway();
923925
}
924926

@@ -1137,4 +1139,9 @@ namespace NKikimr {
11371139
TActivationContext::Send(ev->Sender, std::move(res));
11381140
}
11391141

1142+
void TBlobStorageGroupProxy::Handle(TEvExplicitMultiPut::TPtr ev) {
1143+
IActor *reqActor = CreateBlobStorageGroupPutRequest(std::move(ev->Get()->Parameters));
1144+
TActivationContext::Register(reqActor);
1145+
}
1146+
11401147
} // NKikimr
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include "defs.h"
4+
#include "dsproxy.h"
5+
6+
namespace NKikimr {
7+
8+
struct TEvExplicitMultiPut : public TEventLocal<TEvExplicitMultiPut, TEvBlobStorage::EvExplicitMultiPut> {
9+
public:
10+
TEvExplicitMultiPut(TBlobStorageGroupMultiPutParameters params)
11+
: Parameters(params)
12+
{}
13+
14+
public:
15+
TBlobStorageGroupMultiPutParameters Parameters;
16+
};
17+
18+
} // namespace NKikimr

ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/blobstorage/base/blobstorage_events.h>
44
#include <ydb/core/blobstorage/dsproxy/dsproxy.h>
55
#include <ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h>
6+
#include <ydb/core/blobstorage/dsproxy/dsproxy_test_helpers.h>
67
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
78
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
89
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
@@ -3433,32 +3434,29 @@ class TTestBlobStorageProxyBatchedPutRequestDoesNotContainAHugeBlob : public TTe
34333434

34343435
switch (TestStep) {
34353436
case 0: {
3436-
TBatchedVec<TEvBlobStorage::TEvPut::TPtr> batched(2);
3437-
batched[0] = GetPut(blobIds[0], Data1);
3438-
batched[1] = GetPut(blobIds[1], Data2);
3437+
Batched[0] = GetPut(blobIds[0], Data1);
3438+
Batched[1] = GetPut(blobIds[1], Data2);
34393439

34403440
TMaybe<TGroupStat::EKind> kind = PutHandleClassToGroupStatKind(HandleClass);
3441-
IActor *reqActor = CreateBlobStorageGroupPutRequest(
3442-
TBlobStorageGroupMultiPutParameters{
3443-
.Common = {
3444-
.GroupInfo = BsInfo,
3445-
.GroupQueues = GroupQueues,
3446-
.Mon = Mon,
3447-
.Now = TMonotonic::Now(),
3448-
.StoragePoolCounters = StoragePoolCounters,
3449-
.RestartCounter = TBlobStorageGroupMultiPutParameters::CalculateRestartCounter(batched),
3450-
.LatencyQueueKind = kind,
3451-
},
3452-
.Events = batched,
3453-
.TimeStatsEnabled = false,
3454-
.Stats = PerDiskStatsPtr,
3455-
.HandleClass = HandleClass,
3456-
.Tactic = Tactic,
3457-
.EnableRequestMod3x3ForMinLatency = false,
3458-
.AccelerationParams = TAccelerationParams{},
3459-
});
3460-
3461-
ctx.Register(reqActor);
3441+
ctx.Send(Proxy, new TEvExplicitMultiPut(TBlobStorageGroupMultiPutParameters{
3442+
.Common = {
3443+
.GroupInfo = BsInfo,
3444+
.GroupQueues = GroupQueues,
3445+
.Mon = Mon,
3446+
.Now = TMonotonic::Now(),
3447+
.StoragePoolCounters = StoragePoolCounters,
3448+
.RestartCounter = TBlobStorageGroupMultiPutParameters::CalculateRestartCounter(Batched),
3449+
.LatencyQueueKind = kind,
3450+
.DoSendDeathNote = false,
3451+
},
3452+
.Events = Batched,
3453+
.TimeStatsEnabled = false,
3454+
.Stats = PerDiskStatsPtr,
3455+
.HandleClass = HandleClass,
3456+
.Tactic = Tactic,
3457+
.EnableRequestMod3x3ForMinLatency = false,
3458+
.AccelerationParams = TAccelerationParams{},
3459+
}));
34623460
break;
34633461
}
34643462
case 10:
@@ -3486,10 +3484,12 @@ class TTestBlobStorageProxyBatchedPutRequestDoesNotContainAHugeBlob : public TTe
34863484
NKikimrBlobStorage::EPutHandleClass HandleClass = NKikimrBlobStorage::TabletLog;
34873485
TString Data1;
34883486
TString Data2;
3487+
TBatchedVec<TEvBlobStorage::TEvPut::TPtr> Batched;
34893488
public:
34903489
TTestBlobStorageProxyBatchedPutRequestDoesNotContainAHugeBlob(const TActorId &proxy, const TIntrusivePtr<TBlobStorageGroupInfo> &bsInfo,
34913490
const TIntrusivePtr<TTestEnvironment> &env, const TIntrusivePtr<ITestParametrs> &parametrs)
34923491
: TTestBlobStorageProxyForRequest(proxy, bsInfo, env, parametrs)
3492+
, Batched(2)
34933493
{
34943494
Data1.resize(MaxBatchedPutSize - 1, 'a');
34953495
Data2.resize(1, 'a');

0 commit comments

Comments
 (0)