Skip to content

Commit 0c40857

Browse files
committed
Added implementation of desribeConsumer
1 parent 2420b84 commit 0c40857

File tree

8 files changed

+357
-0
lines changed

8 files changed

+357
-0
lines changed

topic/src/main/java/tech/ydb/topic/TopicClient.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tech.ydb.core.Result;
99
import tech.ydb.core.Status;
1010
import tech.ydb.core.grpc.GrpcTransport;
11+
import tech.ydb.topic.description.ConsumerDescription;
1112
import tech.ydb.topic.description.TopicDescription;
1213
import tech.ydb.topic.impl.GrpcTopicRpc;
1314
import tech.ydb.topic.impl.TopicClientImpl;
@@ -16,6 +17,7 @@
1617
import tech.ydb.topic.settings.AlterTopicSettings;
1718
import tech.ydb.topic.settings.CommitOffsetSettings;
1819
import tech.ydb.topic.settings.CreateTopicSettings;
20+
import tech.ydb.topic.settings.DescribeConsumerSettings;
1921
import tech.ydb.topic.settings.DescribeTopicSettings;
2022
import tech.ydb.topic.settings.DropTopicSettings;
2123
import tech.ydb.topic.settings.ReadEventHandlersSettings;
@@ -93,6 +95,30 @@ default CompletableFuture<Result<TopicDescription>> describeTopic(String path) {
9395
return describeTopic(path, DescribeTopicSettings.newBuilder().build());
9496
}
9597

98+
/**
99+
* Describe consumer.
100+
*
101+
* Receives consumer description.
102+
* @param path path to topic
103+
* @param name consumer name
104+
* @param settings additional options of request
105+
* @return {@link CompletableFuture} to a result with {@link ConsumerDescription}
106+
*/
107+
CompletableFuture<Result<ConsumerDescription>> describeConsumer(String path, String name,
108+
DescribeConsumerSettings settings);
109+
110+
/**
111+
* Describe consumer.
112+
*
113+
* Receives consumer description.
114+
* @param path path to topic
115+
* @param name consumer name
116+
* @return {@link CompletableFuture} to a result with {@link ConsumerDescription}
117+
*/
118+
default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String path, String name) {
119+
return describeConsumer(path, name, DescribeConsumerSettings.newBuilder().build());
120+
}
121+
96122
/**
97123
* Create sync topic reader.
98124
*

topic/src/main/java/tech/ydb/topic/TopicRpc.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ public interface TopicRpc {
4848
CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(YdbTopic.DescribeTopicRequest request,
4949
GrpcRequestSettings settings);
5050

51+
/**
52+
* Describe consumer.
53+
* @param request request proto
54+
* @param settings rpc call settings
55+
* @return completable future with result of operation
56+
*/
57+
CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
58+
YdbTopic.DescribeConsumerRequest request, GrpcRequestSettings settings
59+
);
60+
5161
/**
5262
* Commit offset.
5363
* @param request request proto
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package tech.ydb.topic.description;
2+
3+
4+
import java.util.List;
5+
import java.util.stream.Collectors;
6+
7+
import tech.ydb.proto.topic.YdbTopic;
8+
9+
/**
10+
*
11+
* @author Aleksandr Gorshenin
12+
*/
13+
public class ConsumerDescription {
14+
private final Consumer consumer;
15+
private final List<ConsumerPartitionInfo> partitions;
16+
17+
public ConsumerDescription(YdbTopic.DescribeConsumerResult result) {
18+
this.consumer = new Consumer(result.getConsumer());
19+
this.partitions = result.getPartitionsList().stream()
20+
.map(ConsumerPartitionInfo::new)
21+
.collect(Collectors.toList());
22+
}
23+
24+
public Consumer getConsumer() {
25+
return consumer;
26+
}
27+
28+
public List<ConsumerPartitionInfo> getPartitions() {
29+
return partitions;
30+
}
31+
32+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package tech.ydb.topic.description;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.util.List;
6+
7+
import tech.ydb.core.utils.ProtobufUtils;
8+
import tech.ydb.proto.topic.YdbTopic;
9+
10+
/**
11+
* @author Nikolay Perfilov
12+
*/
13+
public class ConsumerPartitionInfo {
14+
private final long partitionId;
15+
private final boolean active;
16+
private final List<Long> childPartitionIds;
17+
private final List<Long> parentPartitionIds;
18+
private final PartitionStats partitionStats;
19+
private final ConsumerStats consumerStats;
20+
private final PartitionLocation location;
21+
22+
public ConsumerPartitionInfo(YdbTopic.DescribeConsumerResult.PartitionInfo result) {
23+
this.partitionId = result.getPartitionId();
24+
this.active = result.getActive();
25+
this.childPartitionIds = result.getChildPartitionIdsList();
26+
this.parentPartitionIds = result.getParentPartitionIdsList();
27+
this.partitionStats = new PartitionStats(result.getPartitionStats());
28+
this.consumerStats = new ConsumerStats(result.getPartitionConsumerStats());
29+
this.location = new PartitionLocation(result.getPartitionLocation());
30+
}
31+
32+
/**
33+
* @return Partition identifier.
34+
*/
35+
public long getPartitionId() {
36+
return partitionId;
37+
}
38+
39+
/**
40+
* @return Is partition open for write.
41+
*/
42+
public boolean isActive() {
43+
return active;
44+
}
45+
46+
/**
47+
* @return Ids of partitions which was formed when this partition was split or merged.
48+
*/
49+
public List<Long> getChildPartitionIds() {
50+
return childPartitionIds;
51+
}
52+
53+
/**
54+
* @return Ids of partitions from which this partition was formed by split or merge.
55+
*/
56+
public List<Long> getParentPartitionIds() {
57+
return parentPartitionIds;
58+
}
59+
60+
/**
61+
* @return Stats for partition, filled only when include_stats in request is true.
62+
*/
63+
public PartitionStats getPartitionStats() {
64+
return partitionStats;
65+
}
66+
67+
/**
68+
* @return Stats for consumer of this partition, filled only when include_stats in request is true.
69+
*/
70+
public ConsumerStats getConsumerStats() {
71+
return consumerStats;
72+
}
73+
74+
/**
75+
* @return Partition location, filled only when include_location in request is true.
76+
*/
77+
public PartitionLocation getPartitionLocation() {
78+
return location;
79+
}
80+
81+
public static class ConsumerStats {
82+
private final long lastReadOffset;
83+
private final long committedOffset;
84+
private final String readSessionId;
85+
private final Instant partitionReadSessionCreateTime;
86+
private final Instant lastReadTime;
87+
private final Duration maxReadTimeLag;
88+
private final Duration maxWriteTimeLag;
89+
90+
private final MultipleWindowsStat bytesRead;
91+
private final String readerName;
92+
private final int connectionNodeId;
93+
94+
public ConsumerStats(YdbTopic.DescribeConsumerResult.PartitionConsumerStats stats) {
95+
this.lastReadOffset = stats.getLastReadOffset();
96+
this.committedOffset = stats.getCommittedOffset();
97+
this.readSessionId = stats.getReadSessionId();
98+
this.partitionReadSessionCreateTime = ProtobufUtils.protoToInstant(
99+
stats.getPartitionReadSessionCreateTime()
100+
);
101+
this.lastReadTime = ProtobufUtils.protoToInstant(stats.getLastReadTime());
102+
this.maxReadTimeLag = ProtobufUtils.protoToDuration(stats.getMaxReadTimeLag());
103+
this.maxWriteTimeLag = ProtobufUtils.protoToDuration(stats.getMaxWriteTimeLag());
104+
this.bytesRead = new MultipleWindowsStat(stats.getBytesRead());
105+
this.readerName = stats.getReaderName();
106+
this.connectionNodeId = stats.getConnectionNodeId();
107+
}
108+
109+
/**
110+
* @return Last read offset from this partition.
111+
*/
112+
public long getLastReadOffset() {
113+
return lastReadOffset;
114+
}
115+
116+
/**
117+
* @return Committed offset for this partition.
118+
*/
119+
public long getCommittedOffset() {
120+
return committedOffset;
121+
}
122+
123+
/**
124+
* @return Reading this partition read session identifier.
125+
*/
126+
public String getReadSessionId() {
127+
return readSessionId;
128+
}
129+
130+
/**
131+
* @return Timestamp of providing this partition to this session by server.
132+
*/
133+
public Instant getPartitionReadSessionCreateTime() {
134+
return partitionReadSessionCreateTime;
135+
}
136+
137+
/**
138+
* @return Timestamp of last read from this partition.
139+
*/
140+
public Instant getLastReadTime() {
141+
return lastReadTime;
142+
}
143+
144+
/**
145+
* @return Maximum of differences between timestamp of read and write timestamp for all messages, read during last
146+
* minute.
147+
*/
148+
public Duration getMaxReadTimeLag() {
149+
return maxReadTimeLag;
150+
}
151+
152+
/**
153+
* @return Maximum of differences between write timestamp and create timestamp for all messages, read during last
154+
* minute.
155+
*/
156+
public Duration getMaxWriteTimeLag() {
157+
return maxWriteTimeLag;
158+
}
159+
160+
/**
161+
* @return How much bytes were read during several windows statistics from this partition.
162+
*/
163+
public MultipleWindowsStat getBytesRead() {
164+
return bytesRead;
165+
}
166+
167+
/**
168+
* @return Read session name, provided by client.
169+
*/
170+
public String getReaderName() {
171+
return readerName;
172+
}
173+
174+
/**
175+
* @return Host where read session connected.
176+
*/
177+
public int getConnectionNodeId() {
178+
return connectionNodeId;
179+
}
180+
}
181+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package tech.ydb.topic.description;
2+
3+
import tech.ydb.proto.topic.YdbTopic;
4+
5+
/**
6+
*
7+
* @author Aleksandr Gorshenin
8+
*/
9+
public class PartitionLocation {
10+
private final int nodeId;
11+
private final long generation;
12+
13+
public PartitionLocation(YdbTopic.PartitionLocation location) {
14+
this.nodeId = location.getNodeId();
15+
this.generation = location.getGeneration();
16+
}
17+
18+
public int getNodeId() {
19+
return nodeId;
20+
}
21+
22+
public long getGeneration() {
23+
return generation;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return "PartitionLocation{nodeId=" + nodeId + ", generation=" + generation + "}";
29+
}
30+
}

topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,17 @@ public CompletableFuture<Result<YdbTopic.DescribeTopicResult>> describeTopic(Ydb
5757
);
5858
}
5959

60+
@Override
61+
public CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
62+
YdbTopic.DescribeConsumerRequest request, GrpcRequestSettings settings
63+
) {
64+
return transport
65+
.unaryCall(TopicServiceGrpc.getDescribeConsumerMethod(), settings, request)
66+
.thenApply(OperationBinder.bindSync(
67+
YdbTopic.DescribeConsumerResponse::getOperation, YdbTopic.DescribeConsumerResult.class)
68+
);
69+
}
70+
6071
@Override
6172
public CompletableFuture<Status> dropTopic(YdbTopic.DropTopicRequest request, GrpcRequestSettings settings) {
6273
return transport

topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,22 @@ public CompletableFuture<Result<TopicDescription>> describeTopic(String path, De
241241
.thenApply(result -> result.map(this::mapDescribeTopic));
242242
}
243243

244+
@Override
245+
public CompletableFuture<Result<ConsumerDescription>> describeConsumer(
246+
String path, String name, DescribeConsumerSettings settings
247+
) {
248+
YdbTopic.DescribeConsumerRequest request = YdbTopic.DescribeConsumerRequest.newBuilder()
249+
.setOperationParams(Operation.buildParams(settings))
250+
.setPath(path)
251+
.setConsumer(name)
252+
.setIncludeStats(settings.isIncludeStats())
253+
.setIncludeLocation(settings.isIncludeLocation())
254+
.build();
255+
final GrpcRequestSettings grpcRequestSettings = makeGrpcRequestSettings(settings);
256+
return topicRpc.describeConsumer(request, grpcRequestSettings)
257+
.thenApply(result -> result.map(ConsumerDescription::new));
258+
}
259+
244260
private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) {
245261
if (logger.isTraceEnabled()) {
246262
logger.trace("Received topic describe response:\n{}", result);

0 commit comments

Comments
 (0)