Skip to content

Commit 69a3522

Browse files
authored
support consumer's availability_period in the ydb_cli (#26351)
KIKIMR-24054
1 parent 1972758 commit 69a3522

File tree

5 files changed

+102
-13
lines changed

5 files changed

+102
-13
lines changed

ydb/apps/ydb/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
* Fixed a bug where the `ydb debug ping` command crashed in case of any error.
2+
* Added a new `--retention-period` option to the `ydb topic` subcommands. The new option supports various time units, such as seconds, minutes, or days. Usage of the legacy `--retention-period-hours` option is discouraged.
3+
* The `ydb topic consumer add` subcommand now has a new `--availability-period` option, which overrides the consumer's retention guarantee.
24

35
## 2.26.0 ##
46

ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <google/protobuf/port_def.inc>
1111

12+
#include <util/stream/format.h>
1213
#include <util/string/join.h>
1314

1415
namespace NYdb {
@@ -121,6 +122,39 @@ namespace {
121122
Cout << "none" << Endl;
122123
}
123124
}
125+
126+
struct TPrettyDurationFormatParameters {
127+
double EntitiesPerSecond;
128+
TStringBuf EntityName;
129+
TStringBuf ZeroString;
130+
TStringBuf MaxString;
131+
int MaxPrecision;
132+
};
133+
134+
constexpr TPrettyDurationFormatParameters PRETTY_HOURS_DEFAULT{
135+
.EntitiesPerSecond = 3600,
136+
.EntityName = "hours",
137+
.ZeroString = "0 hours",
138+
.MaxString = "+infinity",
139+
.MaxPrecision = 3,
140+
};
141+
142+
constexpr TPrettyDurationFormatParameters PRETTY_HOURS_NON_ZERO{
143+
.EntitiesPerSecond = 3600,
144+
.EntityName = "hours",
145+
.ZeroString = "",
146+
.MaxString = "+infinity",
147+
.MaxPrecision = 3,
148+
};
149+
150+
TString PrettyDurationString(TDuration duration, const TPrettyDurationFormatParameters& paramerters) {
151+
if (duration == TDuration::Zero()) {
152+
return ToString(paramerters.ZeroString);
153+
} else if (duration == TDuration::Max()) {
154+
return ToString(paramerters.MaxString);
155+
}
156+
return TStringBuilder() << Prec(duration.MillisecondsFloat() / (paramerters.EntitiesPerSecond * 1000.0), PREC_POINT_DIGITS_STRIP_ZEROES, paramerters.MaxPrecision) << " " << paramerters.EntityName;
157+
}
124158
}
125159

126160
void PrintAllPermissions(
@@ -139,6 +173,9 @@ int PrintPrettyDescribeConsumerResult(const NYdb::NTopic::TConsumerDescription&
139173
const NYdb::NTopic::TConsumer& consumer = description.GetConsumer();
140174
Cout << "Consumer " << consumer.GetConsumerName() << ": " << Endl;
141175
Cout << "Important: " << (consumer.GetImportant() ? "Yes" : "No") << Endl;
176+
if (const auto availabilityPeriodStr = PrettyDurationString(consumer.GetAvailabilityPeriod(), PRETTY_HOURS_NON_ZERO)) {
177+
Cout << "Availability period: " << availabilityPeriodStr << Endl;
178+
}
142179
if (const TInstant& readFrom = consumer.GetReadFrom()) {
143180
Cout << "Read from: " << readFrom.ToRfc822StringLocal() << Endl;
144181
} else {
@@ -315,13 +352,14 @@ namespace {
315352
if (consumers.empty()) {
316353
return;
317354
}
318-
TPrettyTable table({ "ConsumerName", "SupportedCodecs", "ReadFrom", "Important" });
355+
TPrettyTable table({ "ConsumerName", "SupportedCodecs", "ReadFrom", "Important", "Availability period", });
319356
for (const auto& c: consumers) {
320357
table.AddRow()
321358
.Column(0, c.GetConsumerName())
322359
.Column(1, FormatCodecs(c.GetSupportedCodecs()))
323360
.Column(2, c.GetReadFrom().ToRfc822StringLocal())
324-
.Column(3, c.GetImportant() ? "Yes" : "No");
361+
.Column(3, c.GetImportant() ? "Yes" : "No")
362+
.Column(4, PrettyDurationString(c.GetAvailabilityPeriod(), PRETTY_HOURS_NON_ZERO));
325363
// .Column(4, rule.ServiceType())
326364
// .Column(5, rule.Version());
327365
}
@@ -345,7 +383,7 @@ namespace {
345383

346384
void PrintMain(const NTopic::TTopicDescription& topicDescription) {
347385
Cout << Endl << "Main:";
348-
Cout << Endl << "RetentionPeriod: " << topicDescription.GetRetentionPeriod().Hours() << " hours";
386+
Cout << Endl << "RetentionPeriod: " << PrettyDurationString(topicDescription.GetRetentionPeriod(), PRETTY_HOURS_DEFAULT);
349387
if (topicDescription.GetRetentionStorageMb().has_value()) {
350388
Cout << Endl << "StorageRetention: " << *topicDescription.GetRetentionStorageMb() << " MB";
351389
}

ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
1212

1313
#include <util/generic/set.h>
14+
#include <util/stream/format.h>
1415
#include <util/stream/str.h>
1516
#include <util/string/cast.h>
1617
#include <util/string/hex.h>
1718
#include <util/string/vector.h>
1819
#include <util/string/join.h>
20+
#include <util/string/strip.h>
1921

2022
#define TIMESTAMP_FORMAT_OPTION_DESCRIPTION "Timestamp may be specified in unix time format (seconds from 1970.01.01) or in ISO-8601 format (like 2020-07-10T15:00:00Z)"
2123

@@ -162,6 +164,28 @@ namespace NYdb::NConsoleClient {
162164

163165
return exists->second;
164166
}
167+
168+
TDuration ParseDurationHours(const TStringBuf str) {
169+
double hours = 0;
170+
if (!TryFromString(str, hours)) {
171+
throw TMisuseException() << "Invalid hours duration '" << str << "'";
172+
}
173+
if (hours < 0) {
174+
throw TMisuseException() << "Duration must be non-negative";
175+
}
176+
if (!std::isfinite(hours)) {
177+
throw TMisuseException() << "Duration must be finite";
178+
}
179+
return TDuration::Seconds(hours * 3600); // using floating-point ctor with saturation
180+
}
181+
182+
TDuration ParseDuration(TStringBuf str) {
183+
StripInPlace(str);
184+
if (!str.empty() && !IsAsciiAlpha(str.back())) {
185+
throw TMisuseException() << "Duration must ends with a unit name (ex. 'h' for hours, 's' for seconds)";
186+
}
187+
return TDuration::Parse(str);
188+
}
165189
}
166190

167191
void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector<NYdb::NTopic::ECodec>& supportedCodecs) {
@@ -343,10 +367,18 @@ namespace NYdb::NConsoleClient {
343367
.Optional()
344368
.StoreResult(&MinActivePartitions_)
345369
.DefaultValue(1);
346-
config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored")
347-
.DefaultValue(24)
370+
config.Opts->AddLongOption("retention-period-hours", TStringBuilder()
371+
<< "Duration in hours for which data in topic is stored "
372+
<< "(default: " << NColorizer::StdOut().CyanColor() << RetentionPeriod_.Hours() << NColorizer::StdOut().OldColor() << ")")
373+
.Hidden()
374+
.Optional()
375+
.RequiredArgument("HOURS")
376+
.StoreMappedResult(&RetentionPeriod_, ParseDurationHours);
377+
config.Opts->AddLongOption("retention-period", TStringBuilder()
378+
<< "Duration for which data in topic is stored (ex. '72h', '1440m') "
379+
<< "(default: " << NColorizer::StdOut().CyanColor() << HumanReadable(RetentionPeriod_) << NColorizer::StdOut().OldColor() << ")")
348380
.Optional()
349-
.StoreResult(&RetentionPeriodHours_);
381+
.StoreMappedResult(&RetentionPeriod_, ParseDuration);
350382
config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second")
351383
.DefaultValue(1024)
352384
.Optional()
@@ -370,6 +402,8 @@ namespace NYdb::NConsoleClient {
370402
.Optional()
371403
.Hidden()
372404
.StoreResult(&PartitionsPerTablet_);
405+
406+
config.Opts->MutuallyExclusive("retention-period-hours", "retention-period");
373407
}
374408

375409
void TCommandTopicCreate::Parse(TConfig& config) {
@@ -409,7 +443,7 @@ namespace NYdb::NConsoleClient {
409443
settings.MeteringMode(GetMeteringMode());
410444
}
411445

412-
settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_));
446+
settings.RetentionPeriod(RetentionPeriod_);
413447
settings.RetentionStorageMb(RetentionStorageMb_);
414448

415449
if (PartitionsPerTablet_.Defined()) {
@@ -434,9 +468,14 @@ namespace NYdb::NConsoleClient {
434468
config.Opts->AddLongOption("partitions-count", "Initial and minimum number of partitions for topic")
435469
.Optional()
436470
.StoreResult(&MinActivePartitions_);
437-
config.Opts->AddLongOption("retention-period-hours", "Duration for which data in topic is stored")
471+
config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored")
472+
.Hidden()
438473
.Optional()
439-
.StoreResult(&RetentionPeriodHours_);
474+
.RequiredArgument("HOURS")
475+
.StoreMappedResult(&RetentionPeriod_, ParseDurationHours);
476+
config.Opts->AddLongOption("retention-period", "Duration for which data in topic is stored (ex. '72h', '1440m')")
477+
.Optional()
478+
.StoreMappedResult(&RetentionPeriod_, ParseDuration);
440479
config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second")
441480
.Optional()
442481
.StoreResult(&PartitionWriteSpeedKbps_);
@@ -453,6 +492,8 @@ namespace NYdb::NConsoleClient {
453492
.Optional()
454493
.StoreResult(&MaxActivePartitions_);
455494
AddAutoPartitioning(config, true);
495+
496+
config.Opts->MutuallyExclusive("retention-period-hours", "retention-period");
456497
}
457498

458499
void TCommandTopicAlter::Parse(TConfig& config) {
@@ -501,8 +542,8 @@ namespace NYdb::NConsoleClient {
501542
settings.SetSupportedCodecs(codecs);
502543
}
503544

504-
if (RetentionPeriodHours_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != TDuration::Hours(*RetentionPeriodHours_)) {
505-
settings.SetRetentionPeriod(TDuration::Hours(*RetentionPeriodHours_));
545+
if (RetentionPeriod_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != RetentionPeriod_) {
546+
settings.SetRetentionPeriod(*RetentionPeriod_);
506547
}
507548

508549
if (PartitionWriteSpeedKbps_.Defined() && describeResult.GetTopicDescription().GetPartitionWriteSpeedBytesPerSecond() / 1_KB != *PartitionWriteSpeedKbps_) {
@@ -600,6 +641,9 @@ namespace NYdb::NConsoleClient {
600641
.Optional()
601642
.DefaultValue(false)
602643
.StoreResult(&IsImportant_);
644+
config.Opts->AddLongOption("availability-period", "Duration for which uncommited data in topic is retained (ex. '72h', '1440m')")
645+
.Optional()
646+
.StoreMappedResult(&AvailabilityPeriod_, ParseDuration);
603647
config.Opts->SetFreeArgsNum(1);
604648
SetFreeArgTitle(0, "<topic-path>", "Topic path");
605649
AddAllowedCodecs(config, AllowedCodecs);
@@ -630,6 +674,9 @@ namespace NYdb::NConsoleClient {
630674
consumerSettings.SetSupportedCodecs(codecs);
631675
}
632676
consumerSettings.SetImportant(IsImportant_);
677+
if (AvailabilityPeriod_.Defined()) {
678+
consumerSettings.AvailabilityPeriod(*AvailabilityPeriod_);
679+
}
633680

634681
readRuleSettings.AppendAddConsumers(consumerSettings);
635682

ydb/public/lib/ydb_cli/commands/ydb_service_topic.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ namespace NYdb::NConsoleClient {
8383
int Run(TConfig& config) override;
8484

8585
private:
86-
ui64 RetentionPeriodHours_;
86+
TDuration RetentionPeriod_ = TDuration::Hours(24);
8787
ui64 RetentionStorageMb_;
8888
ui32 MinActivePartitions_;
8989
TMaybe<ui32> MaxActivePartitions_;
@@ -105,7 +105,7 @@ namespace NYdb::NConsoleClient {
105105
int Run(TConfig& config) override;
106106

107107
private:
108-
TMaybe<ui64> RetentionPeriodHours_;
108+
TMaybe<TDuration> RetentionPeriod_;
109109
TMaybe<ui64> RetentionStorageMb_;
110110
TMaybe<ui32> MinActivePartitions_;
111111
TMaybe<ui32> MaxActivePartitions_;
@@ -145,6 +145,7 @@ namespace NYdb::NConsoleClient {
145145
private:
146146
TString ConsumerName_;
147147
bool IsImportant_;
148+
TMaybe<TDuration> AvailabilityPeriod_;
148149
TMaybe<TInstant> StartingMessageTimestamp_;
149150
};
150151

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2160,6 +2160,7 @@ TRestoreResult TRestoreClient::RestoreConsumers(const TString& topicPath, const
21602160
.BeginAddConsumer()
21612161
.ConsumerName(consumer.GetConsumerName())
21622162
.Important(consumer.GetImportant())
2163+
.AvailabilityPeriod(consumer.GetAvailabilityPeriod())
21632164
.Attributes(consumer.GetAttributes())
21642165
.EndAddConsumer()
21652166
).GetValueSync();

0 commit comments

Comments
 (0)