|
35 | 35 | import tech.ydb.topic.read.SyncReader; |
36 | 36 | import tech.ydb.topic.read.impl.AsyncReaderImpl; |
37 | 37 | import tech.ydb.topic.read.impl.SyncReaderImpl; |
| 38 | +import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings; |
38 | 39 | import tech.ydb.topic.settings.AlterConsumerSettings; |
39 | 40 | import tech.ydb.topic.settings.AlterPartitioningSettings; |
40 | 41 | import tech.ydb.topic.settings.AlterTopicSettings; |
| 42 | +import tech.ydb.topic.settings.AutoPartitioningStrategy; |
| 43 | +import tech.ydb.topic.settings.AutoPartitioningWriteStrategySettings; |
41 | 44 | import tech.ydb.topic.settings.CommitOffsetSettings; |
42 | 45 | import tech.ydb.topic.settings.CreateTopicSettings; |
43 | 46 | import tech.ydb.topic.settings.DescribeConsumerSettings; |
@@ -104,7 +107,19 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se |
104 | 107 | if (partitioningSettings != null) { |
105 | 108 | requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder() |
106 | 109 | .setMinActivePartitions(partitioningSettings.getMinActivePartitions()) |
107 | | - .setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())); |
| 110 | + .setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()) |
| 111 | + .setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder() |
| 112 | + .setStrategy(partitioningSettings.getAutoPartitioningStrategy().getProtoReference()))); |
| 113 | + |
| 114 | + AutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings.getWriteStrategySettings(); |
| 115 | + if (writeStrategySettings != null) { |
| 116 | + requestBuilder.getPartitioningSettingsBuilder().getAutoPartitioningSettingsBuilder() |
| 117 | + .setPartitionWriteSpeed(YdbTopic.AutoPartitioningWriteSpeedStrategy.newBuilder() |
| 118 | + .setStabilizationWindow(ProtobufUtils.durationToProto(writeStrategySettings.getStabilizationWindow())) |
| 119 | + .setDownUtilizationPercent(writeStrategySettings.getDownUtilizationPercent()) |
| 120 | + .setUpUtilizationPercent(writeStrategySettings.getUpUtilizationPercent()) |
| 121 | + ); |
| 122 | + } |
108 | 123 | } |
109 | 124 |
|
110 | 125 | Duration retentionPeriod = settings.getRetentionPeriod(); |
@@ -145,6 +160,28 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett |
145 | 160 | if (partitionCountLimit != null) { |
146 | 161 | builder.setSetPartitionCountLimit(partitionCountLimit); |
147 | 162 | } |
| 163 | + AutoPartitioningStrategy autoPartitioningStrategy = partitioningSettings.getAutoPartitioningStrategy(); |
| 164 | + if (autoPartitioningStrategy != null) { |
| 165 | + builder.getAlterAutoPartitioningSettingsBuilder().setSetStrategy(autoPartitioningStrategy.getProtoReference()); |
| 166 | + } |
| 167 | + AlterAutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings.getWriteStrategySettings(); |
| 168 | + if (writeStrategySettings != null) { |
| 169 | + Duration stabilizationWindow = writeStrategySettings.getStabilizationWindow(); |
| 170 | + if (stabilizationWindow != null) { |
| 171 | + builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder() |
| 172 | + .setSetStabilizationWindow(ProtobufUtils.durationToProto(stabilizationWindow)); |
| 173 | + } |
| 174 | + Integer upUtilizationPercent = writeStrategySettings.getUpUtilizationPercent(); |
| 175 | + if (upUtilizationPercent != null) { |
| 176 | + builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder() |
| 177 | + .setSetUpUtilizationPercent(upUtilizationPercent); |
| 178 | + } |
| 179 | + Integer downUtilizationPercent = writeStrategySettings.getDownUtilizationPercent(); |
| 180 | + if (downUtilizationPercent != null) { |
| 181 | + builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder() |
| 182 | + .setSetDownUtilizationPercent(downUtilizationPercent); |
| 183 | + } |
| 184 | + } |
148 | 185 | requestBuilder.setAlterPartitioningSettings(builder); |
149 | 186 | } |
150 | 187 |
|
@@ -273,11 +310,20 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) { |
273 | 310 | .setMeteringMode(fromProto(result.getMeteringMode())); |
274 | 311 |
|
275 | 312 | YdbTopic.PartitioningSettings partitioningSettings = result.getPartitioningSettings(); |
276 | | - description.setPartitioningSettings(PartitioningSettings.newBuilder() |
| 313 | + PartitioningSettings.Builder partitioningDescription = PartitioningSettings.newBuilder() |
277 | 314 | .setMinActivePartitions(partitioningSettings.getMinActivePartitions()) |
278 | 315 | .setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()) |
| 316 | + .setAutoPartitioningStrategy(AutoPartitioningStrategy.fromProto(partitioningSettings.getAutoPartitioningSettings().getStrategy())); |
| 317 | + |
| 318 | + YdbTopic.AutoPartitioningWriteSpeedStrategy partitionWriteSpeed = partitioningSettings.getAutoPartitioningSettings().getPartitionWriteSpeed(); |
| 319 | + partitioningDescription.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder() |
| 320 | + .setStabilizationWindow(ProtobufUtils.protoToDuration(partitionWriteSpeed.getStabilizationWindow())) |
| 321 | + .setUpUtilizationPercent(partitionWriteSpeed.getUpUtilizationPercent()) |
| 322 | + .setDownUtilizationPercent(partitionWriteSpeed.getDownUtilizationPercent()) |
279 | 323 | .build()); |
280 | 324 |
|
| 325 | + description.setPartitioningSettings(partitioningDescription.build()); |
| 326 | + |
281 | 327 | List<PartitionInfo> partitions = new ArrayList<>(); |
282 | 328 | for (YdbTopic.DescribeTopicResult.PartitionInfo partition : result.getPartitionsList()) { |
283 | 329 | PartitionInfo.Builder partitionBuilder = PartitionInfo.newBuilder() |
|
0 commit comments