Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions topic/src/main/java/tech/ydb/topic/TopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.topic.description.ConsumerDescription;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.impl.GrpcTopicRpc;
import tech.ydb.topic.impl.TopicClientImpl;
Expand All @@ -16,6 +17,7 @@
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.topic.settings.CommitOffsetSettings;
import tech.ydb.topic.settings.CreateTopicSettings;
import tech.ydb.topic.settings.DescribeConsumerSettings;
import tech.ydb.topic.settings.DescribeTopicSettings;
import tech.ydb.topic.settings.DropTopicSettings;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
Expand Down Expand Up @@ -93,6 +95,30 @@ default CompletableFuture<Result<TopicDescription>> describeTopic(String path) {
return describeTopic(path, DescribeTopicSettings.newBuilder().build());
}

/**
* Describe consumer.
*
* Receives consumer description.
* @param path path to topic
* @param name consumer name
* @param settings additional options of request
* @return {@link CompletableFuture} to a result with {@link ConsumerDescription}
*/
CompletableFuture<Result<ConsumerDescription>> describeConsumer(String path, String name,
DescribeConsumerSettings settings);

/**
* Describe consumer.
*
* Receives consumer description.
* @param path path to topic
* @param name consumer name
* @return {@link CompletableFuture} to a result with {@link ConsumerDescription}
*/
default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String path, String name) {
return describeConsumer(path, name, DescribeConsumerSettings.newBuilder().build());
}

/**
* Create sync topic reader.
*
Expand Down
10 changes: 10 additions & 0 deletions topic/src/main/java/tech/ydb/topic/TopicRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public interface TopicRpc {
CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
GrpcRequestSettings settings);

/**
* Describe consumer.
* @param request request proto
* @param settings rpc call settings
* @return completable future with result of operation
*/
CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
YdbTopic.DescribeConsumerRequest request, GrpcRequestSettings settings
);

/**
* Commit offset.
* @param request request proto
Expand Down
2 changes: 1 addition & 1 deletion topic/src/main/java/tech/ydb/topic/description/Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ public enum Codec {
GZIP,
LZOP,
ZSTD,
CUSTOM
CUSTOM;
}
45 changes: 34 additions & 11 deletions topic/src/main/java/tech/ydb/topic/description/Consumer.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
package tech.ydb.topic.description;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import com.google.common.collect.ImmutableMap;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.utils.ProtoUtils;

/**
* @author Nikolay Perfilov
*/
public class Consumer {
private final String name;
@Nullable
private final Boolean important;
@Nullable
private final boolean important;
private final Instant readFrom;
@Nullable
private final SupportedCodecs supportedCodecs;
private final List<Codec> supportedCodecs;
private final Map<String, String> attributes;
@Nullable
private final ConsumerStats stats;

private Consumer(Builder builder) {
Expand All @@ -33,6 +36,16 @@ private Consumer(Builder builder) {
this.stats = builder.stats;
}

public Consumer(YdbTopic.Consumer consumer) {
this.name = consumer.getName();
this.important = consumer.getImportant();
this.readFrom = ProtobufUtils.protoToInstant(consumer.getReadFrom());
this.supportedCodecs = consumer.getSupportedCodecs().getCodecsList()
.stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList());
this.attributes = consumer.getAttributesMap();
this.stats = new ConsumerStats(consumer.getConsumerStats());
}

public static Builder newBuilder() {
return new Builder();
}
Expand All @@ -41,8 +54,7 @@ public String getName() {
return name;
}

@Nullable
public Boolean isImportant() {
public boolean isImportant() {
return important;
}

Expand All @@ -53,13 +65,18 @@ public Instant getReadFrom() {

@Nullable
public SupportedCodecs getSupportedCodecs() {
return new SupportedCodecs(supportedCodecs);
}

public List<Codec> getSupportedCodecsList() {
return supportedCodecs;
}

public Map<String, String> getAttributes() {
return attributes;
}

@Nullable
public ConsumerStats getStats() {
return stats;
}
Expand All @@ -69,9 +86,9 @@ public ConsumerStats getStats() {
*/
public static class Builder {
private String name;
private Boolean important = null;
private boolean important = false;
private Instant readFrom = null;
private SupportedCodecs supportedCodecs = null;
private List<Codec> supportedCodecs = new ArrayList<>();
private Map<String, String> attributes = new HashMap<>();
private ConsumerStats stats = null;

Expand All @@ -90,8 +107,14 @@ public Builder setReadFrom(Instant readFrom) {
return this;
}

public Builder addSupportedCodec(Codec codec) {
this.supportedCodecs.add(codec);
return this;
}

public Builder setSupportedCodecs(SupportedCodecs supportedCodecs) {
this.supportedCodecs = supportedCodecs;
this.supportedCodecs.clear();
this.supportedCodecs.addAll(supportedCodecs.getCodecs());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package tech.ydb.topic.description;


import java.util.List;
import java.util.stream.Collectors;

import tech.ydb.proto.topic.YdbTopic;

/**
*
* @author Aleksandr Gorshenin
*/
public class ConsumerDescription {
private final Consumer consumer;
private final List<ConsumerPartitionInfo> partitions;

public ConsumerDescription(YdbTopic.DescribeConsumerResult result) {
this.consumer = new Consumer(result.getConsumer());
this.partitions = result.getPartitionsList().stream()
.map(ConsumerPartitionInfo::new)
.collect(Collectors.toList());
}

public Consumer getConsumer() {
return consumer;
}

public List<ConsumerPartitionInfo> getPartitions() {
return partitions;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package tech.ydb.topic.description;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;

/**
* @author Nikolay Perfilov
*/
public class ConsumerPartitionInfo {
private final long partitionId;
private final boolean active;
private final List<Long> childPartitionIds;
private final List<Long> parentPartitionIds;
private final PartitionStats partitionStats;
private final ConsumerStats consumerStats;
private final PartitionLocation location;

public ConsumerPartitionInfo(YdbTopic.DescribeConsumerResult.PartitionInfo result) {
this.partitionId = result.getPartitionId();
this.active = result.getActive();
this.childPartitionIds = result.getChildPartitionIdsList();
this.parentPartitionIds = result.getParentPartitionIdsList();
this.partitionStats = new PartitionStats(result.getPartitionStats());
this.consumerStats = new ConsumerStats(result.getPartitionConsumerStats());
this.location = new PartitionLocation(result.getPartitionLocation());
}

/**
* @return Partition identifier.
*/
public long getPartitionId() {
return partitionId;
}

/**
* @return Is partition open for write.
*/
public boolean isActive() {
return active;
}

/**
* @return Ids of partitions which was formed when this partition was split or merged.
*/
public List<Long> getChildPartitionIds() {
return childPartitionIds;
}

/**
* @return Ids of partitions from which this partition was formed by split or merge.
*/
public List<Long> getParentPartitionIds() {
return parentPartitionIds;
}

/**
* @return Stats for partition, filled only when include_stats in request is true.
*/
public PartitionStats getPartitionStats() {
return partitionStats;
}

/**
* @return Stats for consumer of this partition, filled only when include_stats in request is true.
*/
public ConsumerStats getConsumerStats() {
return consumerStats;
}

/**
* @return Partition location, filled only when include_location in request is true.
*/
public PartitionLocation getPartitionLocation() {
return location;
}

public static class ConsumerStats {
private final long lastReadOffset;
private final long committedOffset;
private final String readSessionId;
private final Instant partitionReadSessionCreateTime;
private final Instant lastReadTime;
private final Duration maxReadTimeLag;
private final Duration maxWriteTimeLag;

private final MultipleWindowsStat bytesRead;
private final String readerName;
private final int connectionNodeId;

public ConsumerStats(YdbTopic.DescribeConsumerResult.PartitionConsumerStats stats) {
this.lastReadOffset = stats.getLastReadOffset();
this.committedOffset = stats.getCommittedOffset();
this.readSessionId = stats.getReadSessionId();
this.partitionReadSessionCreateTime = ProtobufUtils.protoToInstant(
stats.getPartitionReadSessionCreateTime()
);
this.lastReadTime = ProtobufUtils.protoToInstant(stats.getLastReadTime());
this.maxReadTimeLag = ProtobufUtils.protoToDuration(stats.getMaxReadTimeLag());
this.maxWriteTimeLag = ProtobufUtils.protoToDuration(stats.getMaxWriteTimeLag());
this.bytesRead = new MultipleWindowsStat(stats.getBytesRead());
this.readerName = stats.getReaderName();
this.connectionNodeId = stats.getConnectionNodeId();
}

/**
* @return Last read offset from this partition.
*/
public long getLastReadOffset() {
return lastReadOffset;
}

/**
* @return Committed offset for this partition.
*/
public long getCommittedOffset() {
return committedOffset;
}

/**
* @return Reading this partition read session identifier.
*/
public String getReadSessionId() {
return readSessionId;
}

/**
* @return Timestamp of providing this partition to this session by server.
*/
public Instant getPartitionReadSessionCreateTime() {
return partitionReadSessionCreateTime;
}

/**
* @return Timestamp of last read from this partition.
*/
public Instant getLastReadTime() {
return lastReadTime;
}

/**
* @return Maximum of differences between timestamp of read and write timestamp for all messages, read during last
* minute.
*/
public Duration getMaxReadTimeLag() {
return maxReadTimeLag;
}

/**
* @return Maximum of differences between write timestamp and create timestamp for all messages, read during last
* minute.
*/
public Duration getMaxWriteTimeLag() {
return maxWriteTimeLag;
}

/**
* @return How much bytes were read during several windows statistics from this partition.
*/
public MultipleWindowsStat getBytesRead() {
return bytesRead;
}

/**
* @return Read session name, provided by client.
*/
public String getReaderName() {
return readerName;
}

/**
* @return Host where read session connected.
*/
public int getConnectionNodeId() {
return connectionNodeId;
}
}
}
Loading