diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.cpp b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp new file mode 100644 index 000000000000..15e225e72083 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp @@ -0,0 +1,6 @@ +#include "thread_safe_optional.h" + +namespace NKikimr::NOlap { + +} + diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.h b/ydb/core/tx/columnshard/common/thread_safe_optional.h new file mode 100644 index 000000000000..8625132fa127 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NOlap { + +template +class TThreadSafeOptional { +private: + alignas(T) unsigned char Storage[sizeof(T)]; + std::atomic Defined{ false }; + + T *Ptr() { + return reinterpret_cast(&Storage[0]); + } + + const T *Ptr() const { + return reinterpret_cast(&Storage[0]); + } + +public: + TThreadSafeOptional() = default; + + ~TThreadSafeOptional() { + if (Has()) { + Ptr()->~T(); + } + } + + TThreadSafeOptional(const TThreadSafeOptional& other) { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(*other.Ptr()); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete; + + TThreadSafeOptional(TThreadSafeOptional&& other) noexcept { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept { + if (this == &other) { + return *this; + } + + AFL_VERIFY(!Has()); + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + + return *this; + } + + void Reset() = delete; + + void Set(const T& value) { + AFL_VERIFY(!Has()); + ::new (Ptr()) T(value); + Defined.store(true, std::memory_order_release); + } + + void Set(T&& value) { + AFL_VERIFY(!Has()); + ::new (Ptr()) T(std::move(value)); + Defined.store(true, std::memory_order_release); + } + + bool Has() const { + return Defined.load(std::memory_order_acquire); + } + + const T& Get() const { + return *Ptr(); + } + + std::optional GetOptional() const { + if (Has()) { + return *Ptr(); + } + + return {}; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 86c71afb41ba..8b3f40838e3b 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -7,6 +7,7 @@ SRCS( snapshot.cpp portion.cpp tablet_id.cpp + thread_safe_optional.cpp blob.cpp volume.cpp path_id.cpp diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp index 38c4c1372939..cb9e61cd15fb 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp @@ -29,12 +29,14 @@ std::shared_ptr TPortionInfoConstructor::Build() { if (RemoveSnapshot) { AFL_VERIFY(RemoveSnapshot->Valid()); - result->RemoveSnapshot = *RemoveSnapshot; + result->SetRemoveSnapshot(*RemoveSnapshot); } + AFL_VERIFY(SchemaVersion && *SchemaVersion); result->SchemaVersion = *SchemaVersion; result->ShardingVersion = ShardingVersion; } + static TAtomicCounter countValues = 0; static TAtomicCounter sumValues = 0; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())( diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index b827d6e19472..14f91a801893 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const { sb << "column_size:" << GetColumnBlobBytes() << ";" << "index_size:" << GetIndexBlobBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; - if (RemoveSnapshot.Valid()) { - sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; + if (HasRemoveSnapshot()) { + sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");"; } return sb << ")"; } @@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector& blobIds, PathId.ToProto(proto); proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); - if (!RemoveSnapshot.IsZero()) { - *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); + if (HasRemoveSnapshot()) { + *proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto(); } *proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced()); @@ -71,11 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat return TConclusionStatus::Fail("portion's schema version cannot been equals to zero"); } if (proto.HasRemoveSnapshot()) { - auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot()); + TSnapshot tmp = TSnapshot::Zero(); + auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot()); if (!parse) { return parse; } + + RemoveSnapshot.Set(std::move(tmp)); } + return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index c3482196042d..b4ce802d4d08 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -14,6 +15,7 @@ #include + namespace NKikimrColumnShardDataSharingProto { class TPortionInfo; } @@ -89,7 +91,7 @@ class TPortionInfo { TInternalPathId PathId; ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId - TSnapshot RemoveSnapshot = TSnapshot::Zero(); + TThreadSafeOptional RemoveSnapshot; ui64 SchemaVersion = 0; std::optional ShardingVersion; @@ -213,8 +215,8 @@ class TPortionInfo { } void SetRemoveSnapshot(const TSnapshot& snap) { - AFL_VERIFY(!RemoveSnapshot.Valid()); - RemoveSnapshot = snap; + AFL_VERIFY(!HasRemoveSnapshot()); + RemoveSnapshot.Set(snap); } void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { @@ -339,7 +341,7 @@ class TPortionInfo { TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { - return RemoveSnapshot.Valid(); + return RemoveSnapshot.Has(); } bool IsRemovedFor(const TSnapshot& snapshot) const { @@ -376,12 +378,12 @@ class TPortionInfo { const TSnapshot& GetRemoveSnapshotVerified() const { AFL_VERIFY(HasRemoveSnapshot()); - return RemoveSnapshot; + return RemoveSnapshot.Get(); } std::optional GetRemoveSnapshotOptional() const { - if (RemoveSnapshot.Valid()) { - return RemoveSnapshot; + if (HasRemoveSnapshot()) { + return RemoveSnapshot.Get(); } else { return {}; } @@ -393,7 +395,7 @@ class TPortionInfo { } bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const { - const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot); + const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)( "snapshot", snapshot.DebugString());