diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 409940768f2..a45fd52cc58 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -23 +24 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 180c6e5e284..9e9b2a1c928 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -9563c7b171619687436257273b34ceba261903ed +110937321b45fb3be9ca39ff3bb83bcdfcf52c97 diff --git a/include/ydb-cpp-sdk/client/discovery/discovery.h b/include/ydb-cpp-sdk/client/discovery/discovery.h index dcb5ffa778f..91be5831f25 100644 --- a/include/ydb-cpp-sdk/client/discovery/discovery.h +++ b/include/ydb-cpp-sdk/client/discovery/discovery.h @@ -66,15 +66,7 @@ struct TEndpointInfo { }; struct TPileState { - enum EState { - UNSPECIFIED = 0 /* "unspecified" */, - PRIMARY = 1 /* "primary" */, - PROMOTED = 2 /* "promoted" */, - SYNCHRONIZED = 3 /* "synchronized" */, - NOT_SYNCHRONIZED = 4 /* "not_synchronized" */, - SUSPENDED = 5 /* "suspended" */, - DISCONNECTED = 6 /* "disconnected" */ - }; + using EState = NYdb::EPileState; EState State; std::string PileName; diff --git a/include/ydb-cpp-sdk/client/driver/driver.h b/include/ydb-cpp-sdk/client/driver/driver.h index ac07326994c..6cb1a3a42a1 100644 --- a/include/ydb-cpp-sdk/client/driver/driver.h +++ b/include/ydb-cpp-sdk/client/driver/driver.h @@ -26,16 +26,20 @@ class TDriverConfig { //! where "://" can be "grpc://" or "grpcs://" or be absent, "" is endpoint, //! "/?database=" is optional TDriverConfig(const std::string& connectionString = ""); + //! Endpoint to initiate connections with Ydb cluster, //! client will connect to others nodes according to client loadbalancing TDriverConfig& SetEndpoint(const std::string& endpoint); + //! Set number of network threads, default: 2 TDriverConfig& SetNetworkThreadsNum(size_t sz); + //! Set number of client pool threads, if 0 adaptive thread pool will be used. //! NOTE: in case of no zero value it is possible to get deadlock if all threads //! of this pool is blocked somewhere in user code. //! default: 0 TDriverConfig& SetClientThreadsNum(size_t sz); + //! Warning: not recommended to change //! Set max number of queued responses. 0 - no limit //! There is a queue to perform async calls to user code, @@ -46,29 +50,37 @@ class TDriverConfig { //! This value doesn't make sense if SetClientThreadsNum is 0 //! default: 0 TDriverConfig& SetMaxClientQueueSize(size_t sz); + //! Enable Ssl. //! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections. //! If this parameter is empty, the default roots will be used. TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string()); TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel); TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey); + //! Set token, this option can be overridden for client by ClientSettings TDriverConfig& SetAuthToken(const std::string& token); + //! Set database, this option can be overridden for client by ClientSettings TDriverConfig& SetDatabase(const std::string& database); + //! Set credentials data, this option can be overridden for client by ClientSettings TDriverConfig& SetCredentialsProviderFactory(std::shared_ptr credentialsProviderFactory); + //! Set behaviour of discovery routine //! See EDiscoveryMode enum comments //! default: EDiscoveryMode::Sync TDriverConfig& SetDiscoveryMode(EDiscoveryMode discoveryMode); + //! Max number of requests in queue waiting for discovery if "Async" mode chosen //! default: 100 TDriverConfig& SetMaxQueuedRequests(size_t sz); + //! Limit using of memory for grpc buffer pool. 0 means disabled. //! If enabled the size must be greater than size of recieved message. //! default: 0 TDriverConfig& SetGrpcMemoryQuota(uint64_t bytes); + //! Specify tcp keep alive settings //! This option allows to adjust tcp keep alive settings, useful to work //! with balancers or to detect unexpected connectivity problem. @@ -83,12 +95,20 @@ class TDriverConfig { //! NOTE: Please read OS documentation and investigate your network topology before touching this option. //! default: true, 30, 5, 10 for linux, and true and OS default for others POSIX TDriverConfig& SetTcpKeepAliveSettings(bool enable, size_t idle, size_t count, size_t interval); + //! Enable or disable drain of client logic (e.g. session pool drain) during dtor call TDriverConfig& SetDrainOnDtors(bool allowed); + + //! Set policy for balancing + //! default: TBalancingPolicy::UsePreferableLocation() + TDriverConfig& SetBalancingPolicy(TBalancingPolicy&& policy); + + //! DEPRECATED //! Set policy for balancing //! Params is a optionally field to set policy settings //! default: EBalancingPolicy::UsePreferableLocation TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string()); + //! Set grpc level keep alive. If keepalive ping was delayed more than given timeout //! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error //! Note: this timeout should not be too small to prevent fail due to @@ -97,6 +117,7 @@ class TDriverConfig { //! default: enabled, 10 seconds TDriverConfig& SetGRpcKeepAliveTimeout(TDuration timeout); TDriverConfig& SetGRpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls); + //! Set inactive socket timeout. //! Used to close connections, that were inactive for given time. //! Closes unused connections every 1/10 of timeout, so deletion time is approximate. @@ -107,11 +128,14 @@ class TDriverConfig { //! Set maximum incoming message size. //! Note: this option overrides MaxMessageSize for incoming messages. //! default: 0 + TDriverConfig& SetMaxInboundMessageSize(uint64_t maxInboundMessageSize); + //! Set maximum outgoing message size. //! Note: this option overrides MaxMessageSize for outgoing messages. //! default: 0 TDriverConfig& SetMaxOutboundMessageSize(uint64_t maxOutboundMessageSize); + //! Note: if this option is unset, default 64_MB message size will be used. //! default: 0 TDriverConfig& SetMaxMessageSize(uint64_t maxMessageSize); diff --git a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h index f8ed9f38611..e7024d72103 100644 --- a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h +++ b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h @@ -74,10 +74,11 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { auto resultPromise = NThreading::NewPromise(); auto response = std::make_shared(); + auto context = std::make_shared(); std::shared_ptr self = TGrpcIamCredentialsProvider::TImpl::shared_from_this(); - auto cb = [self, sync, resultPromise, response] (grpc::Status status) mutable { + auto cb = [self, sync, resultPromise, response, context] (grpc::Status status) mutable { self->ProcessIamResponse(std::move(status), std::move(*response), sync); resultPromise.SetValue(); }; @@ -86,18 +87,16 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { RequestFiller_(req); - Context_ = std::make_unique(); - auto deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_micros(IamEndpoint_.RequestTimeout.MicroSeconds(), GPR_TIMESPAN)); - Context_->set_deadline(deadline); + context->set_deadline(deadline); if (AuthTokenProvider_) { - Context_->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo()); + context->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo()); } - (Stub_->async()->*Rpc_)(Context_.get(), &req, response.get(), std::move(cb)); + (Stub_->async()->*Rpc_)(context.get(), &req, response.get(), std::move(cb)); if (sync) { resultPromise.GetFuture().Wait(2 * IamEndpoint_.RequestTimeout); @@ -132,7 +131,6 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { private: void ProcessIamResponse(grpc::Status&& status, TResponse&& result, bool sync) { - Context_.reset(); if (!status.ok()) { TDuration sleepDuration; { @@ -171,7 +169,6 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { std::shared_ptr Channel_; std::shared_ptr Stub_; TAsyncRpc Rpc_; - std::unique_ptr Context_; std::string Ticket_; TInstant NextTicketUpdate_; diff --git a/include/ydb-cpp-sdk/client/operation/operation.h b/include/ydb-cpp-sdk/client/operation/operation.h index 6416f6cc81b..b069d7bd92c 100644 --- a/include/ydb-cpp-sdk/client/operation/operation.h +++ b/include/ydb-cpp-sdk/client/operation/operation.h @@ -56,11 +56,11 @@ class TOperationClient { TAsyncStatus Forget(const TOperation::TOperationId& id); template - NThreading::TFuture> List(ui64 pageSize = 0, const std::string& pageToken = std::string()); + NThreading::TFuture> List(std::uint64_t pageSize = 0, const std::string& pageToken = std::string()); private: template - NThreading::TFuture> List(const std::string& kind, ui64 pageSize, const std::string& pageToken); + NThreading::TFuture> List(const std::string& kind, std::uint64_t pageSize, const std::string& pageToken); private: std::shared_ptr Impl_; diff --git a/include/ydb-cpp-sdk/client/topic/read_events.h b/include/ydb-cpp-sdk/client/topic/read_events.h index 66e8de865b6..7dc550e49c8 100644 --- a/include/ydb-cpp-sdk/client/topic/read_events.h +++ b/include/ydb-cpp-sdk/client/topic/read_events.h @@ -102,6 +102,7 @@ struct TReadSessionEvent { virtual ~TMessageBase() = default; virtual const std::string& GetData() const; + virtual const std::string& GetBrokenData() const; virtual void Commit() = 0; @@ -144,6 +145,8 @@ struct TReadSessionEvent { //! User data. //! Throws decompressor exception if decompression failed. const std::string& GetData() const override; + //! Throws exception if decompression succeeded. + const std::string& GetBrokenData() const override; //! Commits single message. void Commit() override; diff --git a/include/ydb-cpp-sdk/client/types/ydb.h b/include/ydb-cpp-sdk/client/types/ydb.h index 3ae912fadf5..53c767ebc93 100644 --- a/include/ydb-cpp-sdk/client/types/ydb.h +++ b/include/ydb-cpp-sdk/client/types/ydb.h @@ -3,6 +3,10 @@ #include "fwd.h" #include "status_codes.h" +#include +#include + + namespace NYdb::inline V3 { enum class EDiscoveryMode { @@ -22,12 +26,47 @@ enum class EDiscoveryMode { Off }; +//! @deprecated Use TBalancingPolicy instead enum class EBalancingPolicy { //! Use all available cluster nodes regardless datacenter locality UseAllNodes, //! Use preferable location, //! params is a name of location (VLA, MAN), if params is empty local datacenter is used - UsePreferableLocation + UsePreferableLocation, +}; + +enum EPileState { + UNSPECIFIED = 0 /* "unspecified" */, + PRIMARY = 1 /* "primary" */, + PROMOTED = 2 /* "promoted" */, + SYNCHRONIZED = 3 /* "synchronized" */, + NOT_SYNCHRONIZED = 4 /* "not_synchronized" */, + SUSPENDED = 5 /* "suspended" */, + DISCONNECTED = 6 /* "disconnected" */ +}; + +class TBalancingPolicy { + friend class TDriverConfig; + friend class TDriver; +public: + //! Use preferable location, + //! location is a name of datacenter (VLA, MAN), if location is empty local datacenter is used + static TBalancingPolicy UsePreferableLocation(const std::string& location = {}); + + //! Use all available cluster nodes regardless datacenter locality + static TBalancingPolicy UseAllNodes(); + + //! EXPERIMENTAL + //! Use pile with preferable state + static TBalancingPolicy UsePreferablePileState(EPileState pileState = EPileState::PRIMARY); + + class TImpl; +private: + TBalancingPolicy(std::unique_ptr&& impl); + + TBalancingPolicy(EBalancingPolicy policy, const std::string& params); + + std::unique_ptr Impl_; }; } // namespace NYdb diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 300dd73185d..544f432e1fa 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -79,19 +79,19 @@ message VectorIndexSettings { VECTOR_TYPE_BIT = 4; } - Metric metric = 1; + optional Metric metric = 1; - VectorType vector_type = 2; + optional VectorType vector_type = 2; - uint32 vector_dimension = 3; + optional uint32 vector_dimension = 3; } message KMeansTreeSettings { VectorIndexSettings settings = 1; - // average count of clusters on each level of tree, 0 -- means auto - uint32 clusters = 2; - // average count of levels in the tree, 0 -- means auto - uint32 levels = 3; + + optional uint32 clusters = 2; + + optional uint32 levels = 3; } message GlobalIndex { diff --git a/src/client/common_client/impl/client.h b/src/client/common_client/impl/client.h index a24667204cc..da8c8306dbe 100644 --- a/src/client/common_client/impl/client.h +++ b/src/client/common_client/impl/client.h @@ -3,7 +3,7 @@ #include "iface.h" #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/common_client/settings.cpp b/src/client/common_client/settings.cpp index d8928a566f5..0a6ddc76a4a 100644 --- a/src/client/common_client/settings.cpp +++ b/src/client/common_client/settings.cpp @@ -1,6 +1,6 @@ #include -#include +#include namespace NYdb::inline V3 { diff --git a/src/client/coordination/coordination.cpp b/src/client/coordination/coordination.cpp index 080cd186599..e6dc2a9cc1c 100644 --- a/src/client/coordination/coordination.cpp +++ b/src/client/coordination/coordination.cpp @@ -1,8 +1,8 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/datastreams/datastreams.cpp b/src/client/datastreams/datastreams.cpp index d7c571ef67c..4bfffad0f4d 100644 --- a/src/client/datastreams/datastreams.cpp +++ b/src/client/datastreams/datastreams.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/driver/driver.cpp b/src/client/driver/driver.cpp index 0ce0ef260d8..a6df6e8789b 100644 --- a/src/client/driver/driver.cpp +++ b/src/client/driver/driver.cpp @@ -1,14 +1,14 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include -#include +#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include -#include -#include +#include +#include #include #include #include @@ -41,7 +41,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { size_t GetMaxQueuedRequests() const override { return MaxQueuedRequests; } TTcpKeepAliveSettings GetTcpKeepAliveSettings() const override { return TcpKeepAliveSettings; } bool GetDrinOnDtors() const override { return DrainOnDtors; } - TBalancingSettings GetBalancingSettings() const override { return BalancingSettings; } + TBalancingPolicy::TImpl GetBalancingSettings() const override { return BalancingSettings; } TDuration GetGRpcKeepAliveTimeout() const override { return GRpcKeepAliveTimeout; } bool GetGRpcKeepAlivePermitWithoutCalls() const override { return GRpcKeepAlivePermitWithoutCalls; } TDuration GetSocketIdleTimeout() const override { return SocketIdleTimeout; } @@ -69,7 +69,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { TCP_KEEPALIVE_INTERVAL }; bool DrainOnDtors = true; - TBalancingSettings BalancingSettings = TBalancingSettings{EBalancingPolicy::UsePreferableLocation, std::string()}; + TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl(""); TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10); bool GRpcKeepAlivePermitWithoutCalls = true; TDuration SocketIdleTimeout = TDuration::Minutes(6); @@ -170,11 +170,15 @@ TDriverConfig& TDriverConfig::SetDrainOnDtors(bool allowed) { return *this; } -TDriverConfig& TDriverConfig::SetBalancingPolicy(EBalancingPolicy policy, const std::string& params) { - Impl_->BalancingSettings = TBalancingSettings{policy, params}; +TDriverConfig& TDriverConfig::SetBalancingPolicy(TBalancingPolicy&& policy) { + Impl_->BalancingSettings = std::move(*policy.Impl_); return *this; } +TDriverConfig& TDriverConfig::SetBalancingPolicy(EBalancingPolicy policy, const std::string& params) { + return SetBalancingPolicy(TBalancingPolicy(policy, params)); +} + TDriverConfig& TDriverConfig::SetGRpcKeepAliveTimeout(TDuration timeout) { Impl_->GRpcKeepAliveTimeout = timeout; return *this; @@ -253,7 +257,7 @@ TDriverConfig TDriver::GetConfig() const { Impl_->TcpKeepAliveSettings_.Interval ); config.SetDrainOnDtors(Impl_->DrainOnDtors_); - config.SetBalancingPolicy(Impl_->BalancingSettings_.Policy, Impl_->BalancingSettings_.PolicyParams); + config.SetBalancingPolicy(std::make_unique(Impl_->BalancingSettings_)); config.SetGRpcKeepAliveTimeout(Impl_->GRpcKeepAliveTimeout_); config.SetGRpcKeepAlivePermitWithoutCalls(Impl_->GRpcKeepAlivePermitWithoutCalls_); config.SetSocketIdleTimeout(Impl_->SocketIdleTimeout_); diff --git a/src/client/export/export.cpp b/src/client/export/export.cpp index f0bdd85d239..342329018c2 100644 --- a/src/client/export/export.cpp +++ b/src/client/export/export.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/extension_common/extension.cpp b/src/client/extension_common/extension.cpp index 5ad050e1d1f..98d3fcfe6df 100644 --- a/src/client/extension_common/extension.cpp +++ b/src/client/extension_common/extension.cpp @@ -1,8 +1,8 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H namespace NYdb::inline V3 { diff --git a/src/client/federated_topic/impl/federated_read_session.cpp b/src/client/federated_topic/impl/federated_read_session.cpp index ca93e7eb7b1..0c71faf8930 100644 --- a/src/client/federated_topic/impl/federated_read_session.cpp +++ b/src/client/federated_topic/impl/federated_read_session.cpp @@ -5,7 +5,7 @@ #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/federated_topic/impl/federated_topic_impl.h b/src/client/federated_topic/impl/federated_topic_impl.h index 7638904dab0..9ebd774566d 100644 --- a/src/client/federated_topic/impl/federated_topic_impl.h +++ b/src/client/federated_topic/impl/federated_topic_impl.h @@ -1,7 +1,7 @@ #pragma once #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/federated_topic/impl/federated_write_session.cpp b/src/client/federated_topic/impl/federated_write_session.cpp index 217936058d7..b9c6b5f7c13 100644 --- a/src/client/federated_topic/impl/federated_write_session.cpp +++ b/src/client/federated_topic/impl/federated_write_session.cpp @@ -4,7 +4,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/federated_topic/impl/federation_observer.h b/src/client/federated_topic/impl/federation_observer.h index baf9aec4f5f..4499782c6ca 100644 --- a/src/client/federated_topic/impl/federation_observer.h +++ b/src/client/federated_topic/impl/federation_observer.h @@ -1,8 +1,8 @@ #pragma once #define INCLUDE_YDB_INTERNAL_H -#include -#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/helpers/helpers.cpp b/src/client/helpers/helpers.cpp index cd5e8b10c70..6bd4fe8f106 100644 --- a/src/client/helpers/helpers.cpp +++ b/src/client/helpers/helpers.cpp @@ -4,8 +4,8 @@ #include #include -#include -#include +#include +#include #include diff --git a/src/client/impl/CMakeLists.txt b/src/client/impl/CMakeLists.txt index 05e626b4d05..2cb7d4e142e 100644 --- a/src/client/impl/CMakeLists.txt +++ b/src/client/impl/CMakeLists.txt @@ -1,3 +1,4 @@ -add_subdirectory(ydb_endpoints) -add_subdirectory(ydb_internal) -add_subdirectory(ydb_stats) +add_subdirectory(endpoints) +add_subdirectory(internal) +add_subdirectory(session) +add_subdirectory(stats) diff --git a/src/client/impl/ydb_endpoints/CMakeLists.txt b/src/client/impl/endpoints/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_endpoints/CMakeLists.txt rename to src/client/impl/endpoints/CMakeLists.txt diff --git a/src/client/impl/ydb_endpoints/endpoints.cpp b/src/client/impl/endpoints/endpoints.cpp similarity index 89% rename from src/client/impl/ydb_endpoints/endpoints.cpp rename to src/client/impl/endpoints/endpoints.cpp index 40f9b8402fc..670874a45e3 100644 --- a/src/client/impl/ydb_endpoints/endpoints.cpp +++ b/src/client/impl/endpoints/endpoints.cpp @@ -10,11 +10,9 @@ namespace NYdb::inline V3 { -using std::string; - class TEndpointElectorSafe::TObjRegistry : public IObjRegistryHandle { public: - TObjRegistry(const ui64& nodeId) + TObjRegistry(const std::uint64_t& nodeId) : NodeId_(nodeId) {} @@ -41,13 +39,13 @@ class TEndpointElectorSafe::TObjRegistry : public IObjRegistryHandle { return Objs_.size(); } - ui64 GetNodeId() const { + std::uint64_t GetNodeId() const { return NodeId_; } private: std::set Objs_; - ui64 NodeId_; + std::uint64_t NodeId_; mutable std::shared_mutex Mutex_; }; @@ -55,12 +53,12 @@ class TEndpointElectorSafe::TObjRegistry : public IObjRegistryHandle { //////////////////////////////////////////////////////////////////////////////// // Returns index of last resord with same priority or -1 in case of empty input -static i32 GetBestK(const std::vector& records) { +static std::int32_t GetBestK(const std::vector& records) { if (records.empty()) { return -1; } - const i32 bestPriority = records[0].Priority; + const std::int32_t bestPriority = records[0].Priority; size_t pos = 1; while (pos < records.size()) { @@ -72,8 +70,8 @@ static i32 GetBestK(const std::vector& records) { return pos - 1; } -std::vector TEndpointElectorSafe::SetNewState(std::vector&& records) { - std::unordered_set index; +std::vector TEndpointElectorSafe::SetNewState(std::vector&& records) { + std::unordered_set index; std::vector uniqRec; for (auto&& record : records) { @@ -86,7 +84,7 @@ std::vector TEndpointElectorSafe::SetNewState(std::vector removed; + std::vector removed; std::vector> notifyRemoved; { @@ -161,20 +159,20 @@ TEndpointRecord TEndpointElectorSafe::GetEndpoint(const TEndpointKey& preferredE } // TODO: Suboptimal, but should not be used often -void TEndpointElectorSafe::PessimizeEndpoint(const string& endpoint) { +void TEndpointElectorSafe::PessimizeEndpoint(const std::string& endpoint) { std::unique_lock guard(Mutex_); for (auto& r : Records_) { - if (r.Endpoint == endpoint && r.Priority != Max()) { + if (r.Endpoint == endpoint && r.Priority != std::numeric_limits::max()) { int pessimizationRatio = PessimizationRatio_.load(); auto newRatio = (pessimizationRatio * Records_.size() + 100) / Records_.size(); PessimizationRatio_.store(newRatio); PessimizationRatioGauge_.SetValue(newRatio); EndpointActiveGauge_.Dec(); - r.Priority = Max(); + r.Priority = std::numeric_limits::max(); auto it = KnownEndpoints_.find(endpoint); if (it != KnownEndpoints_.end()) { - it->second.Priority = Max(); + it->second.Priority = std::numeric_limits::max(); } } } @@ -220,10 +218,10 @@ bool TEndpointElectorSafe::LinkObjToEndpoint(const TEndpointKey& endpoint, TEndp } } -void TEndpointElectorSafe::ForEachEndpoint(const THandleCb& cb, i32 minPriority, i32 maxPriority, const void* tag) const { +void TEndpointElectorSafe::ForEachEndpoint(const THandleCb& cb, std::int32_t minPriority, std::int32_t maxPriority, const void* tag) const { std::shared_lock guard(Mutex_); - auto it = std::lower_bound(Records_.begin(), Records_.end(), minPriority, [](const TEndpointRecord& l, i32 r) { + auto it = std::lower_bound(Records_.begin(), Records_.end(), minPriority, [](const TEndpointRecord& l, std::int32_t r) { return l.Priority < r; }); diff --git a/src/client/impl/ydb_endpoints/endpoints.h b/src/client/impl/endpoints/endpoints.h similarity index 84% rename from src/client/impl/ydb_endpoints/endpoints.h rename to src/client/impl/endpoints/endpoints.h index ec6225311e6..21b95d5fa2a 100644 --- a/src/client/impl/ydb_endpoints/endpoints.h +++ b/src/client/impl/endpoints/endpoints.h @@ -5,15 +5,15 @@ #include #include #include -#include +#include namespace NYdb::inline V3 { struct TEndpointRecord { std::string Endpoint; - i32 Priority; + std::int32_t Priority; std::string SslTargetNameOverride; - ui64 NodeId = 0; + std::uint64_t NodeId = 0; TEndpointRecord() : Endpoint() @@ -23,7 +23,7 @@ struct TEndpointRecord { { } - TEndpointRecord(std::string endpoint, i32 priority, std::string sslTargetNameOverride = std::string(), ui64 nodeId = 0) + TEndpointRecord(std::string endpoint, std::int32_t priority, std::string sslTargetNameOverride = std::string(), std::uint64_t nodeId = 0) : Endpoint(std::move(endpoint)) , Priority(priority) , SslTargetNameOverride(std::move(sslTargetNameOverride)) @@ -42,19 +42,19 @@ struct TEndpointRecord { struct TEndpointKey { std::string Endpoint; - ui64 NodeId = 0; + std::uint64_t NodeId = 0; TEndpointKey() : Endpoint() , NodeId(0) {} - TEndpointKey(std::string endpoint, ui64 nodeId) + TEndpointKey(std::string endpoint, std::uint64_t nodeId) : Endpoint(std::move(endpoint)) , NodeId(nodeId) {} - TEndpointKey(ui64 nodeId) + TEndpointKey(std::uint64_t nodeId) : Endpoint() , NodeId(nodeId) {} @@ -63,7 +63,7 @@ struct TEndpointKey { return Endpoint; } - const ui64& GetNodeId() const { + const std::uint64_t& GetNodeId() const { return NodeId; } @@ -105,8 +105,8 @@ class TEndpointElectorSafe { bool LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj* obj, const void* tag); // Perform some action for each object group associated with endpoint - using THandleCb = std::function; - void ForEachEndpoint(const THandleCb& cb, i32 minPriority, i32 maxPriority, const void* tag) const; + using THandleCb = std::function; + void ForEachEndpoint(const THandleCb& cb, std::int32_t minPriority, std::int32_t maxPriority, const void* tag) const; class TObjRegistry; private: @@ -122,7 +122,7 @@ class TEndpointElectorSafe { std::vector Records_; std::unordered_map KnownEndpoints_; std::unordered_map KnownEndpointsByNodeId_; - i32 BestK_ = -1; + std::int32_t BestK_ = -1; std::atomic_int PessimizationRatio_ = 0; NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> EndpointCountGauge_; NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> PessimizationRatioGauge_; diff --git a/src/client/impl/ydb_internal/CMakeLists.txt b/src/client/impl/internal/CMakeLists.txt similarity index 79% rename from src/client/impl/ydb_internal/CMakeLists.txt rename to src/client/impl/internal/CMakeLists.txt index 5c9ae68e698..5370c34f570 100644 --- a/src/client/impl/ydb_internal/CMakeLists.txt +++ b/src/client/impl/internal/CMakeLists.txt @@ -1,11 +1,9 @@ add_subdirectory(common) add_subdirectory(db_driver_state) add_subdirectory(grpc_connections) -add_subdirectory(kqp_session_common) add_subdirectory(logger) add_subdirectory(make_request) add_subdirectory(plain_status) add_subdirectory(retry) -add_subdirectory(session_pool) add_subdirectory(thread_pool) add_subdirectory(value_helpers) diff --git a/src/client/impl/ydb_internal/common/CMakeLists.txt b/src/client/impl/internal/common/CMakeLists.txt similarity index 92% rename from src/client/impl/ydb_internal/common/CMakeLists.txt rename to src/client/impl/internal/common/CMakeLists.txt index 0e21f8a082c..c6220cfa832 100644 --- a/src/client/impl/ydb_internal/common/CMakeLists.txt +++ b/src/client/impl/internal/common/CMakeLists.txt @@ -7,6 +7,7 @@ target_link_libraries(impl-ydb_internal-common PUBLIC ) target_sources(impl-ydb_internal-common PRIVATE + balancing_policies.cpp parser.cpp getenv.cpp client_pid.cpp diff --git a/src/client/impl/internal/common/balancing_policies.cpp b/src/client/impl/internal/common/balancing_policies.cpp new file mode 100644 index 00000000000..48431810815 --- /dev/null +++ b/src/client/impl/internal/common/balancing_policies.cpp @@ -0,0 +1,18 @@ +#define INCLUDE_YDB_INTERNAL_H +#include "balancing_policies.h" + +namespace NYdb::inline V3 { + +std::unique_ptr TBalancingPolicy::TImpl::UseAllNodes() { + return std::make_unique(); +} + +std::unique_ptr TBalancingPolicy::TImpl::UsePreferableLocation(const std::string& location) { + return std::make_unique(location); +} + +std::unique_ptr TBalancingPolicy::TImpl::UsePreferablePileState(EPileState pileState) { + return std::make_unique(pileState); +} + +} diff --git a/src/client/impl/internal/common/balancing_policies.h b/src/client/impl/internal/common/balancing_policies.h new file mode 100644 index 00000000000..01efe15c61c --- /dev/null +++ b/src/client/impl/internal/common/balancing_policies.h @@ -0,0 +1,49 @@ +#pragma once + +#include + +#include + +#include +#include + +namespace NYdb::inline V3 { + +class TBalancingPolicy::TImpl { +public: + enum class EPolicyType { + UseAllNodes, + UsePreferableLocation, + UsePreferablePileState + }; + + static std::unique_ptr UseAllNodes(); + + static std::unique_ptr UsePreferableLocation(const std::string& location); + + static std::unique_ptr UsePreferablePileState(EPileState pileState); + + TImpl() + : PolicyType(EPolicyType::UseAllNodes) + {} + + TImpl(const std::string& location) + : PolicyType(EPolicyType::UsePreferableLocation) + , Location(location) + {} + + TImpl(EPileState pileState) + : PolicyType(EPolicyType::UsePreferablePileState) + , PileState(pileState) + {} + + EPolicyType PolicyType; + + // UsePreferableLocation + std::string Location; + + // UsePreferablePileState + EPileState PileState; +}; + +} diff --git a/src/client/impl/ydb_internal/common/client_pid.cpp b/src/client/impl/internal/common/client_pid.cpp similarity index 100% rename from src/client/impl/ydb_internal/common/client_pid.cpp rename to src/client/impl/internal/common/client_pid.cpp diff --git a/src/client/impl/ydb_internal/common/client_pid.h b/src/client/impl/internal/common/client_pid.h similarity index 100% rename from src/client/impl/ydb_internal/common/client_pid.h rename to src/client/impl/internal/common/client_pid.h diff --git a/src/client/impl/ydb_internal/common/getenv.cpp b/src/client/impl/internal/common/getenv.cpp similarity index 100% rename from src/client/impl/ydb_internal/common/getenv.cpp rename to src/client/impl/internal/common/getenv.cpp diff --git a/src/client/impl/ydb_internal/common/getenv.h b/src/client/impl/internal/common/getenv.h similarity index 100% rename from src/client/impl/ydb_internal/common/getenv.h rename to src/client/impl/internal/common/getenv.h diff --git a/src/client/impl/ydb_internal/common/parser.cpp b/src/client/impl/internal/common/parser.cpp similarity index 100% rename from src/client/impl/ydb_internal/common/parser.cpp rename to src/client/impl/internal/common/parser.cpp diff --git a/src/client/impl/ydb_internal/common/parser.h b/src/client/impl/internal/common/parser.h similarity index 100% rename from src/client/impl/ydb_internal/common/parser.h rename to src/client/impl/internal/common/parser.h diff --git a/src/client/impl/ydb_internal/common/types.h b/src/client/impl/internal/common/types.h similarity index 72% rename from src/client/impl/ydb_internal/common/types.h rename to src/client/impl/internal/common/types.h index 7deb54f5118..6c32f5784e7 100644 --- a/src/client/impl/ydb_internal/common/types.h +++ b/src/client/impl/internal/common/types.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -16,9 +16,4 @@ namespace NYdb::inline V3 { using TSimpleCb = std::function; using TErrorCb = std::function; -struct TBalancingSettings { - EBalancingPolicy Policy; - std::string PolicyParams; -}; - } // namespace NYdb diff --git a/src/client/impl/ydb_internal/db_driver_state/CMakeLists.txt b/src/client/impl/internal/db_driver_state/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/db_driver_state/CMakeLists.txt rename to src/client/impl/internal/db_driver_state/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/db_driver_state/authenticator.cpp b/src/client/impl/internal/db_driver_state/authenticator.cpp similarity index 100% rename from src/client/impl/ydb_internal/db_driver_state/authenticator.cpp rename to src/client/impl/internal/db_driver_state/authenticator.cpp diff --git a/src/client/impl/ydb_internal/db_driver_state/authenticator.h b/src/client/impl/internal/db_driver_state/authenticator.h similarity index 91% rename from src/client/impl/ydb_internal/db_driver_state/authenticator.h rename to src/client/impl/internal/db_driver_state/authenticator.h index ff0ecb0fb26..f2229c235f8 100644 --- a/src/client/impl/ydb_internal/db_driver_state/authenticator.h +++ b/src/client/impl/internal/db_driver_state/authenticator.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/db_driver_state/endpoint_pool.cpp b/src/client/impl/internal/db_driver_state/endpoint_pool.cpp similarity index 66% rename from src/client/impl/ydb_internal/db_driver_state/endpoint_pool.cpp rename to src/client/impl/internal/db_driver_state/endpoint_pool.cpp index 50432a49bc3..1895f39069b 100644 --- a/src/client/impl/ydb_internal/db_driver_state/endpoint_pool.cpp +++ b/src/client/impl/internal/db_driver_state/endpoint_pool.cpp @@ -3,13 +3,10 @@ namespace NYdb::inline V3 { -using std::string; -using std::vector; - TEndpointPool::TEndpointPool(TListEndpointsResultProvider&& provider, const IInternalClient* client) : Provider_(provider) , LastUpdateTime_(TInstant::Zero().MicroSeconds()) - , BalancingSettings_(client->GetBalancingSettings()) + , BalancingPolicy_(client->GetBalancingSettings()) {} TEndpointPool::~TEndpointPool() { @@ -42,21 +39,23 @@ std::pair, bool> TEndpointPool::Updat } auto handler = [this](const TAsyncListEndpointsResult& future) { TListEndpointsResult result = future.GetValue(); - vector removed; + std::vector removed; if (result.DiscoveryStatus.Status == EStatus::SUCCESS) { - vector records; + std::vector records; // Is used to convert float to integer load factor // same integer values will be selected randomly. const float multiplicator = 10.0; - const auto& preferredLocation = GetPreferredLocation(result.Result.self_location()); + std::unordered_map pileStates; + for (const auto& pile : result.Result.pile_states()) { + pileStates[pile.pile_name()] = pile; + } + for (const auto& endpoint : result.Result.endpoints()) { - i32 loadFactor = (i32)(multiplicator * Min(LoadMax, Max(LoadMin, endpoint.load_factor()))); - ui64 nodeId = endpoint.node_id(); - if (BalancingSettings_.Policy != EBalancingPolicy::UseAllNodes) { - if (endpoint.location() != preferredLocation) { - // Location missmatch, shift this endpoint - loadFactor += GetLocalityShift(); - } + std::int32_t loadFactor = static_cast(multiplicator * std::min(LoadMax, std::max(LoadMin, endpoint.load_factor()))); + std::uint64_t nodeId = endpoint.node_id(); + if (!IsLocalEndpoint(endpoint, pileStates)) { + // Location mismatch, shift this endpoint + loadFactor += GetLocalityShift(); } std::string sslTargetNameOverride = endpoint.ssl_target_name_override(); @@ -135,7 +134,7 @@ TDuration TEndpointPool::TimeSinceLastUpdate() const { return TDuration::MicroSeconds(now - LastUpdateTime_.load()); } -void TEndpointPool::BanEndpoint(const string& endpoint) { +void TEndpointPool::BanEndpoint(const std::string& endpoint) { Elector_.PessimizeEndpoint(endpoint); } @@ -148,7 +147,7 @@ bool TEndpointPool::LinkObjToEndpoint(const TEndpointKey& endpoint, TEndpointObj } void TEndpointPool::ForEachEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const { - return Elector_.ForEachEndpoint(cb, 0, Max(), tag); + return Elector_.ForEachEndpoint(cb, 0, std::numeric_limits::max(), tag); } void TEndpointPool::ForEachLocalEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const { @@ -156,11 +155,11 @@ void TEndpointPool::ForEachLocalEndpoint(const TEndpointElectorSafe::THandleCb& } void TEndpointPool::ForEachForeignEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const { - return Elector_.ForEachEndpoint(cb, GetLocalityShift(), Max() - 1, tag); + return Elector_.ForEachEndpoint(cb, GetLocalityShift(), std::numeric_limits::max() - 1, tag); } -EBalancingPolicy TEndpointPool::GetBalancingPolicy() const { - return BalancingSettings_.Policy; +TBalancingPolicy::TImpl::EPolicyType TEndpointPool::GetBalancingPolicyType() const { + return BalancingPolicy_.PolicyType; } void TEndpointPool::SetStatCollector(NSdkStats::TStatCollector& statCollector) { @@ -170,22 +169,45 @@ void TEndpointPool::SetStatCollector(NSdkStats::TStatCollector& statCollector) { StatCollector_ = &statCollector; } -constexpr i32 TEndpointPool::GetLocalityShift() { +constexpr std::int32_t TEndpointPool::GetLocalityShift() { return LoadMax * Multiplicator; } -string TEndpointPool::GetPreferredLocation(const string& selfLocation) { - switch (BalancingSettings_.Policy) { - case EBalancingPolicy::UseAllNodes: - return {}; - case EBalancingPolicy::UsePreferableLocation: - if (BalancingSettings_.PolicyParams.empty()) { - return selfLocation; - } else { - return BalancingSettings_.PolicyParams; +bool TEndpointPool::IsLocalEndpoint(const Ydb::Discovery::EndpointInfo& endpoint, + const std::unordered_map& pileStates) const { + switch (BalancingPolicy_.PolicyType) { + case TBalancingPolicy::TImpl::EPolicyType::UseAllNodes: + return true; + case TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation: + return endpoint.location() == BalancingPolicy_.Location; + case TBalancingPolicy::TImpl::EPolicyType::UsePreferablePileState: + if (auto it = pileStates.find(endpoint.bridge_pile_name()); it != pileStates.end()) { + return GetPileState(it->second.state()) == BalancingPolicy_.PileState; } + return true; + } + return true; +} + +EPileState TEndpointPool::GetPileState(const Ydb::Bridge::PileState::State& state) const { + switch (state) { + case Ydb::Bridge::PileState::PRIMARY: + return EPileState::PRIMARY; + case Ydb::Bridge::PileState::PROMOTED: + return EPileState::PROMOTED; + case Ydb::Bridge::PileState::SYNCHRONIZED: + return EPileState::SYNCHRONIZED; + case Ydb::Bridge::PileState::NOT_SYNCHRONIZED: + return EPileState::NOT_SYNCHRONIZED; + case Ydb::Bridge::PileState::SUSPENDED: + return EPileState::SUSPENDED; + case Ydb::Bridge::PileState::DISCONNECTED: + return EPileState::DISCONNECTED; + case Ydb::Bridge::PileState::UNSPECIFIED: + case Ydb::Bridge::PileState_State_PileState_State_INT_MIN_SENTINEL_DO_NOT_USE_: + case Ydb::Bridge::PileState_State_PileState_State_INT_MAX_SENTINEL_DO_NOT_USE_: + return EPileState::UNSPECIFIED; } - return {}; } } // namespace NYdb diff --git a/src/client/impl/ydb_internal/db_driver_state/endpoint_pool.h b/src/client/impl/internal/db_driver_state/endpoint_pool.h similarity index 74% rename from src/client/impl/ydb_internal/db_driver_state/endpoint_pool.h rename to src/client/impl/internal/db_driver_state/endpoint_pool.h index ac5e57993fb..c7d54ce5106 100644 --- a/src/client/impl/ydb_internal/db_driver_state/endpoint_pool.h +++ b/src/client/impl/internal/db_driver_state/endpoint_pool.h @@ -1,11 +1,12 @@ #pragma once -#include +#include #include -#include -#include -#include +#include +#include +#include +#include #include @@ -39,13 +40,15 @@ class TEndpointPool { void ForEachEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; void ForEachLocalEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; void ForEachForeignEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; - EBalancingPolicy GetBalancingPolicy() const; + TBalancingPolicy::TImpl::EPolicyType GetBalancingPolicyType() const; // TODO: Remove this mess void SetStatCollector(NSdkStats::TStatCollector& statCollector); - static constexpr i32 GetLocalityShift(); + static constexpr std::int32_t GetLocalityShift(); private: - std::string GetPreferredLocation(const std::string& selfLocation); + bool IsLocalEndpoint(const Ydb::Discovery::EndpointInfo& endpoint, + const std::unordered_map& pileStates) const; + EPileState GetPileState(const Ydb::Bridge::PileState::State& state) const; private: TListEndpointsResultProvider Provider_; @@ -53,7 +56,7 @@ class TEndpointPool { TEndpointElectorSafe Elector_; NThreading::TPromise DiscoveryPromise_; std::atomic_uint64_t LastUpdateTime_; - const TBalancingSettings BalancingSettings_; + const TBalancingPolicy::TImpl BalancingPolicy_; NSdkStats::TStatCollector* StatCollector_ = nullptr; diff --git a/src/client/impl/ydb_internal/db_driver_state/state.cpp b/src/client/impl/internal/db_driver_state/state.cpp similarity index 98% rename from src/client/impl/ydb_internal/db_driver_state/state.cpp rename to src/client/impl/internal/db_driver_state/state.cpp index c71349bc008..ffaaf2f0e11 100644 --- a/src/client/impl/ydb_internal/db_driver_state/state.cpp +++ b/src/client/impl/internal/db_driver_state/state.cpp @@ -2,7 +2,7 @@ #include "state.h" #include -#include +#include #include @@ -75,8 +75,8 @@ void TDbDriverState::ForEachForeignEndpoint(const TEndpointElectorSafe::THandleC EndpointPool.ForEachForeignEndpoint(cb, tag); } -EBalancingPolicy TDbDriverState::GetBalancingPolicy() const { - return EndpointPool.GetBalancingPolicy(); +TBalancingPolicy::TImpl::EPolicyType TDbDriverState::GetBalancingPolicyType() const { + return EndpointPool.GetBalancingPolicyType(); } std::string TDbDriverState::GetEndpoint() const { diff --git a/src/client/impl/ydb_internal/db_driver_state/state.h b/src/client/impl/internal/db_driver_state/state.h similarity index 95% rename from src/client/impl/ydb_internal/db_driver_state/state.h rename to src/client/impl/internal/db_driver_state/state.h index ad2ff25b8f1..ce72760bf65 100644 --- a/src/client/impl/ydb_internal/db_driver_state/state.h +++ b/src/client/impl/internal/db_driver_state/state.h @@ -2,9 +2,9 @@ #include "endpoint_pool.h" -#include +#include -#include +#include #include #include @@ -45,7 +45,7 @@ class TDbDriverState void ForEachEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; void ForEachLocalEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; void ForEachForeignEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const; - EBalancingPolicy GetBalancingPolicy() const; + TBalancingPolicy::TImpl::EPolicyType GetBalancingPolicyType() const; std::string GetEndpoint() const; void SetCredentialsProvider(std::shared_ptr credentialsProvider); diff --git a/src/client/impl/ydb_internal/driver/constants.h b/src/client/impl/internal/driver/constants.h similarity index 90% rename from src/client/impl/ydb_internal/driver/constants.h rename to src/client/impl/internal/driver/constants.h index 3b51aa92c0b..8d53ab19946 100644 --- a/src/client/impl/ydb_internal/driver/constants.h +++ b/src/client/impl/internal/driver/constants.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/grpc_connections/CMakeLists.txt b/src/client/impl/internal/grpc_connections/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/grpc_connections/CMakeLists.txt rename to src/client/impl/internal/grpc_connections/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/grpc_connections/actions.cpp b/src/client/impl/internal/grpc_connections/actions.cpp similarity index 100% rename from src/client/impl/ydb_internal/grpc_connections/actions.cpp rename to src/client/impl/internal/grpc_connections/actions.cpp diff --git a/src/client/impl/ydb_internal/grpc_connections/actions.h b/src/client/impl/internal/grpc_connections/actions.h similarity index 95% rename from src/client/impl/ydb_internal/grpc_connections/actions.h rename to src/client/impl/internal/grpc_connections/actions.h index b95f78ce623..e29f96fa962 100644 --- a/src/client/impl/ydb_internal/grpc_connections/actions.h +++ b/src/client/impl/internal/grpc_connections/actions.h @@ -1,11 +1,12 @@ #pragma once -#include +#include #include -#include -#include -#include +#include +#include +#include +#include #include diff --git a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp similarity index 99% rename from src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp rename to src/client/impl/internal/grpc_connections/grpc_connections.cpp index 6004bdd5d9e..aa5bfc21b4d 100644 --- a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -382,7 +382,7 @@ bool TGRpcConnectionsImpl::GetDrainOnDtors() const { return DrainOnDtors_; } -TBalancingSettings TGRpcConnectionsImpl::GetBalancingSettings() const { +TBalancingPolicy::TImpl TGRpcConnectionsImpl::GetBalancingSettings() const { return BalancingSettings_; } diff --git a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h similarity index 98% rename from src/client/impl/ydb_internal/grpc_connections/grpc_connections.h rename to src/client/impl/internal/grpc_connections/grpc_connections.h index 1473e757c3b..8008cb7c834 100644 --- a/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -1,16 +1,16 @@ #pragma once -#include +#include #include #include "actions.h" #include "params.h" #include -#include -#include -#include -#include +#include +#include +#include +#include #include #include @@ -98,6 +98,8 @@ class TGRpcConnectionsImpl clientConfig.MaxOutboundMessageSize = MaxOutboundMessageSize_; } + clientConfig.LoadBalancingPolicy = "round_robin"; + if (dbState->DiscoveryMode != EDiscoveryMode::Off) { if (std::is_same() || dbState->Database.empty() @@ -643,7 +645,7 @@ class TGRpcConnectionsImpl #endif bool GetDrainOnDtors() const; - TBalancingSettings GetBalancingSettings() const override; + TBalancingPolicy::TImpl GetBalancingSettings() const override; bool StartStatCollecting(::NMonitoring::IMetricRegistry* sensorsRegistry) override; ::NMonitoring::TMetricRegistry* GetMetricRegistry() override; void RegisterExtension(IExtension* extension); @@ -760,7 +762,7 @@ class TGRpcConnectionsImpl const i64 MaxQueuedRequests_; const i64 MaxQueuedResponses_; const bool DrainOnDtors_; - const TBalancingSettings BalancingSettings_; + const TBalancingPolicy::TImpl BalancingSettings_; const TDuration GRpcKeepAliveTimeout_; const bool GRpcKeepAlivePermitWithoutCalls_; const ui64 MemoryQuota_; diff --git a/src/client/impl/ydb_internal/grpc_connections/params.h b/src/client/impl/internal/grpc_connections/params.h similarity index 85% rename from src/client/impl/ydb_internal/grpc_connections/params.h rename to src/client/impl/internal/grpc_connections/params.h index d03c7258f95..1267c1fa01c 100644 --- a/src/client/impl/ydb_internal/grpc_connections/params.h +++ b/src/client/impl/internal/grpc_connections/params.h @@ -2,8 +2,9 @@ #include -#include -#include +#include +#include +#include #include #include @@ -24,7 +25,7 @@ class IConnectionsParams { virtual size_t GetMaxQueuedRequests() const = 0; virtual NYdbGrpc::TTcpKeepAliveSettings GetTcpKeepAliveSettings() const = 0; virtual bool GetDrinOnDtors() const = 0; - virtual TBalancingSettings GetBalancingSettings() const = 0; + virtual TBalancingPolicy::TImpl GetBalancingSettings() const = 0; virtual TDuration GetGRpcKeepAliveTimeout() const = 0; virtual bool GetGRpcKeepAlivePermitWithoutCalls() const = 0; virtual TDuration GetSocketIdleTimeout() const = 0; diff --git a/src/client/impl/ydb_internal/internal_client/client.h b/src/client/impl/internal/internal_client/client.h similarity index 84% rename from src/client/impl/ydb_internal/internal_client/client.h rename to src/client/impl/internal/internal_client/client.h index b40003b5077..f53015355aa 100644 --- a/src/client/impl/ydb_internal/internal_client/client.h +++ b/src/client/impl/internal/internal_client/client.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -#include +#include #include #include @@ -26,7 +26,7 @@ class IInternalClient { #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL virtual void DeleteChannels(const std::vector& endpoints) = 0; #endif - virtual TBalancingSettings GetBalancingSettings() const = 0; + virtual TBalancingPolicy::TImpl GetBalancingSettings() const = 0; virtual bool StartStatCollecting(::NMonitoring::IMetricRegistry* sensorsRegistry) = 0; virtual ::NMonitoring::TMetricRegistry* GetMetricRegistry() = 0; virtual const TLog& GetLog() const = 0; diff --git a/src/client/impl/ydb_internal/internal_header.h b/src/client/impl/internal/internal_header.h similarity index 100% rename from src/client/impl/ydb_internal/internal_header.h rename to src/client/impl/internal/internal_header.h diff --git a/src/client/impl/ydb_internal/logger/CMakeLists.txt b/src/client/impl/internal/logger/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/logger/CMakeLists.txt rename to src/client/impl/internal/logger/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/logger/log.cpp b/src/client/impl/internal/logger/log.cpp similarity index 97% rename from src/client/impl/ydb_internal/logger/log.cpp rename to src/client/impl/internal/logger/log.cpp index f8a8dcbcaf0..33b9e7658d8 100644 --- a/src/client/impl/ydb_internal/logger/log.cpp +++ b/src/client/impl/internal/logger/log.cpp @@ -1,8 +1,6 @@ #define INCLUDE_YDB_INTERNAL_H #include "log.h" -#include - #include #include diff --git a/src/client/impl/ydb_internal/logger/log.h b/src/client/impl/internal/logger/log.h similarity index 79% rename from src/client/impl/ydb_internal/logger/log.h rename to src/client/impl/internal/logger/log.h index 1a4c6e19f43..a5aebfd1bd9 100644 --- a/src/client/impl/ydb_internal/logger/log.h +++ b/src/client/impl/internal/logger/log.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/make_request/CMakeLists.txt b/src/client/impl/internal/make_request/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/make_request/CMakeLists.txt rename to src/client/impl/internal/make_request/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/make_request/make.cpp b/src/client/impl/internal/make_request/make.cpp similarity index 100% rename from src/client/impl/ydb_internal/make_request/make.cpp rename to src/client/impl/internal/make_request/make.cpp diff --git a/src/client/impl/ydb_internal/make_request/make.h b/src/client/impl/internal/make_request/make.h similarity index 97% rename from src/client/impl/ydb_internal/make_request/make.h rename to src/client/impl/internal/make_request/make.h index 486ba9e5c3f..01c8ff3666d 100644 --- a/src/client/impl/ydb_internal/make_request/make.h +++ b/src/client/impl/internal/make_request/make.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include diff --git a/src/client/impl/ydb_internal/plain_status/CMakeLists.txt b/src/client/impl/internal/plain_status/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/plain_status/CMakeLists.txt rename to src/client/impl/internal/plain_status/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/plain_status/status.cpp b/src/client/impl/internal/plain_status/status.cpp similarity index 100% rename from src/client/impl/ydb_internal/plain_status/status.cpp rename to src/client/impl/internal/plain_status/status.cpp diff --git a/src/client/impl/ydb_internal/plain_status/status.h b/src/client/impl/internal/plain_status/status.h similarity index 97% rename from src/client/impl/ydb_internal/plain_status/status.h rename to src/client/impl/internal/plain_status/status.h index 79222b714b4..86dc31760d5 100644 --- a/src/client/impl/ydb_internal/plain_status/status.h +++ b/src/client/impl/internal/plain_status/status.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/retry/CMakeLists.txt b/src/client/impl/internal/retry/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/retry/CMakeLists.txt rename to src/client/impl/internal/retry/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/retry/retry.cpp b/src/client/impl/internal/retry/retry.cpp similarity index 100% rename from src/client/impl/ydb_internal/retry/retry.cpp rename to src/client/impl/internal/retry/retry.cpp diff --git a/src/client/impl/ydb_internal/retry/retry.h b/src/client/impl/internal/retry/retry.h similarity index 100% rename from src/client/impl/ydb_internal/retry/retry.h rename to src/client/impl/internal/retry/retry.h diff --git a/src/client/impl/ydb_internal/retry/retry_async.h b/src/client/impl/internal/retry/retry_async.h similarity index 99% rename from src/client/impl/ydb_internal/retry/retry_async.h rename to src/client/impl/internal/retry/retry_async.h index e4a26f02fb6..3610ade8b2a 100644 --- a/src/client/impl/ydb_internal/retry/retry_async.h +++ b/src/client/impl/internal/retry/retry_async.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/retry/retry_sync.h b/src/client/impl/internal/retry/retry_sync.h similarity index 98% rename from src/client/impl/ydb_internal/retry/retry_sync.h rename to src/client/impl/internal/retry/retry_sync.h index b0c63187bdd..01d5fc80f0f 100644 --- a/src/client/impl/ydb_internal/retry/retry_sync.h +++ b/src/client/impl/internal/retry/retry_sync.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include diff --git a/src/client/impl/ydb_internal/rpc_request_settings/settings.h b/src/client/impl/internal/rpc_request_settings/settings.h similarity index 92% rename from src/client/impl/ydb_internal/rpc_request_settings/settings.h rename to src/client/impl/internal/rpc_request_settings/settings.h index 6f75764d187..9006ccf3bfe 100644 --- a/src/client/impl/ydb_internal/rpc_request_settings/settings.h +++ b/src/client/impl/internal/rpc_request_settings/settings.h @@ -1,7 +1,7 @@ #pragma once -#include -#include +#include +#include namespace NYdb::inline V3 { diff --git a/src/client/impl/ydb_internal/scheme_helpers/helpers.h b/src/client/impl/internal/scheme_helpers/helpers.h similarity index 100% rename from src/client/impl/ydb_internal/scheme_helpers/helpers.h rename to src/client/impl/internal/scheme_helpers/helpers.h diff --git a/src/client/impl/ydb_internal/stats_extractor/extractor.h b/src/client/impl/internal/stats_extractor/extractor.h similarity index 89% rename from src/client/impl/ydb_internal/stats_extractor/extractor.h rename to src/client/impl/internal/stats_extractor/extractor.h index 964f30fea0b..6ebc84a8f65 100644 --- a/src/client/impl/ydb_internal/stats_extractor/extractor.h +++ b/src/client/impl/internal/stats_extractor/extractor.h @@ -1,8 +1,8 @@ #pragma once -#include +#include -#include +#include #include diff --git a/src/client/impl/ydb_internal/table_helpers/helpers.h b/src/client/impl/internal/table_helpers/helpers.h similarity index 94% rename from src/client/impl/ydb_internal/table_helpers/helpers.h rename to src/client/impl/internal/table_helpers/helpers.h index 80e1ce47aea..a0de4505070 100644 --- a/src/client/impl/ydb_internal/table_helpers/helpers.h +++ b/src/client/impl/internal/table_helpers/helpers.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include diff --git a/src/client/impl/ydb_internal/thread_pool/CMakeLists.txt b/src/client/impl/internal/thread_pool/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/thread_pool/CMakeLists.txt rename to src/client/impl/internal/thread_pool/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/thread_pool/pool.cpp b/src/client/impl/internal/thread_pool/pool.cpp similarity index 100% rename from src/client/impl/ydb_internal/thread_pool/pool.cpp rename to src/client/impl/internal/thread_pool/pool.cpp diff --git a/src/client/impl/ydb_internal/thread_pool/pool.h b/src/client/impl/internal/thread_pool/pool.h similarity index 88% rename from src/client/impl/ydb_internal/thread_pool/pool.h rename to src/client/impl/internal/thread_pool/pool.h index f29a7bb6238..c7088da0c17 100644 --- a/src/client/impl/ydb_internal/thread_pool/pool.h +++ b/src/client/impl/internal/thread_pool/pool.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/ydb_internal/value_helpers/CMakeLists.txt b/src/client/impl/internal/value_helpers/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_internal/value_helpers/CMakeLists.txt rename to src/client/impl/internal/value_helpers/CMakeLists.txt diff --git a/src/client/impl/ydb_internal/value_helpers/helpers.cpp b/src/client/impl/internal/value_helpers/helpers.cpp similarity index 100% rename from src/client/impl/ydb_internal/value_helpers/helpers.cpp rename to src/client/impl/internal/value_helpers/helpers.cpp diff --git a/src/client/impl/ydb_internal/value_helpers/helpers.h b/src/client/impl/internal/value_helpers/helpers.h similarity index 74% rename from src/client/impl/ydb_internal/value_helpers/helpers.h rename to src/client/impl/internal/value_helpers/helpers.h index 8bb70e0ea9d..d87d188e008 100644 --- a/src/client/impl/ydb_internal/value_helpers/helpers.h +++ b/src/client/impl/internal/value_helpers/helpers.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include diff --git a/src/client/impl/session/CMakeLists.txt b/src/client/impl/session/CMakeLists.txt new file mode 100644 index 00000000000..d22a9cb3c4c --- /dev/null +++ b/src/client/impl/session/CMakeLists.txt @@ -0,0 +1,16 @@ +_ydb_sdk_add_library(impl-session) + +target_link_libraries(impl-session PUBLIC + yutil + threading-future + api-protos + client-impl-ydb_endpoints + client-ydb_types-operation +) + +target_sources(impl-session PRIVATE + kqp_session_common.cpp + session_pool.cpp +) + +_ydb_sdk_install_targets(TARGETS impl-session) diff --git a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/src/client/impl/session/kqp_session_common.cpp similarity index 97% rename from src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp rename to src/client/impl/session/kqp_session_common.cpp index c3f673e53ef..e2a59de959f 100644 --- a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +++ b/src/client/impl/session/kqp_session_common.cpp @@ -165,8 +165,7 @@ void TKqpSessionCommon::CloseFromServer(std::weak_ptr client) no //////////////////////////////////////////////////////////////////////////////// -std::function TKqpSessionCommon::GetSmartDeleter( - std::shared_ptr client) +std::function TKqpSessionCommon::GetSmartDeleter(std::shared_ptr client) { return [client](TKqpSessionCommon* sessionImpl) { switch (sessionImpl->GetState()) { @@ -174,7 +173,7 @@ std::function TKqpSessionCommon::GetSmartDeleter( case TKqpSessionCommon::S_BROKEN: case TKqpSessionCommon::S_CLOSING: client->DeleteSession(sessionImpl); - break; + break; case TKqpSessionCommon::S_IDLE: case TKqpSessionCommon::S_ACTIVE: { if (!client->ReturnSession(sessionImpl)) { @@ -182,8 +181,6 @@ std::function TKqpSessionCommon::GetSmartDeleter( } break; } - default: - break; } }; } diff --git a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/src/client/impl/session/kqp_session_common.h similarity index 92% rename from src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h rename to src/client/impl/session/kqp_session_common.h index dac45a56bce..cfac2b6e043 100644 --- a/src/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h +++ b/src/client/impl/session/kqp_session_common.h @@ -1,7 +1,8 @@ #pragma once -#include -#include +#include "session_client.h" + +#include #include #include @@ -60,8 +61,7 @@ class TKqpSessionCommon : public TEndpointObj { void SetTimeInterval(TDuration interval); TDuration GetTimeInterval() const; - static std::function - GetSmartDeleter(std::shared_ptr client); + static std::function GetSmartDeleter(std::shared_ptr client); // Shoult be called under session pool lock void UpdateServerCloseHandler(IServerCloseHandler*); diff --git a/src/client/impl/ydb_internal/session_client/session_client.h b/src/client/impl/session/session_client.h similarity index 86% rename from src/client/impl/ydb_internal/session_client/session_client.h rename to src/client/impl/session/session_client.h index db920a7ab90..bd0484f7d8b 100644 --- a/src/client/impl/ydb_internal/session_client/session_client.h +++ b/src/client/impl/session/session_client.h @@ -1,9 +1,5 @@ #pragma once -#include - -#include - namespace NYdb::inline V3 { class TKqpSessionCommon; @@ -11,7 +7,9 @@ class TKqpSessionCommon; class ISessionClient { public: virtual ~ISessionClient() = default; + virtual void DeleteSession(TKqpSessionCommon* sessionImpl) = 0; + // TODO: Try to remove from ISessionClient virtual bool ReturnSession(TKqpSessionCommon* sessionImpl) = 0; }; diff --git a/src/client/impl/ydb_internal/session_pool/session_pool.cpp b/src/client/impl/session/session_pool.cpp similarity index 99% rename from src/client/impl/ydb_internal/session_pool/session_pool.cpp rename to src/client/impl/session/session_pool.cpp index ff9a6db5ade..e91861a4a97 100644 --- a/src/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/src/client/impl/session/session_pool.cpp @@ -1,7 +1,7 @@ #include "session_pool.h" #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/impl/ydb_internal/session_pool/session_pool.h b/src/client/impl/session/session_pool.h similarity index 98% rename from src/client/impl/ydb_internal/session_pool/session_pool.h rename to src/client/impl/session/session_pool.h index 78394bbcc12..9533460efd3 100644 --- a/src/client/impl/ydb_internal/session_pool/session_pool.h +++ b/src/client/impl/session/session_pool.h @@ -1,6 +1,7 @@ #pragma once -#include +#include "kqp_session_common.h" + #include diff --git a/src/client/impl/ydb_stats/CMakeLists.txt b/src/client/impl/stats/CMakeLists.txt similarity index 100% rename from src/client/impl/ydb_stats/CMakeLists.txt rename to src/client/impl/stats/CMakeLists.txt diff --git a/src/client/impl/ydb_stats/stats.cpp b/src/client/impl/stats/stats.cpp similarity index 100% rename from src/client/impl/ydb_stats/stats.cpp rename to src/client/impl/stats/stats.cpp diff --git a/src/client/impl/ydb_stats/stats.h b/src/client/impl/stats/stats.h similarity index 100% rename from src/client/impl/ydb_stats/stats.h rename to src/client/impl/stats/stats.h diff --git a/src/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt b/src/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt deleted file mode 100644 index 405cbd9653e..00000000000 --- a/src/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt +++ /dev/null @@ -1,14 +0,0 @@ -_ydb_sdk_add_library(impl-ydb_internal-kqp_session_common) - -target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC - yutil - threading-future - library-operation_id - client-impl-ydb_endpoints -) - -target_sources(impl-ydb_internal-kqp_session_common PRIVATE - kqp_session_common.cpp -) - -_ydb_sdk_install_targets(TARGETS impl-ydb_internal-kqp_session_common) diff --git a/src/client/impl/ydb_internal/session_pool/CMakeLists.txt b/src/client/impl/ydb_internal/session_pool/CMakeLists.txt deleted file mode 100644 index 91577a39695..00000000000 --- a/src/client/impl/ydb_internal/session_pool/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -_ydb_sdk_add_library(impl-ydb_internal-session_pool) - -target_link_libraries(impl-ydb_internal-session_pool PUBLIC - yutil - threading-future - api-protos - client-impl-ydb_endpoints - client-ydb_types-operation -) - -target_sources(impl-ydb_internal-session_pool PRIVATE - session_pool.cpp -) - -_ydb_sdk_install_targets(TARGETS impl-ydb_internal-session_pool) diff --git a/src/client/import/import.cpp b/src/client/import/import.cpp index 466fa66e749..6ec8b99fd44 100644 --- a/src/client/import/import.cpp +++ b/src/client/import/import.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/monitoring/monitoring.cpp b/src/client/monitoring/monitoring.cpp index 6b7b4f1bcf1..7f86d9a2006 100644 --- a/src/client/monitoring/monitoring.cpp +++ b/src/client/monitoring/monitoring.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/operation/impl.h b/src/client/operation/impl.h index e8c2d8ba8c9..22c42e2f353 100644 --- a/src/client/operation/impl.h +++ b/src/client/operation/impl.h @@ -3,7 +3,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/operation/operation.cpp b/src/client/operation/operation.cpp index 8823cf7f39c..7c4a80b8e24 100644 --- a/src/client/operation/operation.cpp +++ b/src/client/operation/operation.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H /* Headers below used to instantiate concrete 'Get' & 'List' methods */ diff --git a/src/client/persqueue_public/impl/persqueue_impl.h b/src/client/persqueue_public/impl/persqueue_impl.h index 31d38e2956d..d817558279b 100644 --- a/src/client/persqueue_public/impl/persqueue_impl.h +++ b/src/client/persqueue_public/impl/persqueue_impl.h @@ -7,7 +7,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/persqueue_public/impl/read_session.cpp b/src/client/persqueue_public/impl/read_session.cpp index 3e0e9dda3a9..3ded33a2798 100644 --- a/src/client/persqueue_public/impl/read_session.cpp +++ b/src/client/persqueue_public/impl/read_session.cpp @@ -3,7 +3,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/persqueue_public/ut/read_session_ut.cpp b/src/client/persqueue_public/ut/read_session_ut.cpp index 4c5a64f4949..5fc21e1749a 100644 --- a/src/client/persqueue_public/ut/read_session_ut.cpp +++ b/src/client/persqueue_public/ut/read_session_ut.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/query/CMakeLists.txt b/src/client/query/CMakeLists.txt index f35da5edb25..fe59888d193 100644 --- a/src/client/query/CMakeLists.txt +++ b/src/client/query/CMakeLists.txt @@ -5,8 +5,7 @@ _ydb_sdk_add_library(client-ydb_query) target_link_libraries(client-ydb_query PUBLIC yutil impl-ydb_internal-make_request - impl-ydb_internal-kqp_session_common - impl-ydb_internal-session_pool + impl-session impl-ydb_internal-retry client-ydb_common_client client-ydb_driver diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 4423e2013d1..118c96f1624 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -3,13 +3,13 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/query/impl/client_session.cpp b/src/client/query/impl/client_session.cpp index f7f733c92a4..abc6b04a128 100644 --- a/src/client/query/impl/client_session.cpp +++ b/src/client/query/impl/client_session.cpp @@ -1,7 +1,7 @@ #include "client_session.h" #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/query/impl/client_session.h b/src/client/query/impl/client_session.h index 2d2d65973f4..2a5c680cf1b 100644 --- a/src/client/query/impl/client_session.h +++ b/src/client/query/impl/client_session.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index 8f28a8f3fa9..896d29cdddc 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -3,9 +3,9 @@ #include "client_session.h" #include -#include -#include -#include +#include +#include +#include #include #undef INCLUDE_YDB_INTERNAL_H diff --git a/src/client/query/impl/exec_query.h b/src/client/query/impl/exec_query.h index b20246e84be..aa11709bddd 100644 --- a/src/client/query/impl/exec_query.h +++ b/src/client/query/impl/exec_query.h @@ -1,10 +1,10 @@ #pragma once -#include +#include #include #include -#include +#include #include namespace NYdb::inline V3::NQuery { diff --git a/src/client/rate_limiter/rate_limiter.cpp b/src/client/rate_limiter/rate_limiter.cpp index 4cfc8a99ebd..d4f197d146b 100644 --- a/src/client/rate_limiter/rate_limiter.cpp +++ b/src/client/rate_limiter/rate_limiter.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/scheme/scheme.cpp b/src/client/scheme/scheme.cpp index f18aa0f556a..25794e5bf63 100644 --- a/src/client/scheme/scheme.cpp +++ b/src/client/scheme/scheme.cpp @@ -1,8 +1,8 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/table/CMakeLists.txt b/src/client/table/CMakeLists.txt index b7103470cc4..3c390383698 100644 --- a/src/client/table/CMakeLists.txt +++ b/src/client/table/CMakeLists.txt @@ -8,7 +8,7 @@ target_link_libraries(client-ydb_table PUBLIC enum_serialization_runtime api-protos impl-ydb_internal-make_request - impl-ydb_internal-kqp_session_common + impl-session impl-ydb_internal-retry client-ydb_driver client-ydb_params diff --git a/src/client/table/impl/CMakeLists.txt b/src/client/table/impl/CMakeLists.txt index 5bbeecf5243..8f53d386fc6 100644 --- a/src/client/table/impl/CMakeLists.txt +++ b/src/client/table/impl/CMakeLists.txt @@ -8,7 +8,7 @@ target_link_libraries(client-ydb_table-impl api-grpc library-operation_id client-impl-ydb_endpoints - impl-ydb_internal-session_pool + impl-session client-ydb_table-query_stats PRIVATE OpenSSL::SSL diff --git a/src/client/table/impl/client_session.h b/src/client/table/impl/client_session.h index dab53f500fa..968a06ff6e8 100644 --- a/src/client/table/impl/client_session.h +++ b/src/client/table/impl/client_session.h @@ -1,8 +1,8 @@ #pragma once #include -#include -#include +#include +#include #include #include diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 3e63550089d..1df7dc34ee5 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -226,16 +226,16 @@ void TTableClient::TImpl::StartPeriodicHostScanTask() { } else { TRequestMigrator& migrator = strongClient->RequestMigrator_; - const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicy(); + const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicyType(); // Try to find any host at foreign locations if prefer local dc - const ui64 foreignHost = (balancingPolicy == EBalancingPolicy::UsePreferableLocation) ? + const ui64 foreignHost = (balancingPolicy == TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation) ? ScanForeignLocations(strongClient) : 0; std::unordered_map hostMap; winner = ScanLocation(strongClient, hostMap, - balancingPolicy == EBalancingPolicy::UseAllNodes); + balancingPolicy == TBalancingPolicy::TImpl::EPolicyType::UseAllNodes); bool forceMigrate = false; diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index 3ae6ee7bb35..e26ef4d4784 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -1,11 +1,11 @@ #pragma once #define INCLUDE_YDB_INTERNAL_H -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index bbbe4a876c4..c53c59e3579 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -1,17 +1,17 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include #undef INCLUDE_YDB_INTERNAL_H #include #include -#include +#include #include #include #include diff --git a/src/client/topic/impl/read_session.cpp b/src/client/topic/impl/read_session.cpp index 5de92015160..79dbe2e41e8 100644 --- a/src/client/topic/impl/read_session.cpp +++ b/src/client/topic/impl/read_session.cpp @@ -2,7 +2,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/topic/impl/read_session_event.cpp b/src/client/topic/impl/read_session_event.cpp index 8eb8858e84b..cf58500d08f 100644 --- a/src/client/topic/impl/read_session_event.cpp +++ b/src/client/topic/impl/read_session_event.cpp @@ -88,6 +88,10 @@ const std::string& TMessageBase::GetData() const { return Data; } +const std::string& TMessageBase::GetBrokenData() const { + return Data; +} + uint64_t TMessageBase::GetOffset() const { return Information.Offset; } @@ -175,6 +179,13 @@ const std::string& TMessage::GetData() const { return TMessageBase::GetData(); } +const std::string& TMessage::GetBrokenData() const { + if (DecompressionException) { + return TMessageBase::GetData(); + } + ythrow yexception() << "Can not get broken data after successful decompression"; +} + bool TMessage::HasException() const { return DecompressionException != nullptr; } diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index 60d313496b2..42e87e4c2c6 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -365,7 +365,7 @@ class TDataDecompressionInfo : public std::enable_shared_from_this MessagesMeta; TCallbackContextPtr CbContext; bool DoDecompress; - i64 ServerBytesSize = 0; + std::atomic ServerBytesSize = 0; std::atomic SourceDataNotProcessed = 0; std::pair CurrentDecompressingMessage = {0, 0}; // (Batch, Message) std::deque ReadyThresholds; diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 382a7b9cf68..4eeb0b3ba3d 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -11,7 +11,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -1361,7 +1362,6 @@ inline void TSingleClusterReadSessionImpl::StopPartitionSessionImpl( if (graceful) { auto committedOffset = partitionStream->GetMaxCommittedOffset(); - LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1422 TStopPartitionSessionEvent"); pushRes = EventsQueue->PushEvent( partitionStream, // TODO(qyryq) Is it safe to use GetMaxCommittedOffset here instead of StopPartitionSessionRequest.commmitted_offset? @@ -1374,7 +1374,6 @@ inline void TSingleClusterReadSessionImpl::StopPartitionSessionImpl( released.set_partition_session_id(partitionStream->GetAssignId()); WriteToProcessorImpl(std::move(req)); PartitionStreams.erase(partitionSessionId); - LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "XXXXX PushEvent 1435 TPartitionSessionClosedEvent"); pushRes = EventsQueue->PushEvent( partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent(partitionStream, TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), @@ -2984,7 +2983,7 @@ void TDataDecompressionInfo::OnDataDecompressed(i64 source if (auto session = CbContext->LockShared()) { // TODO (ildar-khisam@): distribute total ServerBytesSize in proportion of source size // Use CompressedDataSize, sourceSize, ServerBytesSize - session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, std::exchange(ServerBytesSize, 0)); + session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, ServerBytesSize.exchange(0)); } } @@ -3063,16 +3062,15 @@ void TDataDecompressionInfo::TDecompressionTask::operator( data.set_data(TStringType{decompressed}); } } - - DecompressedSize += data.data().size(); } catch (...) { parent->PutDecompressionError(std::current_exception(), messages.Batch, i); - data.clear_data(); // Free memory, because we don't count it. if (auto session = parent->CbContext->LockShared()) { session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage(); } } + + DecompressedSize += data.data().size(); } } diff --git a/src/client/topic/impl/topic.cpp b/src/client/topic/impl/topic.cpp index 06210624579..a6ee2007cfe 100644 --- a/src/client/topic/impl/topic.cpp +++ b/src/client/topic/impl/topic.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include diff --git a/src/client/topic/impl/topic_impl.h b/src/client/topic/impl/topic_impl.h index 98a5738092f..2fa1703635a 100644 --- a/src/client/topic/impl/topic_impl.h +++ b/src/client/topic/impl/topic_impl.h @@ -5,7 +5,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index 1ae008fb56b..e8431f776d2 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -9,10 +9,10 @@ #include #include -#include -#include +#include +#include #include -#include +#include #include #include diff --git a/src/client/types/CMakeLists.txt b/src/client/types/CMakeLists.txt index 4826037a4c0..a7e2cea89cf 100644 --- a/src/client/types/CMakeLists.txt +++ b/src/client/types/CMakeLists.txt @@ -6,12 +6,17 @@ add_subdirectory(status) _ydb_sdk_add_library(client-ydb_types) +target_sources(client-ydb_types PRIVATE + ydb.cpp +) + target_link_libraries(client-ydb_types PUBLIC yutil protobuf::libprotobuf grpc-client yql-public-issue enum_serialization_runtime + impl-ydb_internal-common ) generate_enum_serilization(client-ydb_types diff --git a/src/client/types/credentials/login/login.cpp b/src/client/types/credentials/login/login.cpp index c6985d11e4b..a2a089a7bd2 100644 --- a/src/client/types/credentials/login/login.cpp +++ b/src/client/types/credentials/login/login.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include -#include +#include +#include #include #include diff --git a/src/client/types/status/status.cpp b/src/client/types/status/status.cpp index dd5238a4799..c898a1a4af2 100644 --- a/src/client/types/status/status.cpp +++ b/src/client/types/status/status.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/src/client/types/ydb.cpp b/src/client/types/ydb.cpp new file mode 100644 index 00000000000..0951d0cda4a --- /dev/null +++ b/src/client/types/ydb.cpp @@ -0,0 +1,37 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + + +namespace NYdb::inline V3 { + +TBalancingPolicy::TBalancingPolicy(EBalancingPolicy policy, const std::string& params) { + switch (policy) { + case EBalancingPolicy::UsePreferableLocation: + Impl_ = TImpl::UsePreferableLocation(params); + break; + case EBalancingPolicy::UseAllNodes: + Impl_ = TImpl::UseAllNodes(); + break; + } +} + +TBalancingPolicy TBalancingPolicy::UsePreferableLocation(const std::string& location) { + return TBalancingPolicy(TImpl::UsePreferableLocation(location)); +} + +TBalancingPolicy TBalancingPolicy::UseAllNodes() { + return TBalancingPolicy(TImpl::UseAllNodes()); +} + +TBalancingPolicy TBalancingPolicy::UsePreferablePileState(EPileState pileState) { + return TBalancingPolicy(TImpl::UsePreferablePileState(pileState)); +} + +TBalancingPolicy::TBalancingPolicy(std::unique_ptr&& impl) + : Impl_(std::move(impl)) +{} + +} diff --git a/src/client/value/value.cpp b/src/client/value/value.cpp index 2377d1dccfc..84b2027a92f 100644 --- a/src/client/value/value.cpp +++ b/src/client/value/value.cpp @@ -1,7 +1,7 @@ #include #define INCLUDE_YDB_INTERNAL_H -#include +#include #undef INCLUDE_YDB_INTERNAL_H #include diff --git a/tests/integration/topic/basic_usage.cpp b/tests/integration/topic/basic_usage.cpp index aefab48d08e..7152e511a1e 100644 --- a/tests/integration/topic/basic_usage.cpp +++ b/tests/integration/topic/basic_usage.cpp @@ -747,6 +747,113 @@ TEST_F(BasicUsage, TEST_NAME(TWriteSession_WriteEncoded)) { } } +TEST_F(BasicUsage, TEST_NAME(TWriteSession_WriteEncoded_Broken)) { + // Write a broken compressed message. + // GetData should throw an exception. + // GetBrokenData should return the broken data. + + // Write a correct compressed message. + // GetData should return the correct data. + // GetBrokenData should throw an exception. + + auto driver = MakeDriver(); + + TTopicClient client(driver); + + auto settings = TWriteSessionSettings() + .Path(GetTopicPath()) + .MessageGroupId(TEST_MESSAGE_GROUP_ID); + + auto writer = client.CreateWriteSession(settings); + std::string brokenPacked = "some broken data"; + + { + auto event = *writer->GetEvent(true); + ASSERT_TRUE(std::holds_alternative(event)); + writer->WriteEncoded( + std::move(std::get(event).ContinuationToken), + TWriteMessage::CompressedMessage(brokenPacked, ECodec::GZIP, 100) + ); + } + + std::string correctMessage = "message"; + TString packed; + { + TStringOutput so(packed); + TZLibCompress oss(&so, ZLib::GZip, 6); + oss << correctMessage; + } + { + auto event = *writer->GetEvent(true); + ASSERT_TRUE(std::holds_alternative(event)); + writer->WriteEncoded( + std::move(std::get(event).ContinuationToken), + TWriteMessage::CompressedMessage(packed, ECodec::GZIP, correctMessage.size()) + ); + } + + std::uint32_t acks = 0; + while (acks < 2) { + auto event = *writer->GetEvent(true); + if (auto e = std::get_if(&event)) { + acks += e->Acks.size(); + } else { + continue; + } + } + + ASSERT_EQ(acks, 2u); + + auto readSettings = TReadSessionSettings() + .ConsumerName(GetConsumerName()) + .AppendTopics(GetTopicPath()) + // .DirectRead(EnableDirectRead) + ; + std::shared_ptr readSession = client.CreateReadSession(readSettings); + std::uint32_t readMessageCount = 0; + while (readMessageCount < 2) { + std::cerr << "Get event on client\n"; + auto event = *readSession->GetEvent(true); + std::visit(TOverloaded { + [&](TReadSessionEvent::TDataReceivedEvent& event) { + for (auto& message: event.GetMessages()) { + if (readMessageCount == 0) { + ASSERT_THROW(message.GetData(), std::exception); + std::string data = message.GetBrokenData(); + ASSERT_TRUE(brokenPacked == data); + } else { + ASSERT_THROW(message.GetBrokenData(), std::exception); + std::string data = message.GetData(); + ASSERT_TRUE(correctMessage == data); + } + ++readMessageCount; + } + }, + [&](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) { + FAIL(); + }, + [&](TReadSessionEvent::TStartPartitionSessionEvent& event) { + event.Confirm(); + }, + [&](TReadSessionEvent::TStopPartitionSessionEvent& event) { + event.Confirm(); + }, + [&](TReadSessionEvent::TEndPartitionSessionEvent& event) { + event.Confirm(); + }, + [&](TReadSessionEvent::TPartitionSessionStatusEvent&) { + FAIL() << "Test does not support lock sessions yet"; + }, + [&](TReadSessionEvent::TPartitionSessionClosedEvent&) { + FAIL() << "Test does not support lock sessions yet"; + }, + [&](TSessionClosedEvent&) { + FAIL() << "Session closed"; + } + }, event); + } +} + namespace { enum class EExpectedTestResult { SUCCESS, diff --git a/tests/unit/client/endpoints/endpoints_ut.cpp b/tests/unit/client/endpoints/endpoints_ut.cpp index da911f04bc8..7e2ebe719d4 100644 --- a/tests/unit/client/endpoints/endpoints_ut.cpp +++ b/tests/unit/client/endpoints/endpoints_ut.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include