diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 98d9bcb75a6..3c032078a4a 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -17 +18 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 38a9334ad0c..35b62492122 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -ca39c1dc5e3592adab111f61e4aaec2021bfa95b +cdad0889aa2f47f9cde997deae6f682eced20873 diff --git a/.github/workflows/examples.yaml b/.github/workflows/examples.yaml index 08920ddf7b9..5d0f38651b4 100644 --- a/.github/workflows/examples.yaml +++ b/.github/workflows/examples.yaml @@ -5,6 +5,7 @@ on: branches: - main pull_request: + types: [opened, synchronize, reopened, ready_for_review] branches: - main concurrency: diff --git a/.github/workflows/import.yaml b/.github/workflows/import.yaml index b4fc76c5c87..d1fe9f00746 100644 --- a/.github/workflows/import.yaml +++ b/.github/workflows/import.yaml @@ -80,6 +80,7 @@ jobs: --base ${{ github.ref_name }} \ --head import-pr-$GENERATION \ --title "Import YDB C++ SDK $GENERATION" \ - --body "Automatic import of new commits from ydb repository" + --body "Automatic import of new commits from ydb repository" \ + --draft env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a8ac3eb3373..c9c7bad9f9a 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -5,6 +5,7 @@ on: branches: - main pull_request: + types: [opened, synchronize, reopened, ready_for_review] branches: - main concurrency: @@ -116,4 +117,4 @@ jobs: - name: Test shell: bash run: | - YDB_VERSION=${{ matrix.ydb-version }} ctest -j$(nproc) --preset integration + YDB_VERSION=${{ matrix.ydb-version }} ctest -j2 --preset integration diff --git a/include/ydb-cpp-sdk/client/proto/accessor.h b/include/ydb-cpp-sdk/client/proto/accessor.h index 313321176b5..bfa6f02e7b9 100644 --- a/include/ydb-cpp-sdk/client/proto/accessor.h +++ b/include/ydb-cpp-sdk/client/proto/accessor.h @@ -1,7 +1,5 @@ #pragma once -#include "private.h" - #include #include #include @@ -52,10 +50,6 @@ class TProtoAccessor { static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription& describeNodeResult); static const Ydb::Import::ListObjectsInS3ExportResult& GetProto(const NYdb::NImport::TListObjectsInS3ExportResult& result); -#ifdef YDB_SDK_INTERNAL_CLIENTS - static const Ydb::Replication::DescribeReplicationResult& GetProto(const NYdb::NReplication::TDescribeReplicationResult& desc); - static const Ydb::View::DescribeViewResult& GetProto(const NYdb::NView::TDescribeViewResult& desc); -#endif static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request); diff --git a/include/ydb-cpp-sdk/client/scheme/scheme.h b/include/ydb-cpp-sdk/client/scheme/scheme.h index 6cb842200f9..f4bede6f1e1 100644 --- a/include/ydb-cpp-sdk/client/scheme/scheme.h +++ b/include/ydb-cpp-sdk/client/scheme/scheme.h @@ -51,6 +51,7 @@ enum class ESchemeEntryType : i32 { View = 20, ResourcePool = 21, SysView = 22, + Transfer = 23, }; struct TVirtualTimestamp { diff --git a/include/ydb-cpp-sdk/client/value/value.h b/include/ydb-cpp-sdk/client/value/value.h index b47748439c7..38f22f1d06a 100644 --- a/include/ydb-cpp-sdk/client/value/value.h +++ b/include/ydb-cpp-sdk/client/value/value.h @@ -4,6 +4,8 @@ #include +#include + #include #include @@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly { protected: TValueBuilderBase(TValueBuilderBase&&); - TValueBuilderBase(); + TValueBuilderBase(google::protobuf::Arena* arena = nullptr); TValueBuilderBase(const TType& type); @@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly { class TValueBuilder : public TValueBuilderBase { public: - TValueBuilder(); + TValueBuilder(google::protobuf::Arena* arena = nullptr); TValueBuilder(const TType& type); diff --git a/src/api/grpc/draft/ydb_replication_v1.proto b/src/api/grpc/draft/ydb_replication_v1.proto index f8868727955..ab1209868a4 100644 --- a/src/api/grpc/draft/ydb_replication_v1.proto +++ b/src/api/grpc/draft/ydb_replication_v1.proto @@ -7,4 +7,5 @@ option java_package = "com.yandex.ydb.replication.v1"; service ReplicationService { rpc DescribeReplication(Replication.DescribeReplicationRequest) returns (Replication.DescribeReplicationResponse); + rpc DescribeTransfer(Replication.DescribeTransferRequest) returns (Replication.DescribeTransferResponse); } diff --git a/src/api/protos/draft/ydb_replication.proto b/src/api/protos/draft/ydb_replication.proto index 4ef82408e61..62106c4d027 100644 --- a/src/api/protos/draft/ydb_replication.proto +++ b/src/api/protos/draft/ydb_replication.proto @@ -97,3 +97,52 @@ message DescribeReplicationResult { } } +message DescribeTransferRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Replication path. + string path = 2 [(required) = true]; +} + +message DescribeTransferResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +message DescribeTransferResult { + message RunningState { + } + + message ErrorState { + repeated Ydb.Issue.IssueMessage issues = 1; + } + + message DoneState { + } + + message PausedState { + } + + // Description of scheme object. + Ydb.Scheme.Entry self = 1; + + ConnectionParams connection_params = 2; + + oneof state { + RunningState running = 3; + ErrorState error = 4; + DoneState done = 5; + PausedState paused = 6; + } + + string source_path = 7; + string destination_path = 8; + string transformation_lambda = 9; + string consumer_name = 10; + + message BatchSettings { + optional uint64 size_bytes = 1; + google.protobuf.Duration flush_interval = 2; + } + + optional BatchSettings batch_settings = 11; +} diff --git a/src/client/impl/ydb_internal/retry/retry_async.h b/src/client/impl/ydb_internal/retry/retry_async.h index f19a0bf3371..e4a26f02fb6 100644 --- a/src/client/impl/ydb_internal/retry/retry_async.h +++ b/src/client/impl/ydb_internal/retry/retry_async.h @@ -114,7 +114,6 @@ class TRetryWithoutSession : public TRetryContext { template > class TRetryWithSession : public TRetryContext { using TRetryContextAsync = TRetryContext; - using TPtr = typename TRetryContextAsync::TPtr; using TStatusType = typename TRetryContextAsync::TStatusType; using TSession = typename TClient::TSession; using TCreateSessionSettings = typename TClient::TCreateSessionSettings; @@ -132,7 +131,7 @@ class TRetryWithSession : public TRetryContext { {} void Retry() override { - TPtr self(this); + TIntrusivePtr self(this); if (!Session_) { auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_); this->Client_.GetSession(settings).Subscribe( @@ -143,9 +142,8 @@ class TRetryWithSession : public TRetryContext { return TRetryContextAsync::HandleStatusAsync(self, TStatusType(TStatus(result))); } - auto* myself = dynamic_cast(self.Get()); - myself->Session_ = result.GetSession(); - myself->DoRunOperation(self); + self->Session_ = result.GetSession(); + self->DoRunOperation(self); } catch (...) { return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception()); } diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index ff7ccee5bd3..4423e2013d1 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -858,8 +858,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this { private: bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet - std::vector PrecommitCallbacks; - std::vector OnFailureCallbacks; + mutable std::vector PrecommitCallbacks; + mutable std::vector OnFailureCallbacks; std::mutex PrecommitCallbacksMutex; std::mutex OnFailureCallbacksMutex; diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index 713db1b33c4..faabbf61f9e 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -111,6 +111,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry) return ESchemeEntryType::ResourcePool; case ::Ydb::Scheme::Entry::SYS_VIEW: return ESchemeEntryType::SysView; + case ::Ydb::Scheme::Entry::TRANSFER: + return ESchemeEntryType::Transfer; default: return ESchemeEntryType::Unknown; } diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 24f84147f87..0072d9c88d0 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -2508,7 +2508,7 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { std::vector indexColumns; std::vector dataColumns; std::vector globalIndexSettings; - std::variant specializedIndexSettings; + std::variant specializedIndexSettings = std::monostate{}; indexColumns.assign(proto.index_columns().begin(), proto.index_columns().end()); dataColumns.assign(proto.data_columns().begin(), proto.data_columns().end()); diff --git a/src/client/topic/impl/direct_reader.cpp b/src/client/topic/impl/direct_reader.cpp index ce3b6e9a940..7e84cd0ea50 100644 --- a/src/client/topic/impl/direct_reader.cpp +++ b/src/client/topic/impl/direct_reader.cpp @@ -4,7 +4,7 @@ #include -namespace NYdb::NTopic { +namespace NYdb::inline V3::NTopic { TDirectReadClientMessage TDirectReadPartitionSession::MakeStartRequest() const { TDirectReadClientMessage req; diff --git a/src/client/topic/impl/direct_reader.h b/src/client/topic/impl/direct_reader.h index 2b2058537e6..2a25df56b4c 100644 --- a/src/client/topic/impl/direct_reader.h +++ b/src/client/topic/impl/direct_reader.h @@ -10,7 +10,7 @@ #include -namespace NYdb::NTopic { +namespace NYdb::inline V3::NTopic { template class TDeferredActions; diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index 51531a52072..5855780b6bf 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -1111,7 +1111,7 @@ struct THash::TKey> { } }; -namespace NYdb::NTopic { +namespace NYdb::inline V3::NTopic { // Read session for single cluster. // This class holds only read session logic. diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index f4c2f8518fe..72293e5da9b 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -2183,6 +2183,9 @@ void TSingleClusterReadSessionImpl::TrySubscribeOnTransact return; } + txInfo->IsActive = true; + txInfo->Subscribed = true; + auto callback = [cbContext = this->SelfContext, txId, txInfo, consumer = Settings.ConsumerName_, client]() { std::vector offsets; @@ -2205,9 +2208,6 @@ void TSingleClusterReadSessionImpl::TrySubscribeOnTransact }; tx.AddPrecommitCallback(std::move(callback)); - - txInfo->IsActive = true; - txInfo->Subscribed = true; } } diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 55bc19178e8..774bf0c1302 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -554,31 +554,31 @@ void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransactionBase* tx) txInfo->IsActive = true; txInfo->Subscribed = true; txInfo->AllAcksReceived = NThreading::NewPromise(); - } - auto callback = [cbContext = this->SelfContext, txId, txInfo]() { - with_lock(txInfo->Lock) { - Y_ABORT_UNLESS(!txInfo->CommitCalled); + auto callback = [cbContext = this->SelfContext, txId, txInfo]() { + with_lock(txInfo->Lock) { + Y_ABORT_UNLESS(!txInfo->CommitCalled); - txInfo->CommitCalled = true; + txInfo->CommitCalled = true; - if (txInfo->WriteCount == txInfo->AckCount) { - txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); - if (auto self = cbContext->LockShared()) { - self->DeleteTx(txId); + if (txInfo->WriteCount == txInfo->AckCount) { + txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess()); + if (auto self = cbContext->LockShared()) { + self->DeleteTx(txId); + } + return txInfo->AllAcksReceived.GetFuture(); } - return txInfo->AllAcksReceived.GetFuture(); - } - if (txInfo->IsActive) { - return txInfo->AllAcksReceived.GetFuture(); + if (txInfo->IsActive) { + return txInfo->AllAcksReceived.GetFuture(); + } } - } - return NThreading::MakeFuture(MakeSessionExpiredError()); - }; + return NThreading::MakeFuture(MakeSessionExpiredError()); + }; - tx->AddPrecommitCallback(std::move(callback)); + tx->AddPrecommitCallback(std::move(callback)); + } } void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo) diff --git a/src/client/value/value.cpp b/src/client/value/value.cpp index 4041b66609c..c00f007eabc 100644 --- a/src/client/value/value.cpp +++ b/src/client/value/value.cpp @@ -2061,12 +2061,24 @@ class TValueBuilderImpl { public: TValueBuilderImpl() : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) + { + PushPath(ProtoValue_); + } + + TValueBuilderImpl(google::protobuf::Arena* arena) + : TypeBuilder_() + , Arena(arena) + , ProtoValue_(*google::protobuf::Arena::CreateMessage(Arena)) { PushPath(ProtoValue_); } TValueBuilderImpl(const TType& type) : TypeBuilder_() + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(ProtoValue_); GetType().CopyFrom(type.GetProto()); @@ -2074,6 +2086,8 @@ class TValueBuilderImpl { TValueBuilderImpl(Ydb::Type& type, Ydb::Value& value) : TypeBuilder_(type) + , Arena(nullptr) + , ProtoValue_(ProtoValueHeap) { PushPath(value); } @@ -2088,10 +2102,15 @@ class TValueBuilderImpl { TValue BuildValue() { CheckValue(); - Ydb::Value value; - value.Swap(&ProtoValue_); - - return TValue(TypeBuilder_.Build(), std::move(value)); + if (Arena) { + auto* value = google::protobuf::Arena::CreateMessage(Arena); + value->Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), value); + } else { + Ydb::Value value; + value.Swap(&ProtoValue_); + return TValue(TypeBuilder_.Build(), std::move(value)); + } } void Bool(bool value) { @@ -2803,7 +2822,12 @@ class TValueBuilderImpl { private: //TTypeBuilder TypeBuilder_; TTypeBuilder::TImpl TypeBuilder_; - Ydb::Value ProtoValue_; + google::protobuf::Arena* Arena; + Ydb::Value ProtoValueHeap; + + // either ProtoValueHeap or a reference to the arena allocated protobuf + Ydb::Value& ProtoValue_; + std::map StructsMap_; TStackVec Path_; @@ -2819,8 +2843,8 @@ template TValueBuilderBase::~TValueBuilderBase() = default; template -TValueBuilderBase::TValueBuilderBase() - : Impl_(new TValueBuilderImpl()) {} +TValueBuilderBase::TValueBuilderBase(google::protobuf::Arena* arena) + : Impl_(new TValueBuilderImpl(arena)) {} template TValueBuilderBase::TValueBuilderBase(const TType& type) @@ -3382,8 +3406,8 @@ template class TValueBuilderBase; //////////////////////////////////////////////////////////////////////////////// -TValueBuilder::TValueBuilder() - : TValueBuilderBase() {} +TValueBuilder::TValueBuilder(google::protobuf::Arena* arena) + : TValueBuilderBase(arena) {} TValueBuilder::TValueBuilder(const TType& type) : TValueBuilderBase(type) {} diff --git a/tests/integration/topic/topic_to_table.cpp b/tests/integration/topic/topic_to_table.cpp index af411e2cb4c..be83486bb16 100644 --- a/tests/integration/topic/topic_to_table.cpp +++ b/tests/integration/topic/topic_to_table.cpp @@ -1335,9 +1335,6 @@ void TxUsage::TestWriteToTopic10() void TxUsage::TestWriteToTopic26() { - // TODO(brgayazov): fix test - GTEST_SKIP() << "Test is flaky"; - // // the test verifies a transaction in which data is read from a partition of one topic and written to // another partition of this topic @@ -1723,9 +1720,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_17)) void TxUsage::TestWriteToTopic25() { - // TODO(brgayazov): fix test - GTEST_SKIP() << "Test is flaky"; - // // the test verifies a transaction in which data is read from one topic and written to another //