diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 6f4247a6255..f64f5d8d85a 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -26 +27 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 93f70a8963f..bd4d1291fd1 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -234f6d55e1e71a242c2767a22b990d68ff7fce72 +78aa6fa75eba124b91b200bfe76c67e94ee968a4 diff --git a/examples/topic_reader/eventloop/main.cpp b/examples/topic_reader/eventloop/main.cpp index 277faea6661..bc4b513fd81 100644 --- a/examples/topic_reader/eventloop/main.cpp +++ b/examples/topic_reader/eventloop/main.cpp @@ -103,7 +103,7 @@ int main(int argc, const char* argv[]) { stopPartitionSessionEvent->Confirm(); } else if (auto* endPartitionSessionEvent = std::get_if(&*event)) { endPartitionSessionEvent->Confirm(); - } else if (auto* closeSessionEvent = std::get_if(&*event)) { + } else if (std::get_if(&*event)) { break; } } diff --git a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h index e7024d72103..29834b3faf4 100644 --- a/include/ydb-cpp-sdk/client/iam/common/generic_provider.h +++ b/include/ydb-cpp-sdk/client/iam/common/generic_provider.h @@ -24,7 +24,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { protected: using TRequestFiller = std::function; using TAsyncInterface = typename TService::Stub::async_interface; - using TAsyncRpc = void (TAsyncInterface::*)(grpc::ClientContext*, const TRequest*, TResponse*, std::function); + using TAsyncRpc = std::function)>; private: class TImpl : public std::enable_shared_from_this::TImpl> { @@ -96,7 +96,7 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { context->AddMetadata("authorization", "Bearer " + AuthTokenProvider_->GetAuthInfo()); } - (Stub_->async()->*Rpc_)(context.get(), &req, response.get(), std::move(cb)); + Rpc_(Stub_.get(), context.get(), &req, response.get(), std::move(cb)); if (sync) { resultPromise.GetFuture().Wait(2 * IamEndpoint_.RequestTimeout); @@ -127,6 +127,8 @@ class TGrpcIamCredentialsProvider : public ICredentialsProvider { } NeedStop_ = true; } + Stub_.reset(); + Channel_.reset(); } private: @@ -215,7 +217,9 @@ class TIamJwtCredentialsProvider : public TGrpcIamCredentialsProvider(params, [jwtParams = params.JwtParams](TRequest& req) { req.set_jwt(MakeSignedJwt(jwtParams)); - }, &TService::Stub::async_interface::Create) {} + }, [](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function cb) { + stub->async()->Create(context, request, response, std::move(cb)); + }) {} }; template @@ -225,7 +229,9 @@ class TIamOAuthCredentialsProvider : public TGrpcIamCredentialsProvider(params, [token = params.OAuthToken](TRequest& req) { req.set_yandex_passport_oauth_token(TStringType{token}); - }, &TService::Stub::async_interface::Create) {} + }, [](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function cb) { + stub->async()->Create(context, request, response, std::move(cb)); + }) {} }; template diff --git a/include/ydb-cpp-sdk/client/topic/control_plane.h b/include/ydb-cpp-sdk/client/topic/control_plane.h index 6d2a0c9ba72..2b18590d83c 100644 --- a/include/ydb-cpp-sdk/client/topic/control_plane.h +++ b/include/ydb-cpp-sdk/client/topic/control_plane.h @@ -36,12 +36,20 @@ enum class EAutoPartitioningStrategy: uint32_t { Paused = 4, }; +// 0 - unspecified +// 1 - disabeld +// 2 - database level metrics +// 3 - object level metrics +// 4 - detailed metrics +using EMetricsLevel = uint32_t; + class TConsumer { public: TConsumer(const Ydb::Topic::Consumer&); const std::string& GetConsumerName() const; bool GetImportant() const; + TDuration GetAvailabilityPeriod() const; const TInstant& GetReadFrom() const; const std::vector& GetSupportedCodecs() const; const std::map& GetAttributes() const; @@ -49,6 +57,7 @@ class TConsumer { private: std::string ConsumerName_; bool Important_; + TDuration AvailabilityPeriod_; TInstant ReadFrom_; std::map Attributes_; std::vector SupportedCodecs_; @@ -307,6 +316,8 @@ class TTopicDescription { const TTopicStats& GetTopicStats() const; + std::optional GetMetricsLevel() const; + void SerializeTo(Ydb::Topic::CreateTopicRequest& request) const; private: @@ -330,6 +341,7 @@ class TTopicDescription { NScheme::TVirtualTimestamp CreationTimestamp_; std::vector Permissions_; std::vector EffectivePermissions_; + std::optional MetricsLevel_; }; class TConsumerDescription { @@ -452,6 +464,7 @@ struct TConsumerSettings { FLUENT_SETTING(std::string, ConsumerName); FLUENT_SETTING_DEFAULT(bool, Important, false); + FLUENT_SETTING_DEFAULT(TDuration, AvailabilityPeriod, TDuration::Zero()); FLUENT_SETTING_DEFAULT(TInstant, ReadFrom, TInstant::Zero()); FLUENT_SETTING_VECTOR(ECodec, SupportedCodecs); @@ -488,6 +501,11 @@ struct TConsumerSettings { return *this; } + TConsumerSettings& SetAvailiabilityPeriod(TDuration availabilityPeriod) { + AvailabilityPeriod_ = availabilityPeriod; + return *this; + } + TSettings& EndAddConsumer() { return Parent_; }; private: @@ -504,6 +522,7 @@ struct TAlterConsumerSettings { FLUENT_SETTING(std::string, ConsumerName); FLUENT_SETTING_OPTIONAL(bool, SetImportant); + FLUENT_SETTING_OPTIONAL(TDuration, SetAvailabilityPeriod); FLUENT_SETTING_OPTIONAL(TInstant, SetReadFrom); FLUENT_SETTING_OPTIONAL_VECTOR(ECodec, SetSupportedCodecs); @@ -524,6 +543,11 @@ struct TAlterConsumerSettings { return *this; } + TAlterConsumerSettings& SetAvailabilityPeriod(TDuration availabilityPeriod) { + SetAvailabilityPeriod_ = availabilityPeriod; + return *this; + } + TAlterTopicSettings& EndAlterConsumer() { return Parent_; }; private: @@ -557,6 +581,8 @@ struct TCreateTopicSettings : public TOperationRequestSettings&& codecs) { SupportedCodecs_ = std::move(codecs); return *this; @@ -729,7 +755,20 @@ struct TAlterTopicSettings : public TOperationRequestSettings AlterPartitioningSettings_; + std::variant< + bool, // Reset + EMetricsLevel // Set + > MetricsLevel_ = false; }; inline TPartitioningSettingsBuilder TCreateTopicSettings::BeginConfigurePartitioningSettings() { diff --git a/include/ydb-cpp-sdk/client/value/value.h b/include/ydb-cpp-sdk/client/value/value.h index 543024ef1e2..66cba7a6b81 100644 --- a/include/ydb-cpp-sdk/client/value/value.h +++ b/include/ydb-cpp-sdk/client/value/value.h @@ -68,7 +68,9 @@ enum class EPrimitiveType { TzDatetime = 0x0035, TzTimestamp = 0x0036, String = 0x1001, + Bytes = 0x1001, Utf8 = 0x1200, + Text = 0x1200, Yson = 0x1201, Json = 0x1202, Uuid = 0x1203, @@ -338,7 +340,9 @@ class TValueParser : public TMoveOnly { const std::string& GetTzDatetime() const; const std::string& GetTzTimestamp() const; const std::string& GetString() const; + const std::string& GetBytes() const; const std::string& GetUtf8() const; + const std::string& GetText() const; const std::string& GetYson() const; const std::string& GetJson() const; TDecimalValue GetDecimal() const; @@ -370,7 +374,9 @@ class TValueParser : public TMoveOnly { std::optional GetOptionalTzDatetime() const; std::optional GetOptionalTzTimestamp() const; std::optional GetOptionalString() const; + std::optional GetOptionalBytes() const; std::optional GetOptionalUtf8() const; + std::optional GetOptionalText() const; std::optional GetOptionalYson() const; std::optional GetOptionalJson() const; std::optional GetOptionalDecimal() const; @@ -448,7 +454,9 @@ class TValueBuilderBase : public TMoveOnly { TDerived& TzDatetime(const std::string& value); TDerived& TzTimestamp(const std::string& value); TDerived& String(const std::string& value); + TDerived& Bytes(const std::string& value); TDerived& Utf8(const std::string& value); + TDerived& Text(const std::string& value); TDerived& Yson(const std::string& value); TDerived& Json(const std::string& value); TDerived& Decimal(const TDecimalValue& value); @@ -480,7 +488,9 @@ class TValueBuilderBase : public TMoveOnly { TDerived& OptionalTzDatetime(const std::optional& value); TDerived& OptionalTzTimestamp(const std::optional& value); TDerived& OptionalString(const std::optional& value); + TDerived& OptionalBytes(const std::optional& value); TDerived& OptionalUtf8(const std::optional& value); + TDerived& OptionalText(const std::optional& value); TDerived& OptionalYson(const std::optional& value); TDerived& OptionalJson(const std::optional& value); TDerived& OptionalUuid(const std::optional& value); diff --git a/src/api/protos/ydb_common.proto b/src/api/protos/ydb_common.proto index 1be2cbe5727..9e865646b43 100644 --- a/src/api/protos/ydb_common.proto +++ b/src/api/protos/ydb_common.proto @@ -27,9 +27,3 @@ message VirtualTimestamp { uint64 plan_step = 1; uint64 tx_id = 2; } - -enum MetricsLevel { - Database = 0; - Object = 1; - Detailed = 2; -} diff --git a/src/api/protos/ydb_persqueue_v1.proto b/src/api/protos/ydb_persqueue_v1.proto index 2d5c070cbc3..04655c3852d 100644 --- a/src/api/protos/ydb_persqueue_v1.proto +++ b/src/api/protos/ydb_persqueue_v1.proto @@ -1159,6 +1159,9 @@ message TopicSettings { // Client service type. string service_type = 7; + + // Message for this consumer will not expire due to retention for at least `availability_period` if they aren't commited. + optional google.protobuf.Duration availability_period = 8; } // List of consumer read rules for this topic. @@ -1184,6 +1187,9 @@ message TopicSettings { } // remote mirror rule for this topic. RemoteMirrorRule remote_mirror_rule = 11; + + // Set or reset metrics level. + optional uint32 metrics_level = 16; } message AutoPartitioningSettings { diff --git a/src/api/protos/ydb_topic.proto b/src/api/protos/ydb_topic.proto index 6ae3aa6f6e4..fced1b62fac 100644 --- a/src/api/protos/ydb_topic.proto +++ b/src/api/protos/ydb_topic.proto @@ -7,6 +7,7 @@ import "src/api/protos/annotations/sensitive.proto"; import "src/api/protos/annotations/validation.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; package Ydb.Topic; @@ -155,7 +156,7 @@ message StreamWriteMessage { // Message sequence number, provided by client for deduplication. // Starts at 1 int64 seq_no = 1; - // Creation timestamp + // Creation timestamp. google.protobuf.Timestamp created_at = 2; // Compressed client message body. bytes data = 3; @@ -822,6 +823,9 @@ message Consumer { // Bytes read statistics. MultipleWindowsStat bytes_read = 4; } + + // Message for this consumer will not expire due to retention for at least `availability_period` if they aren't commited. + optional google.protobuf.Duration availability_period = 8; } // Consumer alter description. @@ -844,6 +848,12 @@ message AlterConsumer { // User and server attributes of consumer. Server attributes starts from "_" and will be validated by server. // Leave the value blank to drop an attribute. map alter_attributes = 6; + + // Change message lifetime if consumer is important. + oneof availability_period_action { + google.protobuf.Duration set_availability_period = 7; + google.protobuf.Empty reset_availability_period = 8; + } } enum AutoPartitioningStrategy { @@ -992,6 +1002,9 @@ message CreateTopicRequest { // Metering mode for the topic in a serverless database. MeteringMode metering_mode = 12; + + // Metrics level. If the level is unset, use database setting. + optional uint32 metrics_level = 13; } // Create topic response sent from server to client. @@ -1125,6 +1138,9 @@ message DescribeTopicResult { // How much bytes were written statistics. MultipleWindowsStat bytes_written = 4; } + + // Metrics level. + optional uint32 metrics_level = 16; } // Describe partition request sent from client to server. @@ -1252,7 +1268,6 @@ message PartitionStats { int32 partition_node_id = 8 [deprecated=true]; //Use PartitionLocation } - // Update existing topic request sent from client to server. message AlterTopicRequest { Ydb.Operations.OperationParams operation_params = 1; @@ -1297,6 +1312,12 @@ message AlterTopicRequest { // Set metering mode for topic in serverless database. MeteringMode set_metering_mode = 14; + + // Set or reset metrics level. + oneof metrics_level { + uint32 set_metrics_level = 15; + google.protobuf.Empty reset_metrics_level = 16; + } } // Update topic response sent from server to client. diff --git a/src/client/iam_private/common/iam.h b/src/client/iam_private/common/iam.h index acf2cf16b15..79a67d217d8 100644 --- a/src/client/iam_private/common/iam.h +++ b/src/client/iam_private/common/iam.h @@ -18,7 +18,9 @@ class TIamServiceCredentialsProviderFactory : public ICredentialsProviderFactory req.set_resource_type(params.ResourceType); req.set_target_service_account_id(params.TargetServiceAccountId); }, - &TService::Stub::async_interface::CreateForService, + [](typename TService::Stub* stub, grpc::ClientContext* context, const TRequest* request, TResponse* response, std::function cb) { + stub->async()->CreateForService(context, request, response, std::move(cb)); + }, params.SystemServiceAccountCredentials->CreateProvider()) {} }; diff --git a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp index ee87596df8c..0acd94de7e6 100644 --- a/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp +++ b/src/client/persqueue_public/ut/ut_utils/data_plane_helpers.cpp @@ -114,7 +114,7 @@ namespace NKikimr::NPersQueueTests { createPartitionStreamEvent->Confirm(); } else if (auto* destroyPartitionStreamEvent = std::get_if(&*event)) { destroyPartitionStreamEvent->Confirm(); - } else if (auto* closeSessionEvent = std::get_if(&*event)) { + } else if (std::get_if(&*event)) { return {}; } } diff --git a/src/client/persqueue_public/ut/ut_utils/ut_utils.h b/src/client/persqueue_public/ut/ut_utils/ut_utils.h index 57379111e99..b55153143b1 100644 --- a/src/client/persqueue_public/ut/ut_utils/ut_utils.h +++ b/src/client/persqueue_public/ut/ut_utils/ut_utils.h @@ -97,6 +97,19 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup { .ClusterDiscoveryMode(EClusterDiscoveryMode::On); return settings; } + + void Write(const TString& topic, ui32 partitionId, const TString& data) { + auto settings = TWriteSessionSettings() + .Path(topic) + .MessageGroupId("src-id") + .PartitionGroupId(partitionId) + .Codec(ECodec::RAW); + auto writeSession = GetPersQueueClient().CreateSimpleBlockingWriteSession(settings); + + writeSession->Write(data); + + writeSession->Close(); + } }; struct TYDBClientEventLoop : public ::NPersQueue::IClientEventLoop { diff --git a/src/client/query/impl/exec_query.cpp b/src/client/query/impl/exec_query.cpp index 896d29cdddc..d107083aeb4 100644 --- a/src/client/query/impl/exec_query.cpp +++ b/src/client/query/impl/exec_query.cpp @@ -128,8 +128,12 @@ class TExecuteQueryIterator::TReaderImpl { }; TAsyncExecuteQueryPart TExecuteQueryIterator::ReadNext() { + if (!ReaderImpl_) { + RaiseError("Attempt to read a stream result part on an invalid stream. "); + } + if (ReaderImpl_->IsFinished()) { - RaiseError("Attempt to perform read on invalid or finished stream"); + RaiseError("Attempt to read a stream result part on a finished stream. "); } return ReaderImpl_->ReadNext(ReaderImpl_); diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index d68a1b633a5..26594dfb0b6 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -1032,16 +1032,17 @@ TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, request->set_table(TStringType{table}); - if (rows.GetType().Impl_.use_count() == 1) { - request->mutable_rows()->mutable_type()->Swap(&rows.GetType().GetProto()); - } else { - *request->mutable_rows()->mutable_type() = rows.GetType().GetProto(); - } - + auto* mutable_rows = request->mutable_rows(); if (rows.Impl_.use_count() == 1) { - request->mutable_rows()->mutable_value()->Swap(&rows.GetProto()); + mutable_rows->mutable_value()->Swap(&rows.GetProto()); + if (rows.GetType().Impl_.use_count() == 1) { + mutable_rows->mutable_type()->Swap(&rows.GetType().GetProto()); + } else { + *mutable_rows->mutable_type() = rows.GetType().GetProto(); + } } else { - *request->mutable_rows()->mutable_value() = rows.GetProto(); + *mutable_rows->mutable_value() = rows.GetProto(); + *mutable_rows->mutable_type() = rows.GetType().GetProto(); } auto promise = NewPromise(); diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 7a222ba4acb..f688c9c8d81 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -33,6 +33,7 @@ namespace NYdb::inline V3::NTopic { static const bool RangesMode = !std::string{std::getenv("PQ_OFFSET_RANGES_MODE") ? std::getenv("PQ_OFFSET_RANGES_MODE") : ""}.empty(); static const bool ExperimentalDirectRead = !std::string{std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") ? std::getenv("PQ_EXPERIMENTAL_DIRECT_READ") : ""}.empty(); +static const bool DecompressEverything = !std::string{std::getenv("PQ_DECOMPRESS_EVERYTHING") ? std::getenv("PQ_DECOMPRESS_EVERYTHING") : ""}.empty(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1734,11 +1735,13 @@ void TSingleClusterReadSessionImpl::StartDecompressionTask UpdateMemoryUsageStatisticsImpl(); const i64 limit = GetDecompressedDataSizeLimit(); Y_ABORT_UNLESS(limit > 0); - while (DecompressedDataSize < limit - && (static_cast(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_ - || DecompressedDataSize == 0 /* Allow decompression of at least one message even if memory is full. */) - && !DecompressionQueue.empty()) - { + while ( + !DecompressionQueue.empty() + && (DecompressEverything + || (DecompressedDataSize < limit + && (static_cast(CompressedDataSize + DecompressedDataSize) < Settings.MaxMemoryUsageBytes_ + || DecompressedDataSize == 0 /* Allow decompression of at least one message even if memory is full. */))) + ) { TDecompressionQueueItem& current = DecompressionQueue.front(); auto sentToDecompress = current.BatchInfo->StartDecompressionTasks(Settings.DecompressionExecutor_, Max(limit - DecompressedDataSize, static_cast(1)), diff --git a/src/client/topic/impl/topic.cpp b/src/client/topic/impl/topic.cpp index a6ee2007cfe..891e89509d6 100644 --- a/src/client/topic/impl/topic.cpp +++ b/src/client/topic/impl/topic.cpp @@ -11,6 +11,10 @@ namespace NYdb::inline V3::NTopic { +static TDuration ConvertPositiveDuration(const google::protobuf::Duration& duration) { + return TDuration::Seconds(Max(duration.seconds(), 0)) + TDuration::MicroSeconds(Max(duration.nanos(), 0) / 1000); +} + TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result) : TStatus(std::move(status)) , TopicDescription_(std::move(result)) @@ -50,6 +54,7 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) , PartitionWriteBurstBytes_(Proto_.partition_write_burst_bytes()) , MeteringMode_(TProtoAccessor::FromProto(Proto_.metering_mode())) , TopicStats_(Proto_.topic_stats()) + , MetricsLevel_(Proto_.has_metrics_level() ? std::optional(static_cast(Proto_.metrics_level())) : std::optional()) { Owner_ = Proto_.self().owner(); CreationTimestamp_ = NScheme::TVirtualTimestamp(Proto_.self().created_at()); @@ -88,6 +93,7 @@ TPartitionDescription::TPartitionDescription(Ydb::Topic::DescribePartitionResult TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer) : ConsumerName_(consumer.name()) , Important_(consumer.important()) + , AvailabilityPeriod_(ConvertPositiveDuration(consumer.availability_period())) , ReadFrom_(TInstant::Seconds(consumer.read_from().seconds())) { for (const auto& codec : consumer.supported_codecs().codecs()) { @@ -106,6 +112,10 @@ bool TConsumer::GetImportant() const { return Important_; } +TDuration TConsumer::GetAvailabilityPeriod() const { + return AvailabilityPeriod_; +} + const TInstant& TConsumer::GetReadFrom() const { return ReadFrom_; } @@ -184,6 +194,9 @@ void TTopicDescription::SerializeTo(Ydb::Topic::CreateTopicRequest& request) con *request.mutable_attributes() = Proto_.attributes(); *request.mutable_consumers() = Proto_.consumers(); request.set_metering_mode(Proto_.metering_mode()); + if (Proto_.has_metrics_level()) { + request.set_metrics_level(Proto_.metrics_level()); + } } const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const { @@ -210,6 +223,10 @@ const TTopicStats& TTopicDescription::GetTopicStats() const { return TopicStats_; } +std::optional TTopicDescription::GetMetricsLevel() const { + return MetricsLevel_; +} + const std::vector& TTopicDescription::GetPermissions() const { return Permissions_; } @@ -282,7 +299,7 @@ uint32_t TAutoPartitioningSettings::GetDownUtilizationPercent() const { TTopicStats::TTopicStats(const Ydb::Topic::DescribeTopicResult::TopicStats& topicStats) : StoreSizeBytes_(topicStats.store_size_bytes()) , MinLastWriteTime_(TInstant::Seconds(topicStats.min_last_write_time().seconds())) - , MaxWriteTimeLag_(TDuration::Seconds(topicStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(topicStats.max_write_time_lag().nanos() / 1000)) + , MaxWriteTimeLag_(ConvertPositiveDuration(topicStats.max_write_time_lag())) , BytesWrittenPerMinute_(topicStats.bytes_written().per_minute()) , BytesWrittenPerHour_(topicStats.bytes_written().per_hour()) , BytesWrittenPerDay_(topicStats.bytes_written().per_day()) @@ -319,7 +336,7 @@ TPartitionStats::TPartitionStats(const Ydb::Topic::PartitionStats& partitionStat , EndOffset_(partitionStats.partition_offsets().end()) , StoreSizeBytes_(partitionStats.store_size_bytes()) , LastWriteTime_(TInstant::Seconds(partitionStats.last_write_time().seconds())) - , MaxWriteTimeLag_(TDuration::Seconds(partitionStats.max_write_time_lag().seconds()) + TDuration::MicroSeconds(partitionStats.max_write_time_lag().nanos() / 1000)) + , MaxWriteTimeLag_(ConvertPositiveDuration(partitionStats.max_write_time_lag())) , BytesWrittenPerMinute_(partitionStats.bytes_written().per_minute()) , BytesWrittenPerHour_(partitionStats.bytes_written().per_hour()) , BytesWrittenPerDay_(partitionStats.bytes_written().per_day()) @@ -620,6 +637,7 @@ template TConsumerSettings::TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto) : ConsumerName_(proto.name()) , Important_(proto.important()) + , AvailabilityPeriod_(ConvertPositiveDuration(proto.availability_period())) , ReadFrom_(TInstant::Seconds(proto.read_from().seconds())) , SupportedCodecs_(DeserializeCodecs(proto.supported_codecs())) , Attributes_(DeserializeAttributes(proto.attributes())) @@ -631,6 +649,12 @@ template void TConsumerSettings::SerializeTo(Ydb::Topic::Consumer& proto) const { proto.set_name(ConsumerName_); proto.set_important(Important_); + if (AvailabilityPeriod_ != TDuration::Zero()) { + proto.mutable_availability_period()->set_seconds(AvailabilityPeriod_.Seconds()); + proto.mutable_availability_period()->set_nanos((AvailabilityPeriod_.MicroSeconds() % 1'000'000) * 1'000); + } else { + proto.clear_availability_period(); + } proto.mutable_read_from()->set_seconds(ReadFrom_.Seconds()); *proto.mutable_supported_codecs() = SerializeCodecs(SupportedCodecs_); *proto.mutable_attributes() = SerializeAttributes(Attributes_); @@ -648,6 +672,7 @@ TCreateTopicSettings::TCreateTopicSettings(const Ydb::Topic::CreateTopicRequest& , PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second()) , PartitionWriteBurstBytes_(proto.partition_write_burst_bytes()) , Attributes_(DeserializeAttributes(proto.attributes())) + , MetricsLevel_(proto.has_metrics_level() ? std::optional(static_cast(proto.metrics_level())) : std::nullopt) { Consumers_ = DeserializeConsumers(*this, proto.consumers()); } @@ -662,6 +687,9 @@ void TCreateTopicSettings::SerializeTo(Ydb::Topic::CreateTopicRequest& request) request.set_partition_write_burst_bytes(PartitionWriteBurstBytes_); *request.mutable_consumers() = SerializeConsumers(Consumers_); *request.mutable_attributes() = SerializeAttributes(Attributes_); + if (MetricsLevel_) { + request.set_metrics_level(*MetricsLevel_); + } } } // namespace NYdb::NTopic diff --git a/src/client/topic/impl/topic_impl.h b/src/client/topic/impl/topic_impl.h index 2fa1703635a..5d19a74ea52 100644 --- a/src/client/topic/impl/topic_impl.h +++ b/src/client/topic/impl/topic_impl.h @@ -46,6 +46,14 @@ class TTopicClient::TImpl : public TClientImplCommon { consumerProto.set_name(TStringType{settings.ConsumerName_}); if (settings.SetImportant_) consumerProto.set_set_important(*settings.SetImportant_); + if (settings.SetAvailabilityPeriod_) { + if (settings.SetAvailabilityPeriod_ != TDuration::Zero()) { + consumerProto.mutable_set_availability_period()->set_seconds(settings.SetAvailabilityPeriod_->Seconds()); + consumerProto.mutable_set_availability_period()->set_nanos((settings.SetAvailabilityPeriod_->MicroSeconds() % 1'000'000) * 1'000); + } else { + consumerProto.mutable_reset_availability_period(); + } + } if (settings.SetReadFrom_) consumerProto.mutable_set_read_from()->set_seconds(settings.SetReadFrom_->Seconds()); @@ -142,6 +150,12 @@ class TTopicClient::TImpl : public TClientImplCommon { ConvertAlterConsumerToProto(consumer, consumerProto); } + if (auto level = std::get_if(&settings.MetricsLevel_)) { + request.set_set_metrics_level(*level); + } else if (auto reset = std::get_if(&settings.MetricsLevel_); *reset) { + request.mutable_reset_metrics_level(); + } + return request; } diff --git a/src/client/topic/ut/topic_to_table_ut.cpp b/src/client/topic/ut/topic_to_table_ut.cpp index e8431f776d2..4a9ccb7b9de 100644 --- a/src/client/topic/ut/topic_to_table_ut.cpp +++ b/src/client/topic/ut/topic_to_table_ut.cpp @@ -307,6 +307,8 @@ class TFixture : public NUnitTest::TBaseFixture { virtual EClientType GetClientType() const = 0; virtual ~TFixture() = default; + void TestWriteAndReadMessages(size_t count, size_t size, bool restart); + private: class TTableSession : public ISession { public: @@ -2466,68 +2468,6 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_47_Query, TFixtureQuery) TestWriteToTopic47(); } -void TFixture::TestWriteToTopic50() -{ - // TODO(abcdef): temporarily deleted - return; - - // We write to the topic in the transaction. When a transaction is committed, the keys in the blob - // cache are renamed. - CreateTopic("topic_A", TEST_CONSUMER); - CreateTopic("topic_B", TEST_CONSUMER); - - std::string message(128_KB, 'x'); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message); - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); - - auto session = CreateSession(); - - // tx #1 - // After the transaction commit, there will be no large blobs in the batches. The number of renames - // will not change in the cache. - auto tx = session->BeginTx(); - - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, tx.get()); - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, tx.get()); - - UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); - - session->CommitTx(*tx, EStatus::SUCCESS); - - std::this_thread::sleep_for(5s); - - UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); - - // tx #2 - // After the commit, the party will rename one big blob - tx = session->BeginTx(); - - for (unsigned i = 0; i < 80; ++i) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, tx.get()); - } - - WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, tx.get()); - - UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); - - session->CommitTx(*tx, EStatus::SUCCESS); - - std::this_thread::sleep_for(5s); - - UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_50_Table, TFixtureTable) -{ - TestWriteToTopic50(); -} - -Y_UNIT_TEST_F(WriteToTopic_Demo_50_Query, TFixtureQuery) -{ - TestWriteToTopic50(); -} - void TFixture::TestWriteRandomSizedMessagesInWideTransactions() { // The test verifies the simultaneous execution of several transactions. There is a topic @@ -3359,6 +3299,123 @@ Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixt RestartPQTablet("topic_A", 1); } +void TFixture::TestWriteAndReadMessages(size_t count, size_t size, bool restart) +{ + CreateTopic("topic_A"); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + for (size_t i = 0; i < count; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, std::string(size, 'x'), nullptr); + } + CloseTopicWriteSession("topic_A", TEST_MESSAGE_GROUP_ID); + + if (restart) { + RestartPQTablet("topic_A", 0); + } + + auto messages = Read_Exactly_N_Messages_From_Topic("topic_A", TEST_CONSUMER, count); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), count); +} + +Y_UNIT_TEST_F(Write_And_Read_Small_Messages_1, TFixtureNoClient) +{ + TestWriteAndReadMessages(320, 64'000, false); +} + +Y_UNIT_TEST_F(Write_And_Read_Small_Messages_2, TFixtureNoClient) +{ + TestWriteAndReadMessages(320, 64'000, true); +} + +Y_UNIT_TEST_F(Write_And_Read_Big_Messages_1, TFixtureNoClient) +{ + TestWriteAndReadMessages(27, 64'000 * 12, false); +} + +Y_UNIT_TEST_F(Write_And_Read_Big_Messages_2, TFixtureNoClient) +{ + TestWriteAndReadMessages(27, 64'000 * 12, true); +} + +Y_UNIT_TEST_F(Write_And_Read_Huge_Messages_1, TFixtureNoClient) +{ + TestWriteAndReadMessages(4, 9'000'000, false); +} + +Y_UNIT_TEST_F(Write_And_Read_Huge_Messages_2, TFixtureNoClient) +{ + TestWriteAndReadMessages(4, 9'000'000, true); +} + +Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_1, TFixtureNoClient) +{ + TestWriteAndReadMessages(4, 61'000'000, false); +} + +Y_UNIT_TEST_F(Write_And_Read_Gigant_Messages_2, TFixtureNoClient) +{ + TestWriteAndReadMessages(4, 61'000'000, true); +} + +Y_UNIT_TEST_F(Write_50k_100times_50tx, TFixtureTable) +{ + // 100 transactions. Write 100 50KB messages in each folder. Call the commit at the same time. + // As a result, there will be a lot of small blobs in the FastWrite zone of the main batch, + // which will be picked up by a compact. The scenario is similar to the work of Ya.Metrika. + + const std::size_t PARTITIONS_COUNT = 2; + const std::size_t TXS_COUNT = 50; + + auto makeSourceId = [](unsigned txId, unsigned partitionId) { + std::string sourceId = TEST_MESSAGE_GROUP_ID; + sourceId += "_"; + sourceId += ToString(txId); + sourceId += "_"; + sourceId += ToString(partitionId); + return sourceId; + }; + + CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT); + + SetPartitionWriteSpeed("topic_A", 50'000'000); + + std::vector> sessions; + std::vector> transactions; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + sessions.push_back(CreateSession()); + auto& session = sessions.back(); + + transactions.push_back(session->BeginTx()); + auto& tx = transactions.back(); + + auto sourceId = makeSourceId(i, 0); + for (size_t j = 0; j < 100; ++j) { + WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 0); + } + WaitForAcks("topic_A", sourceId); + + sourceId = makeSourceId(i, 1); + WriteToTopic("topic_A", sourceId, std::string(50'000, 'x'), tx.get(), 1); + WaitForAcks("topic_A", sourceId); + } + + // We are doing an asynchronous commit of transactions. They will be executed simultaneously. + std::vector futures; + + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures.push_back(sessions[i]->AsyncCommitTx(*transactions[i])); + } + + // All transactions must be completed successfully. + for (std::size_t i = 0; i < TXS_COUNT; ++i) { + futures[i].Wait(); + const auto& result = futures[i].GetValueSync(); + UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } +} + } } diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp index 30273f03ff8..56496b7d4e1 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp @@ -56,10 +56,16 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const std::string& nam void TTopicSdkTestSetup::Write(const std::string& message, std::uint32_t partitionId, const std::optional producer, std::optional seqNo) { + Write(GetTopicPath(), message, partitionId, producer, seqNo); +} + +void TTopicSdkTestSetup::Write(const std::string& topic, const std::string& message, std::uint32_t partitionId, + const std::optional producer, + std::optional seqNo) { TTopicClient client(MakeDriver()); TWriteSessionSettings settings; - settings.Path(GetTopicPath()); + settings.Path(topic); settings.PartitionId(partitionId); settings.DeduplicationEnabled(producer.has_value()); if (producer) { diff --git a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h index 65c7f036ae7..bef129a69a8 100644 --- a/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h +++ b/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h @@ -32,6 +32,10 @@ class TTopicSdkTestSetup : public ITopicTestSetup { const std::optional producer = std::nullopt, std::optional seqNo = std::nullopt); + void Write(const std::string& topic, const std::string& message, std::uint32_t partitionId = 0, + const std::optional producer = std::nullopt, + std::optional seqNo = std::nullopt); + struct TReadResult { std::shared_ptr Reader; bool Timeout; diff --git a/src/client/value/value.cpp b/src/client/value/value.cpp index 84b2027a92f..31359e825d1 100644 --- a/src/client/value/value.cpp +++ b/src/client/value/value.cpp @@ -1260,11 +1260,21 @@ class TValueParser::TImpl { return GetProto().bytes_value(); } + const std::string& GetBytes() const { + CheckPrimitive(NYdb::EPrimitiveType::Bytes); + return GetProto().bytes_value(); + } + const std::string& GetUtf8() const { CheckPrimitive(NYdb::EPrimitiveType::Utf8); return GetProto().text_value(); } + const std::string& GetText() const { + CheckPrimitive(NYdb::EPrimitiveType::Text); + return GetProto().text_value(); + } + const std::string& GetYson() const { CheckPrimitive(NYdb::EPrimitiveType::Yson); return GetProto().bytes_value(); @@ -1635,9 +1645,9 @@ class TValueParser::TImpl { case NYdb::EPrimitiveType::TzDatetime: case NYdb::EPrimitiveType::TzTimestamp: return Ydb::Value::kTextValue; - case NYdb::EPrimitiveType::String: + case NYdb::EPrimitiveType::Bytes: return Ydb::Value::kBytesValue; - case NYdb::EPrimitiveType::Utf8: + case NYdb::EPrimitiveType::Text: return Ydb::Value::kTextValue; case NYdb::EPrimitiveType::Yson: return Ydb::Value::kBytesValue; @@ -1780,10 +1790,18 @@ const std::string& TValueParser::GetString() const { return Impl_->GetString(); } +const std::string& TValueParser::GetBytes() const { + return Impl_->GetBytes(); +} + const std::string& TValueParser::GetUtf8() const { return Impl_->GetUtf8(); } +const std::string& TValueParser::GetText() const { + return Impl_->GetText(); +} + const std::string& TValueParser::GetYson() const { return Impl_->GetYson(); } @@ -1912,10 +1930,18 @@ std::optional TValueParser::GetOptionalString() const { RET_OPT_VALUE(std::string, String); } +std::optional TValueParser::GetOptionalBytes() const { + RET_OPT_VALUE(std::string, Bytes); +} + std::optional TValueParser::GetOptionalUtf8() const { RET_OPT_VALUE(std::string, Utf8); } +std::optional TValueParser::GetOptionalText() const { + RET_OPT_VALUE(std::string, Text); +} + std::optional TValueParser::GetOptionalYson() const { RET_OPT_VALUE(std::string, Yson); } @@ -2228,11 +2254,21 @@ class TValueBuilderImpl { GetValue().set_bytes_value(TStringType{value}); } + void Bytes(const std::string& value) { + FillPrimitiveType(EPrimitiveType::Bytes); + GetValue().set_bytes_value(TStringType{value}); + } + void Utf8(const std::string& value) { FillPrimitiveType(EPrimitiveType::Utf8); GetValue().set_text_value(TStringType{value}); } + void Text(const std::string& value) { + FillPrimitiveType(EPrimitiveType::Text); + GetValue().set_text_value(TStringType{value}); + } + void Yson(const std::string& value) { FillPrimitiveType(EPrimitiveType::Yson); GetValue().set_bytes_value(TStringType{value}); @@ -3001,12 +3037,24 @@ TDerived& TValueBuilderBase::String(const std::string& value) { return static_cast(*this); } +template +TDerived& TValueBuilderBase::Bytes(const std::string& value) { + Impl_->Bytes(value); + return static_cast(*this); +} + template TDerived& TValueBuilderBase::Utf8(const std::string& value) { Impl_->Utf8(value); return static_cast(*this); } +template +TDerived& TValueBuilderBase::Text(const std::string& value) { + Impl_->Text(value); + return static_cast(*this); +} + template TDerived& TValueBuilderBase::Yson(const std::string& value) { Impl_->Yson(value); @@ -3180,11 +3228,21 @@ TDerived& TValueBuilderBase::OptionalString(const std::optional +TDerived& TValueBuilderBase::OptionalBytes(const std::optional& value) { + SET_OPT_VALUE_FROM_OPTIONAL(Bytes); +} + template TDerived& TValueBuilderBase::OptionalUtf8(const std::optional& value) { SET_OPT_VALUE_FROM_OPTIONAL(Utf8); } +template +TDerived& TValueBuilderBase::OptionalText(const std::optional& value) { + SET_OPT_VALUE_FROM_OPTIONAL(Text); +} + template TDerived& TValueBuilderBase::OptionalYson(const std::optional& value) { SET_OPT_VALUE_FROM_OPTIONAL(Yson); diff --git a/src/library/operation_id/operation_id.cpp b/src/library/operation_id/operation_id.cpp index 9e43e1c01d7..5fcaebd7d51 100644 --- a/src/library/operation_id/operation_id.cpp +++ b/src/library/operation_id/operation_id.cpp @@ -130,8 +130,12 @@ class TOperationId::TImpl { if (er != TState::ParsedOK) { ythrow yexception() << "Unable to parse input string"; } - std::string path = uri.PrintS(TField::FlagPath).substr(1); // start from 1 to remove first '/' - if (path.length() < 1) { + std::string path = uri.PrintS(TField::FlagPath); + if (path.empty() || path[0] != '/') { + ythrow yexception() << "Operation ID must have a path"; + } + path = path.substr(1); // start from 1 to remove first '/' + if (path.empty()) { ythrow yexception() << "Invalid path length"; } int kind; diff --git a/tests/integration/topic/describe_topic.cpp b/tests/integration/topic/describe_topic.cpp index c454597593a..cc471a5d88b 100644 --- a/tests/integration/topic/describe_topic.cpp +++ b/tests/integration/topic/describe_topic.cpp @@ -22,9 +22,6 @@ TEST_F(Describe, TEST_NAME(Basic)) { } TEST_F(Describe, TEST_NAME(Statistics)) { - // TODO(abcdef): temporarily deleted - GTEST_SKIP() << "temporarily deleted"; - TTopicClient client(MakeDriver()); // Get empty description @@ -122,4 +119,83 @@ TEST_F(Describe, TEST_NAME(Location)) { } } +TEST_F(Describe, TEST_NAME(MetricsLevel)) { + char* ydbVersion = std::getenv("YDB_VERSION"); + if (ydbVersion != nullptr && std::string(ydbVersion) != "trunk") { + GTEST_SKIP() << "Skipping test for YDB version " << ydbVersion; + } + + TTopicClient client(MakeDriver()); + + // const std::uint32_t MetricsLevelDisabled = 0; + // const std::uint32_t MetricsLevelDatabase = 1; + const std::uint32_t MetricsLevelObject = 2; + const std::uint32_t MetricsLevelDetailed = 3; + + auto createTopic = [&](std::string topic, EMetricsLevel metricsLevel) { + auto res = client.CreateTopic(topic, TCreateTopicSettings().MetricsLevel(metricsLevel)).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + }; + + auto setMetricsLevel = [&](std::string topic, EMetricsLevel metricsLevel) { + auto res = client.AlterTopic(topic, TAlterTopicSettings().SetMetricsLevel(metricsLevel)).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + }; + + auto resetMetricsLevel = [&](std::string topic) { + auto res = client.AlterTopic(topic, TAlterTopicSettings().ResetMetricsLevel()).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + }; + + auto checkFlag = [&](std::string topic, std::optional expectedMetricsLevel) { + auto res = client.DescribeTopic(topic, {}).GetValueSync(); + Y_ENSURE(res.IsSuccess()); + return res.GetTopicDescription().GetMetricsLevel() == expectedMetricsLevel; + }; + + { + const std::string topic(GetTopicPath("topic-with-counters")); + createTopic(topic, MetricsLevelDetailed); + checkFlag(topic, MetricsLevelDetailed); + setMetricsLevel(topic, MetricsLevelObject); + Y_ENSURE(checkFlag(topic, MetricsLevelObject)); + + { + // Empty alter should change nothing. + auto res = client.AlterTopic(topic).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + Y_ENSURE(checkFlag(topic, MetricsLevelObject)); + } + + { + resetMetricsLevel(topic); + Y_ENSURE(checkFlag(topic, {})); + } + } + + { + const std::string topic(GetTopicPath("topic-without-counters-by-default")); + auto res = client.CreateTopic(topic).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + Y_ENSURE(checkFlag(topic, {})); + setMetricsLevel(topic, MetricsLevelDetailed); + Y_ENSURE(checkFlag(topic, MetricsLevelDetailed)); + + { + // Empty alter should change nothing. + auto res = client.AlterTopic(topic).GetValueSync(); + ASSERT_TRUE(res.IsSuccess()); + Y_ENSURE(checkFlag(topic, MetricsLevelDetailed)); + } + } + + { + const std::string topic(GetTopicPath("topic-without-counters")); + createTopic(topic, MetricsLevelObject); + Y_ENSURE(checkFlag(topic, MetricsLevelObject)); + setMetricsLevel(topic, MetricsLevelDetailed); + Y_ENSURE(checkFlag(topic, MetricsLevelDetailed)); + } +} + } diff --git a/tests/integration/topic/topic_to_table.cpp b/tests/integration/topic/topic_to_table.cpp index be83486bb16..7691e9fa89f 100644 --- a/tests/integration/topic/topic_to_table.cpp +++ b/tests/integration/topic/topic_to_table.cpp @@ -1675,9 +1675,6 @@ TEST_F(TxUsageQuery, TEST_NAME(WriteToTopic_Demo_15)) void TxUsage::TestWriteToTopic17() { - // TODO(abcdef): temporarily deleted - return; - CreateTopic("topic_A"); auto session = CreateSession(); diff --git a/tests/integration/topic/utils/describe.cpp b/tests/integration/topic/utils/describe.cpp index c366416e1fb..5e9fa9d4212 100644 --- a/tests/integration/topic/utils/describe.cpp +++ b/tests/integration/topic/utils/describe.cpp @@ -42,6 +42,8 @@ void DescribeTopicTest(ITopicTestSetup& setup, TTopicClient& client, bool requir Y_ENSURE(partitionLocation.GetNodeId() > 0); Y_ENSURE(partitionLocation.GetGeneration() >= 0); // greater-or-equal 0 } + + Y_ENSURE(!description.GetMetricsLevel().has_value()); } } diff --git a/tests/slo_workloads/utils/statistics.cpp b/tests/slo_workloads/utils/statistics.cpp index 7ffcc04babf..4b5d8e4855f 100644 --- a/tests/slo_workloads/utils/statistics.cpp +++ b/tests/slo_workloads/utils/statistics.cpp @@ -359,8 +359,8 @@ void TStat::CalculateFailSeconds() { std::sort(LatencyStats.begin(), LatencyStats.end(), [&](const TPeriodStat& a, const TPeriodStat& b) { return a.Seconds < b.Seconds; }); - FailSeconds = std::make_unique(0); - size_t& failSeconds = *FailSeconds; + FailSeconds = std::make_unique(0); + std::uint64_t& failSeconds = *FailSeconds; std::uint64_t lastSecChecked = LatencyStats[0].Seconds - 1; for (auto& stat : LatencyStats) { failSeconds += stat.Seconds - lastSecChecked - 1; diff --git a/tests/unit/library/operation_id/CMakeLists.txt b/tests/unit/library/operation_id/CMakeLists.txt index 47b9c2d5c40..a6f2143949a 100644 --- a/tests/unit/library/operation_id/CMakeLists.txt +++ b/tests/unit/library/operation_id/CMakeLists.txt @@ -1,12 +1,10 @@ -add_ydb_test(NAME operation_id_ut +add_ydb_test(NAME operation_id_ut GTEST SOURCES operation_id_ut.cpp LINK_LIBRARIES yutil - cpp-testing-unittest_main lib-operation_id-protos library-operation_id - cpp-testing-unittest LABELS unit ) diff --git a/tests/unit/library/operation_id/operation_id_ut.cpp b/tests/unit/library/operation_id/operation_id_ut.cpp index 58bda37900c..59c5e5a0533 100644 --- a/tests/unit/library/operation_id/operation_id_ut.cpp +++ b/tests/unit/library/operation_id/operation_id_ut.cpp @@ -1,124 +1,126 @@ #include #include -#include -#include - -namespace NKikimr { -namespace NOperationId { - -Y_UNIT_TEST_SUITE(OperationIdTest) { - const std::string PreparedQueryId = "9d629c27-2c3036b3-4b180476-64435bca"; - - Y_UNIT_TEST(ConvertKindOnly) { - Ydb::TOperationId proto; - proto.set_kind(Ydb::TOperationId::OPERATION_DDL); - auto str = ProtoToString(proto); - UNIT_ASSERT_EQUAL(str, "ydb://operation/1"); - auto newProto = TOperationId(str); - UNIT_ASSERT_EQUAL(newProto.GetProto().kind(), proto.kind()); - UNIT_ASSERT_EQUAL(newProto.GetProto().data_size(), 0); - } +#include - Y_UNIT_TEST(PreparedQueryIdCompatibleFormatter) { - Ydb::TOperationId opId; - opId.set_kind(Ydb::TOperationId::PREPARED_QUERY_ID); - AddOptionalValue(opId, "id", PreparedQueryId); - auto result = ProtoToString(opId); - UNIT_ASSERT_VALUES_EQUAL(FormatPreparedQueryIdCompat(PreparedQueryId), result); - } +#include - Y_UNIT_TEST(PreparedQueryIdDecode) { - const auto queryId = FormatPreparedQueryIdCompat(PreparedQueryId); - std::string decodedString; - bool decoded = DecodePreparedQueryIdCompat(queryId, decodedString); - UNIT_ASSERT(decoded); - UNIT_ASSERT_VALUES_EQUAL(PreparedQueryId, decodedString); - } +namespace NYdb::NOperationId { - Y_UNIT_TEST(PreparedQueryIdDecodeRawString) { - std::string decodedString; - bool decoded = DecodePreparedQueryIdCompat(PreparedQueryId, decodedString); - UNIT_ASSERT(!decoded); - UNIT_ASSERT(decodedString.empty()); - } +const std::string PreparedQueryId = "9d629c27-2c3036b3-4b180476-64435bca"; - Y_UNIT_TEST(PreparedQueryIdDecodeInvalidString) { - std::string decodedString; - UNIT_ASSERT_EXCEPTION( - DecodePreparedQueryIdCompat(std::string("ydb://preparedqueryid/4?id="), decodedString), yexception); - UNIT_ASSERT(decodedString.empty()); - } +TEST(OperationIdTest, ConvertKindOnly) { + Ydb::TOperationId proto; + proto.set_kind(Ydb::TOperationId::OPERATION_DDL); + auto str = NKikimr::NOperationId::ProtoToString(proto); + ASSERT_EQ(str, "ydb://operation/1"); + auto newProto = NKikimr::NOperationId::TOperationId(str); + ASSERT_EQ(newProto.GetProto().kind(), proto.kind()); + ASSERT_EQ(newProto.GetProto().data_size(), 0); +} - Y_UNIT_TEST(FormatPrefixShorter) { - UNIT_ASSERT(std::string("ydb://preparedqueryid/4?id=").size() < PreparedQueryId.size()); - } +TEST(OperationIdTest, PreparedQueryIdCompatibleFormatter) { + Ydb::TOperationId opId; + opId.set_kind(Ydb::TOperationId::PREPARED_QUERY_ID); + NKikimr::NOperationId::AddOptionalValue(opId, "id", PreparedQueryId); + auto result = NKikimr::NOperationId::ProtoToString(opId); + ASSERT_EQ(NKikimr::NOperationId::FormatPreparedQueryIdCompat(PreparedQueryId), result); +} + +TEST(OperationIdTest, PreparedQueryIdDecode) { + const auto queryId = NKikimr::NOperationId::FormatPreparedQueryIdCompat(PreparedQueryId); + std::string decodedString; + bool decoded = NKikimr::NOperationId::DecodePreparedQueryIdCompat(queryId, decodedString); + ASSERT_TRUE(decoded); + ASSERT_EQ(PreparedQueryId, decodedString); +} + +TEST(OperationIdTest, PreparedQueryIdDecodeRawString) { + std::string decodedString; + bool decoded = NKikimr::NOperationId::DecodePreparedQueryIdCompat(PreparedQueryId, decodedString); + ASSERT_FALSE(decoded); + ASSERT_TRUE(decodedString.empty()); +} + +TEST(OperationIdTest, PreparedQueryIdDecodeInvalidString) { + std::string decodedString; + ASSERT_THROW( + NKikimr::NOperationId::DecodePreparedQueryIdCompat("ydb://preparedqueryid/4?id=", decodedString), yexception); + ASSERT_TRUE(decodedString.empty()); +} + +TEST(OperationIdTest, FormatPrefixShorter) { + ASSERT_TRUE(std::strlen("ydb://preparedqueryid/4?id=") < PreparedQueryId.size()); +} #if 0 - Y_UNIT_TEST(PreparedQueryIdCompatibleFormatterPerf) { - ui64 x = 0; - for (int i = 0; i < 10000000; i++) { - auto result = FormatPreparedQueryIdCompat(PreparedQueryId); - x += result.size(); - } - std::cerr << x << std::endl; +TEST(OperationIdTest, PreparedQueryIdCompatibleFormatterPerf) { + ui64 x = 0; + for (int i = 0; i < 10000000; i++) { + auto result = NKikimr::NOperationId::FormatPreparedQueryIdCompat(PreparedQueryId); + x += result.size(); } + std::cerr << x << std::endl; +} - Y_UNIT_TEST(PreparedQueryIdDecodePerf) { - ui64 x = 0; - for (int i = 0; i < 10000000; i++) { - const auto queryId = FormatPreparedQueryIdCompat(PreparedQueryId); - std::string decodedString; - bool decoded = DecodePreparedQueryIdCompat(queryId, decodedString); - UNIT_ASSERT(decoded); - UNIT_ASSERT_VALUES_EQUAL(PreparedQueryId, decodedString); - x += decodedString.size(); - } - std::cerr << x << std::endl; +TEST(OperationIdTest, PreparedQueryIdDecodePerf) { + ui64 x = 0; + for (int i = 0; i < 10000000; i++) { + const auto queryId = NKikimr::NOperationId::FormatPreparedQueryIdCompat(PreparedQueryId); + std::string decodedString; + bool decoded = NKikimr::NOperationId::DecodePreparedQueryIdCompat(queryId, decodedString); + ASSERT_TRUE(decoded); + ASSERT_EQ(PreparedQueryId, decodedString); + x += decodedString.size(); } + std::cerr << x << std::endl; +} - Y_UNIT_TEST(PreparedQueryIdOldFormatterPerf) { - ui64 x = 0; - for (int i = 0; i < 10000000; i++) { - Ydb::TOperationId opId; - opId.SetKind(Ydb::TOperationId::PREPARED_QUERY_ID); - AddOptionalValue(opId, "id", PreparedQueryId); - auto result = ProtoToString(opId); - x += result.size(); - } - std::cerr << x << std::endl; +TEST(OperationIdTest, PreparedQueryIdOldFormatterPerf) { + ui64 x = 0; + for (int i = 0; i < 10000000; i++) { + Ydb::TOperationId opId; + opId.SetKind(Ydb::TOperationId::PREPARED_QUERY_ID); + NKikimr::NOperationId::AddOptionalValue(opId, "id", PreparedQueryId); + auto result = NKikimr::NOperationId::ProtoToString(opId); + x += result.size(); } + std::cerr << x << std::endl; +} #endif - Y_UNIT_TEST(ConvertKindAndValues) { - Ydb::TOperationId proto; - proto.set_kind(Ydb::TOperationId::OPERATION_DDL); - { - auto data = proto.add_data(); - data->set_key("key1"); - data->set_value("value1"); - } - { - auto data = proto.add_data(); - data->set_key("txId"); - data->set_value("42"); - } - auto str = ProtoToString(proto); - UNIT_ASSERT_EQUAL(str, "ydb://operation/1?key1=value1&txId=42"); - auto newProto = TOperationId(str); - UNIT_ASSERT_EQUAL(newProto.GetProto().kind(), proto.kind()); - UNIT_ASSERT_EQUAL(newProto.GetProto().data_size(), 2); - { - auto data = newProto.GetProto().data(0); - UNIT_ASSERT_EQUAL(data.key(), "key1"); - UNIT_ASSERT_EQUAL(data.value(), "value1"); - } - { - auto data = newProto.GetProto().data(1); - UNIT_ASSERT_EQUAL(data.key(), "txId"); - UNIT_ASSERT_EQUAL(data.value(), "42"); - } + +TEST(OperationIdTest, ConvertKindAndValues) { + Ydb::TOperationId proto; + proto.set_kind(Ydb::TOperationId::OPERATION_DDL); + { + auto data = proto.add_data(); + data->set_key("key1"); + data->set_value("value1"); + } + { + auto data = proto.add_data(); + data->set_key("txId"); + data->set_value("42"); + } + auto str = NKikimr::NOperationId::ProtoToString(proto); + ASSERT_EQ(str, "ydb://operation/1?key1=value1&txId=42"); + auto newProto = NKikimr::NOperationId::TOperationId(str); + ASSERT_EQ(newProto.GetProto().kind(), proto.kind()); + ASSERT_EQ(newProto.GetProto().data_size(), 2); + { + auto data = newProto.GetProto().data(0); + ASSERT_EQ(data.key(), "key1"); + ASSERT_EQ(data.value(), "value1"); } + { + auto data = newProto.GetProto().data(1); + ASSERT_EQ(data.key(), "txId"); + ASSERT_EQ(data.value(), "42"); + } +} + +TEST(OperationIdTest, InvalidOperationId) { + ASSERT_THROW(NKikimr::NOperationId::TOperationId("ydb://preparedqueryid"), yexception); } -} // namespace NOperationId -} // namespace NKikimr +}