Skip to content

Commit 01ae8f7

Browse files
author
Matveev Sergei
committed
added thread safe class
1 parent 8e785a7 commit 01ae8f7

File tree

5 files changed

+159
-69
lines changed

5 files changed

+159
-69
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#include "thread_safe_optional.h"
2+
3+
namespace NKikimr::NOlap {
4+
5+
}
6+
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <optional>
5+
#include <utility>
6+
7+
namespace NKikimr::NOlap {
8+
9+
template <class T>
10+
class TThreadSafeOptional {
11+
private:
12+
alignas(T) unsigned char Storage[sizeof(T)];
13+
std::atomic<bool> Defined{ false };
14+
15+
T *Ptr() {
16+
return reinterpret_cast<T *>(&Storage[0]);
17+
}
18+
19+
const T *Ptr() const {
20+
return reinterpret_cast<const T *>(&Storage[0]);
21+
}
22+
23+
public:
24+
TThreadSafeOptional() = default;
25+
26+
~TThreadSafeOptional() {
27+
if (Has()) {
28+
Ptr()->~T();
29+
}
30+
}
31+
32+
TThreadSafeOptional(const TThreadSafeOptional& other) {
33+
const bool has = other.Defined.load(std::memory_order_acquire);
34+
if (has) {
35+
::new (Ptr()) T(*other.Ptr());
36+
Defined.store(true, std::memory_order_release);
37+
}
38+
}
39+
40+
TThreadSafeOptional& operator=(const TThreadSafeOptional& other) {
41+
if (this == &other) {
42+
return *this;
43+
}
44+
45+
const bool has = other.Defined.load(std::memory_order_acquire);
46+
if (Has()) {
47+
Ptr()->~T();
48+
Defined.store(false, std::memory_order_release);
49+
}
50+
51+
if (has) {
52+
::new (Ptr()) T(*other.Ptr());
53+
Defined.store(true, std::memory_order_release);
54+
}
55+
56+
return *this;
57+
}
58+
59+
TThreadSafeOptional(TThreadSafeOptional&& other) noexcept {
60+
const bool has = other.Defined.load(std::memory_order_acquire);
61+
if (has) {
62+
::new (Ptr()) T(std::move(*other.Ptr()));
63+
Defined.store(true, std::memory_order_release);
64+
}
65+
}
66+
67+
TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept {
68+
if (this == &other) {
69+
return *this;
70+
}
71+
72+
const bool has = other.Defined.load(std::memory_order_acquire);
73+
if (Has()) {
74+
Ptr()->~T();
75+
Defined.store(false, std::memory_order_release);
76+
}
77+
78+
if (has) {
79+
::new (Ptr()) T(std::move(*other.Ptr()));
80+
Defined.store(true, std::memory_order_release);
81+
}
82+
83+
return *this;
84+
}
85+
86+
void Reset() {
87+
if (Has()) {
88+
Ptr()->~T();
89+
Defined.store(false, std::memory_order_release);
90+
}
91+
}
92+
93+
void Set(const T& value) {
94+
if (Has()) {
95+
Ptr()->~T();
96+
Defined.store(false, std::memory_order_release);
97+
}
98+
99+
::new (Ptr()) T(value);
100+
Defined.store(true, std::memory_order_release);
101+
}
102+
103+
void Set(T&& value) {
104+
if (Has()) {
105+
Ptr()->~T();
106+
Defined.store(false, std::memory_order_release);
107+
}
108+
109+
::new (Ptr()) T(std::move(value));
110+
Defined.store(true, std::memory_order_release);
111+
}
112+
113+
bool Has() const {
114+
return Defined.load(std::memory_order_acquire);
115+
}
116+
117+
const T& Get() const {
118+
return *Ptr();
119+
}
120+
121+
std::optional<T> GetOptional() const {
122+
if (Has()) {
123+
return *Ptr();
124+
}
125+
126+
return {};
127+
}
128+
};
129+
130+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SRCS(
77
snapshot.cpp
88
portion.cpp
99
tablet_id.cpp
10+
thread_safe_optional.cpp
1011
blob.cpp
1112
volume.cpp
1213
path_id.cpp

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
3939
<< "index_size:" << GetIndexBlobBytes() << ";"
4040
<< "meta:(" << Meta.DebugString() << ");";
4141
if (HasRemoveSnapshot()) {
42-
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
42+
sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");";
4343
}
4444
return sb << ")";
4545
}
@@ -57,7 +57,7 @@ void TPortionInfo::SerializeToProto(const std::vector<TUnifiedBlobId>& blobIds,
5757
proto.SetPortionId(PortionId);
5858
proto.SetSchemaVersion(GetSchemaVersionVerified());
5959
if (HasRemoveSnapshot()) {
60-
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
60+
*proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto();
6161
}
6262

6363
*proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced());
@@ -71,12 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
7171
return TConclusionStatus::Fail("portion's schema version cannot been equals to zero");
7272
}
7373
if (proto.HasRemoveSnapshot()) {
74-
auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot());
74+
TSnapshot tmp = TSnapshot::Zero();
75+
auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot());
7576
if (!parse) {
7677
return parse;
7778
}
78-
RemoveSnapshotDefined.store(true, std::memory_order_release);
79+
80+
RemoveSnapshot.Set(std::move(tmp));
7981
}
82+
8083
return TConclusionStatus::Success();
8184
}
8285

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 15 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
88
#include <ydb/core/tx/columnshard/common/blob.h>
99
#include <ydb/core/tx/columnshard/common/path_id.h>
10+
#include <ydb/core/tx/columnshard/common/thread_safe_optional.h>
1011
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
1112

1213
#include <ydb/library/accessor/accessor.h>
1314
#include <ydb/library/formats/arrow/replace_key.h>
1415

1516
#include <util/generic/hash_set.h>
16-
#include <atomic>
17+
1718

1819
namespace NKikimrColumnShardDataSharingProto {
1920
class TPortionInfo;
@@ -85,37 +86,12 @@ class TPortionInfo {
8586
friend class TCompactedPortionInfo;
8687
friend class TWrittenPortionInfo;
8788

88-
TPortionInfo(const TPortionInfo& other)
89-
: PathId(other.PathId)
90-
, PortionId(other.PortionId)
91-
, RemoveSnapshot(other.RemoveSnapshot)
92-
, RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire))
93-
, SchemaVersion(other.SchemaVersion)
94-
, ShardingVersion(other.ShardingVersion)
95-
, Meta(other.Meta)
96-
, RuntimeFeatures(other.RuntimeFeatures) {
97-
}
98-
99-
TPortionInfo& operator=(const TPortionInfo& other) {
100-
if (this == &other) {
101-
return *this;
102-
}
103-
104-
PathId = other.PathId;
105-
PortionId = other.PortionId;
106-
RemoveSnapshot = other.RemoveSnapshot;
107-
RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release);
108-
SchemaVersion = other.SchemaVersion;
109-
ShardingVersion = other.ShardingVersion;
110-
Meta = other.Meta;
111-
RuntimeFeatures = other.RuntimeFeatures;
112-
return *this;
113-
}
89+
TPortionInfo(const TPortionInfo&) = default;
90+
TPortionInfo& operator=(const TPortionInfo&) = default;
11491

11592
TInternalPathId PathId;
11693
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
117-
TSnapshot RemoveSnapshot = TSnapshot::Zero();
118-
std::atomic<bool> RemoveSnapshotDefined = false;
94+
TThreadSafeOptional<TSnapshot> RemoveSnapshot;
11995
ui64 SchemaVersion = 0;
12096
std::optional<ui64> ShardingVersion;
12197

@@ -193,33 +169,8 @@ class TPortionInfo {
193169
TPortionInfo(TPortionMeta&& meta)
194170
: Meta(std::move(meta)) {
195171
}
196-
197-
TPortionInfo(TPortionInfo&& other) noexcept
198-
: PathId(std::move(other.PathId))
199-
, PortionId(other.PortionId)
200-
, RemoveSnapshot(std::move(other.RemoveSnapshot))
201-
, RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire))
202-
, SchemaVersion(other.SchemaVersion)
203-
, ShardingVersion(std::move(other.ShardingVersion))
204-
, Meta(std::move(other.Meta))
205-
, RuntimeFeatures(other.RuntimeFeatures) {
206-
}
207-
208-
TPortionInfo& operator=(TPortionInfo&& other) noexcept {
209-
if (this == &other) {
210-
return *this;
211-
}
212-
213-
PathId = std::move(other.PathId);
214-
PortionId = other.PortionId;
215-
RemoveSnapshot = std::move(other.RemoveSnapshot);
216-
RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release);
217-
SchemaVersion = other.SchemaVersion;
218-
ShardingVersion = std::move(other.ShardingVersion);
219-
Meta = std::move(other.Meta);
220-
RuntimeFeatures = other.RuntimeFeatures;
221-
return *this;
222-
}
172+
TPortionInfo(TPortionInfo&&) = default;
173+
TPortionInfo& operator=(TPortionInfo&&) = default;
223174

224175
virtual void FillDefaultColumn(NAssembling::TColumnAssemblingInfo& column, const std::optional<TSnapshot>& defaultSnapshot) const = 0;
225176

@@ -264,9 +215,8 @@ class TPortionInfo {
264215
}
265216

266217
void SetRemoveSnapshot(const TSnapshot& snap) {
267-
AFL_VERIFY(!RemoveSnapshotDefined.load());
268-
RemoveSnapshot = snap;
269-
RemoveSnapshotDefined.store(true, std::memory_order_release);
218+
AFL_VERIFY(!HasRemoveSnapshot());
219+
RemoveSnapshot.Set(snap);
270220
}
271221

272222
void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
@@ -391,7 +341,7 @@ class TPortionInfo {
391341
TString DebugString(const bool withDetails = false) const;
392342

393343
bool HasRemoveSnapshot() const {
394-
return RemoveSnapshotDefined.load(std::memory_order_acquire);
344+
return RemoveSnapshot.Has();
395345
}
396346

397347
bool IsRemovedFor(const TSnapshot& snapshot) const {
@@ -427,13 +377,13 @@ class TPortionInfo {
427377
}
428378

429379
const TSnapshot& GetRemoveSnapshotVerified() const {
430-
AFL_VERIFY(RemoveSnapshotDefined.load(std::memory_order_acquire));
431-
return RemoveSnapshot;
380+
AFL_VERIFY(HasRemoveSnapshot());
381+
return RemoveSnapshot.Get();
432382
}
433383

434384
std::optional<TSnapshot> GetRemoveSnapshotOptional() const {
435-
if (RemoveSnapshotDefined.load(std::memory_order_acquire)) {
436-
return RemoveSnapshot;
385+
if (HasRemoveSnapshot()) {
386+
return RemoveSnapshot.Get();
437387
} else {
438388
return {};
439389
}
@@ -445,7 +395,7 @@ class TPortionInfo {
445395
}
446396

447397
bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const {
448-
const bool visible = (!RemoveSnapshotDefined.load(std::memory_order_acquire) || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot);
398+
const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot);
449399

450400
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)(
451401
"snapshot", snapshot.DebugString());

0 commit comments

Comments
 (0)