From e55b2df65a931459d8ab10212ca349795bc9d4a1 Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Fri, 3 Oct 2025 17:10:02 +0000 Subject: [PATCH 1/6] availability in ydb_cli --- .../ydb_cli/commands/ydb_service_scheme.cpp | 19 +++++++++++++++++-- .../ydb_cli/commands/ydb_service_topic.cpp | 6 ++++++ .../lib/ydb_cli/commands/ydb_service_topic.h | 1 + ydb/public/lib/ydb_cli/dump/restore_impl.cpp | 1 + 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index 9db941d6d281..84d57fe4299a 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -9,6 +9,7 @@ #include +#include #include namespace NYdb { @@ -121,6 +122,16 @@ namespace { Cout << "none" << Endl; } } + + + TString PrettyHoursDuration(TDuration duration, TStringBuf zeroStr = ""sv, TStringBuf maxStr = "+inf"sv) { + if (duration == TDuration::Zero()) { + return ToString(zeroStr); + } else if (duration == TDuration::Max()) { + return ToString(maxStr); + } + return TStringBuilder() << Prec(duration.MillisecondsFloat() / 3'600'000.0, PREC_POINT_DIGITS_STRIP_ZEROES, 3) << " hours"; + } } void PrintAllPermissions( @@ -139,6 +150,9 @@ int PrintPrettyDescribeConsumerResult(const NYdb::NTopic::TConsumerDescription& const NYdb::NTopic::TConsumer& consumer = description.GetConsumer(); Cout << "Consumer " << consumer.GetConsumerName() << ": " << Endl; Cout << "Important: " << (consumer.GetImportant() ? "Yes" : "No") << Endl; + if (const auto availabilityPeriodStr = PrettyHoursDuration(consumer.GetAvailabilityPeriod())) { + Cout << "Availability period: " << availabilityPeriodStr << Endl; + } if (const TInstant& readFrom = consumer.GetReadFrom()) { Cout << "Read from: " << readFrom.ToRfc822StringLocal() << Endl; } else { @@ -315,13 +329,14 @@ namespace { if (consumers.empty()) { return; } - TPrettyTable table({ "ConsumerName", "SupportedCodecs", "ReadFrom", "Important" }); + TPrettyTable table({ "ConsumerName", "SupportedCodecs", "ReadFrom", "Important", "Availability period", }); for (const auto& c: consumers) { table.AddRow() .Column(0, c.GetConsumerName()) .Column(1, FormatCodecs(c.GetSupportedCodecs())) .Column(2, c.GetReadFrom().ToRfc822StringLocal()) - .Column(3, c.GetImportant() ? "Yes" : "No"); + .Column(3, c.GetImportant() ? "Yes" : "No") + .Column(4, PrettyHoursDuration(c.GetAvailabilityPeriod())); // .Column(4, rule.ServiceType()) // .Column(5, rule.Version()); } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index 4874203211e3..a3a610c0ca58 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -596,6 +596,9 @@ namespace NYdb::NConsoleClient { .Optional() .DefaultValue(false) .StoreResult(&IsImportant_); + config.Opts->AddLongOption("availability-period-hours", "Duration in hours for which uncommited data in topic is retained") + .Optional() + .StoreResult(&AvailabilityPeriodHours_); config.Opts->SetFreeArgsNum(1); SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); @@ -626,6 +629,9 @@ namespace NYdb::NConsoleClient { consumerSettings.SetSupportedCodecs(codecs); } consumerSettings.SetImportant(IsImportant_); + if (AvailabilityPeriodHours_.Defined()) { + consumerSettings.AvailabilityPeriod(TDuration::Hours(*AvailabilityPeriodHours_)); + } readRuleSettings.AppendAddConsumers(consumerSettings); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 75b20723da64..fa1865480835 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -145,6 +145,7 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; bool IsImportant_; + TMaybe AvailabilityPeriodHours_; TMaybe StartingMessageTimestamp_; }; diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index f0def12017c9..30121ab44a1a 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -2160,6 +2160,7 @@ TRestoreResult TRestoreClient::RestoreConsumers(const TString& topicPath, const .BeginAddConsumer() .ConsumerName(consumer.GetConsumerName()) .Important(consumer.GetImportant()) + .AvailabilityPeriod(consumer.GetAvailabilityPeriod()) .Attributes(consumer.GetAttributes()) .EndAddConsumer() ).GetValueSync(); From 8c4efb54018153c46ac7f1fce886c9dda61fb536 Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Mon, 6 Oct 2025 16:45:18 +0000 Subject: [PATCH 2/6] support fractional hours --- .../ydb_cli/commands/ydb_service_scheme.cpp | 37 +++++++++++++++---- .../ydb_cli/commands/ydb_service_topic.cpp | 27 ++++++++++++-- .../lib/ydb_cli/commands/ydb_service_topic.h | 6 +-- 3 files changed, 56 insertions(+), 14 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index 84d57fe4299a..3e4bf27e345d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -123,14 +123,37 @@ namespace { } } + struct TPrettyDurationFormatParameters { + double EntitiesPerSecond; + TStringBuf EntityName; + TStringBuf ZeroString; + TStringBuf MaxString; + int MaxPrecision; + }; + + constexpr TPrettyDurationFormatParameters PRETTY_HOURS_DEFAULT{ + .EntitiesPerSecond = 3600, + .EntityName = "hours", + .ZeroString = "0 hours", + .MaxString = "+infinity", + .MaxPrecision = 3, + }; + + constexpr TPrettyDurationFormatParameters PRETTY_HOURS_NON_ZERO{ + .EntitiesPerSecond = 3600, + .EntityName = "hours", + .ZeroString = "", + .MaxString = "+infinity", + .MaxPrecision = 3, + }; - TString PrettyHoursDuration(TDuration duration, TStringBuf zeroStr = ""sv, TStringBuf maxStr = "+inf"sv) { + TString PrettyDurationString(TDuration duration, const TPrettyDurationFormatParameters& paramerters) { if (duration == TDuration::Zero()) { - return ToString(zeroStr); + return ToString(paramerters.ZeroString); } else if (duration == TDuration::Max()) { - return ToString(maxStr); + return ToString(paramerters.MaxString); } - return TStringBuilder() << Prec(duration.MillisecondsFloat() / 3'600'000.0, PREC_POINT_DIGITS_STRIP_ZEROES, 3) << " hours"; + return TStringBuilder() << Prec(duration.MillisecondsFloat() / (paramerters.EntitiesPerSecond * 1000.0), PREC_POINT_DIGITS_STRIP_ZEROES, paramerters.MaxPrecision) << " " << paramerters.EntityName; } } @@ -150,7 +173,7 @@ int PrintPrettyDescribeConsumerResult(const NYdb::NTopic::TConsumerDescription& const NYdb::NTopic::TConsumer& consumer = description.GetConsumer(); Cout << "Consumer " << consumer.GetConsumerName() << ": " << Endl; Cout << "Important: " << (consumer.GetImportant() ? "Yes" : "No") << Endl; - if (const auto availabilityPeriodStr = PrettyHoursDuration(consumer.GetAvailabilityPeriod())) { + if (const auto availabilityPeriodStr = PrettyDurationString(consumer.GetAvailabilityPeriod(), PRETTY_HOURS_NON_ZERO)) { Cout << "Availability period: " << availabilityPeriodStr << Endl; } if (const TInstant& readFrom = consumer.GetReadFrom()) { @@ -336,7 +359,7 @@ namespace { .Column(1, FormatCodecs(c.GetSupportedCodecs())) .Column(2, c.GetReadFrom().ToRfc822StringLocal()) .Column(3, c.GetImportant() ? "Yes" : "No") - .Column(4, PrettyHoursDuration(c.GetAvailabilityPeriod())); + .Column(4, PrettyDurationString(c.GetAvailabilityPeriod(), PRETTY_HOURS_NON_ZERO)); // .Column(4, rule.ServiceType()) // .Column(5, rule.Version()); } @@ -360,7 +383,7 @@ namespace { void PrintMain(const NTopic::TTopicDescription& topicDescription) { Cout << Endl << "Main:"; - Cout << Endl << "RetentionPeriod: " << topicDescription.GetRetentionPeriod().Hours() << " hours"; + Cout << Endl << "RetentionPeriod: " << PrettyDurationString(topicDescription.GetRetentionPeriod(), PRETTY_HOURS_DEFAULT); if (topicDescription.GetRetentionStorageMb().has_value()) { Cout << Endl << "StorageRetention: " << *topicDescription.GetRetentionStorageMb() << " MB"; } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index a3a610c0ca58..ccea969a6586 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -158,6 +158,22 @@ namespace NYdb::NConsoleClient { return exists->second; } + + void CheckHoursDuration(const TMaybe hours, const TStringBuf optionName) { + if (!hours.Defined()) { + return; + } + if (*hours < 0) { + throw TMisuseException() << optionName << " must be non-negative"; + } + if (!std::isfinite(*hours)) { + throw TMisuseException() << optionName << " must be finite"; + } + } + + TDuration DurationFromFractionalHours(const double hours) { + return TDuration::Seconds(hours * 3600.0); // using floating-point ctor with saturation + } } void TCommandWithSupportedCodecs::AddAllowedCodecs(TClientCommand::TConfig& config, const TVector& supportedCodecs) { @@ -374,6 +390,7 @@ namespace NYdb::NConsoleClient { ParseCodecs(); ParseMeteringMode(); ParseAutoPartitioningStrategy(); + CheckHoursDuration(RetentionPeriodHours_, "--retention-period-hours"); } int TCommandTopicCreate::Run(TConfig& config) { @@ -405,7 +422,7 @@ namespace NYdb::NConsoleClient { settings.MeteringMode(GetMeteringMode()); } - settings.RetentionPeriod(TDuration::Hours(RetentionPeriodHours_)); + settings.RetentionPeriod(DurationFromFractionalHours(RetentionPeriodHours_)); settings.RetentionStorageMb(RetentionStorageMb_); if (PartitionsPerTablet_.Defined()) { @@ -457,6 +474,7 @@ namespace NYdb::NConsoleClient { ParseCodecs(); ParseMeteringMode(); ParseAutoPartitioningStrategy(); + CheckHoursDuration(RetentionPeriodHours_, "--retention-period-hours"); } NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings( @@ -497,8 +515,8 @@ namespace NYdb::NConsoleClient { settings.SetSupportedCodecs(codecs); } - if (RetentionPeriodHours_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != TDuration::Hours(*RetentionPeriodHours_)) { - settings.SetRetentionPeriod(TDuration::Hours(*RetentionPeriodHours_)); + if (RetentionPeriodHours_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != DurationFromFractionalHours(*RetentionPeriodHours_)) { + settings.SetRetentionPeriod(DurationFromFractionalHours(*RetentionPeriodHours_)); } if (PartitionWriteSpeedKbps_.Defined() && describeResult.GetTopicDescription().GetPartitionWriteSpeedBytesPerSecond() / 1_KB != *PartitionWriteSpeedKbps_) { @@ -608,6 +626,7 @@ namespace NYdb::NConsoleClient { TYdbCommand::Parse(config); ParseCodecs(); ParseTopicName(config, 0); + CheckHoursDuration(AvailabilityPeriodHours_, "--availability-period-hours"); } int TCommandTopicConsumerAdd::Run(TConfig& config) { @@ -630,7 +649,7 @@ namespace NYdb::NConsoleClient { } consumerSettings.SetImportant(IsImportant_); if (AvailabilityPeriodHours_.Defined()) { - consumerSettings.AvailabilityPeriod(TDuration::Hours(*AvailabilityPeriodHours_)); + consumerSettings.AvailabilityPeriod(DurationFromFractionalHours(*AvailabilityPeriodHours_)); } readRuleSettings.AppendAddConsumers(consumerSettings); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index fa1865480835..97179a737425 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -83,7 +83,7 @@ namespace NYdb::NConsoleClient { int Run(TConfig& config) override; private: - ui64 RetentionPeriodHours_; + double RetentionPeriodHours_; ui64 RetentionStorageMb_; ui32 MinActivePartitions_; TMaybe MaxActivePartitions_; @@ -105,7 +105,7 @@ namespace NYdb::NConsoleClient { int Run(TConfig& config) override; private: - TMaybe RetentionPeriodHours_; + TMaybe RetentionPeriodHours_; TMaybe RetentionStorageMb_; TMaybe MinActivePartitions_; TMaybe MaxActivePartitions_; @@ -145,7 +145,7 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; bool IsImportant_; - TMaybe AvailabilityPeriodHours_; + TMaybe AvailabilityPeriodHours_; TMaybe StartingMessageTimestamp_; }; From 12ad2be5fa482374f8588b86054a7448045fe5a1 Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Wed, 8 Oct 2025 14:44:09 +0000 Subject: [PATCH 3/6] add retention-period option; use duration string parser --- .../ydb_cli/commands/ydb_service_topic.cpp | 70 ++++++++++++------- .../lib/ydb_cli/commands/ydb_service_topic.h | 6 +- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index ccea969a6586..bef4f3d7d080 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -11,11 +11,13 @@ #include #include +#include #include #include #include #include #include +#include #define TIMESTAMP_FORMAT_OPTION_DESCRIPTION "Timestamp may be specified in unix time format (seconds from 1970.01.01) or in ISO-8601 format (like 2020-07-10T15:00:00Z)" @@ -159,20 +161,26 @@ namespace NYdb::NConsoleClient { return exists->second; } - void CheckHoursDuration(const TMaybe hours, const TStringBuf optionName) { - if (!hours.Defined()) { - return; + TDuration ParseDurationHours(const TStringBuf str) { + double hours = 0; + if (!TryFromString(str, hours)) { + throw TMisuseException() << "Invalid hours duration '" << str << "'"; } - if (*hours < 0) { - throw TMisuseException() << optionName << " must be non-negative"; + if (hours < 0) { + throw TMisuseException() << "Duration must be non-negative"; } - if (!std::isfinite(*hours)) { - throw TMisuseException() << optionName << " must be finite"; + if (!std::isfinite(hours)) { + throw TMisuseException() << "Duration must be finite"; } + return TDuration::Seconds(hours * 3600); // using floating-point ctor with saturation } - TDuration DurationFromFractionalHours(const double hours) { - return TDuration::Seconds(hours * 3600.0); // using floating-point ctor with saturation + TDuration ParseDuration(TStringBuf str) { + StripInPlace(str); + if (!str.empty() && !IsAsciiAlpha(str.back())) { + throw TMisuseException() << "Duration must ends with a unit name (ex. 'h' for hours, 's' for seconds)"; + } + return TDuration::Parse(str); } } @@ -355,10 +363,18 @@ namespace NYdb::NConsoleClient { .Optional() .StoreResult(&MinActivePartitions_) .DefaultValue(1); - config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored") - .DefaultValue(24) + config.Opts->AddLongOption("retention-period-hours", TStringBuilder() + << "Duration in hours for which data in topic is stored " + << "(default: " << NColorizer::StdOut().CyanColor() << RetentionPeriod_.Hours() << NColorizer::StdOut().OldColor() << ")") + .Optional() + .RequiredArgument("HOURS") + .StoreMappedResult(&RetentionPeriod_, ParseDurationHours); + config.Opts->AddLongOption("retention-period", TStringBuilder() + << "Duration for which data in topic is stored (ex. '72h', '1440m') " + << "(default: " << NColorizer::StdOut().CyanColor() << HumanReadable(RetentionPeriod_) << NColorizer::StdOut().OldColor() << ")") + .Hidden() .Optional() - .StoreResult(&RetentionPeriodHours_); + .StoreMappedResult(&RetentionPeriod_, ParseDuration); config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second") .DefaultValue(1024) .Optional() @@ -382,6 +398,8 @@ namespace NYdb::NConsoleClient { .Optional() .Hidden() .StoreResult(&PartitionsPerTablet_); + + config.Opts->MutuallyExclusive("retention-period-hours", "retention-period"); } void TCommandTopicCreate::Parse(TConfig& config) { @@ -390,7 +408,6 @@ namespace NYdb::NConsoleClient { ParseCodecs(); ParseMeteringMode(); ParseAutoPartitioningStrategy(); - CheckHoursDuration(RetentionPeriodHours_, "--retention-period-hours"); } int TCommandTopicCreate::Run(TConfig& config) { @@ -422,7 +439,7 @@ namespace NYdb::NConsoleClient { settings.MeteringMode(GetMeteringMode()); } - settings.RetentionPeriod(DurationFromFractionalHours(RetentionPeriodHours_)); + settings.RetentionPeriod(RetentionPeriod_); settings.RetentionStorageMb(RetentionStorageMb_); if (PartitionsPerTablet_.Defined()) { @@ -447,9 +464,14 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption("partitions-count", "Initial and minimum number of partitions for topic") .Optional() .StoreResult(&MinActivePartitions_); - config.Opts->AddLongOption("retention-period-hours", "Duration for which data in topic is stored") + config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored") + .Optional() + .RequiredArgument("HOURS") + .StoreMappedResult(&RetentionPeriod_, ParseDurationHours); + config.Opts->AddLongOption("retention-period", "Duration for which data in topic is stored (ex. '72h', '1440m')") .Optional() - .StoreResult(&RetentionPeriodHours_); + .Hidden() + .StoreMappedResult(&RetentionPeriod_, ParseDuration); config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second") .Optional() .StoreResult(&PartitionWriteSpeedKbps_); @@ -466,6 +488,8 @@ namespace NYdb::NConsoleClient { .Optional() .StoreResult(&MaxActivePartitions_); AddAutoPartitioning(config, true); + + config.Opts->MutuallyExclusive("retention-period-hours", "retention-period"); } void TCommandTopicAlter::Parse(TConfig& config) { @@ -474,7 +498,6 @@ namespace NYdb::NConsoleClient { ParseCodecs(); ParseMeteringMode(); ParseAutoPartitioningStrategy(); - CheckHoursDuration(RetentionPeriodHours_, "--retention-period-hours"); } NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings( @@ -515,8 +538,8 @@ namespace NYdb::NConsoleClient { settings.SetSupportedCodecs(codecs); } - if (RetentionPeriodHours_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != DurationFromFractionalHours(*RetentionPeriodHours_)) { - settings.SetRetentionPeriod(DurationFromFractionalHours(*RetentionPeriodHours_)); + if (RetentionPeriod_.Defined() && describeResult.GetTopicDescription().GetRetentionPeriod() != RetentionPeriod_) { + settings.SetRetentionPeriod(*RetentionPeriod_); } if (PartitionWriteSpeedKbps_.Defined() && describeResult.GetTopicDescription().GetPartitionWriteSpeedBytesPerSecond() / 1_KB != *PartitionWriteSpeedKbps_) { @@ -614,9 +637,9 @@ namespace NYdb::NConsoleClient { .Optional() .DefaultValue(false) .StoreResult(&IsImportant_); - config.Opts->AddLongOption("availability-period-hours", "Duration in hours for which uncommited data in topic is retained") + config.Opts->AddLongOption("availability-period", "Duration for which uncommited data in topic is retained (ex. '72h', '1440m')") .Optional() - .StoreResult(&AvailabilityPeriodHours_); + .StoreMappedResult(&AvailabilityPeriod_, ParseDuration); config.Opts->SetFreeArgsNum(1); SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); @@ -626,7 +649,6 @@ namespace NYdb::NConsoleClient { TYdbCommand::Parse(config); ParseCodecs(); ParseTopicName(config, 0); - CheckHoursDuration(AvailabilityPeriodHours_, "--availability-period-hours"); } int TCommandTopicConsumerAdd::Run(TConfig& config) { @@ -648,8 +670,8 @@ namespace NYdb::NConsoleClient { consumerSettings.SetSupportedCodecs(codecs); } consumerSettings.SetImportant(IsImportant_); - if (AvailabilityPeriodHours_.Defined()) { - consumerSettings.AvailabilityPeriod(DurationFromFractionalHours(*AvailabilityPeriodHours_)); + if (AvailabilityPeriod_.Defined()) { + consumerSettings.AvailabilityPeriod(*AvailabilityPeriod_); } readRuleSettings.AppendAddConsumers(consumerSettings); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index 97179a737425..3e37f945bedd 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -83,7 +83,7 @@ namespace NYdb::NConsoleClient { int Run(TConfig& config) override; private: - double RetentionPeriodHours_; + TDuration RetentionPeriod_ = TDuration::Hours(24); ui64 RetentionStorageMb_; ui32 MinActivePartitions_; TMaybe MaxActivePartitions_; @@ -105,7 +105,7 @@ namespace NYdb::NConsoleClient { int Run(TConfig& config) override; private: - TMaybe RetentionPeriodHours_; + TMaybe RetentionPeriod_; TMaybe RetentionStorageMb_; TMaybe MinActivePartitions_; TMaybe MaxActivePartitions_; @@ -145,7 +145,7 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; bool IsImportant_; - TMaybe AvailabilityPeriodHours_; + TMaybe AvailabilityPeriod_; TMaybe StartingMessageTimestamp_; }; From 785a417ec0536bf08a7804d746693eb42aef342e Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Wed, 8 Oct 2025 15:02:44 +0000 Subject: [PATCH 4/6] deprecate the --retention-period-hours option --- ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index bef4f3d7d080..5c56d299a4ad 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -366,13 +366,13 @@ namespace NYdb::NConsoleClient { config.Opts->AddLongOption("retention-period-hours", TStringBuilder() << "Duration in hours for which data in topic is stored " << "(default: " << NColorizer::StdOut().CyanColor() << RetentionPeriod_.Hours() << NColorizer::StdOut().OldColor() << ")") + .Hidden() .Optional() .RequiredArgument("HOURS") .StoreMappedResult(&RetentionPeriod_, ParseDurationHours); config.Opts->AddLongOption("retention-period", TStringBuilder() << "Duration for which data in topic is stored (ex. '72h', '1440m') " << "(default: " << NColorizer::StdOut().CyanColor() << HumanReadable(RetentionPeriod_) << NColorizer::StdOut().OldColor() << ")") - .Hidden() .Optional() .StoreMappedResult(&RetentionPeriod_, ParseDuration); config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second") @@ -465,12 +465,12 @@ namespace NYdb::NConsoleClient { .Optional() .StoreResult(&MinActivePartitions_); config.Opts->AddLongOption("retention-period-hours", "Duration in hours for which data in topic is stored") + .Hidden() .Optional() .RequiredArgument("HOURS") .StoreMappedResult(&RetentionPeriod_, ParseDurationHours); config.Opts->AddLongOption("retention-period", "Duration for which data in topic is stored (ex. '72h', '1440m')") .Optional() - .Hidden() .StoreMappedResult(&RetentionPeriod_, ParseDuration); config.Opts->AddLongOption("partition-write-speed-kbps", "Partition write speed in kilobytes per second") .Optional() From 9f328451913d34570ccff32966dcbbcf76233b99 Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Wed, 8 Oct 2025 15:56:49 +0000 Subject: [PATCH 5/6] README --- ydb/apps/ydb/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md index ecaa9c4011ee..809de4f19d15 100644 --- a/ydb/apps/ydb/CHANGELOG.md +++ b/ydb/apps/ydb/CHANGELOG.md @@ -1,4 +1,6 @@ * Fixed a bug where the `ydb debug ping` command crashed in case of any error. +* Added a new `--retention-period` option to the `ydb topic` subcommands. The new option supports various time units, such as seconds, minutes, or days. Usage of the legacy `--retention-period-hours` option is discouraged. +* The `ydb topic consumer add` subcommand now has a new `--availability-period` option, which overrides the consumer's retention guarantee. ## 2.26.0 ## From bd547dce24ac27dd4ff9c169f09e6b1331be3155 Mon Sep 17 00:00:00 2001 From: Aleksei Moiseitsev Date: Wed, 8 Oct 2025 18:37:58 +0000 Subject: [PATCH 6/6] use data holder --- ydb/tests/stress/topic/workload/__init__.py | 27 +++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/ydb/tests/stress/topic/workload/__init__.py b/ydb/tests/stress/topic/workload/__init__.py index 2118188e89e4..0ef9c245a1d4 100644 --- a/ydb/tests/stress/topic/workload/__init__.py +++ b/ydb/tests/stress/topic/workload/__init__.py @@ -40,14 +40,23 @@ def _unpack_resource(self, name): os.chmod(path_to_unpack, st.st_mode | stat.S_IEXEC) self.cli_path = path_to_unpack - def get_command_prefix(self, subcmds: list[str]) -> list[str]: + def _get_cli_common_args(self) -> list[str]: return [ self.cli_path, '--verbose', '--endpoint', self.endpoint, '--database={}'.format(self.database), + ] + + @property + def workload_topic_name(self) -> str: + return f'{self.table_prefix}' + + def get_command_prefix(self, subcmds: list[str]) -> list[str]: + return [ + *self._get_cli_common_args(), 'workload', 'topic' - ] + subcmds + ['--topic', f'{self.table_prefix}'] + ] + subcmds + ['--topic', self.workload_topic_name] def cmd_run(self, cmd): logger.debug(f"Running cmd {cmd}") @@ -60,6 +69,20 @@ def __loop(self): self.cmd_run( self.get_command_prefix(subcmds=['init', '-c', self.consumers, '-p', self.producers]) ) + # adjust + self.cmd_run([ + *self._get_cli_common_args(), + 'topic', 'alter', + '--retention-period=2s', + self.workload_topic_name, + ]) + self.cmd_run([ + *self._get_cli_common_args(), + 'topic', 'consumer', 'add', + f'--availability-period={int(self.duration) * 9 // 10}s', + '--consumer', 'data_holder', + self.workload_topic_name, + ]) # run run_cmd_args = ['run', 'full', '-s', self.duration, '--byte-rate', '100M', '--use-tx', '--tx-commit-interval', '2000', '-p', self.producers, '-c', self.consumers] if self.limit_memory_usage: