Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/types/ydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class TBalancingPolicy {
//! location is a name of datacenter (VLA, MAN), if location is nullopt local datacenter is used
static TBalancingPolicy UsePreferableLocation(const std::optional<std::string>& location = {});

//! Use detected local dc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Распиши чуть подробнее, чем этот вариант от предыдущего отличается. И какие предостережения к использованию (если есть)

static TBalancingPolicy UseDetectedLocalDC();

//! Use all available cluster nodes regardless datacenter locality
static TBalancingPolicy UseAllNodes();

Expand Down
1 change: 1 addition & 0 deletions src/client/impl/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(common)
add_subdirectory(db_driver_state)
add_subdirectory(grpc_connections)
add_subdirectory(local_dc_detector)
add_subdirectory(logger)
add_subdirectory(make_request)
add_subdirectory(plain_status)
Expand Down
6 changes: 6 additions & 0 deletions src/client/impl/internal/common/balancing_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferableLocation(const std
return impl;
}

TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UseDetectedLocalDC() {
TBalancingPolicy::TImpl impl;
impl.PolicyType = EPolicyType::UseDetectedLocalDC;
return impl;
}

TBalancingPolicy::TImpl TBalancingPolicy::TImpl::UsePreferablePileState(EPileState pileState) {
TBalancingPolicy::TImpl impl;
impl.PolicyType = EPolicyType::UsePreferablePileState;
Expand Down
3 changes: 3 additions & 0 deletions src/client/impl/internal/common/balancing_policies.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ class TBalancingPolicy::TImpl {
enum class EPolicyType {
UseAllNodes,
UsePreferableLocation,
UseDetectedLocalDC,
UsePreferablePileState
};

static TImpl UseAllNodes();

static TImpl UsePreferableLocation(const std::optional<std::string>& location);

static TImpl UseDetectedLocalDC();

static TImpl UsePreferablePileState(EPileState pileState);

EPolicyType PolicyType;
Expand Down
1 change: 1 addition & 0 deletions src/client/impl/internal/db_driver_state/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_link_libraries(impl-internal-db_driver_state PUBLIC
client-impl-ydb_endpoints
impl-internal-logger
impl-internal-plain_status
impl-internal-local_dc_detector
client-types-credentials
)

Expand Down
6 changes: 6 additions & 0 deletions src/client/impl/internal/db_driver_state/endpoint_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ std::pair<NThreading::TFuture<TEndpointUpdateResult>, bool> TEndpointPool::Updat
TListEndpointsResult result = future.GetValue();
std::vector<std::string> removed;
if (result.DiscoveryStatus.Status == EStatus::SUCCESS) {
if (BalancingPolicy_.PolicyType == TBalancingPolicy::TImpl::EPolicyType::UseDetectedLocalDC) {
LocalDCDetector_.DetectLocalDC(result.Result);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А это ведь прям тяжелая операция. А мы ее синхронно делаем. Это может discovery сильно замедлить, тут замеры не помешали бы. Лучше пинги в фоне делать, как по мне, но такое писать сложнее будет

}

std::vector<TEndpointRecord> records;
// Is used to convert float to integer load factor
// same integer values will be selected randomly.
Expand Down Expand Up @@ -182,6 +186,8 @@ bool TEndpointPool::IsPreferredEndpoint(const Ydb::Discovery::EndpointInfo& endp
return true;
case TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation:
return endpoint.location() == BalancingPolicy_.Location.value_or(selfLocation);
case TBalancingPolicy::TImpl::EPolicyType::UseDetectedLocalDC:
return LocalDCDetector_.IsLocalDC(endpoint.location());
case TBalancingPolicy::TImpl::EPolicyType::UsePreferablePileState:
if (auto it = pileStates.find(endpoint.bridge_pile_name()); it != pileStates.end()) {
return GetPileState(it->second.state()) == BalancingPolicy_.PileState;
Expand Down
3 changes: 3 additions & 0 deletions src/client/impl/internal/db_driver_state/endpoint_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <src/api/protos/ydb_discovery.pb.h>
#include <src/client/impl/internal/common/balancing_policies.h>
#include <src/client/impl/internal/internal_client/client.h>
#include <src/client/impl/internal/local_dc_detector/local_dc_detector.h>
#include <src/client/impl/internal/plain_status/status.h>
#include <src/client/impl/endpoints/endpoints.h>

Expand Down Expand Up @@ -57,7 +58,9 @@ class TEndpointPool {
TEndpointElectorSafe Elector_;
NThreading::TPromise<TEndpointUpdateResult> DiscoveryPromise_;
std::atomic_uint64_t LastUpdateTime_;

const TBalancingPolicy::TImpl BalancingPolicy_;
TLocalDCDetector LocalDCDetector_;

NSdkStats::TStatCollector* StatCollector_ = nullptr;

Expand Down
13 changes: 13 additions & 0 deletions src/client/impl/internal/local_dc_detector/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
_ydb_sdk_add_library(impl-internal-local_dc_detector)

target_link_libraries(impl-internal-local_dc_detector PUBLIC
yutil
api-protos
)

target_sources(impl-internal-local_dc_detector PRIVATE
local_dc_detector.cpp
pinger.cpp
)

_ydb_sdk_install_targets(TARGETS impl-internal-local_dc_detector)
75 changes: 75 additions & 0 deletions src/client/impl/internal/local_dc_detector/local_dc_detector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#define INCLUDE_YDB_INTERNAL_H
#include "local_dc_detector.h"

namespace NYdb::inline V3 {

TLocalDCDetector::TLocalDCDetector(TPinger pingEndpoint)
: PingEndpoint_(std::move(pingEndpoint))
{}

void TLocalDCDetector::DetectLocalDC(const Ydb::Discovery::ListEndpointsResult& endpointsList) {
auto endpointsByLocation = GroupEndpointsByLocation(endpointsList);
SampleEndpoints(endpointsByLocation);

if (endpointsByLocation.size() > 1) {
Location_ = FindNearestLocation(endpointsByLocation);
} else {
Location_.clear();
}
}

bool TLocalDCDetector::IsLocalDC(const std::string& location) const {
return Location_.empty() || Location_ == location;
}

TLocalDCDetector::TEndpointsByLocation TLocalDCDetector::GroupEndpointsByLocation(const Ydb::Discovery::ListEndpointsResult& endpointsList) const {
TEndpointsByLocation endpointsByLocation;
for (const auto& endpoint : endpointsList.endpoints()) {
endpointsByLocation[endpoint.location()].emplace_back(endpoint);
}
return endpointsByLocation;
}

void TLocalDCDetector::SampleEndpoints(TEndpointsByLocation& endpointsByLocation) const {
std::mt19937 gen(std::random_device{}());
for (auto& [location, endpoints] : endpointsByLocation) {
if (endpoints.size() > MAX_ENDPOINTS_PER_LOCATION) {
std::vector<TEndpointRef> sample;
sample.reserve(MAX_ENDPOINTS_PER_LOCATION);
std::sample(endpoints.begin(), endpoints.end(), std::back_inserter(sample), MAX_ENDPOINTS_PER_LOCATION, gen);
endpoints.swap(sample);
}
}
}

std::uint64_t TLocalDCDetector::MeasureLocationRtt(const std::vector<TEndpointRef>& endpoints) const {
if (endpoints.empty()) {
return std::numeric_limits<std::uint64_t>::max();
}

std::vector<std::uint64_t> timings;
timings.reserve(PING_COUNT);
for (size_t i = 0; i < PING_COUNT; ++i) {
const auto& ep = endpoints[i % endpoints.size()].get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут кажется проще каждый эндпоинт по PING_COUNT раз пинговать, читаемее будет

timings.push_back(PingEndpoint_(ep).MicroSeconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А это же синхронные пинги из одного потока? На первый взгляд так нельзя делать:
Во-первых, 2*3 - это 6 пингов один за другим.
Во-вторых, 3 ендпоинта в ДЦ может быть слишком мало.
Мы можем параллельно пинги рассылать? Странно, если нет. А если да, то видимо надо пинговать асинхронно и бОльшее количество нод (может и по 1 разу) для точности

}
std::sort(timings.begin(), timings.end());

return std::midpoint(timings[(PING_COUNT - 1) / 2], timings[PING_COUNT / 2]);
}


std::string TLocalDCDetector::FindNearestLocation(const TEndpointsByLocation& endpointsByLocation) {
auto minRtt = std::numeric_limits<std::uint64_t>::max();
std::string nearestLocation;
for (const auto& [location, endpoints] : endpointsByLocation) {
auto rtt = MeasureLocationRtt(endpoints);
if (rtt < minRtt) {
minRtt = rtt;
nearestLocation = location;
}
}
return nearestLocation;
}

} // namespace NYdb
42 changes: 42 additions & 0 deletions src/client/impl/internal/local_dc_detector/local_dc_detector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <src/api/protos/ydb_discovery.pb.h>
#include <src/client/impl/internal/internal_header.h>
#include <src/client/impl/internal/local_dc_detector/pinger.h>

#include <algorithm>
#include <map>
#include <numeric>
#include <random>
#include <string>
#include <vector>

namespace NYdb::inline V3 {

class TLocalDCDetector {
public:
using TPinger = std::function<TDuration(const Ydb::Discovery::EndpointInfo& endpoint)>;
explicit TLocalDCDetector(TPinger pingEndpoint = PingEndpoint);

void DetectLocalDC(const Ydb::Discovery::ListEndpointsResult& endpoints);
bool IsLocalDC(const std::string& location) const;

private:
using TEndpoint = Ydb::Discovery::EndpointInfo;
using TEndpointRef = std::reference_wrapper<const TEndpoint>;
using TEndpointsByLocation = std::unordered_map<std::string, std::vector<TEndpointRef>>;

TEndpointsByLocation GroupEndpointsByLocation(const Ydb::Discovery::ListEndpointsResult& endpointsList) const;
void SampleEndpoints(TEndpointsByLocation& endpointsByLocation) const;
std::uint64_t MeasureLocationRtt(const std::vector<TEndpointRef>& endpoints) const;
std::string FindNearestLocation(const TEndpointsByLocation& endpointsByLocation);

private:
static constexpr std::size_t MAX_ENDPOINTS_PER_LOCATION = 3;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

С одной стороны, маловато нод, но тогда сильно тяжелее будет операция. Может действительно надо асинхронно

static constexpr std::size_t PING_COUNT = 2 * MAX_ENDPOINTS_PER_LOCATION;

TPinger PingEndpoint_;
std::string Location_;
};

} // namespace NYdb
17 changes: 17 additions & 0 deletions src/client/impl/internal/local_dc_detector/pinger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#define INCLUDE_YDB_INTERNAL_H
#include "pinger.h"

namespace NYdb::inline V3 {

TDuration PingEndpoint(const Ydb::Discovery::EndpointInfo& endpoint) {
try {
TNetworkAddress addr(endpoint.address().data(), static_cast<ui16>(endpoint.port()));
auto start = TInstant::Now();
TSocket sock(addr, TDuration::Seconds(PING_TIMEOUT_SECONDS));
return TInstant::Now() - start;
} catch (...) {
return TDuration::Max();
}
}

} // namespace NYdb
15 changes: 15 additions & 0 deletions src/client/impl/internal/local_dc_detector/pinger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <src/client/impl/internal/internal_header.h>
#include <src/api/protos/ydb_discovery.pb.h>

#include <util/network/sock.h>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это инклюд можно в .cpp унести

#include <util/datetime/base.h>

namespace NYdb::inline V3 {

static constexpr std::uint32_t PING_TIMEOUT_SECONDS = 5;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут лучше временной тип использовать, а не в сырую хранить


TDuration PingEndpoint(const Ydb::Discovery::EndpointInfo& endpoint);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше тут std::chrono::duration сделать, мы от TDuration и TInstant потихоньку избавляемся в SDK


} // namespace NYdb
6 changes: 4 additions & 2 deletions src/client/table/impl/table_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ void TTableClient::TImpl::StartPeriodicHostScanTask() {
const auto balancingPolicy = strongClient->DbDriverState_->GetBalancingPolicyType();

// Try to find any host at foreign locations if prefer local dc
const ui64 foreignHost = (balancingPolicy == TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation) ?
ScanForeignLocations(strongClient) : 0;
const ui64 foreignHost =
balancingPolicy == TBalancingPolicy::TImpl::EPolicyType::UsePreferableLocation ||
balancingPolicy == TBalancingPolicy::TImpl::EPolicyType::UseDetectedLocalDC ?
ScanForeignLocations(strongClient) : 0;

std::unordered_map<ui64, size_t> hostMap;

Expand Down
4 changes: 4 additions & 0 deletions src/client/types/ydb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ TBalancingPolicy TBalancingPolicy::UsePreferableLocation(const std::optional<std
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UsePreferableLocation(location)));
}

TBalancingPolicy TBalancingPolicy::UseDetectedLocalDC() {
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UseDetectedLocalDC()));
}

TBalancingPolicy TBalancingPolicy::UseAllNodes() {
return TBalancingPolicy(std::make_unique<TImpl>(TImpl::UseAllNodes()));
}
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ add_ydb_test(NAME client-impl-ydb_endpoints_ut
unit
)

add_ydb_test(NAME client-impl-internal-local_dc_detector_ut
INCLUDE_DIRS
${YDB_SDK_SOURCE_DIR}/src/client/impl/internal/local_dc_detector
SOURCES
local_dc_detector/local_dc_detector_ut.cpp
LINK_LIBRARIES
yutil
api-protos
impl-internal-local_dc_detector
LABELS
unit
)

add_ydb_test(NAME client-oauth2_ut
SOURCES
oauth2_token_exchange/credentials_ut.cpp
Expand Down
Loading
Loading