@@ -234,6 +234,13 @@ TPartitioningSettings::TPartitioningSettings(const Ydb::Topic::PartitioningSetti
234234 , AutoPartitioningSettings_(settings.auto_partitioning_settings())
235235{}
236236
237+ void TPartitioningSettings::SerializeTo (Ydb::Topic::PartitioningSettings& proto) const {
238+ proto.set_min_active_partitions (MinActivePartitions_);
239+ proto.set_max_active_partitions (MaxActivePartitions_);
240+ proto.set_partition_count_limit (PartitionCountLimit_);
241+ AutoPartitioningSettings_.SerializeTo (*proto.mutable_auto_partitioning_settings ());
242+ }
243+
237244uint64_t TPartitioningSettings::GetMinActivePartitions () const {
238245 return MinActivePartitions_;
239246}
@@ -257,6 +264,14 @@ TAutoPartitioningSettings::TAutoPartitioningSettings(const Ydb::Topic::AutoParti
257264 , UpUtilizationPercent_(settings.partition_write_speed().up_utilization_percent())
258265{}
259266
267+ void TAutoPartitioningSettings::SerializeTo (Ydb::Topic::AutoPartitioningSettings& proto) const {
268+ proto.set_strategy (static_cast <Ydb::Topic::AutoPartitioningStrategy>(Strategy_));
269+ auto & writeSpeed = *proto.mutable_partition_write_speed ();
270+ writeSpeed.mutable_stabilization_window ()->set_seconds (StabilizationWindow_.Seconds ());
271+ writeSpeed.set_down_utilization_percent (DownUtilizationPercent_);
272+ writeSpeed.set_up_utilization_percent (UpUtilizationPercent_);
273+ }
274+
260275EAutoPartitioningStrategy TAutoPartitioningSettings::GetStrategy () const {
261276 return Strategy_;
262277}
@@ -542,4 +557,109 @@ TAsyncStatus TTopicClient::CommitOffset(const std::string& path, uint64_t partit
542557 return Impl_->CommitOffset (path, partitionId, consumerName, offset, settings);
543558}
544559
560+ namespace {
561+
562+ Ydb::Topic::SupportedCodecs SerializeCodecs (const std::vector<ECodec>& codecs) {
563+ Ydb::Topic::SupportedCodecs proto;
564+ for (ECodec codec : codecs) {
565+ proto.add_codecs (static_cast <Ydb::Topic::Codec>(codec));
566+ }
567+ return proto;
568+ }
569+
570+ std::vector<ECodec> DeserializeCodecs (const Ydb::Topic::SupportedCodecs& proto) {
571+ std::vector<ECodec> codecs;
572+ codecs.reserve (proto.codecs_size ());
573+ for (int codec : proto.codecs ()) {
574+ codecs.emplace_back (static_cast <ECodec>(codec));
575+ }
576+ return codecs;
577+ }
578+
579+ google::protobuf::Map<TProtoStringType, TProtoStringType> SerializeAttributes (const std::map<std::string, std::string>& attributes) {
580+ google::protobuf::Map<TProtoStringType, TProtoStringType> proto;
581+ for (const auto & [key, value] : attributes) {
582+ proto.emplace (key, value);
583+ }
584+ return proto;
585+ }
586+
587+ std::map<std::string, std::string> DeserializeAttributes (const google::protobuf::Map<TProtoStringType, TProtoStringType>& proto) {
588+ std::map<std::string, std::string> attributes;
589+ for (const auto & [key, value] : proto) {
590+ attributes.emplace (key, value);
591+ }
592+ return attributes;
593+ }
594+
595+ template <typename TSettings>
596+ google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> SerializeConsumers (const std::vector<TConsumerSettings<TSettings>>& consumers) {
597+ google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer> proto;
598+ proto.Reserve (consumers.size ());
599+ for (const auto & consumer : consumers) {
600+ consumer.SerializeTo (*proto.Add ());
601+ }
602+ return proto;
603+ }
604+
605+ template <typename TSettings>
606+ std::vector<TConsumerSettings<TSettings>> DeserializeConsumers (TSettings& parent, const google::protobuf::RepeatedPtrField<Ydb::Topic::Consumer>& proto) {
607+ std::vector<TConsumerSettings<TSettings>> consumers;
608+ consumers.reserve (proto.size ());
609+ for (const auto & consumer : proto) {
610+ consumers.emplace_back (TConsumerSettings<TSettings>(parent, consumer));
611+ }
612+ return consumers;
613+ }
614+
615+ }
616+
617+ template <typename TSettings>
618+ TConsumerSettings<TSettings>::TConsumerSettings(TSettings& parent, const Ydb::Topic::Consumer& proto)
619+ : ConsumerName_(proto.name())
620+ , Important_(proto.important())
621+ , ReadFrom_(TInstant::Seconds(proto.read_from().seconds()))
622+ , SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
623+ , Attributes_(DeserializeAttributes(proto.attributes()))
624+ , Parent_(parent)
625+ {
626+ }
627+
628+ template <typename TSettings>
629+ void TConsumerSettings<TSettings>::SerializeTo(Ydb::Topic::Consumer& proto) const {
630+ proto.set_name (ConsumerName_);
631+ proto.set_important (Important_);
632+ proto.mutable_read_from ()->set_seconds (ReadFrom_.Seconds ());
633+ *proto.mutable_supported_codecs () = SerializeCodecs (SupportedCodecs_);
634+ *proto.mutable_attributes () = SerializeAttributes (Attributes_);
635+ }
636+
637+ template struct TConsumerSettings <TCreateTopicSettings>;
638+ template struct TConsumerSettings <TAlterTopicSettings>;
639+
640+ TCreateTopicSettings::TCreateTopicSettings (const Ydb::Topic::CreateTopicRequest& proto)
641+ : PartitioningSettings_(TPartitioningSettings(proto.partitioning_settings()))
642+ , RetentionPeriod_(TDuration::Seconds(proto.retention_period().seconds()))
643+ , SupportedCodecs_(DeserializeCodecs(proto.supported_codecs()))
644+ , RetentionStorageMb_(proto.retention_storage_mb())
645+ , MeteringMode_(TProtoAccessor::FromProto(proto.metering_mode()))
646+ , PartitionWriteSpeedBytesPerSecond_(proto.partition_write_speed_bytes_per_second())
647+ , PartitionWriteBurstBytes_(proto.partition_write_burst_bytes())
648+ , Attributes_(DeserializeAttributes(proto.attributes()))
649+ {
650+ Consumers_ = DeserializeConsumers (*this , proto.consumers ());
651+ }
652+
653+ void TCreateTopicSettings::SerializeTo (Ydb::Topic::CreateTopicRequest& request) const {
654+ PartitioningSettings_.SerializeTo (*request.mutable_partitioning_settings ());
655+ request.mutable_retention_period ()->set_seconds (RetentionPeriod_.Seconds ());
656+ *request.mutable_supported_codecs () = SerializeCodecs (SupportedCodecs_);
657+ request.set_retention_storage_mb (RetentionStorageMb_);
658+ request.set_metering_mode (TProtoAccessor::GetProto (MeteringMode_));
659+ request.set_partition_write_speed_bytes_per_second (PartitionWriteSpeedBytesPerSecond_);
660+ request.set_partition_write_burst_bytes (PartitionWriteBurstBytes_);
661+ *request.mutable_consumers () = SerializeConsumers (Consumers_);
662+ *request.mutable_attributes () = SerializeAttributes (Attributes_);
663+ }
664+
545665} // namespace NYdb::NTopic
0 commit comments