diff --git a/.github/import_generation.txt b/.github/import_generation.txt index f5c89552bd..bb95160cb6 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -32 +33 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index fed5bffc63..3522bfc6d0 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -8d2f0915190e981b53b115ea4b755e8dc0a9df5e +303019a794dd98ca44a77440af08bbeecf56d727 diff --git a/include/ydb-cpp-sdk/client/export/export.h b/include/ydb-cpp-sdk/client/export/export.h index 8369fc0e95..a2ada6ed70 100644 --- a/include/ydb-cpp-sdk/client/export/export.h +++ b/include/ydb-cpp-sdk/client/export/export.h @@ -101,7 +101,7 @@ struct TExportToS3Settings : public TOperationRequestSettings::max(), }; -enum class EIndexFillingMode { +enum class EIndexPopulationMode { Build = 0, Import = 1, Auto = 2, @@ -71,7 +71,7 @@ struct TImportFromS3Settings : public TOperationRequestSettings class TRetryContext; } // namespace NRetry::Sync + namespace NRetry { + template + class TRetryDeadlineHelper; + } // namespace NRetry } namespace NYdb::inline V3::NQuery { @@ -129,9 +133,12 @@ class TSession { friend class TQueryClient; friend class TTransaction; friend class TExecuteQueryIterator; + friend class NRetry::TRetryDeadlineHelper; public: const std::string& GetId() const; + const std::optional& GetPropagatedDeadline() const; + TAsyncExecuteQueryResult ExecuteQuery(const std::string& query, const TTxControl& txControl, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); @@ -153,6 +160,8 @@ class TSession { TSession(std::shared_ptr client); // Create broken session TSession(std::shared_ptr client, TSession::TImpl* sessionImpl); + void SetPropagatedDeadline(const TDeadline& deadline); + std::shared_ptr Client_; std::shared_ptr SessionImpl_; }; diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index 9ae0114e55..f405fc654a 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -57,6 +57,11 @@ template class TRetryContext; } // namespace NRetry::Sync +namespace NRetry { +template +class TRetryDeadlineHelper; +} // namespace NRetry + namespace NScheme { struct TPermissions; } // namespace NScheme @@ -1857,6 +1862,7 @@ class TSession { friend class TDataQuery; friend class TTransaction; friend class TSessionPool; + friend class NRetry::TRetryDeadlineHelper; public: //! The following methods perform corresponding calls. @@ -1931,11 +1937,15 @@ class TSession { //! Returns session id const std::string& GetId() const; + const std::optional& GetPropagatedDeadline() const; + class TImpl; private: TSession(std::shared_ptr client, const std::string& sessionId, const std::string& endpointId, bool isOwnedBySessionPool); TSession(std::shared_ptr client, std::shared_ptr SessionImpl_); + void SetPropagatedDeadline(const TDeadline& deadline); + std::shared_ptr Client_; std::shared_ptr SessionImpl_; }; diff --git a/include/ydb-cpp-sdk/client/types/request_settings.h b/include/ydb-cpp-sdk/client/types/request_settings.h index 51f3b40307..2067694fd7 100644 --- a/include/ydb-cpp-sdk/client/types/request_settings.h +++ b/include/ydb-cpp-sdk/client/types/request_settings.h @@ -4,6 +4,8 @@ #include "fluent_settings_helpers.h" +#include + #include #include @@ -17,20 +19,23 @@ struct TRequestSettings { using TSelf = TDerived; using THeader = std::vector>; + FLUENT_SETTING_DEFAULT(TDuration, ClientTimeout, TDuration::Max()); + FLUENT_SETTING_DEFAULT(TDeadline, Deadline, TDeadline::Max()); + FLUENT_SETTING(std::string, TraceId); FLUENT_SETTING(std::string, RequestType); FLUENT_SETTING(THeader, Header); - FLUENT_SETTING_DEFAULT(TDuration, ClientTimeout, TDuration::Max()); FLUENT_SETTING(std::string, TraceParent); TRequestSettings() = default; template explicit TRequestSettings(const TRequestSettings& other) - : TraceId_(other.TraceId_) + : ClientTimeout_(other.ClientTimeout_) + , Deadline_(other.Deadline_) + , TraceId_(other.TraceId_) , RequestType_(other.RequestType_) , Header_(other.Header_) - , ClientTimeout_(other.ClientTimeout_) , TraceParent_(other.TraceParent_) {} }; diff --git a/src/library/time/time.h b/include/ydb-cpp-sdk/library/time/time.h similarity index 100% rename from src/library/time/time.h rename to include/ydb-cpp-sdk/library/time/time.h diff --git a/src/api/grpc/ydb_keyvalue_v2.proto b/src/api/grpc/ydb_keyvalue_v2.proto new file mode 100644 index 0000000000..73d9f14852 --- /dev/null +++ b/src/api/grpc/ydb_keyvalue_v2.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package Ydb.KeyValue.V2; + +option java_package = "com.yandex.ydb.keyvalue.v2"; +option java_outer_classname = "KeyValueGrpc"; +option java_multiple_files = true; + +import "src/api/protos/ydb_keyvalue.proto"; + +// KeyValue tablets provide a simple key-value storage in a low-overhead and easy-to-shoot-your-leg manner. +// To use KeyValue tablets in an efficient way one must be familiar with the design of both the KeyValue tablet +// and the Distributed Storage underneath it. + +service KeyValueService { + + // Acquire an exclusive lock for the partition. + rpc AcquireLock(KeyValue.AcquireLockRequest) returns (KeyValue.AcquireLockResult); + + // Perform list of commands to modify the state of the partition as an atomic transaction. + rpc ExecuteTransaction(KeyValue.ExecuteTransactionRequest) returns (KeyValue.ExecuteTransactionResult); + + // Read the value stored in the item with the key specified. + rpc Read(KeyValue.ReadRequest) returns (KeyValue.ReadResult); + + // Read items with keys in the specified range. + rpc ReadRange(KeyValue.ReadRangeRequest) returns (KeyValue.ReadRangeResult); + + // List keys and metadata of items with keys in the specified range. + rpc ListRange(KeyValue.ListRangeRequest) returns (KeyValue.ListRangeResult); + + // Get storage channel status of the partition. + rpc GetStorageChannelStatus(KeyValue.GetStorageChannelStatusRequest) returns (KeyValue.GetStorageChannelStatusResult); + +} diff --git a/src/api/protos/ydb_export.proto b/src/api/protos/ydb_export.proto index 522f87593c..c334944f1c 100644 --- a/src/api/protos/ydb_export.proto +++ b/src/api/protos/ydb_export.proto @@ -145,11 +145,11 @@ message ExportToS3Settings { // the resulting data will not be encrypted. EncryptionSettings encryption_settings = 15; - // Materialization of index table data. + // Include index data or not. // By default, only index metadata is uploaded and indexes are built during import — it saves space // and reduces export time, but it can potentially increase the import time. - // Indexes can be materialized, then their data will be uploaded during export and downloaded during import. - bool materialize_indexes = 16; + // Index data can be uploaded and downloaded back during import. + bool include_index_data = 16; // Patterns (PCRE) for paths excluded from export operation. // - Patterns are matched against the object path relative to the export's source_path. diff --git a/src/api/protos/ydb_import.proto b/src/api/protos/ydb_import.proto index f30f428923..ac7fe56605 100644 --- a/src/api/protos/ydb_import.proto +++ b/src/api/protos/ydb_import.proto @@ -62,15 +62,15 @@ message ImportFromS3Settings { string destination_path = 2; } - enum IndexFillingMode { + enum IndexPopulationMode { // If unspecified, use default - Build - INDEX_FILLING_MODE_UNSPECIFIED = 0; + INDEX_POPULATION_MODE_UNSPECIFIED = 0; // Build index - INDEX_FILLING_MODE_BUILD = 1; + INDEX_POPULATION_MODE_BUILD = 1; // Import materialized index - INDEX_FILLING_MODE_IMPORT = 2; + INDEX_POPULATION_MODE_IMPORT = 2; // Try to import materialized index, build otherwise - INDEX_FILLING_MODE_AUTO = 3; + INDEX_POPULATION_MODE_AUTO = 3; } string endpoint = 1 [(required) = true]; @@ -112,9 +112,9 @@ message ImportFromS3Settings { // the resulting data is considered not encrypted. Ydb.Export.EncryptionSettings encryption_settings = 15; - // Index filling mode. + // Index population mode. // If not specified, indexes will be built. - IndexFillingMode index_filling_mode = 16; + IndexPopulationMode index_population_mode = 16; // Patterns (PCRE) for paths excluded from import operation. // - Patterns are matched against the database object names stored in the backup listing. diff --git a/src/api/protos/ydb_keyvalue.proto b/src/api/protos/ydb_keyvalue.proto index 53e0703c79..21e1cc0708 100644 --- a/src/api/protos/ydb_keyvalue.proto +++ b/src/api/protos/ydb_keyvalue.proto @@ -8,6 +8,8 @@ option java_outer_classname = "KeyValueProtos"; option java_multiple_files = true; import "src/api/protos/ydb_operation.proto"; +import "src/api/protos/ydb_status_codes.proto"; +import "src/api/protos/ydb_issue_message.proto"; // // KeyValue API. @@ -109,6 +111,10 @@ message AcquireLockResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 2; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message ExecuteTransactionRequest { @@ -234,6 +240,10 @@ message ExecuteTransactionResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 2; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message ReadRequest { @@ -290,6 +300,10 @@ message ReadResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 6; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message ReadRangeRequest { @@ -354,6 +368,10 @@ message ReadRangeResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 3; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message ListRangeRequest { @@ -407,6 +425,10 @@ message ListRangeResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 3; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message GetStorageChannelStatusRequest { @@ -435,6 +457,10 @@ message GetStorageChannelStatusResult { // Contains 0 if the request was sent to the node of the partition, node ID of the partition otherwise. uint32 node_id = 2; + + // Response status, use in V2. + StatusIds.StatusCode status = 16; + repeated Ydb.Issue.IssueMessage issues = 17; } message CreateVolumeRequest { diff --git a/src/client/common_client/impl/iface.h b/src/client/common_client/impl/iface.h index 5fa14e50f1..cd7bea247b 100644 --- a/src/client/common_client/impl/iface.h +++ b/src/client/common_client/impl/iface.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/export/export.cpp b/src/client/export/export.cpp index 53c35b1ee6..3e637b8f0e 100644 --- a/src/client/export/export.cpp +++ b/src/client/export/export.cpp @@ -95,7 +95,7 @@ TExportToS3Response::TExportToS3Response(TStatus&& status, Ydb::Operations::Oper Metadata_.Settings.Description(metadata.settings().description()); Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries()); - Metadata_.Settings.MaterializeIndexes(metadata.settings().materialize_indexes()); + Metadata_.Settings.IncludeIndexData(metadata.settings().include_index_data()); if (!metadata.settings().compression().empty()) { Metadata_.Settings.Compression(metadata.settings().compression()); @@ -252,7 +252,7 @@ TFuture TExportClient::ExportToS3(const TExportToS3Settings } request.mutable_settings()->set_disable_virtual_addressing(!settings.UseVirtualAddressing_); - request.mutable_settings()->set_materialize_indexes(settings.MaterializeIndexes_); + request.mutable_settings()->set_include_index_data(settings.IncludeIndexData_); if (settings.EncryptionAlgorithm_.empty() != settings.SymmetricKey_.empty()) { throw TContractViolation("Encryption algorithm and symmetric key must be set together"); diff --git a/src/client/impl/internal/retry/retry.h b/src/client/impl/internal/retry/retry.h index 1228796793..6fe090409c 100644 --- a/src/client/impl/internal/retry/retry.h +++ b/src/client/impl/internal/retry/retry.h @@ -4,6 +4,8 @@ #include #include +#include + #include #include #include @@ -111,4 +113,12 @@ class TRetryContextBase : TNonCopyable { } }; +template +class TRetryDeadlineHelper { +public: + static void SetDeadline(TClient::TSession& session, const TDeadline& deadline) { + session.SetPropagatedDeadline(deadline); + } +}; + } // namespace NYdb::NRetry diff --git a/src/client/impl/internal/retry/retry_async.h b/src/client/impl/internal/retry/retry_async.h index 3610ade8b2..fbaa06df32 100644 --- a/src/client/impl/internal/retry/retry_async.h +++ b/src/client/impl/internal/retry/retry_async.h @@ -112,7 +112,7 @@ class TRetryWithoutSession : public TRetryContext { }; template > -class TRetryWithSession : public TRetryContext { +class TRetryWithSession : public TRetryContext, public TRetryDeadlineHelper { using TRetryContextAsync = TRetryContext; using TStatusType = typename TRetryContextAsync::TStatusType; using TSession = typename TClient::TSession; @@ -120,20 +120,25 @@ class TRetryWithSession : public TRetryContext { using TAsyncCreateSessionResult = typename TClient::TAsyncCreateSessionResult; private: - TOperation Operation_; + const TOperation Operation_; + const TDeadline Deadline_; std::optional Session_; public: explicit TRetryWithSession( const TClient& client, TOperation&& operation, const TRetryOperationSettings& settings) : TRetryContextAsync(client, settings) - , Operation_(operation) + , Operation_(std::move(operation)) + , Deadline_(TDeadline::AfterDuration(this->Settings_.MaxTimeout_)) {} void Retry() override { TIntrusivePtr self(this); if (!Session_) { - auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_); + auto settings = TCreateSessionSettings() + .ClientTimeout(this->Settings_.GetSessionClientTimeout_) + .Deadline(Deadline_); + this->Client_.GetSession(settings).Subscribe( [self](const TAsyncCreateSessionResult& resultFuture) { try { @@ -143,6 +148,7 @@ class TRetryWithSession : public TRetryContext { } self->Session_ = result.GetSession(); + TRetryDeadlineHelper::SetDeadline(*self->Session_, self->Deadline_); self->DoRunOperation(self); } catch (...) { return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception()); diff --git a/src/client/impl/internal/retry/retry_sync.h b/src/client/impl/internal/retry/retry_sync.h index 01d5fc80f0..beefcb2771 100644 --- a/src/client/impl/internal/retry/retry_sync.h +++ b/src/client/impl/internal/retry/retry_sync.h @@ -1,9 +1,10 @@ #pragma once -#include #include #include +#include + namespace NYdb::inline V3::NRetry::Sync { template @@ -81,18 +82,20 @@ class TRetryWithoutSession : public TRetryContext { }; template> -class TRetryWithSession : public TRetryContext { +class TRetryWithSession : public TRetryContext, public TRetryDeadlineHelper { using TSession = typename TClient::TSession; using TCreateSessionSettings = typename TClient::TCreateSessionSettings; private: const TOperation& Operation_; + const TDeadline Deadline_; std::optional Session_; public: TRetryWithSession(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings) : TRetryContext(client, settings) , Operation_(operation) + , Deadline_(TDeadline::AfterDuration(this->Settings_.MaxTimeout_)) {} protected: @@ -100,10 +103,14 @@ class TRetryWithSession : public TRetryContext { std::optional status; if (!Session_) { - auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_); + auto settings = TCreateSessionSettings() + .ClientTimeout(this->Settings_.GetSessionClientTimeout_) + .Deadline(Deadline_); + auto sessionResult = this->Client_.GetSession(settings).GetValueSync(); if (sessionResult.IsSuccess()) { Session_ = sessionResult.GetSession(); + TRetryDeadlineHelper::SetDeadline(*Session_, Deadline_); } status = TStatusType(TStatus(sessionResult)); } diff --git a/src/client/impl/internal/rpc_request_settings/settings.h b/src/client/impl/internal/rpc_request_settings/settings.h index 18269f18d6..a63ddf6532 100644 --- a/src/client/impl/internal/rpc_request_settings/settings.h +++ b/src/client/impl/internal/rpc_request_settings/settings.h @@ -3,7 +3,7 @@ #include #include -#include +#include namespace NYdb::inline V3 { @@ -22,7 +22,9 @@ struct TRpcRequestSettings { std::string TraceParent; template - static TRpcRequestSettings Make(const TRequestSettings& settings, const TEndpointKey& preferredEndpoint = {}, TEndpointPolicy endpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally) { + static TRpcRequestSettings Make(const TRequestSettings& settings, + const TEndpointKey& preferredEndpoint = {}, + TEndpointPolicy endpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally) { TRpcRequestSettings rpcSettings; rpcSettings.TraceId = settings.TraceId_; rpcSettings.RequestType = settings.RequestType_; @@ -31,9 +33,16 @@ struct TRpcRequestSettings { rpcSettings.PreferredEndpoint = preferredEndpoint; rpcSettings.EndpointPolicy = endpointPolicy; rpcSettings.UseAuth = true; - rpcSettings.Deadline = NYdb::TDeadline::AfterDuration(settings.ClientTimeout_); + rpcSettings.Deadline = std::min(settings.Deadline_, NYdb::TDeadline::AfterDuration(settings.ClientTimeout_)); return rpcSettings; } + + TRpcRequestSettings& TryUpdateDeadline(const std::optional& deadline) { + if (deadline) { + Deadline = std::min(Deadline, *deadline); + } + return *this; + } }; } // namespace NYdb diff --git a/src/client/impl/session/kqp_session_common.cpp b/src/client/impl/session/kqp_session_common.cpp index fc0fe64ef3..fedc32358f 100644 --- a/src/client/impl/session/kqp_session_common.cpp +++ b/src/client/impl/session/kqp_session_common.cpp @@ -168,6 +168,8 @@ void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) no std::function TKqpSessionCommon::GetSmartDeleter(std::shared_ptr client) { return [client](TKqpSessionCommon* sessionImpl) { + sessionImpl->PropagatedDeadline_ = std::nullopt; + switch (sessionImpl->GetState()) { case TKqpSessionCommon::S_STANDALONE: case TKqpSessionCommon::S_BROKEN: diff --git a/src/client/impl/session/kqp_session_common.h b/src/client/impl/session/kqp_session_common.h index cfac2b6e04..225d8646df 100644 --- a/src/client/impl/session/kqp_session_common.h +++ b/src/client/impl/session/kqp_session_common.h @@ -69,6 +69,9 @@ class TKqpSessionCommon : public TEndpointObj { // Called asynchronously from grpc thread. void CloseFromServer(std::weak_ptr client) noexcept; +public: + std::optional PropagatedDeadline_; + protected: TAdaptiveLock Lock_; diff --git a/src/client/import/import.cpp b/src/client/import/import.cpp index 99936a8176..9fcc478a29 100644 --- a/src/client/import/import.cpp +++ b/src/client/import/import.cpp @@ -77,7 +77,7 @@ TImportFromS3Response::TImportFromS3Response(TStatus&& status, Ydb::Operations:: Metadata_.Settings.Description(metadata.settings().description()); Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries()); - Metadata_.Settings.IndexFillingMode(TProtoAccessor::FromProto(metadata.settings().index_filling_mode())); + Metadata_.Settings.IndexPopulationMode(TProtoAccessor::FromProto(metadata.settings().index_population_mode())); // progress Metadata_.Progress = TProtoAccessor::FromProto(metadata.progress()); @@ -316,7 +316,7 @@ TAsyncImportFromS3Response TImportClient::ImportFromS3(const TImportFromS3Settin settingsProto.set_destination_path(settings.DestinationPath_.value()); } - settingsProto.set_index_filling_mode(TProtoAccessor::GetProto(settings.IndexFillingMode_)); + settingsProto.set_index_population_mode(TProtoAccessor::GetProto(settings.IndexPopulationMode_)); for (const std::string& excludeRegexp : settings.ExcludeRegexp_) { settingsProto.add_exclude_regexps(excludeRegexp); diff --git a/src/client/proto/accessor.cpp b/src/client/proto/accessor.cpp index ed7dfbd535..8a5cbd79c7 100644 --- a/src/client/proto/accessor.cpp +++ b/src/client/proto/accessor.cpp @@ -137,30 +137,30 @@ NImport::EImportProgress TProtoAccessor::FromProto(Ydb::Import::ImportProgress:: } } -Ydb::Import::ImportFromS3Settings::IndexFillingMode TProtoAccessor::GetProto(NImport::EIndexFillingMode value) { +Ydb::Import::ImportFromS3Settings::IndexPopulationMode TProtoAccessor::GetProto(NImport::EIndexPopulationMode value) { switch (value) { - case NImport::EIndexFillingMode::Build: - return Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_BUILD; - case NImport::EIndexFillingMode::Import: - return Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_IMPORT; - case NImport::EIndexFillingMode::Auto: - return Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_AUTO; + case NImport::EIndexPopulationMode::Build: + return Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_BUILD; + case NImport::EIndexPopulationMode::Import: + return Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_IMPORT; + case NImport::EIndexPopulationMode::Auto: + return Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_AUTO; default: - return Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_UNSPECIFIED; + return Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_UNSPECIFIED; } } -NImport::EIndexFillingMode TProtoAccessor::FromProto(Ydb::Import::ImportFromS3Settings::IndexFillingMode value) { +NImport::EIndexPopulationMode TProtoAccessor::FromProto(Ydb::Import::ImportFromS3Settings::IndexPopulationMode value) { switch (value) { - case Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_UNSPECIFIED: - case Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_BUILD: - return NImport::EIndexFillingMode::Build; - case Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_IMPORT: - return NImport::EIndexFillingMode::Import; - case Ydb::Import::ImportFromS3Settings::INDEX_FILLING_MODE_AUTO: - return NImport::EIndexFillingMode::Auto; + case Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_UNSPECIFIED: + case Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_BUILD: + return NImport::EIndexPopulationMode::Build; + case Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_IMPORT: + return NImport::EIndexPopulationMode::Import; + case Ydb::Import::ImportFromS3Settings::INDEX_POPULATION_MODE_AUTO: + return NImport::EIndexPopulationMode::Auto; default: - return NImport::EIndexFillingMode::Unknown; + return NImport::EIndexPopulationMode::Unknown; } } diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index ed5122c327..9ce7b4343f 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -99,8 +99,9 @@ class TQueryClient::TImpl: public TClientImplCommon, public } NThreading::TFuture ExecuteScript(const std::string& script, const std::optional& params, const TExecuteScriptSettings& settings) { - using namespace Ydb::Query; - auto request = MakeOperationRequest(settings); + auto rpcSettings = TRpcRequestSettings::Make(settings); + + auto request = MakeOperationRequest(settings); request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_)); request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_)); request.set_pool_id(TStringType{settings.ResourcePool_}); @@ -131,12 +132,12 @@ class TQueryClient::TImpl: public TClientImplCommon, public } }; - Connections_->Run( + Connections_->Run( std::move(request), responseCb, - &V1::QueryService::Stub::AsyncExecuteScript, + &Ydb::Query::V1::QueryService::Stub::AsyncExecuteScript, DbDriverState_, - TRpcRequestSettings::Make(settings)); + rpcSettings); return promise.GetFuture(); } @@ -148,8 +149,13 @@ class TQueryClient::TImpl: public TClientImplCommon, public return FetchScriptResultsImpl(std::move(request), settings); } - TAsyncStatus RollbackTransaction(const std::string& txId, const NYdb::NQuery::TRollbackTxSettings& settings, const TSession& session) { - using namespace Ydb::Query; + TAsyncStatus RollbackTransaction(const std::string& txId, + const NYdb::NQuery::TRollbackTxSettings& settings, + const TSession& session) + { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeRequest(); request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); @@ -174,19 +180,23 @@ class TQueryClient::TImpl: public TClientImplCommon, public } }; - Connections_->Run( + Connections_->Run( std::move(request), responseCb, - &V1::QueryService::Stub::AsyncRollbackTransaction, + &Ydb::Query::V1::QueryService::Stub::AsyncRollbackTransaction, DbDriverState_, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())); + rpcSettings); return promise.GetFuture(); } - TAsyncCommitTransactionResult CommitTransaction(const std::string& txId, const NYdb::NQuery::TCommitTxSettings& settings, const TSession& session) { - using namespace Ydb::Query; - auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()); + TAsyncCommitTransactionResult CommitTransaction(const std::string& txId, + const NYdb::NQuery::TCommitTxSettings& settings, + const TSession& session) + { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeRequest(); request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); @@ -212,10 +222,10 @@ class TQueryClient::TImpl: public TClientImplCommon, public } }; - Connections_->Run( + Connections_->Run( std::move(request), responseCb, - &V1::QueryService::Stub::AsyncCommitTransaction, + &Ydb::Query::V1::QueryService::Stub::AsyncCommitTransaction, DbDriverState_, rpcSettings); @@ -223,10 +233,12 @@ class TQueryClient::TImpl: public TClientImplCommon, public } TAsyncBeginTransactionResult BeginTransaction(const TTxSettings& txSettings, - const TBeginTxSettings& settings, const TSession& session) + const TBeginTxSettings& settings, + const TSession& session) { - using namespace Ydb::Query; - auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()); + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeRequest(); request.set_session_id(TStringType{session.GetId()}); SetTxSettings(txSettings, request.mutable_tx_settings()); @@ -254,10 +266,10 @@ class TQueryClient::TImpl: public TClientImplCommon, public } }; - Connections_->Run( + Connections_->Run( std::move(request), responseCb, - &V1::QueryService::Stub::AsyncBeginTransaction, + &Ydb::Query::V1::QueryService::Stub::AsyncBeginTransaction, DbDriverState_, rpcSettings); @@ -349,7 +361,6 @@ class TQueryClient::TImpl: public TClientImplCommon, public NThreading::TPromise promise, const std::string& endpoint, std::shared_ptr client) { - using TStreamProcessorPtr = TSession::TImpl::TStreamProcessorPtr; Ydb::Query::AttachSessionRequest request; const auto sessionId = resp->session_id(); request.set_session_id(sessionId); @@ -366,7 +377,7 @@ class TQueryClient::TImpl: public TClientImplCommon, public Ydb::Query::SessionState> ( std::move(request), - [args] (TPlainStatus status, TStreamProcessorPtr processor) mutable { + [args] (TPlainStatus status, TSession::TImpl::TStreamProcessorPtr processor) mutable { if (processor) { TSession::TImpl::MakeImplAsync(processor, args); } else { @@ -380,8 +391,6 @@ class TQueryClient::TImpl: public TClientImplCommon, public } TAsyncCreateSessionResult CreateAttachedSession(const TRpcRequestSettings& rpcSettings) { - using namespace Ydb::Query; - Ydb::Query::CreateSessionRequest request; auto promise = NThreading::NewPromise(); @@ -404,10 +413,10 @@ class TQueryClient::TImpl: public TClientImplCommon, public } }; - Connections_->Run( + Connections_->Run( std::move(request), extractor, - &V1::QueryService::Stub::AsyncCreateSession, + &Ydb::Query::V1::QueryService::Stub::AsyncCreateSession, DbDriverState_, rpcSettings); @@ -415,14 +424,12 @@ class TQueryClient::TImpl: public TClientImplCommon, public } TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) { - auto rpcSettings = TRpcRequestSettings::Make(settings); - class TQueryClientGetSessionCtx : public NSessionPool::IGetSessionCtx { public: - TQueryClientGetSessionCtx(std::shared_ptr client, const TRpcRequestSettings& settings) + TQueryClientGetSessionCtx(std::shared_ptr client, const TCreateSessionSettings& settings) : Promise(NThreading::NewPromise()) , Client(client) - , RpcSettings(settings) + , RpcSettings(TRpcRequestSettings::Make(settings)) {} TAsyncCreateSessionResult GetFuture() { @@ -477,9 +484,10 @@ class TQueryClient::TImpl: public TClientImplCommon, public const TRpcRequestSettings RpcSettings; }; - auto ctx = std::make_unique(shared_from_this(), rpcSettings); + auto ctx = std::make_unique(shared_from_this(), settings); auto future = ctx->GetFuture(); SessionPool_.GetSession(std::move(ctx)); + return future; } @@ -685,6 +693,14 @@ TSession::TSession(std::shared_ptr client, TSession::TImpl* , SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client)) {} +const std::optional& TSession::GetPropagatedDeadline() const { + return SessionImpl_->PropagatedDeadline_; +} + +void TSession::SetPropagatedDeadline(const TDeadline& deadline) { + SessionImpl_->PropagatedDeadline_ = deadline; +} + const std::string& TSession::GetId() const { return SessionImpl_->GetId(); } diff --git a/src/client/query/impl/client_session.cpp b/src/client/query/impl/client_session.cpp index abc6b04a12..c905ad82fc 100644 --- a/src/client/query/impl/client_session.cpp +++ b/src/client/query/impl/client_session.cpp @@ -105,17 +105,14 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const std::string& sessionId, co } } -TSession::TImpl::~TImpl() -{ +TSession::TImpl::~TImpl() { if (StreamProcessor_) { StreamProcessor_->Cancel(); } SessionHolder->WaitAndLock(); } -void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, - std::shared_ptr args) -{ +void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, std::shared_ptr args) { auto resp = std::make_shared(); ptr->Read(resp.get(), [args, resp, ptr](NYdbGrpc::TGrpcStatus grpcStatus) mutable { if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) { @@ -137,9 +134,7 @@ void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr, }); } -void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, - std::shared_ptr args, NYdb::TStatus st) -{ +void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr args, NYdb::TStatus st) { args->Promise.SetValue( TCreateSessionResult( std::move(st), diff --git a/src/client/query/impl/client_session.h b/src/client/query/impl/client_session.h index 2a5c680cf1..d36d75c8a6 100644 --- a/src/client/query/impl/client_session.h +++ b/src/client/query/impl/client_session.h @@ -32,6 +32,7 @@ class TSession::TImpl : public TKqpSessionCommon { using TResponse = Ydb::Query::SessionState; using TStreamProcessorPtr = NYdbGrpc::IStreamRequestReadProcessor::TPtr; + TImpl(TStreamProcessorPtr ptr, const std::string& id, const std::string& endpoint, std::weak_ptr client); ~TImpl(); diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index f6707fce1c..2f2f69467f 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -287,6 +287,7 @@ class TExecQueryInternal { { auto rpcSettings = TRpcRequestSettings::Make(settings); if (session.has_value()) { + rpcSettings.TryUpdateDeadline(session->GetPropagatedDeadline()); rpcSettings.PreferredEndpoint = TEndpointKey(GetNodeIdFromSession(session->GetId())); } diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index f07e93ebe7..18c536aff9 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -286,19 +286,17 @@ void TTableClient::TImpl::StartPeriodicHostScanTask() { } TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSettings& settings) { - auto rpcSettings = TRpcRequestSettings::Make(settings); - rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER}); - class TTableClientGetSessionCtx : public NSessionPool::IGetSessionCtx { public: TTableClientGetSessionCtx(std::shared_ptr client, - const TCreateSessionSettings& createSessionSettings, - const TRpcRequestSettings& rpcSettings) + const TCreateSessionSettings& createSessionSettings) : Promise(NewPromise()) , Client(client) , CreateSessionSettings(createSessionSettings) - , RpcSettings(rpcSettings) - {} + , RpcSettings(TRpcRequestSettings::Make(createSessionSettings)) + { + RpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER}); + } TAsyncCreateSessionResult GetFuture() { return Promise.GetFuture(); @@ -353,10 +351,10 @@ TAsyncCreateSessionResult TTableClient::TImpl::GetSession(const TCreateSessionSe NThreading::TPromise Promise; std::shared_ptr Client; const TCreateSessionSettings CreateSessionSettings; - const TRpcRequestSettings RpcSettings; + TRpcRequestSettings RpcSettings; }; - auto ctx = std::make_unique(shared_from_this(), settings, rpcSettings); + auto ctx = std::make_unique(shared_from_this(), settings); auto future = ctx->GetFuture(); SessionPool_.GetSession(std::move(ctx)); return future; @@ -415,6 +413,9 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio } TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session->GetEndpointKey()) + .TryUpdateDeadline(session->PropagatedDeadline_); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session->GetId()}); @@ -449,29 +450,29 @@ TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* sess &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings, session->GetEndpointKey()) - ); + rpcSettings + ); return keepAliveResultPromise.GetFuture(); } -TFuture TTableClient::TImpl::CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings) +TFuture TTableClient::TImpl::CreateTable(Ydb::Table::CreateTableRequest&& request, const TRpcRequestSettings& rpcSettings) { return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncCreateTable, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TFuture TTableClient::TImpl::AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) +TFuture TTableClient::TImpl::AlterTable(Ydb::Table::AlterTableRequest&& request, const TRpcRequestSettings& rpcSettings) { return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TAsyncOperation TTableClient::TImpl::AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings) +TAsyncOperation TTableClient::TImpl::AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TRpcRequestSettings& rpcSettings) { using Ydb::Table::V1::TableService; using Ydb::Table::AlterTableRequest; @@ -479,53 +480,87 @@ TAsyncOperation TTableClient::TImpl::AlterTableLong(Ydb::Table::AlterTableReques return RunOperation( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncAlterTable, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TFuture TTableClient::TImpl::CopyTable(const std::string& sessionId, const std::string& src, const std::string& dst, - const TCopyTableSettings& settings) +TFuture TTableClient::TImpl::CopyTable(const TSession& session, const std::string& src, const std::string& dst, const TCopyTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{sessionId}); + request.set_session_id(TStringType{session.GetId()}); request.set_source_path(TStringType{src}); request.set_destination_path(TStringType{dst}); return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncCopyTable, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TFuture TTableClient::TImpl::CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings) +TFuture TTableClient::TImpl::CopyTables(const TSession& session, const std::vector& copyItems, const TCopyTablesSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + + auto request = MakeOperationRequest(settings); + request.set_session_id(TStringType{session.GetId()}); + + for (const auto& item: copyItems) { + auto add = request.add_tables(); + add->set_source_path(TStringType{item.SourcePath()}); + add->set_destination_path(TStringType{item.DestinationPath()}); + add->set_omit_indexes(item.OmitIndexes()); + } + return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncCopyTables, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TFuture TTableClient::TImpl::RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings) +TFuture TTableClient::TImpl::RenameTables(const TSession& session, const std::vector& renameItems, const TRenameTablesSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + + auto request = MakeOperationRequest(settings); + request.set_session_id(TStringType{session.GetId()}); + + for (const auto& item: renameItems) { + auto add = request.add_tables(); + add->set_source_path(TStringType{item.SourcePath()}); + add->set_destination_path(TStringType{item.DestinationPath()}); + add->set_replace_destination(item.ReplaceDestination()); + } + return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncRenameTables, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TFuture TTableClient::TImpl::DropTable(const std::string& sessionId, const std::string& path, const TDropTableSettings& settings) { +TFuture TTableClient::TImpl::DropTable(const TSession& session, const std::string& path, const TDropTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{sessionId}); + request.set_session_id(TStringType{session.GetId()}); request.set_path(TStringType{path}); return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncDropTable, - TRpcRequestSettings::Make(settings)); + rpcSettings); } -TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const std::string& sessionId, const std::string& path, const TDescribeTableSettings& settings) { +TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const TSession& session, const std::string& path, const TDescribeTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{sessionId}); + request.set_session_id(TStringType{session.GetId()}); request.set_path(TStringType{path}); if (settings.WithKeyShardBoundary_) { request.set_include_shard_key_bounds(true); @@ -566,18 +601,23 @@ TAsyncDescribeTableResult TTableClient::TImpl::DescribeTable(const std::string& &Ydb::Table::V1::TableService::Stub::AsyncDescribeTable, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); + rpcSettings); return promise.GetFuture(); } -TAsyncDescribeExternalDataSourceResult TTableClient::TImpl::DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings) { +TAsyncDescribeExternalDataSourceResult TTableClient::TImpl::DescribeExternalDataSource(const TSession& session, + const std::string& path, const TDescribeExternalDataSourceSettings& settings) +{ + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_path(path); auto promise = NewPromise(); - auto extractor = [promise, settings](google::protobuf::Any* any, TPlainStatus status) mutable { + auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::Table::DescribeExternalDataSourceResult proto; if (any) { any->UnpackTo(&proto); @@ -593,19 +633,24 @@ TAsyncDescribeExternalDataSourceResult TTableClient::TImpl::DescribeExternalData &Ydb::Table::V1::TableService::Stub::AsyncDescribeExternalDataSource, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings) + rpcSettings ); return promise.GetFuture(); } -TAsyncDescribeExternalTableResult TTableClient::TImpl::DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings) { +TAsyncDescribeExternalTableResult TTableClient::TImpl::DescribeExternalTable(const TSession& session, + const std::string& path, const TDescribeExternalTableSettings& settings) +{ + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_path(path); auto promise = NewPromise(); - auto extractor = [promise, settings](google::protobuf::Any* any, TPlainStatus status) mutable { + auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::Table::DescribeExternalTableResult proto; if (any) { any->UnpackTo(&proto); @@ -621,21 +666,22 @@ TAsyncDescribeExternalTableResult TTableClient::TImpl::DescribeExternalTable(con &Ydb::Table::V1::TableService::Stub::AsyncDescribeExternalTable, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings) + rpcSettings ); return promise.GetFuture(); } -TAsyncDescribeSystemViewResult TTableClient::TImpl::DescribeSystemView(const std::string& path, - const TDescribeSystemViewSettings& settings) -{ +TAsyncDescribeSystemViewResult TTableClient::TImpl::DescribeSystemView(const TSession& session, const std::string& path, const TDescribeSystemViewSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_path(path); auto promise = NewPromise(); - auto extractor = [promise, settings](google::protobuf::Any* any, TPlainStatus status) mutable { + auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::Table::DescribeSystemViewResult proto; if (any) { any->UnpackTo(&proto); @@ -651,7 +697,7 @@ TAsyncDescribeSystemViewResult TTableClient::TImpl::DescribeSystemView(const std &Ydb::Table::V1::TableService::Stub::AsyncDescribeSystemView, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings) + rpcSettings ); return promise.GetFuture(); @@ -660,6 +706,9 @@ TAsyncDescribeSystemViewResult TTableClient::TImpl::DescribeSystemView(const std TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& session, const std::string& query, const TPrepareDataQuerySettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); request.set_yql_text(TStringType{query}); @@ -696,28 +745,35 @@ TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& s &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) - ); + rpcSettings + ); return promise.GetFuture(); } -TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const std::string& sessionId, const std::string& query, +TAsyncStatus TTableClient::TImpl::ExecuteSchemeQuery(const TSession& session, const std::string& query, const TExecSchemeQuerySettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{sessionId}); + request.set_session_id(TStringType{session.GetId()}); request.set_yql_text(TStringType{query}); return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncExecuteSchemeQuery, - TRpcRequestSettings::Make(settings)); + rpcSettings + ); } TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSession& session, const TTxSettings& txSettings, const TBeginTxSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); SetTxSettings(txSettings, request.mutable_tx_settings()); @@ -744,8 +800,8 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) - ); + rpcSettings + ); return promise.GetFuture(); } @@ -753,6 +809,9 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSession& session, const std::string& txId, const TCommitTxSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); @@ -782,8 +841,8 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) - ); + rpcSettings + ); return promise.GetFuture(); } @@ -791,6 +850,9 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, const std::string& txId, const TRollbackTxSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); request.set_tx_id(TStringType{txId}); @@ -798,13 +860,16 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) - ); + rpcSettings + ); } TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const std::string& query, const TExplainDataQuerySettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{session.GetId()}); request.set_yql_text(TStringType{query}); @@ -835,8 +900,8 @@ TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSessio &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) - ); + rpcSettings + ); return promise.GetFuture(); } @@ -847,12 +912,15 @@ void TTableClient::TImpl::SetTypedValue(Ydb::TypedValue* protoValue, const TValu } NThreading::TFuture> TTableClient::TImpl::ReadTable( - const std::string& sessionId, + const TSession& session, const std::string& path, const TReadTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + .TryUpdateDeadline(session.GetPropagatedDeadline()); + auto request = MakeRequest(); - request.set_session_id(TStringType{sessionId}); + request.set_session_id(TStringType{session.GetId()}); request.set_path(TStringType{path}); request.set_ordered(settings.Ordered_); if (settings.RowLimit_) { @@ -910,7 +978,7 @@ NThreading::TFutureGetEndpointKey()) + .TryUpdateDeadline(sessionImpl->PropagatedDeadline_); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{sessionImpl->GetId()}); return RunSimple( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession, - TRpcRequestSettings::Make(settings, sessionImpl->GetEndpointKey()) - ); + rpcSettings + ); } TAsyncStatus TTableClient::TImpl::CloseInternal(const TKqpSessionCommon* sessionImpl) { diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 68a20de3d9..8fe71287f3 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -60,18 +60,18 @@ class TTableClient::TImpl: public TClientImplCommon, public bool standalone); TAsyncKeepAliveResult KeepAlive(const TSession::TImpl* session, const TKeepAliveSettings& settings); - TFuture CreateTable(Ydb::Table::CreateTableRequest&& request, const TCreateTableSettings& settings); - TFuture AlterTable(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings); - TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TAlterTableSettings& settings); - TFuture CopyTable(const std::string& sessionId, const std::string& src, const std::string& dst, - const TCopyTableSettings& settings); - TFuture CopyTables(Ydb::Table::CopyTablesRequest&& request, const TCopyTablesSettings& settings); - TFuture RenameTables(Ydb::Table::RenameTablesRequest&& request, const TRenameTablesSettings& settings); - TFuture DropTable(const std::string& sessionId, const std::string& path, const TDropTableSettings& settings); - TAsyncDescribeTableResult DescribeTable(const std::string& sessionId, const std::string& path, const TDescribeTableSettings& settings); - TAsyncDescribeExternalDataSourceResult DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings); - TAsyncDescribeExternalTableResult DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings); - TAsyncDescribeSystemViewResult DescribeSystemView(const std::string& path, const TDescribeSystemViewSettings& settings); + TFuture CreateTable(Ydb::Table::CreateTableRequest&& request, const TRpcRequestSettings& rpcSettings); + TFuture AlterTable(Ydb::Table::AlterTableRequest&& request, const TRpcRequestSettings& rpcSettings); + TAsyncOperation AlterTableLong(Ydb::Table::AlterTableRequest&& request, const TRpcRequestSettings& rpcSettings); + + TFuture CopyTable(const TSession& session, const std::string& src, const std::string& dst, const TCopyTableSettings& settings); + TFuture CopyTables(const TSession& session, const std::vector& copyItems, const TCopyTablesSettings& settings); + TFuture RenameTables(const TSession& session, const std::vector& renameItems, const TRenameTablesSettings& settings); + TFuture DropTable(const TSession& session, const std::string& path, const TDropTableSettings& settings); + TAsyncDescribeTableResult DescribeTable(const TSession& session, const std::string& path, const TDescribeTableSettings& settings); + TAsyncDescribeExternalDataSourceResult DescribeExternalDataSource(const TSession& session, const std::string& path, const TDescribeExternalDataSourceSettings& settings); + TAsyncDescribeExternalTableResult DescribeExternalTable(const TSession& session, const std::string& path, const TDescribeExternalTableSettings& settings); + TAsyncDescribeSystemViewResult DescribeSystemView(const TSession& session, const std::string& path, const TDescribeSystemViewSettings& settings); template TAsyncDataQueryResult ExecuteDataQuery(TSession& session, const std::string& query, const TTxControl& txControl, @@ -110,7 +110,7 @@ class TTableClient::TImpl: public TClientImplCommon, public TAsyncPrepareQueryResult PrepareDataQuery(const TSession& session, const std::string& query, const TPrepareDataQuerySettings& settings); - TAsyncStatus ExecuteSchemeQuery(const std::string& sessionId, const std::string& query, + TAsyncStatus ExecuteSchemeQuery(const TSession& session, const std::string& query, const TExecSchemeQuerySettings& settings); TAsyncBeginTransactionResult BeginTransaction(const TSession& session, const TTxSettings& txSettings, @@ -126,7 +126,7 @@ class TTableClient::TImpl: public TClientImplCommon, public static void SetTypedValue(Ydb::TypedValue* protoValue, const TValue& value); NThreading::TFuture> ReadTable( - const std::string& sessionId, + const TSession& session, const std::string& path, const TReadTableSettings& settings); TAsyncReadRowsResult ReadRows(const std::string& path, TValue&& keys, const std::vector& columns, const TReadRowsSettings& settings); diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 0731bf8a98..c67eba0975 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1677,6 +1677,9 @@ TSession::TSession(std::shared_ptr client, std::shared_ptr< TFuture TSession::CreateTable(const std::string& path, TTableDescription&& tableDesc, const TCreateTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(GetPropagatedDeadline()); + auto request = MakeOperationRequest(settings); request.set_session_id(TStringType{SessionImpl_->GetId()}); request.set_path(TStringType{path}); @@ -1687,7 +1690,7 @@ TFuture TSession::CreateTable(const std::string& path, TTableDescriptio return InjectSessionStatusInterception( SessionImpl_, - Client_->CreateTable(std::move(request), settings), + Client_->CreateTable(std::move(request), rpcSettings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } @@ -1695,7 +1698,7 @@ TFuture TSession::CreateTable(const std::string& path, TTableDescriptio TFuture TSession::DropTable(const std::string& path, const TDropTableSettings& settings) { return InjectSessionStatusInterception( SessionImpl_, - Client_->DropTable(SessionImpl_->GetId(), path, settings), + Client_->DropTable(*this, path, settings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } @@ -1793,57 +1796,43 @@ static Ydb::Table::AlterTableRequest MakeAlterTableProtoRequest( } TAsyncStatus TSession::AlterTable(const std::string& path, const TAlterTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(GetPropagatedDeadline()); + auto request = MakeAlterTableProtoRequest(path, settings, SessionImpl_->GetId()); return InjectSessionStatusInterception( SessionImpl_, - Client_->AlterTable(std::move(request), settings), + Client_->AlterTable(std::move(request), rpcSettings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } TAsyncOperation TSession::AlterTableLong(const std::string& path, const TAlterTableSettings& settings) { + auto rpcSettings = TRpcRequestSettings::Make(settings) + .TryUpdateDeadline(GetPropagatedDeadline()); + auto request = MakeAlterTableProtoRequest(path, settings, SessionImpl_->GetId()); return InjectSessionStatusInterception( SessionImpl_, - Client_->AlterTableLong(std::move(request), settings), + Client_->AlterTableLong(std::move(request), rpcSettings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } TAsyncStatus TSession::RenameTables(const std::vector& renameItems, const TRenameTablesSettings& settings) { - auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{SessionImpl_->GetId()}); - - for (const auto& item: renameItems) { - auto add = request.add_tables(); - add->set_source_path(TStringType{item.SourcePath()}); - add->set_destination_path(TStringType{item.DestinationPath()}); - add->set_replace_destination(item.ReplaceDestination()); - } - return InjectSessionStatusInterception( SessionImpl_, - Client_->RenameTables(std::move(request), settings), + Client_->RenameTables(*this, renameItems, settings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } TAsyncStatus TSession::CopyTables(const std::vector& copyItems, const TCopyTablesSettings& settings) { - auto request = MakeOperationRequest(settings); - request.set_session_id(TStringType{SessionImpl_->GetId()}); - - for (const auto& item: copyItems) { - auto add = request.add_tables(); - add->set_source_path(TStringType{item.SourcePath()}); - add->set_destination_path(TStringType{item.DestinationPath()}); - add->set_omit_indexes(item.OmitIndexes()); - } - return InjectSessionStatusInterception( SessionImpl_, - Client_->CopyTables(std::move(request), settings), + Client_->CopyTables(*this, copyItems, settings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } @@ -1851,27 +1840,27 @@ TAsyncStatus TSession::CopyTables(const std::vector& copyItems, const TFuture TSession::CopyTable(const std::string& src, const std::string& dst, const TCopyTableSettings& settings) { return InjectSessionStatusInterception( SessionImpl_, - Client_->CopyTable(SessionImpl_->GetId(), src, dst, settings), + Client_->CopyTable(*this, src, dst, settings), false, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } TAsyncDescribeTableResult TSession::DescribeTable(const std::string& path, const TDescribeTableSettings& settings) { - return Client_->DescribeTable(SessionImpl_->GetId(), path, settings); + return Client_->DescribeTable(*this, path, settings); } TAsyncDescribeExternalDataSourceResult TSession::DescribeExternalDataSource(const std::string& path, const TDescribeExternalDataSourceSettings& settings) { - return Client_->DescribeExternalDataSource(path, settings); + return Client_->DescribeExternalDataSource(*this, path, settings); } TAsyncDescribeExternalTableResult TSession::DescribeExternalTable(const std::string& path, const TDescribeExternalTableSettings& settings) { - return Client_->DescribeExternalTable(path, settings); + return Client_->DescribeExternalTable(*this, path, settings); } TAsyncDescribeSystemViewResult TSession::DescribeSystemView(const std::string& path, const TDescribeSystemViewSettings& settings) { - return Client_->DescribeSystemView(path, settings); + return Client_->DescribeSystemView(*this, path, settings); } TAsyncDataQueryResult TSession::ExecuteDataQuery(const std::string& query, const TTxControl& txControl, @@ -1929,7 +1918,7 @@ TAsyncPrepareQueryResult TSession::PrepareDataQuery(const std::string& query, co TAsyncStatus TSession::ExecuteSchemeQuery(const std::string& query, const TExecSchemeQuerySettings& settings) { return InjectSessionStatusInterception( SessionImpl_, - Client_->ExecuteSchemeQuery(SessionImpl_->GetId(), query, settings), + Client_->ExecuteSchemeQuery(*this, query, settings), true, GetMinTimeToTouch(Client_->Settings_.SessionPoolSettings_)); } @@ -1966,7 +1955,7 @@ TAsyncTablePartIterator TSession::ReadTable(const std::string& path, pair.second, pair.first.Endpoint) : nullptr, std::move(pair.first)) ); }; - Client_->ReadTable(SessionImpl_->GetId(), path, settings).Subscribe(readTableIteratorBuilder); + Client_->ReadTable(*this, path, settings).Subscribe(readTableIteratorBuilder); return InjectSessionStatusInterception( SessionImpl_, promise.GetFuture(), @@ -2006,6 +1995,14 @@ const std::string& TSession::GetId() const { return SessionImpl_->GetId(); } +const std::optional& TSession::GetPropagatedDeadline() const { + return SessionImpl_->PropagatedDeadline_; +} + +void TSession::SetPropagatedDeadline(const TDeadline& deadline) { + SessionImpl_->PropagatedDeadline_ = deadline; +} + //////////////////////////////////////////////////////////////////////////////// TTxControl::TTxControl(const TTransaction& tx) diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index aec1a0e238..9913397563 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -242,6 +242,8 @@ void TSingleClusterReadSessionImpl::TDecompressionQueueIte template TSingleClusterReadSessionImpl::~TSingleClusterReadSessionImpl() { + std::lock_guard guard(Lock); + for (auto&& [_, partitionStream] : PartitionStreams) { partitionStream->ClearQueue(); } @@ -3049,7 +3051,7 @@ void TDataDecompressionInfo::TDecompressionTask::operator( ) { const ICodec* codecImpl = TCodecMap::GetTheCodecMap().GetOrThrow(static_cast(data.codec())); std::string decompressed = codecImpl->Decompress(data.data()); - data.set_data(TStringType{decompressed}); + data.set_data(TStringType{std::move(decompressed)}); data.set_codec(Ydb::PersQueue::V1::CODEC_RAW); } } else { @@ -3059,7 +3061,7 @@ void TDataDecompressionInfo::TDecompressionTask::operator( ) { const ICodec* codecImpl = TCodecMap::GetTheCodecMap().GetOrThrow(static_cast(batch.codec())); std::string decompressed = codecImpl->Decompress(data.data()); - data.set_data(TStringType{decompressed}); + data.set_data(TStringType{std::move(decompressed)}); } } } catch (...) { diff --git a/src/client/types/core_facility/core_facility.h b/src/client/types/core_facility/core_facility.h index 6d3a3035a5..98689e810f 100644 --- a/src/client/types/core_facility/core_facility.h +++ b/src/client/types/core_facility/core_facility.h @@ -3,7 +3,7 @@ #include #include -#include +#include namespace NYdb::inline V3 { using TPeriodicCb = std::function; diff --git a/src/client/types/status/status.cpp b/src/client/types/status/status.cpp index c898a1a4af..137c41bd6c 100644 --- a/src/client/types/status/status.cpp +++ b/src/client/types/status/status.cpp @@ -119,6 +119,6 @@ void ThrowOnErrorOrPrintIssues(TStatus status) { }); } -} +} // namespace NStatusHelpers } // namespace NYdb diff --git a/src/library/grpc/client/grpc_client_low.h b/src/library/grpc/client/grpc_client_low.h index 32fff617ae..37a0999336 100644 --- a/src/library/grpc/client/grpc_client_low.h +++ b/src/library/grpc/client/grpc_client_low.h @@ -4,7 +4,7 @@ #include -#include +#include #include #include diff --git a/src/library/time/time.cpp b/src/library/time/time.cpp index cb43014103..58f3a96617 100644 --- a/src/library/time/time.cpp +++ b/src/library/time/time.cpp @@ -1,4 +1,4 @@ -#include "time.h" +#include namespace NYdb::inline V3 { diff --git a/tests/unit/library/time/time_ut.cpp b/tests/unit/library/time/time_ut.cpp index 5c7890adef..e6d08f1e8b 100644 --- a/tests/unit/library/time/time_ut.cpp +++ b/tests/unit/library/time/time_ut.cpp @@ -1,4 +1,4 @@ -#include +#include #include