Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/common/thread_safe_optional.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "thread_safe_optional.h"

namespace NKikimr::NOlap {

}

96 changes: 96 additions & 0 deletions ydb/core/tx/columnshard/common/thread_safe_optional.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include <atomic>
#include <optional>
#include <utility>

namespace NKikimr::NOlap {

template <class T>
class TThreadSafeOptional {
private:
alignas(T) unsigned char Storage[sizeof(T)];
std::atomic<bool> Defined{ false };

T *Ptr() {
return reinterpret_cast<T *>(&Storage[0]);
}

const T *Ptr() const {
return reinterpret_cast<const T *>(&Storage[0]);
}

public:
TThreadSafeOptional() = default;

~TThreadSafeOptional() {
if (Has()) {
Ptr()->~T();
}
}

TThreadSafeOptional(const TThreadSafeOptional& other) {
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(*other.Ptr());
Defined.store(true, std::memory_order_release);
}
}

TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete;

TThreadSafeOptional(TThreadSafeOptional&& other) noexcept {
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(std::move(*other.Ptr()));
Defined.store(true, std::memory_order_release);
}
}

TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept {
if (this == &other) {
return *this;
}

AFL_VERIFY(!Has());
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(std::move(*other.Ptr()));
Defined.store(true, std::memory_order_release);
}

return *this;
}

void Reset() = delete;

void Set(const T& value) {
AFL_VERIFY(!Has());
::new (Ptr()) T(value);
Defined.store(true, std::memory_order_release);
}

void Set(T&& value) {
AFL_VERIFY(!Has());
::new (Ptr()) T(std::move(value));
Defined.store(true, std::memory_order_release);
}

bool Has() const {
return Defined.load(std::memory_order_acquire);
}

const T& Get() const {
return *Ptr();
}

std::optional<T> GetOptional() const {
if (Has()) {
return *Ptr();
}

return {};
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
snapshot.cpp
portion.cpp
tablet_id.cpp
thread_safe_optional.cpp
blob.cpp
volume.cpp
path_id.cpp
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
sb << "column_size:" << GetColumnBlobBytes() << ";"
<< "index_size:" << GetIndexBlobBytes() << ";"
<< "meta:(" << Meta.DebugString() << ");";
if (RemoveSnapshot.Valid()) {
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
if (HasRemoveSnapshot()) {
sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");";
}
return sb << ")";
}
Expand All @@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector<TUnifiedBlobId>& blobIds,
PathId.ToProto(proto);
proto.SetPortionId(PortionId);
proto.SetSchemaVersion(GetSchemaVersionVerified());
if (!RemoveSnapshot.IsZero()) {
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
if (HasRemoveSnapshot()) {
*proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto();
}

*proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced());
Expand All @@ -71,11 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
return TConclusionStatus::Fail("portion's schema version cannot been equals to zero");
}
if (proto.HasRemoveSnapshot()) {
auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot());
TSnapshot tmp = TSnapshot::Zero();
auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot());
if (!parse) {
return parse;
}

RemoveSnapshot.Set(std::move(tmp));
}

return TConclusionStatus::Success();
}

Expand Down
18 changes: 10 additions & 8 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/common/blob.h>
#include <ydb/core/tx/columnshard/common/path_id.h>
#include <ydb/core/tx/columnshard/common/thread_safe_optional.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/replace_key.h>

#include <util/generic/hash_set.h>


namespace NKikimrColumnShardDataSharingProto {
class TPortionInfo;
}
Expand Down Expand Up @@ -89,7 +91,7 @@ class TPortionInfo {

TInternalPathId PathId;
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
TSnapshot RemoveSnapshot = TSnapshot::Zero();
TThreadSafeOptional<TSnapshot> RemoveSnapshot;
ui64 SchemaVersion = 0;
std::optional<ui64> ShardingVersion;

Expand Down Expand Up @@ -213,8 +215,8 @@ class TPortionInfo {
}

void SetRemoveSnapshot(const TSnapshot& snap) {
AFL_VERIFY(!RemoveSnapshot.Valid());
RemoveSnapshot = snap;
AFL_VERIFY(!HasRemoveSnapshot());
RemoveSnapshot.Set(snap);
}

void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
Expand Down Expand Up @@ -339,7 +341,7 @@ class TPortionInfo {
TString DebugString(const bool withDetails = false) const;

bool HasRemoveSnapshot() const {
return RemoveSnapshot.Valid();
return RemoveSnapshot.Has();
}

bool IsRemovedFor(const TSnapshot& snapshot) const {
Expand Down Expand Up @@ -376,12 +378,12 @@ class TPortionInfo {

const TSnapshot& GetRemoveSnapshotVerified() const {
AFL_VERIFY(HasRemoveSnapshot());
return RemoveSnapshot;
return RemoveSnapshot.Get();
}

std::optional<TSnapshot> GetRemoveSnapshotOptional() const {
if (RemoveSnapshot.Valid()) {
return RemoveSnapshot;
if (HasRemoveSnapshot()) {
return RemoveSnapshot.Get();
} else {
return {};
}
Expand All @@ -393,7 +395,7 @@ class TPortionInfo {
}

bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const {
const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot);
const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot);

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)(
"snapshot", snapshot.DebugString());
Expand Down
Loading