Skip to content

Commit 1e5b359

Browse files
committed
Disable topic metrics
1 parent 88f3b91 commit 1e5b359

27 files changed

+188
-24
lines changed

cmake/common.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ function(_ydb_sdk_add_library Tgt)
200200
)
201201
target_compile_definitions(${Tgt} ${includeMode}
202202
YDB_SDK_USE_STD_STRING
203+
YDB_TOPIC_DISABLE_COUNTERS
203204
)
204205
endfunction()
205206

include/ydb-cpp-sdk/client/federated_topic/federated_topic.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,10 @@ class IFederatedReadSession {
479479
//! TSessionClosedEvent arrives.
480480
virtual bool Close(TDuration timeout = TDuration::Max()) = 0;
481481

482+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
482483
//! Reader counters with different stats (see TReaderConuters).
483484
virtual NTopic::TReaderCounters::TPtr GetCounters() const = 0;
485+
#endif
484486

485487
//! Get unique identifier of read session.
486488
virtual std::string GetSessionId() const = 0;

include/ydb-cpp-sdk/client/topic/read_session.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
#pragma once
22

3-
#include "counters.h"
43
#include "executor.h"
54
#include "read_events.h"
65
#include "retry_policy.h"
76

7+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
8+
#include "counters.h"
9+
#endif
10+
811
#include <ydb-cpp-sdk/client/common_client/settings.h>
912
#include <ydb-cpp-sdk/client/types/tx/tx.h>
1013
#include <ydb-cpp-sdk/client/types/fluent_settings_helpers.h>
@@ -183,10 +186,12 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {
183186
//! If not set, default executor will be used.
184187
FLUENT_SETTING(IExecutor::TPtr, DecompressionExecutor);
185188

189+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
186190
//! Counters.
187191
//! If counters are not provided explicitly,
188192
//! they will be created inside session (without link with parent counters).
189193
FLUENT_SETTING(TReaderCounters::TPtr, Counters);
194+
#endif
190195

191196
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
192197

@@ -244,8 +249,10 @@ class IReadSession {
244249
//! TSessionClosedEvent arrives.
245250
virtual bool Close(TDuration timeout = TDuration::Max()) = 0;
246251

252+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
247253
//! Reader counters with different stats (see TReaderConuters).
248254
virtual TReaderCounters::TPtr GetCounters() const = 0;
255+
#endif
249256

250257
//! Get unique identifier of read session.
251258
virtual std::string GetSessionId() const = 0;

include/ydb-cpp-sdk/client/topic/write_session.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
#pragma once
22

33
#include "codecs.h"
4-
#include "counters.h"
54
#include "executor.h"
65
#include "retry_policy.h"
76
#include "write_events.h"
87

8+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
9+
#include "counters.h"
10+
#endif
11+
912
#include <ydb-cpp-sdk/client/types/tx/tx.h>
1013
#include <ydb-cpp-sdk/client/types/fluent_settings_helpers.h>
1114
#include <ydb-cpp-sdk/client/types/request_settings.h>
@@ -91,7 +94,9 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
9194

9295
FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30));
9396

97+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
9498
FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters);
99+
#endif
95100

96101
//! Executor for compression tasks.
97102
//! If not set, default executor will be used.
@@ -222,7 +227,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase {
222227
//! Returns true if write session is alive and acitve. False if session was closed.
223228
virtual bool IsAlive() const = 0;
224229

230+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
225231
virtual TWriterCounters::TPtr GetCounters() = 0;
232+
#endif
226233

227234
//! Close immediately and destroy, don't wait for anything.
228235
virtual ~ISimpleBlockingWriteSession() = default;
@@ -269,8 +276,10 @@ class IWriteSession {
269276
//! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted.
270277
virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;
271278

279+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
272280
//! Writer counters with different stats (see TWriterConuters).
273281
virtual TWriterCounters::TPtr GetCounters() = 0;
282+
#endif
274283

275284
//! Close() with timeout = 0 and destroy everything instantly.
276285
virtual ~IWriteSession() = default;

src/client/federated_topic/impl/federated_read_session.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,11 @@ class TFederatedReadSessionImpl : public NTopic::TEnableSelfContext<TFederatedRe
151151
return SessionId;
152152
}
153153

154+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
154155
inline NTopic::TReaderCounters::TPtr GetCounters() const {
155156
return Settings.Counters_; // Always not nullptr.
156157
}
158+
#endif
157159

158160
private:
159161
TStringBuilder GetLogPrefix() const;
@@ -236,9 +238,11 @@ class TFederatedReadSession : public IFederatedReadSession,
236238
return TryGetImpl()->GetSessionId();
237239
}
238240

241+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
239242
inline NTopic::TReaderCounters::TPtr GetCounters() const override {
240243
return TryGetImpl()->GetCounters();
241244
}
245+
#endif
242246

243247
private:
244248
void Start() {

src/client/federated_topic/impl/federated_write_session.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,9 @@ class TFederatedWriteSession : public NTopic::IWriteSession,
196196
bool Close(TDuration timeout) override {
197197
return TryGetImpl()->Close(timeout);
198198
}
199-
199+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
200200
inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
201+
#endif
201202

202203
private:
203204

src/client/persqueue_public/impl/aliases.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ using NTopic::Cancel;
3232
using NTopic::IsErrorMessage;
3333
using NTopic::MakeErrorFromProto;
3434

35+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
3536
// counters_logger
3637
using TCountersLogger = NTopic::TCountersLogger<true>;
38+
#endif
3739

3840
// read_session_impl
3941
using NTopic::HasNullCounters;

src/client/persqueue_public/impl/read_session.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ TReadSession::TReadSession(const TReadSessionSettings& settings,
4545
Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy();
4646
}
4747

48+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
4849
MakeCountersIfNeeded();
50+
#endif
4951
}
5052

5153
TReadSession::~TReadSession() {
@@ -64,9 +66,11 @@ TReadSession::~TReadSession() {
6466
for (const auto& ctx : CbContexts) {
6567
ctx->Cancel();
6668
}
69+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
6770
if (DumpCountersContext) {
6871
DumpCountersContext->Cancel();
6972
}
73+
#endif
7074
}
7175

7276
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClusterDiscoveryRequest() const {
@@ -174,7 +178,9 @@ void TReadSession::ProceedWithoutClusterDiscovery() {
174178
clusterSessionInfo.Topics = Settings.Topics_;
175179
CreateClusterSessionsImpl(deferred);
176180
}
181+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
177182
SetupCountersLogger();
183+
#endif
178184
}
179185

180186
void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) {
@@ -226,7 +232,9 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
226232
}
227233

228234
if (!status.IsSuccess()) {
235+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
229236
++*Settings.Counters_->Errors;
237+
#endif
230238
if (!ClusterDiscoveryRetryState) {
231239
ClusterDiscoveryRetryState = Settings.RetryPolicy_->CreateRetryState();
232240
}
@@ -249,7 +257,9 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
249257

250258
// Init ClusterSessions.
251259
if (static_cast<size_t>(result.read_sessions_clusters_size()) != Settings.Topics_.size()) {
260+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
252261
++*Settings.Counters_->Errors;
262+
#endif
253263
AbortImpl(EStatus::INTERNAL_ERROR, TStringBuilder() << "Unexpected reply from cluster discovery. Sizes of topics arrays don't match: "
254264
<< result.read_sessions_clusters_size() << " vs " << Settings.Topics_.size(), deferred);
255265
return;
@@ -309,14 +319,18 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
309319
}
310320

311321
if (issues) {
322+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
312323
++*Settings.Counters_->Errors;
324+
#endif
313325
AbortImpl(errorStatus, std::move(issues), deferred);
314326
return;
315327
}
316328

317329
CreateClusterSessionsImpl(deferred);
318330
}
331+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
319332
SetupCountersLogger();
333+
#endif
320334
}
321335

322336
void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred) {
@@ -345,14 +359,13 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
345359

346360
bool TReadSession::Close(TDuration timeout) {
347361
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
348-
349-
362+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
350363
// the program may not have reached SetupCountersLogger
351364
if (CountersLogger) {
352365
// Log final counters.
353366
CountersLogger->Stop();
354367
}
355-
368+
#endif
356369
std::vector<TSingleClusterReadSessionImpl::TPtr> sessions;
357370
NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>();
358371
std::shared_ptr<std::atomic<size_t>> count = std::make_shared<std::atomic<size_t>>(0);
@@ -412,7 +425,9 @@ bool TReadSession::Close(TDuration timeout) {
412425
issues.AddIssue("Session was gracefully closed");
413426
EventsQueue->Close(TSessionClosedEvent(EStatus::SUCCESS, std::move(issues)), deferred);
414427
} else {
428+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
415429
++*Settings.Counters_->Errors;
430+
#endif
416431
for (const auto& session : sessions) {
417432
session->Abort();
418433
}
@@ -438,10 +453,12 @@ void TReadSession::AbortImpl(TDeferredActions&) {
438453
ClusterDiscoveryDelayContext->Cancel();
439454
ClusterDiscoveryDelayContext.reset();
440455
}
456+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
441457
if (DumpCountersContext) {
442458
DumpCountersContext->Cancel();
443459
DumpCountersContext.reset();
444460
}
461+
#endif
445462
for (auto& [cluster, sessionInfo] : ClusterSessions) {
446463
if (sessionInfo.Session) {
447464
sessionInfo.Session->Abort();
@@ -526,6 +543,7 @@ void TReadSession::ResumeReadingData() {
526543
}
527544
}
528545

546+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
529547
void TReadSession::MakeCountersIfNeeded() {
530548
if (!Settings.Counters_ || HasNullCounters(*Settings.Counters_)) {
531549
TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>();
@@ -544,6 +562,7 @@ void TReadSession::SetupCountersLogger() {
544562
DumpCountersContext = CountersLogger->MakeCallbackContext();
545563
CountersLogger->Start();
546564
}
565+
#endif
547566

548567
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
549568
// NPersQueue::TReadSessionEvent

src/client/persqueue_public/impl/read_session.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ class TReadSession : public IReadSession,
4848
return SessionId;
4949
}
5050

51+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
5152
TReaderCounters::TPtr GetCounters() const override {
5253
return Settings.Counters_; // Always not nullptr.
5354
}
55+
#endif
5456

5557
void AddTopic(const TTopicReadSettings& topicReadSettings) /*override*/ {
5658
Y_UNUSED(topicReadSettings);
@@ -97,8 +99,10 @@ class TReadSession : public IReadSession,
9799
void AbortImpl(EStatus statusCode, NYdb::NIssue::TIssues&& issues, TDeferredActions& deferred);
98100
void AbortImpl(EStatus statusCode, const std::string& message, TDeferredActions& deferred);
99101

102+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
100103
void MakeCountersIfNeeded();
101104
void SetupCountersLogger();
105+
#endif
102106

103107
private:
104108
TReadSessionSettings Settings;
@@ -121,8 +125,10 @@ class TReadSession : public IReadSession,
121125

122126
std::vector<TCallbackContextPtr> CbContexts;
123127

128+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
124129
std::shared_ptr<TCountersLogger> CountersLogger;
125130
std::shared_ptr<TCallbackContext<TCountersLogger>> DumpCountersContext;
131+
#endif
126132
};
127133

128134
} // namespace NYdb::NPersQueue

src/client/persqueue_public/impl/write_session.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,11 @@ std::optional<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(cons
135135
return std::nullopt;
136136
}
137137

138+
#ifndef YDB_TOPIC_DISABLE_COUNTERS
138139
TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() {
139140
return Writer->GetCounters();
140141
}
142+
#endif
141143

142144

143145
bool TSimpleBlockingWriteSession::IsAlive() const {

0 commit comments

Comments
 (0)