Skip to content

Commit 68f411e

Browse files
authored
feat: add getSubscription (#162)
1 parent 69038f3 commit 68f411e

File tree

6 files changed

+115
-1
lines changed

6 files changed

+115
-1
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.hstream;
2+
3+
import java.util.List;
4+
5+
public class GetSubscriptionResponse {
6+
Subscription subscription;
7+
List<SubscriptionOffset> offsets;
8+
9+
public Subscription getSubscription() {
10+
return subscription;
11+
}
12+
13+
public List<SubscriptionOffset> getOffsets() {
14+
return offsets;
15+
}
16+
17+
public static Builder newBuilder() {
18+
return new Builder();
19+
}
20+
21+
public static final class Builder {
22+
private Subscription subscription;
23+
private List<SubscriptionOffset> offsets;
24+
25+
public Builder subscription(Subscription subscription) {
26+
this.subscription = subscription;
27+
return this;
28+
}
29+
30+
public Builder offsets(List<SubscriptionOffset> offsets) {
31+
this.offsets = offsets;
32+
return this;
33+
}
34+
35+
public GetSubscriptionResponse build() {
36+
GetSubscriptionResponse getSubscriptionResponse = new GetSubscriptionResponse();
37+
getSubscriptionResponse.offsets = this.offsets;
38+
getSubscriptionResponse.subscription = this.subscription;
39+
return getSubscriptionResponse;
40+
}
41+
}
42+
}

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ static HStreamClientBuilder builder() {
110110
*/
111111
List<Subscription> listSubscriptions();
112112

113+
GetSubscriptionResponse getSubscription(String subscriptionId);
114+
113115
/**
114116
* Delete the specified subscription with subscriptionId.
115117
*
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.hstream;
2+
3+
public class SubscriptionOffset {
4+
long shardId;
5+
long batchId;
6+
7+
public static Builder newBuilder() {
8+
return new Builder();
9+
}
10+
11+
public long getShardId() {
12+
return shardId;
13+
}
14+
15+
public long getBatchId() {
16+
return batchId;
17+
}
18+
19+
public static final class Builder {
20+
private long shardId;
21+
private long batchId;
22+
23+
public Builder withShardId(long shardId) {
24+
this.shardId = shardId;
25+
return this;
26+
}
27+
28+
public Builder withBatchId(long batchId) {
29+
this.batchId = batchId;
30+
return this;
31+
}
32+
33+
public SubscriptionOffset build() {
34+
SubscriptionOffset subscriptionOffset = new SubscriptionOffset();
35+
subscriptionOffset.shardId = this.shardId;
36+
subscriptionOffset.batchId = this.batchId;
37+
return subscriptionOffset;
38+
}
39+
}
40+
}

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.hstream.internal.SpecialOffset;
88
import io.hstream.internal.TaskStatusPB;
99
import java.time.Instant;
10+
import java.util.stream.Collectors;
1011

1112
/**
1213
* A class of utility functions to convert between the GRPC generated classes and the custom classes
@@ -206,4 +207,23 @@ public static ConsumerInformation consumerInformationFromGrpc(
206207
.userAgent(consumer.getUserAgent())
207208
.build();
208209
}
210+
211+
public static GetSubscriptionResponse GetSubscriptionResponseFromGrpc(
212+
io.hstream.internal.GetSubscriptionResponse response) {
213+
return GetSubscriptionResponse.newBuilder()
214+
.subscription(subscriptionFromGrpc(response.getSubscription()))
215+
.offsets(
216+
response.getOffsetsList().stream()
217+
.map(GrpcUtils::subscriptionOffsetFromGrpc)
218+
.collect(Collectors.toList()))
219+
.build();
220+
}
221+
222+
public static SubscriptionOffset subscriptionOffsetFromGrpc(
223+
io.hstream.internal.SubscriptionOffset offset) {
224+
return SubscriptionOffset.newBuilder()
225+
.withShardId(offset.getShardId())
226+
.withBatchId(offset.getBatchId())
227+
.build();
228+
}
209229
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.hstream.BufferedProducerBuilder
66
import io.hstream.Cluster
77
import io.hstream.ConsumerBuilder
88
import io.hstream.ConsumerInformation
9+
import io.hstream.GetSubscriptionResponse
910
import io.hstream.HStreamClient
1011
import io.hstream.ProducerBuilder
1112
import io.hstream.Query
@@ -22,6 +23,7 @@ import io.hstream.internal.DeleteStreamRequest
2223
import io.hstream.internal.DeleteSubscriptionRequest
2324
import io.hstream.internal.DeleteViewRequest
2425
import io.hstream.internal.GetQueryRequest
26+
import io.hstream.internal.GetSubscriptionRequest
2527
import io.hstream.internal.GetViewRequest
2628
import io.hstream.internal.HStreamApiGrpcKt
2729
import io.hstream.internal.ListConsumersRequest
@@ -187,6 +189,14 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
187189
)
188190
}
189191
}
192+
193+
override fun getSubscription(subscriptionId: String?): GetSubscriptionResponse {
194+
return unaryCallBlocked {
195+
val response = it.getSubscription(GetSubscriptionRequest.newBuilder().setId(subscriptionId).build())
196+
GrpcUtils.GetSubscriptionResponseFromGrpc(response)
197+
}
198+
}
199+
190200
override fun deleteSubscription(subscriptionId: String?) {
191201
deleteSubscription(subscriptionId, false)
192202
}

client/src/main/proto

Submodule proto updated 1 file

0 commit comments

Comments
 (0)