Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
27
28
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
78aa6fa75eba124b91b200bfe76c67e94ee968a4
42606baa568dd1f0408b0651dce31577f28bcfe1
6 changes: 4 additions & 2 deletions include/ydb-cpp-sdk/client/types/ydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "status_codes.h"

#include <memory>
#include <optional>
#include <string>


Expand Down Expand Up @@ -50,8 +51,8 @@ class TBalancingPolicy {
friend class TDriver;
public:
//! Use preferable location,
//! location is a name of datacenter (VLA, MAN), if location is empty local datacenter is used
static TBalancingPolicy UsePreferableLocation(const std::string& location = {});
//! location is a name of datacenter (VLA, MAN), if location is nullopt local datacenter is used
static TBalancingPolicy UsePreferableLocation(const std::optional<std::string>& location = {});

//! Use all available cluster nodes regardless datacenter locality
static TBalancingPolicy UseAllNodes();
Expand All @@ -71,6 +72,7 @@ class TBalancingPolicy {
private:
TBalancingPolicy(std::unique_ptr<TImpl>&& impl);

// For deprecated EBalancingPolicy
TBalancingPolicy(EBalancingPolicy policy, const std::string& params);

std::unique_ptr<TImpl> Impl_;
Expand Down
2 changes: 1 addition & 1 deletion src/client/driver/driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
TCP_KEEPALIVE_INTERVAL
};
bool DrainOnDtors = true;
TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl::UsePreferableLocation("");
TBalancingPolicy::TImpl BalancingSettings = TBalancingPolicy::TImpl::UsePreferableLocation(std::nullopt);
TDuration GRpcKeepAliveTimeout = TDuration::Seconds(10);
bool GRpcKeepAlivePermitWithoutCalls = true;
TDuration SocketIdleTimeout = TDuration::Minutes(6);
Expand Down
3 changes: 2 additions & 1 deletion src/client/export/export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const std::string TExportToS3Settings::TEncryptionAlgorithm::CHACHA_20_POLY_1305
namespace {

std::vector<TExportItemProgress> ItemsProgressFromProto(const google::protobuf::RepeatedPtrField<ExportItemProgress>& proto) {
std::vector<TExportItemProgress> result(proto.size());
std::vector<TExportItemProgress> result;
result.reserve(proto.size());

for (const auto& protoItem : proto) {
auto& item = result.emplace_back();
Expand Down
2 changes: 1 addition & 1 deletion src/client/impl/internal/common/balancing_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UseAllNodes() {
return impl;
}

TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferableLocation(const std::string& location) {
TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferableLocation(const std::optional<std::string>& location) {
TBalancingPolicy::TImpl impl;
impl.PolicyType = EPolicyType::UsePreferableLocation;
impl.Location = location;
Expand Down
4 changes: 2 additions & 2 deletions src/client/impl/internal/common/balancing_policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ class TBalancingPolicy::TImpl {

static TImpl UseAllNodes();

static TImpl UsePreferableLocation(const std::string& location);
static TImpl UsePreferableLocation(const std::optional<std::string>& location);

static TImpl UsePreferablePileState(EPileState pileState);

EPolicyType PolicyType;

// UsePreferableLocation
std::string Location;
std::optional<std::string> Location;

// UsePreferablePileState
EPileState PileState;
Expand Down
10 changes: 6 additions & 4 deletions src/client/impl/internal/db_driver_state/endpoint_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ std::pair<NThreading::TFuture<TEndpointUpdateResult>, bool> TEndpointPool::Updat
// Is used to convert float to integer load factor
// same integer values will be selected randomly.
const float multiplicator = 10.0;
std::string selfLocation = result.Result.self_location();
std::unordered_map<std::string, Ydb::Bridge::PileState> pileStates;
for (const auto& pile : result.Result.pile_states()) {
pileStates[pile.pile_name()] = pile;
Expand All @@ -53,7 +54,7 @@ std::pair<NThreading::TFuture<TEndpointUpdateResult>, bool> TEndpointPool::Updat
for (const auto& endpoint : result.Result.endpoints()) {
std::int32_t loadFactor = static_cast<std::int32_t>(multiplicator * std::min(LoadMax, std::max(LoadMin, endpoint.load_factor())));
std::uint64_t nodeId = endpoint.node_id();
if (!IsLocalEndpoint(endpoint, pileStates)) {
if (!IsPreferredEndpoint(endpoint, selfLocation, pileStates)) {
// Location mismatch, shift this endpoint
loadFactor += GetLocalityShift();
}
Expand Down Expand Up @@ -173,13 +174,14 @@ constexpr std::int32_t TEndpointPool::GetLocalityShift() {
return LoadMax * Multiplicator;
}

bool TEndpointPool::IsLocalEndpoint(const Ydb::Discovery::EndpointInfo& endpoint,
const std::unordered_map<std::string, Ydb::Bridge::PileState>& pileStates) const {
bool TEndpointPool::IsPreferredEndpoint(const Ydb::Discovery::EndpointInfo& endpoint,
const std::string& selfLocation,
const std::unordered_map<std::string, Ydb::Bridge::PileState>& pileStates) const {
switch (BalancingPolicy_.PolicyType) {
case TBalancingPolicy::TImpl::EPolicyType::UseAllNodes:
return true;
case TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation:
return endpoint.location() == BalancingPolicy_.Location;
return endpoint.location() == BalancingPolicy_.Location.value_or(selfLocation);
case TBalancingPolicy::TImpl::EPolicyType::UsePreferablePileState:
if (auto it = pileStates.find(endpoint.bridge_pile_name()); it != pileStates.end()) {
return GetPileState(it->second.state()) == BalancingPolicy_.PileState;
Expand Down
5 changes: 3 additions & 2 deletions src/client/impl/internal/db_driver_state/endpoint_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class TEndpointPool {
static constexpr std::int32_t GetLocalityShift();

private:
bool IsLocalEndpoint(const Ydb::Discovery::EndpointInfo& endpoint,
const std::unordered_map<std::string, Ydb::Bridge::PileState>& pileStates) const;
bool IsPreferredEndpoint(const Ydb::Discovery::EndpointInfo& endpoint,
const std::string& selfLocation,
const std::unordered_map<std::string, Ydb::Bridge::PileState>& pileStates) const;
EPileState GetPileState(const Ydb::Bridge::PileState::State& state) const;

private:
Expand Down
3 changes: 2 additions & 1 deletion src/client/impl/session/kqp_session_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(std::
switch (sessionImpl->GetState()) {
case TKqpSessionCommon::S_STANDALONE:
case TKqpSessionCommon::S_BROKEN:
case TKqpSessionCommon::S_CLOSING:
case TKqpSessionCommon::S_CLOSING: {
client->DeleteSession(sessionImpl);
break;
}
case TKqpSessionCommon::S_IDLE:
case TKqpSessionCommon::S_ACTIVE: {
if (!client->ReturnSession(sessionImpl)) {
Expand Down
4 changes: 2 additions & 2 deletions src/client/types/ydb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ namespace NYdb::inline V3 {
TBalancingPolicy::TBalancingPolicy(EBalancingPolicy policy, const std::string& params) {
switch (policy) {
case EBalancingPolicy::UsePreferableLocation:
Impl_ = std::make_unique<TImpl>(TImpl::UsePreferableLocation(params));
Impl_ = std::make_unique<TImpl>(TImpl::UsePreferableLocation(params.empty() ? std::nullopt : std::make_optional(params)));
break;
case EBalancingPolicy::UseAllNodes:
Impl_ = std::make_unique<TImpl>(TImpl::UseAllNodes());
break;
}
}

TBalancingPolicy TBalancingPolicy::UsePreferableLocation(const std::string& location) {
TBalancingPolicy TBalancingPolicy::UsePreferableLocation(const std::optional<std::string>& location) {
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UsePreferableLocation(location)));
}

Expand Down
Loading