Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ function(_ydb_sdk_add_library Tgt)
)
target_compile_definitions(${Tgt} ${includeMode}
YDB_SDK_USE_STD_STRING
YDB_TOPIC_DISABLE_COUNTERS
)
endfunction()

Expand Down
3 changes: 2 additions & 1 deletion include/ydb-cpp-sdk/client/extension_common/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ void TDriver::AddExtension(typename TExtension::TParams params) {
typename TExtension::IApi* api = TExtension::IApi::Create(*this);
auto extension = new TExtension(params, api);
extension->SelfRegister(*this);
if (api)
if (api) {
api->SelfRegister(*this);
}
}

} // namespace NYdb
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@ class TSolomonStatPullExtension: public NYdb::IExtension {
friend class TSolomonStatPullExtension;

public:
TParams(const std::string& host
, ui16 port
, const std::string& project
, const std::string& service
, const std::string& cluster
, const std::vector<std::pair<std::string, std::string>>& labels = {});
TParams(const std::string& host,
std::uint16_t port,
const std::string& project,
const std::string& service,
const std::string& cluster,
const std::vector<std::pair<std::string, std::string>>& labels = {});

NMonitoring::TLabels GetLabels() const;

private:
const std::string Host_;
ui16 Port_;
std::uint16_t Port_;
NMonitoring::TLabels Labels_;
};

TSolomonStatPullExtension(const TParams& params, IApi* api);
~TSolomonStatPullExtension();

private:
class TSolomonStatPage: public NMonitoring::IMonPage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,30 @@ class TMetricRegistryConnector: public NYdb::IExtension {
};

inline void AddMetricRegistry(NYdb::TDriver& driver, NMonitoring::IMetricRegistry* ptr) {
if (!ptr)
if (!ptr) {
return;
}

using TConnector = TMetricRegistryConnector<NMonitoring::IMetricRegistry*>;

driver.AddExtension<TConnector>(TConnector::TParams(ptr));
}

inline void AddMetricRegistry(NYdb::TDriver& driver, std::shared_ptr<NMonitoring::IMetricRegistry> ptr) {
if (!ptr)
if (!ptr) {
return;
}

using TConnector = TMetricRegistryConnector<std::shared_ptr<NMonitoring::IMetricRegistry>>;

driver.AddExtension<TConnector>(TConnector::TParams(ptr));
}

inline void AddMetricRegistry(NYdb::TDriver& driver, TAtomicSharedPtr<NMonitoring::IMetricRegistry> ptr) {
if (!ptr)
if (!ptr) {
return;
}

using TConnector = TMetricRegistryConnector<TAtomicSharedPtr<NMonitoring::IMetricRegistry>>;

driver.AddExtension<TConnector>(TConnector::TParams(ptr));
Expand Down
2 changes: 2 additions & 0 deletions include/ydb-cpp-sdk/client/federated_topic/federated_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,10 @@ class IFederatedReadSession {
//! TSessionClosedEvent arrives.
virtual bool Close(TDuration timeout = TDuration::Max()) = 0;

#ifndef YDB_TOPIC_DISABLE_COUNTERS
//! Reader counters with different stats (see TReaderConuters).
virtual NTopic::TReaderCounters::TPtr GetCounters() const = 0;
#endif

//! Get unique identifier of read session.
virtual std::string GetSessionId() const = 0;
Expand Down
9 changes: 8 additions & 1 deletion include/ydb-cpp-sdk/client/topic/read_session.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once

#include "counters.h"
#include "executor.h"
#include "read_events.h"
#include "retry_policy.h"

#ifndef YDB_TOPIC_DISABLE_COUNTERS
#include "counters.h"
#endif

#include <ydb-cpp-sdk/client/common_client/settings.h>
#include <ydb-cpp-sdk/client/types/tx/tx.h>
#include <ydb-cpp-sdk/client/types/fluent_settings_helpers.h>
Expand Down Expand Up @@ -183,10 +186,12 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
//! If not set, default executor will be used.
FLUENT_SETTING(IExecutor::TPtr, DecompressionExecutor);

#ifndef YDB_TOPIC_DISABLE_COUNTERS
//! Counters.
//! If counters are not provided explicitly,
//! they will be created inside session (without link with parent counters).
FLUENT_SETTING(TReaderCounters::TPtr, Counters);
#endif

FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));

Expand Down Expand Up @@ -244,8 +249,10 @@ class IReadSession {
//! TSessionClosedEvent arrives.
virtual bool Close(TDuration timeout = TDuration::Max()) = 0;

#ifndef YDB_TOPIC_DISABLE_COUNTERS
//! Reader counters with different stats (see TReaderConuters).
virtual TReaderCounters::TPtr GetCounters() const = 0;
#endif

//! Get unique identifier of read session.
virtual std::string GetSessionId() const = 0;
Expand Down
11 changes: 10 additions & 1 deletion include/ydb-cpp-sdk/client/topic/write_session.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once

#include "codecs.h"
#include "counters.h"
#include "executor.h"
#include "retry_policy.h"
#include "write_events.h"

#ifndef YDB_TOPIC_DISABLE_COUNTERS
#include "counters.h"
#endif

#include <ydb-cpp-sdk/client/types/tx/tx.h>
#include <ydb-cpp-sdk/client/types/fluent_settings_helpers.h>
#include <ydb-cpp-sdk/client/types/request_settings.h>
Expand Down Expand Up @@ -91,7 +94,9 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {

FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));

#ifndef YDB_TOPIC_DISABLE_COUNTERS
FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters);
#endif

//! Executor for compression tasks.
//! If not set, default executor will be used.
Expand Down Expand Up @@ -222,7 +227,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase {
//! Returns true if write session is alive and acitve. False if session was closed.
virtual bool IsAlive() const = 0;

#ifndef YDB_TOPIC_DISABLE_COUNTERS
virtual TWriterCounters::TPtr GetCounters() = 0;
#endif

//! Close immediately and destroy, don't wait for anything.
virtual ~ISimpleBlockingWriteSession() = default;
Expand Down Expand Up @@ -269,8 +276,10 @@ class IWriteSession {
//! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted.
virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;

#ifndef YDB_TOPIC_DISABLE_COUNTERS
//! Writer counters with different stats (see TWriterConuters).
virtual TWriterCounters::TPtr GetCounters() = 0;
#endif

//! Close() with timeout = 0 and destroy everything instantly.
virtual ~IWriteSession() = default;
Expand Down
1 change: 1 addition & 0 deletions src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ add_subdirectory(iam)
add_subdirectory(iam_private)
add_subdirectory(impl)
add_subdirectory(import)
add_subdirectory(metrics_providers)
add_subdirectory(monitoring)
add_subdirectory(operation)
add_subdirectory(params)
Expand Down
1 change: 1 addition & 0 deletions src/client/extension_common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ _ydb_sdk_add_library(client-extension_common)
target_link_libraries(client-extension_common PUBLIC
yutil
monlib-metrics
client-metrics_providers-monlib
client-ydb_driver
)

Expand Down
32 changes: 31 additions & 1 deletion src/client/extension_common/extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

#define INCLUDE_YDB_INTERNAL_H
#include <src/client/impl/internal/grpc_connections/grpc_connections.h>
#include <src/client/impl/internal/stats_extractor/extractor.h>
#undef INCLUDE_YDB_INTERNAL_H

#include <src/client/metrics_providers/monlib/provider.h>


namespace NYdb::inline V3 {

void IExtension::SelfRegister(TDriver driver) {
Expand All @@ -15,6 +17,34 @@ void IExtensionApi::SelfRegister(TDriver driver) {
CreateInternalInterface(driver)->RegisterExtensionApi(this);
}

class TStatsExtractor: public NSdkStats::IStatApi {
public:
TStatsExtractor(std::shared_ptr<IInternalClient> client)
: Client_(client)
{}

virtual void SetMetricRegistry(::NMonitoring::IMetricRegistry* sensorsRegistry) override {
auto strong = Client_.lock();
if (strong) {
strong->StartStatCollecting(NMetrics::CreateMonlibMetricsProvider(sensorsRegistry));
}
}

void Accept(NMonitoring::IMetricConsumer* consumer) const override {
auto strong = Client_.lock();
if (strong) {
auto sensorsRegistry = strong->GetMetricRegistry();
auto monlibRegistry = NMetrics::TryGetUnderlyingMetricsRegistry(sensorsRegistry);
Y_ABORT_UNLESS(monlibRegistry, "IMetricRegistry is not a TMonlibMetricsProvider in Stats Extractor");
monlibRegistry->Accept(TInstant::Zero(), consumer);
} else {
throw NSdkStats::DestroyedClientException();
}
}
private:
std::weak_ptr<IInternalClient> Client_;
};

namespace NSdkStats {

IStatApi* IStatApi::Create(TDriver driver) {
Expand Down
39 changes: 20 additions & 19 deletions src/client/extensions/solomon_stats/pull_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@

namespace NSolomonStatExtension::inline V3 {

TSolomonStatPullExtension::TParams::TParams(const std::string& host
, ui16 port
, const std::string& project
, const std::string& service
, const std::string& cluster
, const std::vector<std::pair<std::string, std::string>>& labels)
: Host_(host), Port_(port), Labels_()
TSolomonStatPullExtension::TParams::TParams(const std::string& host,
std::uint16_t port,
const std::string& project,
const std::string& service,
const std::string& cluster,
const std::vector<std::pair<std::string, std::string>>& labels)
: Host_(host)
, Port_(port)
, Labels_()
{
Labels_.Add("project", project);
Labels_.Add("service", service);
Labels_.Add("cluster", cluster);
for (const auto& label: labels) {
Labels_.Add(label.first, label.second);
Labels_.Add(label.first, label.second);
}
}

NMonitoring::TLabels TSolomonStatPullExtension::TParams::GetLabels() const {
return Labels_;
}


TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const std::string& title, const std::string& path, IApi* api)
: NMonitoring::IMonPage(TString(title), TString(path)), Api_(api)
{ }
: NMonitoring::IMonPage(TString(title), TString(path))
, Api_(api)
{}

void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) {
request.Output() << NMonitoring::HTTPOKJSON;
Expand All @@ -35,13 +37,12 @@ void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRe

TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api)
: MetricRegistry_(new NMonitoring::TMetricRegistry(params.GetLabels()))
, MonService_(params.Port_, TString(params.Host_), 0), Page_( new TSolomonStatPage("stats", "Statistics", api) ) {
api->SetMetricRegistry(MetricRegistry_.get());
MonService_.Register(Page_);
MonService_.StartOrThrow();
}

TSolomonStatPullExtension::~TSolomonStatPullExtension()
{ }
, MonService_(params.Port_, TString(params.Host_), 0)
, Page_(new TSolomonStatPage("stats", "Statistics", api))
{
api->SetMetricRegistry(MetricRegistry_.get());
MonService_.Register(Page_);
MonService_.StartOrThrow();
}

} // NSolomonStatExtension
4 changes: 4 additions & 0 deletions src/client/federated_topic/impl/federated_read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ class TFederatedReadSessionImpl : public NTopic::TEnableSelfContext<TFederatedRe
return SessionId;
}

#ifndef YDB_TOPIC_DISABLE_COUNTERS
inline NTopic::TReaderCounters::TPtr GetCounters() const {
return Settings.Counters_; // Always not nullptr.
}
#endif

private:
TStringBuilder GetLogPrefix() const;
Expand Down Expand Up @@ -236,9 +238,11 @@ class TFederatedReadSession : public IFederatedReadSession,
return TryGetImpl()->GetSessionId();
}

#ifndef YDB_TOPIC_DISABLE_COUNTERS
inline NTopic::TReaderCounters::TPtr GetCounters() const override {
return TryGetImpl()->GetCounters();
}
#endif

private:
void Start() {
Expand Down
3 changes: 2 additions & 1 deletion src/client/federated_topic/impl/federated_write_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
bool Close(TDuration timeout) override {
return TryGetImpl()->Close(timeout);
}

#ifndef YDB_TOPIC_DISABLE_COUNTERS
inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
#endif

private:

Expand Down
5 changes: 2 additions & 3 deletions src/client/impl/endpoints/endpoints.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "endpoints.h"

#include <library/cpp/monlib/metrics/metric_registry.h>
#include <library/cpp/string_utils/quote/quote.h>

#include <util/random/random.h>
Expand Down Expand Up @@ -80,7 +79,7 @@ std::vector<std::string> TEndpointElectorSafe::SetNewState(std::vector<TEndpoint
}
}

Sort(uniqRec.begin(), uniqRec.end());
std::sort(uniqRec.begin(), uniqRec.end());

auto bestK = GetBestK(uniqRec);

Expand Down Expand Up @@ -176,7 +175,7 @@ void TEndpointElectorSafe::PessimizeEndpoint(const std::string& endpoint) {
}
}
}
Sort(Records_.begin(), Records_.end());
std::sort(Records_.begin(), Records_.end());
BestK_ = GetBestK(Records_);
}

Expand Down
10 changes: 6 additions & 4 deletions src/client/impl/endpoints/endpoints.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#pragma once

#include <src/client/impl/stats/stats.h>

#include <atomic>
#include <shared_mutex>
#include <unordered_map>
#include <vector>
#include <string>
#include <src/client/impl/stats/stats.h>

namespace NYdb::inline V3 {

Expand Down Expand Up @@ -124,9 +125,10 @@ class TEndpointElectorSafe {
std::unordered_map<ui64, TKnownEndpoint> KnownEndpointsByNodeId_;
std::int32_t BestK_ = -1;
std::atomic_int PessimizationRatio_ = 0;
NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> EndpointCountGauge_;
NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> PessimizationRatioGauge_;
NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> EndpointActiveGauge_;

NSdkStats::TAtomicCounter<NMetrics::IIntGauge> EndpointCountGauge_;
NSdkStats::TAtomicCounter<NMetrics::IIntGauge> PessimizationRatioGauge_;
NSdkStats::TAtomicCounter<NMetrics::IIntGauge> EndpointActiveGauge_;
};

// Used to track object
Expand Down
Loading
Loading