Skip to content

Commit 7ffa264

Browse files
committed
Fix data race on Checksumming field in TGroupSessions (#26096)
1 parent 42bebf4 commit 7ffa264

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

ydb/core/blobstorage/dsproxy/group_sessions.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
8585
auto& q = stateVDisk.Queues.GetQueue(queueId);
8686
q.ActorId = queue;
8787
q.FlowRecord = std::move(flowRecord);
88-
q.ExtraBlockChecksSupport.reset();
88+
q.ExtraBlockChecksSupport.store(false);
8989
}
9090
}
9191
}
@@ -128,17 +128,17 @@ void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EV
128128

129129
if (connected) {
130130
ConnectedQueuesMask[orderNumber] |= 1 << queueId;
131-
q.ExtraBlockChecksSupport = extraGroupChecksSupport;
132-
q.Checksumming = checksumming;
131+
q.ExtraBlockChecksSupport.store(extraGroupChecksSupport);
132+
q.Checksumming.store(checksumming);
133133
Y_ABORT_UNLESS(costModel);
134134
if (!q.CostModel || *q.CostModel != *costModel) {
135135
updated = true;
136136
q.CostModel = costModel;
137137
}
138138
} else {
139139
ConnectedQueuesMask[orderNumber] &= ~(1 << queueId);
140-
q.ExtraBlockChecksSupport.reset();
141-
q.Checksumming.reset();
140+
q.ExtraBlockChecksSupport.store(false);
141+
q.Checksumming.store(false);
142142
if (q.CostModel) {
143143
updated = true;
144144
q.CostModel = nullptr;

ydb/core/blobstorage/dsproxy/group_sessions.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,20 @@ namespace NKikimr {
2121
struct TQueue {
2222
TActorId ActorId;
2323
TIntrusivePtr<NBackpressure::TFlowRecord> FlowRecord;
24-
std::optional<bool> ExtraBlockChecksSupport;
25-
std::optional<bool> Checksumming;
24+
struct AtomicParameter : public std::atomic<bool> {
25+
AtomicParameter& operator=(const AtomicParameter& other) {
26+
store(other.load());
27+
return *this;
28+
}
29+
30+
AtomicParameter(const AtomicParameter& other) {
31+
store(other.load());
32+
}
33+
34+
AtomicParameter() {
35+
store(false);
36+
}
37+
} ExtraBlockChecksSupport, Checksumming;
2638
std::shared_ptr<const TCostModel> CostModel = nullptr;
2739
volatile bool IsConnected = false;
2840
};
@@ -190,7 +202,7 @@ namespace NKikimr {
190202
.Queues
191203
.GetQueue(queueId)
192204
.Checksumming
193-
.value_or(false);
205+
.load();
194206
}
195207

196208
ui64 GetPredictedDelayNsByOrderNumber(ui32 orderNumber, NKikimrBlobStorage::EVDiskQueueId queueId) {

0 commit comments

Comments
 (0)