Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ public Consumer getConsumer() {
public List<ConsumerPartitionInfo> getPartitions() {
return partitions;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,4 @@ public TopicDescription build() {
return new TopicDescription(this);
}
}

}
92 changes: 90 additions & 2 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.impl.AsyncReaderImpl;
import tech.ydb.topic.read.impl.SyncReaderImpl;
import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings;
import tech.ydb.topic.settings.AlterConsumerSettings;
import tech.ydb.topic.settings.AlterPartitioningSettings;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.topic.settings.AutoPartitioningStrategy;
import tech.ydb.topic.settings.AutoPartitioningWriteStrategySettings;
import tech.ydb.topic.settings.CommitOffsetSettings;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.DescribeConsumerSettings;
Expand Down Expand Up @@ -104,7 +107,23 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se
if (partitioningSettings != null) {
requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder()
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()));
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
.setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder()
.setStrategy(toProto(partitioningSettings.getAutoPartitioningStrategy()))));

AutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
.getWriteStrategySettings();

if (writeStrategySettings != null) {
requestBuilder.getPartitioningSettingsBuilder().getAutoPartitioningSettingsBuilder()
.setPartitionWriteSpeed(YdbTopic.AutoPartitioningWriteSpeedStrategy.newBuilder()
.setStabilizationWindow(ProtobufUtils.durationToProto(
writeStrategySettings.getStabilizationWindow()
))
.setDownUtilizationPercent(writeStrategySettings.getDownUtilizationPercent())
.setUpUtilizationPercent(writeStrategySettings.getUpUtilizationPercent())
);
}
}

Duration retentionPeriod = settings.getRetentionPeriod();
Expand Down Expand Up @@ -145,6 +164,30 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett
if (partitionCountLimit != null) {
builder.setSetPartitionCountLimit(partitionCountLimit);
}
AutoPartitioningStrategy autoPartitioningStrategy = partitioningSettings.getAutoPartitioningStrategy();
if (autoPartitioningStrategy != null) {
YdbTopic.AutoPartitioningStrategy protoReference = toProto(autoPartitioningStrategy);
builder.getAlterAutoPartitioningSettingsBuilder().setSetStrategy(protoReference);
}
AlterAutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
.getWriteStrategySettings();
if (writeStrategySettings != null) {
Duration stabilizationWindow = writeStrategySettings.getStabilizationWindow();
if (stabilizationWindow != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetStabilizationWindow(ProtobufUtils.durationToProto(stabilizationWindow));
}
Integer upUtilizationPercent = writeStrategySettings.getUpUtilizationPercent();
if (upUtilizationPercent != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetUpUtilizationPercent(upUtilizationPercent);
}
Integer downUtilizationPercent = writeStrategySettings.getDownUtilizationPercent();
if (downUtilizationPercent != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetDownUtilizationPercent(downUtilizationPercent);
}
}
requestBuilder.setAlterPartitioningSettings(builder);
}

Expand Down Expand Up @@ -273,11 +316,26 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
.setMeteringMode(fromProto(result.getMeteringMode()));

YdbTopic.PartitioningSettings partitioningSettings = result.getPartitioningSettings();
description.setPartitioningSettings(PartitioningSettings.newBuilder()
YdbTopic.AutoPartitioningSettings autoPartitioningSettings = partitioningSettings.getAutoPartitioningSettings();
YdbTopic.AutoPartitioningStrategy autoPartitioningStrategy = autoPartitioningSettings.getStrategy();

PartitioningSettings.Builder partitioningDescription = PartitioningSettings.newBuilder()
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
.setAutoPartitioningStrategy(fromProto(autoPartitioningStrategy));

YdbTopic.AutoPartitioningWriteSpeedStrategy partitionWriteSpeed = autoPartitioningSettings
.getPartitionWriteSpeed();
partitioningDescription.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder()
.setStabilizationWindow(ProtobufUtils.protoToDuration(
partitionWriteSpeed.getStabilizationWindow()
))
.setUpUtilizationPercent(partitionWriteSpeed.getUpUtilizationPercent())
.setDownUtilizationPercent(partitionWriteSpeed.getDownUtilizationPercent())
.build());

description.setPartitioningSettings(partitioningDescription.build());

List<PartitionInfo> partitions = new ArrayList<>();
for (YdbTopic.DescribeTopicResult.PartitionInfo partition : result.getPartitionsList()) {
PartitionInfo.Builder partitionBuilder = PartitionInfo.newBuilder()
Expand Down Expand Up @@ -391,6 +449,36 @@ private static YdbTopic.SupportedCodecs toProto(SupportedCodecs supportedCodecs)
return codecsBuilder.build();
}

private static AutoPartitioningStrategy fromProto(YdbTopic.AutoPartitioningStrategy autoPartitioningStrategy) {
switch (autoPartitioningStrategy) {
case AUTO_PARTITIONING_STRATEGY_PAUSED:
return AutoPartitioningStrategy.PAUSED;
case AUTO_PARTITIONING_STRATEGY_SCALE_UP:
return AutoPartitioningStrategy.SCALE_UP;
case AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN:
return AutoPartitioningStrategy.SCALE_UP_AND_DOWN;
case AUTO_PARTITIONING_STRATEGY_DISABLED:
return AutoPartitioningStrategy.DISABLED;
default:
return null;
}
}

private static YdbTopic.AutoPartitioningStrategy toProto(AutoPartitioningStrategy autoPartitioningStrategy) {
switch (autoPartitioningStrategy) {
case PAUSED:
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_PAUSED;
case SCALE_UP:
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP;
case SCALE_UP_AND_DOWN:
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
case DISABLED:
return YdbTopic.AutoPartitioningStrategy.AUTO_PARTITIONING_STRATEGY_DISABLED;
default:
throw new IllegalArgumentException("Unknown auto partitioning strategy: " + autoPartitioningStrategy);
}
}

@Override
public void close() {
logger.debug("TopicClientImpl.close() is called");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package tech.ydb.topic.settings;

import java.time.Duration;

import javax.annotation.Nullable;

public class AlterAutoPartitioningWriteStrategySettings {
@Nullable
private final Duration stabilizationWindow;
@Nullable
private final Integer upUtilizationPercent;
@Nullable
private final Integer downUtilizationPercent;

public AlterAutoPartitioningWriteStrategySettings(Builder builder) {
this.stabilizationWindow = builder.stabilizationWindow;
this.upUtilizationPercent = builder.upUtilizationPercent;
this.downUtilizationPercent = builder.downUtilizationPercent;
}

@Nullable
public Duration getStabilizationWindow() {
return stabilizationWindow;
}

@Nullable
public Integer getUpUtilizationPercent() {
return upUtilizationPercent;
}

@Nullable
public Integer getDownUtilizationPercent() {
return downUtilizationPercent;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private Duration stabilizationWindow = null;
private Integer upUtilizationPercent = null;
private Integer downUtilizationPercent = null;

/**
* @param stabilizationWindow Duration used by the auto partitioning algorithm to define if the partition must be split.
* Default value is 5 minutes.
* @return strategy builder
*/
public Builder setStabilizationWindow(Duration stabilizationWindow) {
this.stabilizationWindow = stabilizationWindow;
return this;
}

/**
* @param upUtilizationPercent Upper level of partition quota utilization after which the partition should be split.
* Default value is 90%.
* @return strategy builder
*/
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
this.upUtilizationPercent = upUtilizationPercent;
return this;
}

/**
* @param downUtilizationPercent Lower level of partition quota utilization
* after which the partition should be merged with the other one.
* Default value is 30%.
* @return strategy builder
*/
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
this.downUtilizationPercent = downUtilizationPercent;
return this;
}

public AlterAutoPartitioningWriteStrategySettings build() {
return new AlterAutoPartitioningWriteStrategySettings(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ public class AlterPartitioningSettings {
private final Long minActivePartitions;
@Nullable
private final Long partitionCountLimit;
@Nullable
private final AutoPartitioningStrategy autoPartitioningStrategy;
@Nullable
private final AlterAutoPartitioningWriteStrategySettings writeStrategySettings;

private AlterPartitioningSettings(Builder builder) {
this.minActivePartitions = builder.minActivePartitions;
this.partitionCountLimit = builder.partitionCountLimit;
this.autoPartitioningStrategy = builder.autoPartitioningStrategy;
this.writeStrategySettings = builder.writeStrategySettings;
}

public static Builder newBuilder() {
Expand All @@ -30,12 +36,24 @@ public Long getPartitionCountLimit() {
return partitionCountLimit;
}

@Nullable
public AutoPartitioningStrategy getAutoPartitioningStrategy() {
return autoPartitioningStrategy;
}

@Nullable
public AlterAutoPartitioningWriteStrategySettings getWriteStrategySettings() {
return writeStrategySettings;
}

/**
* BUILDER
*/
public static class Builder {
private Long minActivePartitions = null;
private Long partitionCountLimit = null;
private AutoPartitioningStrategy autoPartitioningStrategy = null;
private AlterAutoPartitioningWriteStrategySettings writeStrategySettings = null;

/**
* @param minActivePartitions minimum partition count auto merge would stop working at.
Expand All @@ -58,6 +76,28 @@ public Builder setPartitionCountLimit(long partitionCountLimit) {
return this;
}

/**
* @param autoPartitioningStrategy Strategy for auto partitioning.
* Auto partitioning is disabled by default.
* @return settings builder
* @see AutoPartitioningStrategy#DISABLED
*/
public Builder setAutoPartitioningStrategy(AutoPartitioningStrategy autoPartitioningStrategy) {
this.autoPartitioningStrategy = autoPartitioningStrategy;
return this;
}

/**
* @param writeStrategySettings Settings for auto partitioning write strategy.
* Does not have any effect if auto partitioning is disabled.
* See {@link AlterAutoPartitioningWriteStrategySettings} for defaults
* @return settings builder
*/
public Builder setWriteStrategySettings(AlterAutoPartitioningWriteStrategySettings writeStrategySettings) {
this.writeStrategySettings = writeStrategySettings;
return this;
}

public AlterPartitioningSettings build() {
return new AlterPartitioningSettings(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tech.ydb.topic.settings;

public enum AutoPartitioningStrategy {
/**
* The auto partitioning is disabled.
* You cannot disable the auto partitioning after it has been enabled.
* @see AutoPartitioningStrategy#PAUSED
*/
DISABLED,
/**
* The auto partitioning algorithm will increase the partition count depending on the load characteristics.
* The auto partitioning algorithm will never decrease the number of partitions.
* @see AlterAutoPartitioningWriteStrategySettings
*/
SCALE_UP,
/**
* The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
* @see AlterAutoPartitioningWriteStrategySettings
*/
SCALE_UP_AND_DOWN,
/**
* The auto partitioning is paused.
*/
PAUSED;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package tech.ydb.topic.settings;

import java.time.Duration;

public class AutoPartitioningWriteStrategySettings {
private final Duration stabilizationWindow;
private final int upUtilizationPercent;
private final int downUtilizationPercent;

public AutoPartitioningWriteStrategySettings(Builder builder) {
this.stabilizationWindow = builder.stabilizationWindow;
this.upUtilizationPercent = builder.upUtilizationPercent;
this.downUtilizationPercent = builder.downUtilizationPercent;
}

public Duration getStabilizationWindow() {
return stabilizationWindow;
}

public int getUpUtilizationPercent() {
return upUtilizationPercent;
}

public int getDownUtilizationPercent() {
return downUtilizationPercent;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private Duration stabilizationWindow = Duration.ofMinutes(5);
private int upUtilizationPercent = 90;
private int downUtilizationPercent = 30;

/**
* @param stabilizationWindow Duration used by the auto partitioning algorithm to define if the partition must be split.
* Default value is 5 minutes.
* @return strategy builder
*/
public Builder setStabilizationWindow(Duration stabilizationWindow) {
this.stabilizationWindow = stabilizationWindow;
return this;
}

/**
* @param upUtilizationPercent Upper level of partition quota utilization after which the partition should be split.
* Default value is 90%.
* @return strategy builder
*/
public Builder setUpUtilizationPercent(int upUtilizationPercent) {
this.upUtilizationPercent = upUtilizationPercent;
return this;
}

/**
* @param downUtilizationPercent Lower level of partition quota utilization
* after which the partition should be merged with the other one.
* Default value is 30%.
* @return strategy builder
*/
public Builder setDownUtilizationPercent(int downUtilizationPercent) {
this.downUtilizationPercent = downUtilizationPercent;
return this;
}

public AutoPartitioningWriteStrategySettings build() {
return new AutoPartitioningWriteStrategySettings(this);
}
}
}
Loading