Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -140,4 +141,23 @@ public Consumer build() {
return new Consumer(this);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
Consumer consumer = (Consumer) o;
return important == consumer.important &&
Objects.equals(name, consumer.name) &&
Objects.equals(readFrom, consumer.readFrom) &&
Objects.equals(supportedCodecs, consumer.supportedCodecs) &&
Objects.equals(attributes, consumer.attributes) &&
Objects.equals(stats, consumer.stats);
}

@Override
public int hashCode() {
return Objects.hash(name, important, readFrom, supportedCodecs, attributes, stats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import tech.ydb.proto.topic.YdbTopic;
Expand Down Expand Up @@ -29,4 +30,17 @@ public List<ConsumerPartitionInfo> getPartitions() {
return partitions;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerDescription that = (ConsumerDescription) o;
return Objects.equals(consumer, that.consumer) && Objects.equals(partitions, that.partitions);
}

@Override
public int hashCode() {
return Objects.hash(consumer, partitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
Expand Down Expand Up @@ -178,4 +179,32 @@ public int getConnectionNodeId() {
return connectionNodeId;
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerPartitionInfo that = (ConsumerPartitionInfo) o;
return partitionId == that.partitionId &&
active == that.active &&
Objects.equals(childPartitionIds, that.childPartitionIds) &&
Objects.equals(parentPartitionIds, that.parentPartitionIds) &&
Objects.equals(partitionStats, that.partitionStats) &&
Objects.equals(consumerStats, that.consumerStats) &&
Objects.equals(location, that.location);
}

@Override
public int hashCode() {
return Objects.hash(
partitionId,
active,
childPartitionIds,
parentPartitionIds,
partitionStats,
consumerStats,
location
);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package tech.ydb.topic.description;

import java.util.Objects;

import tech.ydb.proto.topic.YdbTopic;

/**
Expand Down Expand Up @@ -33,4 +35,18 @@ public long getPerHour() {
public long getPerDay() {
return perDay;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
MultipleWindowsStat that = (MultipleWindowsStat) o;
return perMinute == that.perMinute && perHour == that.perHour && perDay == that.perDay;
}

@Override
public int hashCode() {
return Objects.hash(perMinute, perHour, perDay);
}
}
19 changes: 19 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/PartitionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -86,4 +87,22 @@ public PartitionInfo build() {
return new PartitionInfo(this);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionInfo that = (PartitionInfo) o;
return partitionId == that.partitionId &&
active == that.active &&
Objects.equals(childPartitionIds, that.childPartitionIds) &&
Objects.equals(parentPartitionIds, that.parentPartitionIds) &&
Objects.equals(partitionStats, that.partitionStats);
}

@Override
public int hashCode() {
return Objects.hash(partitionId, active, childPartitionIds, parentPartitionIds, partitionStats);
}
}
27 changes: 27 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/PartitionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -126,4 +127,30 @@ public PartitionStats build() {
return new PartitionStats(this);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionStats that = (PartitionStats) o;
return storeSizeBytes == that.storeSizeBytes &&
partitionNodeId == that.partitionNodeId &&
Objects.equals(partitionOffsets, that.partitionOffsets) &&
Objects.equals(lastWriteTime, that.lastWriteTime) &&
Objects.equals(maxWriteTimeLag, that.maxWriteTimeLag) &&
Objects.equals(bytesWritten, that.bytesWritten);
}

@Override
public int hashCode() {
return Objects.hash(
partitionOffsets,
storeSizeBytes,
lastWriteTime,
maxWriteTimeLag,
bytesWritten,
partitionNodeId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -47,4 +48,18 @@ public SupportedCodecs build() {
return new SupportedCodecs(this);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SupportedCodecs that = (SupportedCodecs) o;
return Objects.equals(codecs, that.codecs);
}

@Override
public int hashCode() {
return Objects.hashCode(codecs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -180,4 +181,39 @@ public TopicDescription build() {
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TopicDescription that = (TopicDescription) o;
return retentionStorageMb == that.retentionStorageMb &&
partitionWriteSpeedBytesPerSecond == that.partitionWriteSpeedBytesPerSecond &&
partitionWriteBurstBytes == that.partitionWriteBurstBytes &&
Objects.equals(partitioningSettings, that.partitioningSettings) &&
Objects.equals(partitions, that.partitions) &&
Objects.equals(retentionPeriod, that.retentionPeriod) &&
Objects.equals(supportedCodecs, that.supportedCodecs) &&
Objects.equals(attributes, that.attributes) &&
Objects.equals(consumers, that.consumers) &&
meteringMode == that.meteringMode &&
Objects.equals(topicStats, that.topicStats);
}

@Override
public int hashCode() {
return Objects.hash(
partitioningSettings,
partitions,
retentionPeriod,
retentionStorageMb,
supportedCodecs,
partitionWriteSpeedBytesPerSecond,
partitionWriteBurstBytes,
attributes,
consumers,
meteringMode,
topicStats
);
}
}
18 changes: 18 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/TopicStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -76,4 +77,21 @@ public TopicStats build() {
return new TopicStats(this);
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TopicStats that = (TopicStats) o;
return storeSizeBytes == that.storeSizeBytes &&
Objects.equals(minLastWriteTime, that.minLastWriteTime) &&
Objects.equals(maxWriteTimeLag, that.maxWriteTimeLag) &&
Objects.equals(bytesWritten, that.bytesWritten);
}

@Override
public int hashCode() {
return Objects.hash(storeSizeBytes, minLastWriteTime, maxWriteTimeLag, bytesWritten);
}
}
62 changes: 60 additions & 2 deletions topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import tech.ydb.topic.read.SyncReader;
import tech.ydb.topic.read.impl.AsyncReaderImpl;
import tech.ydb.topic.read.impl.SyncReaderImpl;
import tech.ydb.topic.settings.AlterAutoPartitioningWriteStrategySettings;
import tech.ydb.topic.settings.AlterConsumerSettings;
import tech.ydb.topic.settings.AlterPartitioningSettings;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.topic.settings.AutoPartitioningStrategy;
import tech.ydb.topic.settings.AutoPartitioningWriteStrategySettings;
import tech.ydb.topic.settings.CommitOffsetSettings;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.DescribeConsumerSettings;
Expand Down Expand Up @@ -104,7 +107,23 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se
if (partitioningSettings != null) {
requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder()
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()));
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
.setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder()
.setStrategy(partitioningSettings.getAutoPartitioningStrategy().getProtoReference())));

AutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
.getWriteStrategySettings();

if (writeStrategySettings != null) {
requestBuilder.getPartitioningSettingsBuilder().getAutoPartitioningSettingsBuilder()
.setPartitionWriteSpeed(YdbTopic.AutoPartitioningWriteSpeedStrategy.newBuilder()
.setStabilizationWindow(ProtobufUtils.durationToProto(
writeStrategySettings.getStabilizationWindow()
))
.setDownUtilizationPercent(writeStrategySettings.getDownUtilizationPercent())
.setUpUtilizationPercent(writeStrategySettings.getUpUtilizationPercent())
);
}
}

Duration retentionPeriod = settings.getRetentionPeriod();
Expand Down Expand Up @@ -145,6 +164,30 @@ public CompletableFuture<Status> alterTopic(String path, AlterTopicSettings sett
if (partitionCountLimit != null) {
builder.setSetPartitionCountLimit(partitionCountLimit);
}
AutoPartitioningStrategy autoPartitioningStrategy = partitioningSettings.getAutoPartitioningStrategy();
if (autoPartitioningStrategy != null) {
YdbTopic.AutoPartitioningStrategy protoReference = autoPartitioningStrategy.getProtoReference();
builder.getAlterAutoPartitioningSettingsBuilder().setSetStrategy(protoReference);
}
AlterAutoPartitioningWriteStrategySettings writeStrategySettings = partitioningSettings
.getWriteStrategySettings();
if (writeStrategySettings != null) {
Duration stabilizationWindow = writeStrategySettings.getStabilizationWindow();
if (stabilizationWindow != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetStabilizationWindow(ProtobufUtils.durationToProto(stabilizationWindow));
}
Integer upUtilizationPercent = writeStrategySettings.getUpUtilizationPercent();
if (upUtilizationPercent != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetUpUtilizationPercent(upUtilizationPercent);
}
Integer downUtilizationPercent = writeStrategySettings.getDownUtilizationPercent();
if (downUtilizationPercent != null) {
builder.getAlterAutoPartitioningSettingsBuilder().getSetPartitionWriteSpeedBuilder()
.setSetDownUtilizationPercent(downUtilizationPercent);
}
}
requestBuilder.setAlterPartitioningSettings(builder);
}

Expand Down Expand Up @@ -273,11 +316,26 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
.setMeteringMode(fromProto(result.getMeteringMode()));

YdbTopic.PartitioningSettings partitioningSettings = result.getPartitioningSettings();
description.setPartitioningSettings(PartitioningSettings.newBuilder()
YdbTopic.AutoPartitioningSettings autoPartitioningSettings = partitioningSettings.getAutoPartitioningSettings();
YdbTopic.AutoPartitioningStrategy autoPartitioningStrategy = autoPartitioningSettings.getStrategy();

PartitioningSettings.Builder partitioningDescription = PartitioningSettings.newBuilder()
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
.setAutoPartitioningStrategy(AutoPartitioningStrategy.fromProto(autoPartitioningStrategy));

YdbTopic.AutoPartitioningWriteSpeedStrategy partitionWriteSpeed = autoPartitioningSettings
.getPartitionWriteSpeed();
partitioningDescription.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder()
.setStabilizationWindow(ProtobufUtils.protoToDuration(
partitionWriteSpeed.getStabilizationWindow()
))
.setUpUtilizationPercent(partitionWriteSpeed.getUpUtilizationPercent())
.setDownUtilizationPercent(partitionWriteSpeed.getDownUtilizationPercent())
.build());

description.setPartitioningSettings(partitioningDescription.build());

List<PartitionInfo> partitions = new ArrayList<>();
for (YdbTopic.DescribeTopicResult.PartitionInfo partition : result.getPartitionsList()) {
PartitionInfo.Builder partitionBuilder = PartitionInfo.newBuilder()
Expand Down
Loading