Skip to content

Commit a05120e

Browse files
xyliganSerejaMatveev Sergei
andauthored
remove race #26224 (#26562)
Co-authored-by: Matveev Sergei <[email protected]>
1 parent 68ade36 commit a05120e

File tree

6 files changed

+125
-14
lines changed

6 files changed

+125
-14
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#include "thread_safe_optional.h"
2+
3+
namespace NKikimr::NOlap {
4+
5+
}
6+
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <optional>
5+
#include <utility>
6+
7+
namespace NKikimr::NOlap {
8+
9+
template <class T>
10+
class TThreadSafeOptional {
11+
private:
12+
alignas(T) unsigned char Storage[sizeof(T)];
13+
std::atomic<bool> Defined{ false };
14+
15+
T *Ptr() {
16+
return reinterpret_cast<T *>(&Storage[0]);
17+
}
18+
19+
const T *Ptr() const {
20+
return reinterpret_cast<const T *>(&Storage[0]);
21+
}
22+
23+
public:
24+
TThreadSafeOptional() = default;
25+
26+
~TThreadSafeOptional() {
27+
if (Has()) {
28+
Ptr()->~T();
29+
}
30+
}
31+
32+
TThreadSafeOptional(const TThreadSafeOptional& other) {
33+
const bool has = other.Defined.load(std::memory_order_acquire);
34+
if (has) {
35+
::new (Ptr()) T(*other.Ptr());
36+
Defined.store(true, std::memory_order_release);
37+
}
38+
}
39+
40+
TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete;
41+
42+
TThreadSafeOptional(TThreadSafeOptional&& other) noexcept {
43+
const bool has = other.Defined.load(std::memory_order_acquire);
44+
if (has) {
45+
::new (Ptr()) T(std::move(*other.Ptr()));
46+
Defined.store(true, std::memory_order_release);
47+
}
48+
}
49+
50+
TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept {
51+
if (this == &other) {
52+
return *this;
53+
}
54+
55+
AFL_VERIFY(!Has());
56+
const bool has = other.Defined.load(std::memory_order_acquire);
57+
if (has) {
58+
::new (Ptr()) T(std::move(*other.Ptr()));
59+
Defined.store(true, std::memory_order_release);
60+
}
61+
62+
return *this;
63+
}
64+
65+
void Reset() = delete;
66+
67+
void Set(const T& value) {
68+
AFL_VERIFY(!Has());
69+
::new (Ptr()) T(value);
70+
Defined.store(true, std::memory_order_release);
71+
}
72+
73+
void Set(T&& value) {
74+
AFL_VERIFY(!Has());
75+
::new (Ptr()) T(std::move(value));
76+
Defined.store(true, std::memory_order_release);
77+
}
78+
79+
bool Has() const {
80+
return Defined.load(std::memory_order_acquire);
81+
}
82+
83+
const T& Get() const {
84+
return *Ptr();
85+
}
86+
87+
std::optional<T> GetOptional() const {
88+
if (Has()) {
89+
return *Ptr();
90+
}
91+
92+
return {};
93+
}
94+
};
95+
96+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SRCS(
77
snapshot.cpp
88
portion.cpp
99
tablet_id.cpp
10+
thread_safe_optional.cpp
1011
blob.cpp
1112
volume.cpp
1213
path_id.cpp

ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {
2929

3030
if (RemoveSnapshot) {
3131
AFL_VERIFY(RemoveSnapshot->Valid());
32-
result->RemoveSnapshot = *RemoveSnapshot;
32+
result->SetRemoveSnapshot(*RemoveSnapshot);
3333
}
34+
3435
AFL_VERIFY(SchemaVersion && *SchemaVersion);
3536
result->SchemaVersion = *SchemaVersion;
3637
result->ShardingVersion = ShardingVersion;
3738
}
39+
3840
static TAtomicCounter countValues = 0;
3941
static TAtomicCounter sumValues = 0;
4042
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())(

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
3838
sb << "column_size:" << GetColumnBlobBytes() << ";"
3939
<< "index_size:" << GetIndexBlobBytes() << ";"
4040
<< "meta:(" << Meta.DebugString() << ");";
41-
if (RemoveSnapshot.Valid()) {
42-
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
41+
if (HasRemoveSnapshot()) {
42+
sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");";
4343
}
4444
return sb << ")";
4545
}
@@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector<TUnifiedBlobId>& blobIds,
5656
PathId.ToProto(proto);
5757
proto.SetPortionId(PortionId);
5858
proto.SetSchemaVersion(GetSchemaVersionVerified());
59-
if (!RemoveSnapshot.IsZero()) {
60-
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
59+
if (HasRemoveSnapshot()) {
60+
*proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto();
6161
}
6262

6363
*proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced());
@@ -71,11 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
7171
return TConclusionStatus::Fail("portion's schema version cannot been equals to zero");
7272
}
7373
if (proto.HasRemoveSnapshot()) {
74-
auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot());
74+
TSnapshot tmp = TSnapshot::Zero();
75+
auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot());
7576
if (!parse) {
7677
return parse;
7778
}
79+
80+
RemoveSnapshot.Set(std::move(tmp));
7881
}
82+
7983
return TConclusionStatus::Success();
8084
}
8185

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
88
#include <ydb/core/tx/columnshard/common/blob.h>
99
#include <ydb/core/tx/columnshard/common/path_id.h>
10+
#include <ydb/core/tx/columnshard/common/thread_safe_optional.h>
1011
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
1112

1213
#include <ydb/library/accessor/accessor.h>
1314
#include <ydb/library/formats/arrow/replace_key.h>
1415

1516
#include <util/generic/hash_set.h>
1617

18+
1719
namespace NKikimrColumnShardDataSharingProto {
1820
class TPortionInfo;
1921
}
@@ -89,7 +91,7 @@ class TPortionInfo {
8991

9092
TInternalPathId PathId;
9193
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
92-
TSnapshot RemoveSnapshot = TSnapshot::Zero();
94+
TThreadSafeOptional<TSnapshot> RemoveSnapshot;
9395
ui64 SchemaVersion = 0;
9496
std::optional<ui64> ShardingVersion;
9597

@@ -213,8 +215,8 @@ class TPortionInfo {
213215
}
214216

215217
void SetRemoveSnapshot(const TSnapshot& snap) {
216-
AFL_VERIFY(!RemoveSnapshot.Valid());
217-
RemoveSnapshot = snap;
218+
AFL_VERIFY(!HasRemoveSnapshot());
219+
RemoveSnapshot.Set(snap);
218220
}
219221

220222
void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
@@ -339,7 +341,7 @@ class TPortionInfo {
339341
TString DebugString(const bool withDetails = false) const;
340342

341343
bool HasRemoveSnapshot() const {
342-
return RemoveSnapshot.Valid();
344+
return RemoveSnapshot.Has();
343345
}
344346

345347
bool IsRemovedFor(const TSnapshot& snapshot) const {
@@ -376,12 +378,12 @@ class TPortionInfo {
376378

377379
const TSnapshot& GetRemoveSnapshotVerified() const {
378380
AFL_VERIFY(HasRemoveSnapshot());
379-
return RemoveSnapshot;
381+
return RemoveSnapshot.Get();
380382
}
381383

382384
std::optional<TSnapshot> GetRemoveSnapshotOptional() const {
383-
if (RemoveSnapshot.Valid()) {
384-
return RemoveSnapshot;
385+
if (HasRemoveSnapshot()) {
386+
return RemoveSnapshot.Get();
385387
} else {
386388
return {};
387389
}
@@ -393,7 +395,7 @@ class TPortionInfo {
393395
}
394396

395397
bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const {
396-
const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot);
398+
const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot);
397399

398400
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)(
399401
"snapshot", snapshot.DebugString());

0 commit comments

Comments
 (0)