Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/common/thread_safe_optional.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#include "thread_safe_optional.h"

namespace NKikimr::NOlap {

}

96 changes: 96 additions & 0 deletions ydb/core/tx/columnshard/common/thread_safe_optional.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include <atomic>
#include <optional>
#include <utility>

namespace NKikimr::NOlap {

template <class T>
class TThreadSafeOptional {
private:
alignas(T) unsigned char Storage[sizeof(T)];
std::atomic<bool> Defined{ false };

T *Ptr() {
return reinterpret_cast<T *>(&Storage[0]);
}

const T *Ptr() const {
return reinterpret_cast<const T *>(&Storage[0]);
}

public:
TThreadSafeOptional() = default;

~TThreadSafeOptional() {
if (Has()) {
Ptr()->~T();
}
}

TThreadSafeOptional(const TThreadSafeOptional& other) {
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(*other.Ptr());
Defined.store(true, std::memory_order_release);
}
}

TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete;

TThreadSafeOptional(TThreadSafeOptional&& other) noexcept {
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(std::move(*other.Ptr()));
Defined.store(true, std::memory_order_release);
}
}

TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept {
if (this == &other) {
return *this;
}

AFL_VERIFY(!Has());
const bool has = other.Defined.load(std::memory_order_acquire);
if (has) {
::new (Ptr()) T(std::move(*other.Ptr()));
Defined.store(true, std::memory_order_release);
}

return *this;
}

void Reset() = delete;

void Set(const T& value) {
AFL_VERIFY(!Has());
::new (Ptr()) T(value);
Defined.store(true, std::memory_order_release);
}

void Set(T&& value) {
AFL_VERIFY(!Has());
::new (Ptr()) T(std::move(value));
Defined.store(true, std::memory_order_release);
}

bool Has() const {
return Defined.load(std::memory_order_acquire);
}

const T& Get() const {
return *Ptr();
}

std::optional<T> GetOptional() const {
if (Has()) {
return *Ptr();
}

return {};
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SRCS(
snapshot.cpp
portion.cpp
tablet_id.cpp
thread_safe_optional.cpp
blob.cpp
volume.cpp
path_id.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {

if (RemoveSnapshot) {
AFL_VERIFY(RemoveSnapshot->Valid());
result->RemoveSnapshot = *RemoveSnapshot;
result->SetRemoveSnapshot(*RemoveSnapshot);
}

AFL_VERIFY(SchemaVersion && *SchemaVersion);
result->SchemaVersion = *SchemaVersion;
result->ShardingVersion = ShardingVersion;
}

static TAtomicCounter countValues = 0;
static TAtomicCounter sumValues = 0;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())(
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
sb << "column_size:" << GetColumnBlobBytes() << ";"
<< "index_size:" << GetIndexBlobBytes() << ";"
<< "meta:(" << Meta.DebugString() << ");";
if (RemoveSnapshot.Valid()) {
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
if (HasRemoveSnapshot()) {
sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");";
}
return sb << ")";
}
Expand All @@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector<TUnifiedBlobId>& blobIds,
PathId.ToProto(proto);
proto.SetPortionId(PortionId);
proto.SetSchemaVersion(GetSchemaVersionVerified());
if (!RemoveSnapshot.IsZero()) {
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
if (HasRemoveSnapshot()) {
*proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto();
}

*proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced());
Expand All @@ -71,11 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
return TConclusionStatus::Fail("portion's schema version cannot been equals to zero");
}
if (proto.HasRemoveSnapshot()) {
auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot());
TSnapshot tmp = TSnapshot::Zero();
auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot());
if (!parse) {
return parse;
}

RemoveSnapshot.Set(std::move(tmp));
}

return TConclusionStatus::Success();
}

Expand Down
18 changes: 10 additions & 8 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/common/blob.h>
#include <ydb/core/tx/columnshard/common/path_id.h>
#include <ydb/core/tx/columnshard/common/thread_safe_optional.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/formats/arrow/replace_key.h>

#include <util/generic/hash_set.h>


namespace NKikimrColumnShardDataSharingProto {
class TPortionInfo;
}
Expand Down Expand Up @@ -89,7 +91,7 @@ class TPortionInfo {

TInternalPathId PathId;
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
TSnapshot RemoveSnapshot = TSnapshot::Zero();
TThreadSafeOptional<TSnapshot> RemoveSnapshot;
ui64 SchemaVersion = 0;
std::optional<ui64> ShardingVersion;

Expand Down Expand Up @@ -213,8 +215,8 @@ class TPortionInfo {
}

void SetRemoveSnapshot(const TSnapshot& snap) {
AFL_VERIFY(!RemoveSnapshot.Valid());
RemoveSnapshot = snap;
AFL_VERIFY(!HasRemoveSnapshot());
RemoveSnapshot.Set(snap);
}

void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
Expand Down Expand Up @@ -339,7 +341,7 @@ class TPortionInfo {
TString DebugString(const bool withDetails = false) const;

bool HasRemoveSnapshot() const {
return RemoveSnapshot.Valid();
return RemoveSnapshot.Has();
}

bool IsRemovedFor(const TSnapshot& snapshot) const {
Expand Down Expand Up @@ -376,12 +378,12 @@ class TPortionInfo {

const TSnapshot& GetRemoveSnapshotVerified() const {
AFL_VERIFY(HasRemoveSnapshot());
return RemoveSnapshot;
return RemoveSnapshot.Get();
}

std::optional<TSnapshot> GetRemoveSnapshotOptional() const {
if (RemoveSnapshot.Valid()) {
return RemoveSnapshot;
if (HasRemoveSnapshot()) {
return RemoveSnapshot.Get();
} else {
return {};
}
Expand All @@ -393,7 +395,7 @@ class TPortionInfo {
}

bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const {
const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot);
const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot);

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)(
"snapshot", snapshot.DebugString());
Expand Down
Loading