Skip to content

Commit 71d513c

Browse files
qyryqGazizonoki
authored andcommitted
Topic APIs for metrics level (#25512)
1 parent 973a2a3 commit 71d513c

File tree

9 files changed

+137
-10
lines changed

9 files changed

+137
-10
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0bf38aa928934c9db08c6a173cbe6e80954d2f8f
1+
337ad331d577c2bf3d4e92fb73a77dc37cf8fc7f

include/ydb-cpp-sdk/client/topic/control_plane.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ enum class EAutoPartitioningStrategy: uint32_t {
3636
Paused = 4,
3737
};
3838

39+
// 0 - unspecified
40+
// 1 - disabeld
41+
// 2 - database level metrics
42+
// 3 - object level metrics
43+
// 4 - detailed metrics
44+
using EMetricsLevel = uint32_t;
45+
3946
class TConsumer {
4047
public:
4148
TConsumer(const Ydb::Topic::Consumer&);
@@ -309,6 +316,8 @@ class TTopicDescription {
309316

310317
const TTopicStats& GetTopicStats() const;
311318

319+
std::optional<EMetricsLevel> GetMetricsLevel() const;
320+
312321
void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const;
313322
private:
314323

@@ -332,6 +341,7 @@ class TTopicDescription {
332341
NScheme::TVirtualTimestamp CreationTimestamp_;
333342
std::vector<NScheme::TPermissions> Permissions_;
334343
std::vector<NScheme::TPermissions> EffectivePermissions_;
344+
std::optional<EMetricsLevel> MetricsLevel_;
335345
};
336346

337347
class TConsumerDescription {
@@ -571,6 +581,8 @@ struct TCreateTopicSettings : public TOperationRequestSettings<TCreateTopicSetti
571581

572582
FLUENT_SETTING(TAttributes, Attributes);
573583

584+
FLUENT_SETTING_OPTIONAL(EMetricsLevel, MetricsLevel);
585+
574586
TCreateTopicSettings& SetSupportedCodecs(std::vector<ECodec>&& codecs) {
575587
SupportedCodecs_ = std::move(codecs);
576588
return *this;
@@ -743,7 +755,20 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting
743755
return *this;
744756
}
745757

758+
TAlterTopicSettings& SetMetricsLevel(EMetricsLevel level) {
759+
MetricsLevel_ = level;
760+
return *this;
761+
}
762+
TAlterTopicSettings& ResetMetricsLevel() {
763+
MetricsLevel_ = true;
764+
return *this;
765+
}
766+
746767
std::optional<TAlterPartitioningSettings> AlterPartitioningSettings_;
768+
std::variant<
769+
bool, // Reset
770+
EMetricsLevel // Set
771+
> MetricsLevel_ = false;
747772
};
748773

749774
inline TPartitioningSettingsBuilder TCreateTopicSettings::BeginConfigurePartitioningSettings() {

src/api/protos/ydb_common.proto

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,3 @@ message VirtualTimestamp {
2727
uint64 plan_step = 1;
2828
uint64 tx_id = 2;
2929
}
30-
31-
enum MetricsLevel {
32-
Database = 0;
33-
Object = 1;
34-
Detailed = 2;
35-
}

src/api/protos/ydb_persqueue_v1.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,9 @@ message TopicSettings {
11871187
}
11881188
// remote mirror rule for this topic.
11891189
RemoteMirrorRule remote_mirror_rule = 11;
1190+
1191+
// Set or reset metrics level.
1192+
optional uint32 metrics_level = 16;
11901193
}
11911194

11921195
message AutoPartitioningSettings {

src/api/protos/ydb_topic.proto

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import "src/api/protos/ydb_issue_message.proto";
66
import "src/api/protos/annotations/sensitive.proto";
77
import "src/api/protos/annotations/validation.proto";
88

9-
import "google/protobuf/empty.proto";
109
import "google/protobuf/duration.proto";
10+
import "google/protobuf/empty.proto";
1111
import "google/protobuf/timestamp.proto";
1212

1313
package Ydb.Topic;
@@ -156,7 +156,7 @@ message StreamWriteMessage {
156156
// Message sequence number, provided by client for deduplication.
157157
// Starts at 1
158158
int64 seq_no = 1;
159-
// Creation timestamp
159+
// Creation timestamp.
160160
google.protobuf.Timestamp created_at = 2;
161161
// Compressed client message body.
162162
bytes data = 3;
@@ -1002,6 +1002,9 @@ message CreateTopicRequest {
10021002

10031003
// Metering mode for the topic in a serverless database.
10041004
MeteringMode metering_mode = 12;
1005+
1006+
// Metrics level. If the level is unset, use database setting.
1007+
optional uint32 metrics_level = 13;
10051008
}
10061009

10071010
// Create topic response sent from server to client.
@@ -1135,6 +1138,9 @@ message DescribeTopicResult {
11351138
// How much bytes were written statistics.
11361139
MultipleWindowsStat bytes_written = 4;
11371140
}
1141+
1142+
// Metrics level.
1143+
optional uint32 metrics_level = 16;
11381144
}
11391145

11401146
// Describe partition request sent from client to server.
@@ -1262,7 +1268,6 @@ message PartitionStats {
12621268
int32 partition_node_id = 8 [deprecated=true]; //Use PartitionLocation
12631269
}
12641270

1265-
12661271
// Update existing topic request sent from client to server.
12671272
message AlterTopicRequest {
12681273
Ydb.Operations.OperationParams operation_params = 1;
@@ -1307,6 +1312,12 @@ message AlterTopicRequest {
13071312

13081313
// Set metering mode for topic in serverless database.
13091314
MeteringMode set_metering_mode = 14;
1315+
1316+
// Set or reset metrics level.
1317+
oneof metrics_level {
1318+
uint32 set_metrics_level = 15;
1319+
google.protobuf.Empty reset_metrics_level = 16;
1320+
}
13101321
}
13111322

13121323
// Update topic response sent from server to client.

src/client/topic/impl/topic.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
5454
, PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes())
5555
, MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode()))
5656
, TopicStats_(Proto_.topic_stats())
57+
, MetricsLevel_(Proto_.has_metrics_level() ? std::optional(static_cast<EMetricsLevel>(Proto_.metrics_level())) : std::optional<EMetricsLevel>())
5758
{
5859
Owner_ = Proto_.self().owner();
5960
CreationTimestamp_ = NScheme::TVirtualTimestamp(Proto_.self().created_at());
@@ -193,6 +194,9 @@ void TTopicDescription::SerializeTo(Ydb::Topic::CreateTopicRequest& request) con
193194
*request.mutable_attributes() = Proto_.attributes();
194195
*request.mutable_consumers() = Proto_.consumers();
195196
request.set_metering_mode(Proto_.metering_mode());
197+
if (Proto_.has_metrics_level()) {
198+
request.set_metrics_level(Proto_.metrics_level());
199+
}
196200
}
197201

198202
const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const {
@@ -219,6 +223,10 @@ const TTopicStats& TTopicDescription::GetTopicStats() const {
219223
return TopicStats_;
220224
}
221225

226+
std::optional<EMetricsLevel> TTopicDescription::GetMetricsLevel() const {
227+
return MetricsLevel_;
228+
}
229+
222230
const std::vector<NScheme::TPermissions>& TTopicDescription::GetPermissions() const {
223231
return Permissions_;
224232
}
@@ -664,6 +672,7 @@ TCreateTopicSettings::TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest&
664672
, PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second())
665673
, PartitionWriteBurstBytes_(proto.partition_write_burst_bytes())
666674
, Attributes_(DeserializeAttributes(proto.attributes()))
675+
, MetricsLevel_(proto.has_metrics_level() ? std::optional(static_cast<EMetricsLevel>(proto.metrics_level())) : std::nullopt)
667676
{
668677
Consumers_ = DeserializeConsumers(*this, proto.consumers());
669678
}
@@ -678,6 +687,9 @@ void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request)
678687
request.set_partition_write_burst_bytes(PartitionWriteBurstBytes_);
679688
*request.mutable_consumers() = SerializeConsumers(Consumers_);
680689
*request.mutable_attributes() = SerializeAttributes(Attributes_);
690+
if (MetricsLevel_) {
691+
request.set_metrics_level(*MetricsLevel_);
692+
}
681693
}
682694

683695
} // namespace NYdb::NTopic

src/client/topic/impl/topic_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ class TTopicClient::TImpl : public TClientImplCommon<TTopicClient::TImpl> {
150150
ConvertAlterConsumerToProto(consumer, consumerProto);
151151
}
152152

153+
if (auto level = std::get_if<EMetricsLevel>(&settings.MetricsLevel_)) {
154+
request.set_set_metrics_level(*level);
155+
} else if (auto reset = std::get_if<bool>(&settings.MetricsLevel_); *reset) {
156+
request.mutable_reset_metrics_level();
157+
}
158+
153159
return request;
154160
}
155161

tests/integration/topic/describe_topic.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,78 @@ TEST_F(Describe, TEST_NAME(Location)) {
119119
}
120120
}
121121

122+
TEST_F(Describe, TEST_NAME(MetricsLevel)) {
123+
TTopicClient client(MakeDriver());
124+
125+
// const std::uint32_t MetricsLevelDisabled = 0;
126+
// const std::uint32_t MetricsLevelDatabase = 1;
127+
const std::uint32_t MetricsLevelObject = 2;
128+
const std::uint32_t MetricsLevelDetailed = 3;
129+
130+
auto createTopic = [&](std::string topic, EMetricsLevel metricsLevel) {
131+
auto res = client.CreateTopic(topic, TCreateTopicSettings().MetricsLevel(metricsLevel)).GetValueSync();
132+
ASSERT_TRUE(res.IsSuccess());
133+
};
134+
135+
auto setMetricsLevel = [&](std::string topic, EMetricsLevel metricsLevel) {
136+
auto res = client.AlterTopic(topic, TAlterTopicSettings().SetMetricsLevel(metricsLevel)).GetValueSync();
137+
ASSERT_TRUE(res.IsSuccess());
138+
};
139+
140+
auto resetMetricsLevel = [&](std::string topic) {
141+
auto res = client.AlterTopic(topic, TAlterTopicSettings().ResetMetricsLevel()).GetValueSync();
142+
ASSERT_TRUE(res.IsSuccess());
143+
};
144+
145+
auto checkFlag = [&](std::string topic, std::optional<EMetricsLevel> expectedMetricsLevel) {
146+
auto res = client.DescribeTopic(topic, {}).GetValueSync();
147+
Y_ENSURE(res.IsSuccess());
148+
return res.GetTopicDescription().GetMetricsLevel() == expectedMetricsLevel;
149+
};
150+
151+
{
152+
const std::string topic("topic-with-counters");
153+
createTopic(topic, MetricsLevelDetailed);
154+
checkFlag(topic, MetricsLevelDetailed);
155+
setMetricsLevel(topic, MetricsLevelObject);
156+
Y_ENSURE(checkFlag(topic, MetricsLevelObject));
157+
158+
{
159+
// Empty alter should change nothing.
160+
auto res = client.AlterTopic(topic).GetValueSync();
161+
ASSERT_TRUE(res.IsSuccess());
162+
Y_ENSURE(checkFlag(topic, MetricsLevelObject));
163+
}
164+
165+
{
166+
resetMetricsLevel(topic);
167+
Y_ENSURE(checkFlag(topic, {}));
168+
}
169+
}
170+
171+
{
172+
const std::string topic("topic-without-counters-by-default");
173+
auto res = client.CreateTopic(topic).GetValueSync();
174+
ASSERT_TRUE(res.IsSuccess());
175+
Y_ENSURE(checkFlag(topic, {}));
176+
setMetricsLevel(topic, MetricsLevelDetailed);
177+
Y_ENSURE(checkFlag(topic, MetricsLevelDetailed));
178+
179+
{
180+
// Empty alter should change nothing.
181+
auto res = client.AlterTopic(topic).GetValueSync();
182+
ASSERT_TRUE(res.IsSuccess());
183+
Y_ENSURE(checkFlag(topic, MetricsLevelDetailed));
184+
}
185+
}
186+
187+
{
188+
const std::string topic("topic-without-counters");
189+
createTopic(topic, MetricsLevelObject);
190+
Y_ENSURE(checkFlag(topic, MetricsLevelObject));
191+
setMetricsLevel(topic, MetricsLevelDetailed);
192+
Y_ENSURE(checkFlag(topic, MetricsLevelDetailed));
193+
}
194+
}
195+
122196
}

tests/integration/topic/utils/describe.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ void DescribeTopicTest(ITopicTestSetup& setup, TTopicClient& client, bool requir
4242
Y_ENSURE(partitionLocation.GetNodeId() > 0);
4343
Y_ENSURE(partitionLocation.GetGeneration() >= 0); // greater-or-equal 0
4444
}
45+
46+
Y_ENSURE(!description.GetMetricsLevel().has_value());
4547
}
4648
}
4749

0 commit comments

Comments
 (0)