Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
23
24
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9563c7b171619687436257273b34ceba261903ed
110937321b45fb3be9ca39ff3bb83bcdfcf52c97
10 changes: 1 addition & 9 deletions include/ydb-cpp-sdk/client/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,7 @@ struct TEndpointInfo {
};

struct TPileState {
enum EState {
UNSPECIFIED = 0 /* "unspecified" */,
PRIMARY = 1 /* "primary" */,
PROMOTED = 2 /* "promoted" */,
SYNCHRONIZED = 3 /* "synchronized" */,
NOT_SYNCHRONIZED = 4 /* "not_synchronized" */,
SUSPENDED = 5 /* "suspended" */,
DISCONNECTED = 6 /* "disconnected" */
};
using EState = NYdb::EPileState;

EState State;
std::string PileName;
Expand Down
24 changes: 24 additions & 0 deletions include/ydb-cpp-sdk/client/driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ class TDriverConfig {
//! where "<protocol>://" can be "grpc://" or "grpcs://" or be absent, "<hostname:port>" is endpoint,
//! "/?database=<database-path>" is optional
TDriverConfig(const std::string& connectionString = "");

//! Endpoint to initiate connections with Ydb cluster,
//! client will connect to others nodes according to client loadbalancing
TDriverConfig& SetEndpoint(const std::string& endpoint);

//! Set number of network threads, default: 2
TDriverConfig& SetNetworkThreadsNum(size_t sz);

//! Set number of client pool threads, if 0 adaptive thread pool will be used.
//! NOTE: in case of no zero value it is possible to get deadlock if all threads
//! of this pool is blocked somewhere in user code.
//! default: 0
TDriverConfig& SetClientThreadsNum(size_t sz);

//! Warning: not recommended to change
//! Set max number of queued responses. 0 - no limit
//! There is a queue to perform async calls to user code,
Expand All @@ -46,29 +50,37 @@ class TDriverConfig {
//! This value doesn't make sense if SetClientThreadsNum is 0
//! default: 0
TDriverConfig& SetMaxClientQueueSize(size_t sz);

//! Enable Ssl.
//! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections.
//! If this parameter is empty, the default roots will be used.
TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string());
TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel);
TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey);

//! Set token, this option can be overridden for client by ClientSettings
TDriverConfig& SetAuthToken(const std::string& token);

//! Set database, this option can be overridden for client by ClientSettings
TDriverConfig& SetDatabase(const std::string& database);

//! Set credentials data, this option can be overridden for client by ClientSettings
TDriverConfig& SetCredentialsProviderFactory(std::shared_ptr<ICredentialsProviderFactory> credentialsProviderFactory);

//! Set behaviour of discovery routine
//! See EDiscoveryMode enum comments
//! default: EDiscoveryMode::Sync
TDriverConfig& SetDiscoveryMode(EDiscoveryMode discoveryMode);

//! Max number of requests in queue waiting for discovery if "Async" mode chosen
//! default: 100
TDriverConfig& SetMaxQueuedRequests(size_t sz);

//! Limit using of memory for grpc buffer pool. 0 means disabled.
//! If enabled the size must be greater than size of recieved message.
//! default: 0
TDriverConfig& SetGrpcMemoryQuota(uint64_t bytes);

//! Specify tcp keep alive settings
//! This option allows to adjust tcp keep alive settings, useful to work
//! with balancers or to detect unexpected connectivity problem.
Expand All @@ -83,12 +95,20 @@ class TDriverConfig {
//! NOTE: Please read OS documentation and investigate your network topology before touching this option.
//! default: true, 30, 5, 10 for linux, and true and OS default for others POSIX
TDriverConfig& SetTcpKeepAliveSettings(bool enable, size_t idle, size_t count, size_t interval);

//! Enable or disable drain of client logic (e.g. session pool drain) during dtor call
TDriverConfig& SetDrainOnDtors(bool allowed);

//! Set policy for balancing
//! default: TBalancingPolicy::UsePreferableLocation()
TDriverConfig& SetBalancingPolicy(TBalancingPolicy&& policy);

//! DEPRECATED
//! Set policy for balancing
//! Params is a optionally field to set policy settings
//! default: EBalancingPolicy::UsePreferableLocation
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string());

//! Set grpc level keep alive. If keepalive ping was delayed more than given timeout
//! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error
//! Note: this timeout should not be too small to prevent fail due to
Expand All @@ -97,6 +117,7 @@ class TDriverConfig {
//! default: enabled, 10 seconds
TDriverConfig& SetGRpcKeepAliveTimeout(TDuration timeout);
TDriverConfig& SetGRpcKeepAlivePermitWithoutCalls(bool permitWithoutCalls);

//! Set inactive socket timeout.
//! Used to close connections, that were inactive for given time.
//! Closes unused connections every 1/10 of timeout, so deletion time is approximate.
Expand All @@ -107,11 +128,14 @@ class TDriverConfig {
//! Set maximum incoming message size.
//! Note: this option overrides MaxMessageSize for incoming messages.
//! default: 0

TDriverConfig& SetMaxInboundMessageSize(uint64_t maxInboundMessageSize);

//! Set maximum outgoing message size.
//! Note: this option overrides MaxMessageSize for outgoing messages.
//! default: 0
TDriverConfig& SetMaxOutboundMessageSize(uint64_t maxOutboundMessageSize);

//! Note: if this option is unset, default 64_MB message size will be used.
//! default: 0
TDriverConfig& SetMaxMessageSize(uint64_t maxMessageSize);
Expand Down
13 changes: 5 additions & 8 deletions include/ydb-cpp-sdk/client/iam/common/generic_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {

auto resultPromise = NThreading::NewPromise();
auto response = std::make_shared<TResponse>();
auto context = std::make_shared<grpc::ClientContext>();

std::shared_ptr<TImpl> self = TGrpcIamCredentialsProvider<TRequest, TResponse, TService>::TImpl::shared_from_this();

auto cb = [self, sync, resultPromise, response] (grpc::Status status) mutable {
auto cb = [self, sync, resultPromise, response, context] (grpc::Status status) mutable {
self->ProcessIamResponse(std::move(status), std::move(*response), sync);
resultPromise.SetValue();
};
Expand All @@ -86,18 +87,16 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {

RequestFiller_(req);

Context_ = std::make_unique<grpc::ClientContext>();

auto deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(IamEndpoint_.RequestTimeout.MicroSeconds(), GPR_TIMESPAN));

Context_->set_deadline(deadline);
context->set_deadline(deadline);
if (AuthTokenProvider_) {
Context_->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo());
context->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo());
}

(Stub_->async()->*Rpc_)(Context_.get(), &req, response.get(), std::move(cb));
(Stub_->async()->*Rpc_)(context.get(), &req, response.get(), std::move(cb));

if (sync) {
resultPromise.GetFuture().Wait(2 * IamEndpoint_.RequestTimeout);
Expand Down Expand Up @@ -132,7 +131,6 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {

private:
void ProcessIamResponse(grpc::Status&& status, TResponse&& result, bool sync) {
Context_.reset();
if (!status.ok()) {
TDuration sleepDuration;
{
Expand Down Expand Up @@ -171,7 +169,6 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider {
std::shared_ptr<grpc::Channel> Channel_;
std::shared_ptr<typename TService::Stub> Stub_;
TAsyncRpc Rpc_;
std::unique_ptr<grpc::ClientContext> Context_;

std::string Ticket_;
TInstant NextTicketUpdate_;
Expand Down
4 changes: 2 additions & 2 deletions include/ydb-cpp-sdk/client/operation/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ class TOperationClient {
TAsyncStatus Forget(const TOperation::TOperationId& id);

template <typename TOp>
NThreading::TFuture<TOperationsList<TOp>> List(ui64 pageSize = 0, const std::string& pageToken = std::string());
NThreading::TFuture<TOperationsList<TOp>> List(std::uint64_t pageSize = 0, const std::string& pageToken = std::string());

private:
template <typename TOp>
NThreading::TFuture<TOperationsList<TOp>> List(const std::string& kind, ui64 pageSize, const std::string& pageToken);
NThreading::TFuture<TOperationsList<TOp>> List(const std::string& kind, std::uint64_t pageSize, const std::string& pageToken);

private:
std::shared_ptr<TImpl> Impl_;
Expand Down
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/topic/read_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ struct TReadSessionEvent {
virtual ~TMessageBase() = default;

virtual const std::string& GetData() const;
virtual const std::string& GetBrokenData() const;

virtual void Commit() = 0;

Expand Down Expand Up @@ -144,6 +145,8 @@ struct TReadSessionEvent {
//! User data.
//! Throws decompressor exception if decompression failed.
const std::string& GetData() const override;
//! Throws exception if decompression succeeded.
const std::string& GetBrokenData() const override;

//! Commits single message.
void Commit() override;
Expand Down
41 changes: 40 additions & 1 deletion include/ydb-cpp-sdk/client/types/ydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include "fwd.h"
#include "status_codes.h"

#include <memory>
#include <string>


namespace NYdb::inline V3 {

enum class EDiscoveryMode {
Expand All @@ -22,12 +26,47 @@ enum class EDiscoveryMode {
Off
};

//! @deprecated Use TBalancingPolicy instead
enum class EBalancingPolicy {
//! Use all available cluster nodes regardless datacenter locality
UseAllNodes,
//! Use preferable location,
//! params is a name of location (VLA, MAN), if params is empty local datacenter is used
UsePreferableLocation
UsePreferableLocation,
};

enum EPileState {
UNSPECIFIED = 0 /* "unspecified" */,
PRIMARY = 1 /* "primary" */,
PROMOTED = 2 /* "promoted" */,
SYNCHRONIZED = 3 /* "synchronized" */,
NOT_SYNCHRONIZED = 4 /* "not_synchronized" */,
SUSPENDED = 5 /* "suspended" */,
DISCONNECTED = 6 /* "disconnected" */
};

class TBalancingPolicy {
friend class TDriverConfig;
friend class TDriver;
public:
//! Use preferable location,
//! location is a name of datacenter (VLA, MAN), if location is empty local datacenter is used
static TBalancingPolicy UsePreferableLocation(const std::string& location = {});

//! Use all available cluster nodes regardless datacenter locality
static TBalancingPolicy UseAllNodes();

//! EXPERIMENTAL
//! Use pile with preferable state
static TBalancingPolicy UsePreferablePileState(EPileState pileState = EPileState::PRIMARY);

class TImpl;
private:
TBalancingPolicy(std::unique_ptr<TImpl>&& impl);

TBalancingPolicy(EBalancingPolicy policy, const std::string& params);

std::unique_ptr<TImpl> Impl_;
};

} // namespace NYdb
14 changes: 7 additions & 7 deletions src/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ message VectorIndexSettings {
VECTOR_TYPE_BIT = 4;
}

Metric metric = 1;
optional Metric metric = 1;

VectorType vector_type = 2;
optional VectorType vector_type = 2;

uint32 vector_dimension = 3;
optional uint32 vector_dimension = 3;
}

message KMeansTreeSettings {
VectorIndexSettings settings = 1;
// average count of clusters on each level of tree, 0 -- means auto
uint32 clusters = 2;
// average count of levels in the tree, 0 -- means auto
uint32 levels = 3;

optional uint32 clusters = 2;

optional uint32 levels = 3;
}

message GlobalIndex {
Expand Down
2 changes: 1 addition & 1 deletion src/client/common_client/impl/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "iface.h"

#define INCLUDE_YDB_INTERNAL_H
#include <src/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
#include <src/client/impl/internal/grpc_connections/grpc_connections.h>
#undef INCLUDE_YDB_INTERNAL_H

#include <ydb-cpp-sdk/client/types/exceptions/exceptions.h>
Expand Down
2 changes: 1 addition & 1 deletion src/client/common_client/settings.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <ydb-cpp-sdk/client/common_client/settings.h>

#include <src/client/impl/ydb_internal/common/parser.h>
#include <src/client/impl/internal/common/parser.h>

namespace NYdb::inline V3 {

Expand Down
4 changes: 2 additions & 2 deletions src/client/coordination/coordination.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include <ydb-cpp-sdk/client/coordination/coordination.h>

#define INCLUDE_YDB_INTERNAL_H
#include <src/client/impl/ydb_internal/make_request/make.h>
#include <src/client/impl/ydb_internal/scheme_helpers/helpers.h>
#include <src/client/impl/internal/make_request/make.h>
#include <src/client/impl/internal/scheme_helpers/helpers.h>
#undef INCLUDE_YDB_INTERNAL_H

#include <src/api/grpc/ydb_coordination_v1.grpc.pb.h>
Expand Down
2 changes: 1 addition & 1 deletion src/client/datastreams/datastreams.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <ydb-cpp-sdk/client/datastreams/datastreams.h>

#define INCLUDE_YDB_INTERNAL_H
#include <src/client/impl/ydb_internal/make_request/make.h>
#include <src/client/impl/internal/make_request/make.h>
#undef INCLUDE_YDB_INTERNAL_H

#include <ydb-cpp-sdk/library/issue/yql_issue.h>
Expand Down
24 changes: 14 additions & 10 deletions src/client/driver/driver.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#include <ydb-cpp-sdk/client/driver/driver.h>

#define INCLUDE_YDB_INTERNAL_H
#include <src/client/impl/ydb_internal/driver/constants.h>
#include <src/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
#include <src/client/impl/ydb_internal/logger/log.h>
#include <src/client/impl/internal/driver/constants.h>
#include <src/client/impl/internal/grpc_connections/grpc_connections.h>
#include <src/client/impl/internal/logger/log.h>
#undef INCLUDE_YDB_INTERNAL_H

#include <library/cpp/logger/log.h>
#include <src/client/impl/ydb_internal/common/parser.h>
#include <src/client/impl/ydb_internal/common/getenv.h>
#include <src/client/impl/internal/common/parser.h>
#include <src/client/impl/internal/common/getenv.h>
#include <ydb-cpp-sdk/client/common_client/ssl_credentials.h>
#include <util/stream/file.h>
#include <ydb-cpp-sdk/client/resources/ydb_ca.h>
Expand Down Expand Up @@ -41,7 +41,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
size_t GetMaxQueuedRequests() const override { return MaxQueuedRequests; }
TTcpKeepAliveSettings GetTcpKeepAliveSettings() const override { return TcpKeepAliveSettings; }
bool GetDrinOnDtors() const override { return DrainOnDtors; }
TBalancingSettings GetBalancingSettings() const override { return BalancingSettings; }
TBalancingPolicy::TImpl GetBalancingSettings() const override { return BalancingSettings; }
TDuration GetGRpcKeepAliveTimeout() const override { return GRpcKeepAliveTimeout; }
bool GetGRpcKeepAlivePermitWithoutCalls() const override { return GRpcKeepAlivePermitWithoutCalls; }
TDuration GetSocketIdleTimeout() const override { return SocketIdleTimeout; }
Expand Down Expand Up @@ -69,7 +69,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
TCP_KEEPALIVE_INTERVAL
};
bool DrainOnDtors = true;
TBalancingSettings BalancingSettings = TBalancingSettings{EBalancingPolicy::UsePreferableLocation, std::string()};
TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl("");
TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10);
bool GRpcKeepAlivePermitWithoutCalls = true;
TDuration SocketIdleTimeout = TDuration::Minutes(6);
Expand Down Expand Up @@ -170,11 +170,15 @@ TDriverConfig& TDriverConfig::SetDrainOnDtors(bool allowed) {
return *this;
}

TDriverConfig& TDriverConfig::SetBalancingPolicy(EBalancingPolicy policy, const std::string& params) {
Impl_->BalancingSettings = TBalancingSettings{policy, params};
TDriverConfig& TDriverConfig::SetBalancingPolicy(TBalancingPolicy&& policy) {
Impl_->BalancingSettings = std::move(*policy.Impl_);
return *this;
}

TDriverConfig& TDriverConfig::SetBalancingPolicy(EBalancingPolicy policy, const std::string& params) {
return SetBalancingPolicy(TBalancingPolicy(policy, params));
}

TDriverConfig& TDriverConfig::SetGRpcKeepAliveTimeout(TDuration timeout) {
Impl_->GRpcKeepAliveTimeout = timeout;
return *this;
Expand Down Expand Up @@ -253,7 +257,7 @@ TDriverConfig TDriver::GetConfig() const {
Impl_->TcpKeepAliveSettings_.Interval
);
config.SetDrainOnDtors(Impl_->DrainOnDtors_);
config.SetBalancingPolicy(Impl_->BalancingSettings_.Policy, Impl_->BalancingSettings_.PolicyParams);
config.SetBalancingPolicy(std::make_unique<TBalancingPolicy::TImpl>(Impl_->BalancingSettings_));
config.SetGRpcKeepAliveTimeout(Impl_->GRpcKeepAliveTimeout_);
config.SetGRpcKeepAlivePermitWithoutCalls(Impl_->GRpcKeepAlivePermitWithoutCalls_);
config.SetSocketIdleTimeout(Impl_->SocketIdleTimeout_);
Expand Down
Loading
Loading