From 8e785a76146eaf65337efd4ed8e85e739f827361 Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Wed, 8 Oct 2025 13:15:46 +0000 Subject: [PATCH 1/4] remove race --- .../engines/portions/portion_info.cpp | 5 +- .../engines/portions/portion_info.h | 70 ++++++++++++++++--- 2 files changed, 64 insertions(+), 11 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index b827d6e19472..31d2a6810eb6 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -38,7 +38,7 @@ TString TPortionInfo::DebugString(const bool withDetails) const { sb << "column_size:" << GetColumnBlobBytes() << ";" << "index_size:" << GetIndexBlobBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; - if (RemoveSnapshot.Valid()) { + if (HasRemoveSnapshot()) { sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; } return sb << ")"; @@ -56,7 +56,7 @@ void TPortionInfo::SerializeToProto(const std::vector& blobIds, PathId.ToProto(proto); proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); - if (!RemoveSnapshot.IsZero()) { + if (HasRemoveSnapshot()) { *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); } @@ -75,6 +75,7 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat if (!parse) { return parse; } + RemoveSnapshotDefined.store(true, std::memory_order_release); } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index c3482196042d..8a1b7236b6b2 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -13,6 +13,7 @@ #include #include +#include namespace NKikimrColumnShardDataSharingProto { class TPortionInfo; @@ -84,12 +85,37 @@ class TPortionInfo { friend class TCompactedPortionInfo; friend class TWrittenPortionInfo; - TPortionInfo(const TPortionInfo&) = default; - TPortionInfo& operator=(const TPortionInfo&) = default; + TPortionInfo(const TPortionInfo& other) + : PathId(other.PathId) + , PortionId(other.PortionId) + , RemoveSnapshot(other.RemoveSnapshot) + , RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire)) + , SchemaVersion(other.SchemaVersion) + , ShardingVersion(other.ShardingVersion) + , Meta(other.Meta) + , RuntimeFeatures(other.RuntimeFeatures) { + } + + TPortionInfo& operator=(const TPortionInfo& other) { + if (this == &other) { + return *this; + } + + PathId = other.PathId; + PortionId = other.PortionId; + RemoveSnapshot = other.RemoveSnapshot; + RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release); + SchemaVersion = other.SchemaVersion; + ShardingVersion = other.ShardingVersion; + Meta = other.Meta; + RuntimeFeatures = other.RuntimeFeatures; + return *this; + } TInternalPathId PathId; ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId TSnapshot RemoveSnapshot = TSnapshot::Zero(); + std::atomic RemoveSnapshotDefined = false; ui64 SchemaVersion = 0; std::optional ShardingVersion; @@ -167,8 +193,33 @@ class TPortionInfo { TPortionInfo(TPortionMeta&& meta) : Meta(std::move(meta)) { } - TPortionInfo(TPortionInfo&&) = default; - TPortionInfo& operator=(TPortionInfo&&) = default; + + TPortionInfo(TPortionInfo&& other) noexcept + : PathId(std::move(other.PathId)) + , PortionId(other.PortionId) + , RemoveSnapshot(std::move(other.RemoveSnapshot)) + , RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire)) + , SchemaVersion(other.SchemaVersion) + , ShardingVersion(std::move(other.ShardingVersion)) + , Meta(std::move(other.Meta)) + , RuntimeFeatures(other.RuntimeFeatures) { + } + + TPortionInfo& operator=(TPortionInfo&& other) noexcept { + if (this == &other) { + return *this; + } + + PathId = std::move(other.PathId); + PortionId = other.PortionId; + RemoveSnapshot = std::move(other.RemoveSnapshot); + RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release); + SchemaVersion = other.SchemaVersion; + ShardingVersion = std::move(other.ShardingVersion); + Meta = std::move(other.Meta); + RuntimeFeatures = other.RuntimeFeatures; + return *this; + } virtual void FillDefaultColumn(NAssembling::TColumnAssemblingInfo& column, const std::optional& defaultSnapshot) const = 0; @@ -213,8 +264,9 @@ class TPortionInfo { } void SetRemoveSnapshot(const TSnapshot& snap) { - AFL_VERIFY(!RemoveSnapshot.Valid()); + AFL_VERIFY(!RemoveSnapshotDefined.load()); RemoveSnapshot = snap; + RemoveSnapshotDefined.store(true, std::memory_order_release); } void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { @@ -339,7 +391,7 @@ class TPortionInfo { TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { - return RemoveSnapshot.Valid(); + return RemoveSnapshotDefined.load(std::memory_order_acquire); } bool IsRemovedFor(const TSnapshot& snapshot) const { @@ -375,12 +427,12 @@ class TPortionInfo { } const TSnapshot& GetRemoveSnapshotVerified() const { - AFL_VERIFY(HasRemoveSnapshot()); + AFL_VERIFY(RemoveSnapshotDefined.load(std::memory_order_acquire)); return RemoveSnapshot; } std::optional GetRemoveSnapshotOptional() const { - if (RemoveSnapshot.Valid()) { + if (RemoveSnapshotDefined.load(std::memory_order_acquire)) { return RemoveSnapshot; } else { return {}; @@ -393,7 +445,7 @@ class TPortionInfo { } bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const { - const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot); + const bool visible = (!RemoveSnapshotDefined.load(std::memory_order_acquire) || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)( "snapshot", snapshot.DebugString()); From 01ae8f7505de33dbb4b2a9c30d0e7303b57e0d12 Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Wed, 8 Oct 2025 15:26:45 +0000 Subject: [PATCH 2/4] added thread safe class --- .../common/thread_safe_optional.cpp | 6 + .../columnshard/common/thread_safe_optional.h | 130 ++++++++++++++++++ ydb/core/tx/columnshard/common/ya.make | 1 + .../engines/portions/portion_info.cpp | 11 +- .../engines/portions/portion_info.h | 80 ++--------- 5 files changed, 159 insertions(+), 69 deletions(-) create mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.cpp create mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.h diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.cpp b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp new file mode 100644 index 000000000000..15e225e72083 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp @@ -0,0 +1,6 @@ +#include "thread_safe_optional.h" + +namespace NKikimr::NOlap { + +} + diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.h b/ydb/core/tx/columnshard/common/thread_safe_optional.h new file mode 100644 index 000000000000..62d9fcc29cd3 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.h @@ -0,0 +1,130 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NOlap { + +template +class TThreadSafeOptional { +private: + alignas(T) unsigned char Storage[sizeof(T)]; + std::atomic Defined{ false }; + + T *Ptr() { + return reinterpret_cast(&Storage[0]); + } + + const T *Ptr() const { + return reinterpret_cast(&Storage[0]); + } + +public: + TThreadSafeOptional() = default; + + ~TThreadSafeOptional() { + if (Has()) { + Ptr()->~T(); + } + } + + TThreadSafeOptional(const TThreadSafeOptional& other) { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(*other.Ptr()); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(const TThreadSafeOptional& other) { + if (this == &other) { + return *this; + } + + const bool has = other.Defined.load(std::memory_order_acquire); + if (Has()) { + Ptr()->~T(); + Defined.store(false, std::memory_order_release); + } + + if (has) { + ::new (Ptr()) T(*other.Ptr()); + Defined.store(true, std::memory_order_release); + } + + return *this; + } + + TThreadSafeOptional(TThreadSafeOptional&& other) noexcept { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept { + if (this == &other) { + return *this; + } + + const bool has = other.Defined.load(std::memory_order_acquire); + if (Has()) { + Ptr()->~T(); + Defined.store(false, std::memory_order_release); + } + + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + + return *this; + } + + void Reset() { + if (Has()) { + Ptr()->~T(); + Defined.store(false, std::memory_order_release); + } + } + + void Set(const T& value) { + if (Has()) { + Ptr()->~T(); + Defined.store(false, std::memory_order_release); + } + + ::new (Ptr()) T(value); + Defined.store(true, std::memory_order_release); + } + + void Set(T&& value) { + if (Has()) { + Ptr()->~T(); + Defined.store(false, std::memory_order_release); + } + + ::new (Ptr()) T(std::move(value)); + Defined.store(true, std::memory_order_release); + } + + bool Has() const { + return Defined.load(std::memory_order_acquire); + } + + const T& Get() const { + return *Ptr(); + } + + std::optional GetOptional() const { + if (Has()) { + return *Ptr(); + } + + return {}; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 86c71afb41ba..8b3f40838e3b 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -7,6 +7,7 @@ SRCS( snapshot.cpp portion.cpp tablet_id.cpp + thread_safe_optional.cpp blob.cpp volume.cpp path_id.cpp diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 31d2a6810eb6..14f91a801893 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -39,7 +39,7 @@ TString TPortionInfo::DebugString(const bool withDetails) const { << "index_size:" << GetIndexBlobBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; if (HasRemoveSnapshot()) { - sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; + sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");"; } return sb << ")"; } @@ -57,7 +57,7 @@ void TPortionInfo::SerializeToProto(const std::vector& blobIds, proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); if (HasRemoveSnapshot()) { - *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); + *proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto(); } *proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced()); @@ -71,12 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat return TConclusionStatus::Fail("portion's schema version cannot been equals to zero"); } if (proto.HasRemoveSnapshot()) { - auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot()); + TSnapshot tmp = TSnapshot::Zero(); + auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot()); if (!parse) { return parse; } - RemoveSnapshotDefined.store(true, std::memory_order_release); + + RemoveSnapshot.Set(std::move(tmp)); } + return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 8a1b7236b6b2..b4ce802d4d08 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -7,13 +7,14 @@ #include #include #include +#include #include #include #include #include -#include + namespace NKikimrColumnShardDataSharingProto { class TPortionInfo; @@ -85,37 +86,12 @@ class TPortionInfo { friend class TCompactedPortionInfo; friend class TWrittenPortionInfo; - TPortionInfo(const TPortionInfo& other) - : PathId(other.PathId) - , PortionId(other.PortionId) - , RemoveSnapshot(other.RemoveSnapshot) - , RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire)) - , SchemaVersion(other.SchemaVersion) - , ShardingVersion(other.ShardingVersion) - , Meta(other.Meta) - , RuntimeFeatures(other.RuntimeFeatures) { - } - - TPortionInfo& operator=(const TPortionInfo& other) { - if (this == &other) { - return *this; - } - - PathId = other.PathId; - PortionId = other.PortionId; - RemoveSnapshot = other.RemoveSnapshot; - RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release); - SchemaVersion = other.SchemaVersion; - ShardingVersion = other.ShardingVersion; - Meta = other.Meta; - RuntimeFeatures = other.RuntimeFeatures; - return *this; - } + TPortionInfo(const TPortionInfo&) = default; + TPortionInfo& operator=(const TPortionInfo&) = default; TInternalPathId PathId; ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId - TSnapshot RemoveSnapshot = TSnapshot::Zero(); - std::atomic RemoveSnapshotDefined = false; + TThreadSafeOptional RemoveSnapshot; ui64 SchemaVersion = 0; std::optional ShardingVersion; @@ -193,33 +169,8 @@ class TPortionInfo { TPortionInfo(TPortionMeta&& meta) : Meta(std::move(meta)) { } - - TPortionInfo(TPortionInfo&& other) noexcept - : PathId(std::move(other.PathId)) - , PortionId(other.PortionId) - , RemoveSnapshot(std::move(other.RemoveSnapshot)) - , RemoveSnapshotDefined(other.RemoveSnapshotDefined.load(std::memory_order_acquire)) - , SchemaVersion(other.SchemaVersion) - , ShardingVersion(std::move(other.ShardingVersion)) - , Meta(std::move(other.Meta)) - , RuntimeFeatures(other.RuntimeFeatures) { - } - - TPortionInfo& operator=(TPortionInfo&& other) noexcept { - if (this == &other) { - return *this; - } - - PathId = std::move(other.PathId); - PortionId = other.PortionId; - RemoveSnapshot = std::move(other.RemoveSnapshot); - RemoveSnapshotDefined.store(other.RemoveSnapshotDefined.load(std::memory_order_acquire), std::memory_order_release); - SchemaVersion = other.SchemaVersion; - ShardingVersion = std::move(other.ShardingVersion); - Meta = std::move(other.Meta); - RuntimeFeatures = other.RuntimeFeatures; - return *this; - } + TPortionInfo(TPortionInfo&&) = default; + TPortionInfo& operator=(TPortionInfo&&) = default; virtual void FillDefaultColumn(NAssembling::TColumnAssemblingInfo& column, const std::optional& defaultSnapshot) const = 0; @@ -264,9 +215,8 @@ class TPortionInfo { } void SetRemoveSnapshot(const TSnapshot& snap) { - AFL_VERIFY(!RemoveSnapshotDefined.load()); - RemoveSnapshot = snap; - RemoveSnapshotDefined.store(true, std::memory_order_release); + AFL_VERIFY(!HasRemoveSnapshot()); + RemoveSnapshot.Set(snap); } void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { @@ -391,7 +341,7 @@ class TPortionInfo { TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { - return RemoveSnapshotDefined.load(std::memory_order_acquire); + return RemoveSnapshot.Has(); } bool IsRemovedFor(const TSnapshot& snapshot) const { @@ -427,13 +377,13 @@ class TPortionInfo { } const TSnapshot& GetRemoveSnapshotVerified() const { - AFL_VERIFY(RemoveSnapshotDefined.load(std::memory_order_acquire)); - return RemoveSnapshot; + AFL_VERIFY(HasRemoveSnapshot()); + return RemoveSnapshot.Get(); } std::optional GetRemoveSnapshotOptional() const { - if (RemoveSnapshotDefined.load(std::memory_order_acquire)) { - return RemoveSnapshot; + if (HasRemoveSnapshot()) { + return RemoveSnapshot.Get(); } else { return {}; } @@ -445,7 +395,7 @@ class TPortionInfo { } bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const { - const bool visible = (!RemoveSnapshotDefined.load(std::memory_order_acquire) || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); + const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)( "snapshot", snapshot.DebugString()); From 53ed56625193f5393841174527cf78e2ec5fbf30 Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Wed, 8 Oct 2025 15:50:03 +0000 Subject: [PATCH 3/4] fixed notes --- .../columnshard/common/thread_safe_optional.h | 44 +++---------------- 1 file changed, 5 insertions(+), 39 deletions(-) diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.h b/ydb/core/tx/columnshard/common/thread_safe_optional.h index 62d9fcc29cd3..8625132fa127 100644 --- a/ydb/core/tx/columnshard/common/thread_safe_optional.h +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.h @@ -37,24 +37,7 @@ class TThreadSafeOptional { } } - TThreadSafeOptional& operator=(const TThreadSafeOptional& other) { - if (this == &other) { - return *this; - } - - const bool has = other.Defined.load(std::memory_order_acquire); - if (Has()) { - Ptr()->~T(); - Defined.store(false, std::memory_order_release); - } - - if (has) { - ::new (Ptr()) T(*other.Ptr()); - Defined.store(true, std::memory_order_release); - } - - return *this; - } + TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete; TThreadSafeOptional(TThreadSafeOptional&& other) noexcept { const bool has = other.Defined.load(std::memory_order_acquire); @@ -69,12 +52,8 @@ class TThreadSafeOptional { return *this; } + AFL_VERIFY(!Has()); const bool has = other.Defined.load(std::memory_order_acquire); - if (Has()) { - Ptr()->~T(); - Defined.store(false, std::memory_order_release); - } - if (has) { ::new (Ptr()) T(std::move(*other.Ptr())); Defined.store(true, std::memory_order_release); @@ -83,29 +62,16 @@ class TThreadSafeOptional { return *this; } - void Reset() { - if (Has()) { - Ptr()->~T(); - Defined.store(false, std::memory_order_release); - } - } + void Reset() = delete; void Set(const T& value) { - if (Has()) { - Ptr()->~T(); - Defined.store(false, std::memory_order_release); - } - + AFL_VERIFY(!Has()); ::new (Ptr()) T(value); Defined.store(true, std::memory_order_release); } void Set(T&& value) { - if (Has()) { - Ptr()->~T(); - Defined.store(false, std::memory_order_release); - } - + AFL_VERIFY(!Has()); ::new (Ptr()) T(std::move(value)); Defined.store(true, std::memory_order_release); } From bba9d4bcb3bffe14affd1ac9f9b6f5941839a84e Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Wed, 8 Oct 2025 16:08:50 +0000 Subject: [PATCH 4/4] fixed --- .../tx/columnshard/engines/portions/constructor_portion.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp index 38c4c1372939..cb9e61cd15fb 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp @@ -29,12 +29,14 @@ std::shared_ptr TPortionInfoConstructor::Build() { if (RemoveSnapshot) { AFL_VERIFY(RemoveSnapshot->Valid()); - result->RemoveSnapshot = *RemoveSnapshot; + result->SetRemoveSnapshot(*RemoveSnapshot); } + AFL_VERIFY(SchemaVersion && *SchemaVersion); result->SchemaVersion = *SchemaVersion; result->ShardingVersion = ShardingVersion; } + static TAtomicCounter countValues = 0; static TAtomicCounter sumValues = 0; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())(