Skip to content

Commit 5e61fed

Browse files
authored
feat: listConsumers (#161)
1 parent 93793c1 commit 5e61fed

File tree

8 files changed

+93
-4
lines changed

8 files changed

+93
-4
lines changed

client/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ tasks.withType<Javadoc> {
183183
exclude("io/hstream/impl/**", "io/hstream/util/**")
184184
}
185185

186+
val clientVersion = version
187+
tasks.withType<Jar> {
188+
manifest {
189+
attributes["Implementation-Version"] = clientVersion
190+
}
191+
}
192+
186193
spotless {
187194
java {
188195
googleJavaFormat()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package io.hstream;
2+
3+
public class ConsumerInformation {
4+
String name;
5+
String uri;
6+
7+
String userAgent;
8+
9+
public String getName() {
10+
return name;
11+
}
12+
13+
public String getUri() {
14+
return uri;
15+
}
16+
17+
public String getUserAgent() {
18+
return userAgent;
19+
}
20+
21+
public static Builder newBuilder() {
22+
return new Builder();
23+
}
24+
25+
public static final class Builder {
26+
private String name;
27+
private String uri;
28+
private String userAgent;
29+
30+
private Builder() {}
31+
32+
public Builder name(String name) {
33+
this.name = name;
34+
return this;
35+
}
36+
37+
public Builder uri(String uri) {
38+
this.uri = uri;
39+
return this;
40+
}
41+
42+
public Builder userAgent(String userAgent) {
43+
this.userAgent = userAgent;
44+
return this;
45+
}
46+
47+
public ConsumerInformation build() {
48+
ConsumerInformation consumerInformation = new ConsumerInformation();
49+
consumerInformation.uri = this.uri;
50+
consumerInformation.name = this.name;
51+
consumerInformation.userAgent = this.userAgent;
52+
return consumerInformation;
53+
}
54+
}
55+
}

client/src/main/java/io/hstream/HStreamClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,6 @@ static HStreamClientBuilder builder() {
142142
View getView(String name);
143143

144144
void deleteView(String name);
145+
146+
List<ConsumerInformation> listConsumers(String subscriptionId);
145147
}

client/src/main/java/io/hstream/impl/ChannelProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public class ChannelProvider implements Closeable {
1414
private ChannelCredentials credentials;
1515

1616
private final ConcurrentHashMap<String, ManagedChannel> provider;
17+
String userAgent =
18+
"hstreamdb-java/" + ChannelProvider.class.getPackage().getImplementationVersion();
1719

1820
public ChannelProvider(int size) {
1921
provider = new ConcurrentHashMap<>(size);
@@ -35,13 +37,15 @@ public ManagedChannel get(String serverUrl) {
3537
url ->
3638
ManagedChannelBuilder.forTarget(url)
3739
.usePlaintext()
40+
.userAgent(userAgent)
3841
.executor(MoreExecutors.directExecutor())
3942
.build());
4043
}
4144
return provider.computeIfAbsent(
4245
serverUrl,
4346
url ->
4447
Grpc.newChannelBuilder(url, credentials)
48+
.userAgent(userAgent)
4549
.executor(MoreExecutors.directExecutor())
4650
.build());
4751
}

client/src/main/java/io/hstream/impl/HStreamClientBuilderImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,10 @@ public HStreamClient build() {
7777
private List<String> parseServerUrls(String url) {
7878
var prefix = "hstream://";
7979
String uriStr = url.strip();
80-
if (uriStr.startsWith(prefix)) {
81-
uriStr = uriStr.substring(prefix.length());
80+
if (!uriStr.startsWith(prefix)) {
81+
throw new HStreamDBClientException(
82+
"incorrect serviceUrl:" + uriStr + " (correct example: hstream://127.0.0.1:6570)");
8283
}
83-
return List.of(uriStr.split(","));
84+
return List.of(uriStr.substring(prefix.length()).split(","));
8485
}
8586
}

client/src/main/java/io/hstream/util/GrpcUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,4 +197,13 @@ public static TaskStatus taskStatusFromInternal(TaskStatusPB statusPB) {
197197
throw new IllegalArgumentException("Unknown task status: " + statusPB);
198198
}
199199
}
200+
201+
public static ConsumerInformation consumerInformationFromGrpc(
202+
io.hstream.internal.Consumer consumer) {
203+
return ConsumerInformation.newBuilder()
204+
.name(consumer.getName())
205+
.uri(consumer.getUri())
206+
.userAgent(consumer.getUserAgent())
207+
.build();
208+
}
200209
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.grpc.ChannelCredentials
55
import io.hstream.BufferedProducerBuilder
66
import io.hstream.Cluster
77
import io.hstream.ConsumerBuilder
8+
import io.hstream.ConsumerInformation
89
import io.hstream.HStreamClient
910
import io.hstream.ProducerBuilder
1011
import io.hstream.Query
@@ -23,6 +24,7 @@ import io.hstream.internal.DeleteViewRequest
2324
import io.hstream.internal.GetQueryRequest
2425
import io.hstream.internal.GetViewRequest
2526
import io.hstream.internal.HStreamApiGrpcKt
27+
import io.hstream.internal.ListConsumersRequest
2628
import io.hstream.internal.ListQueriesRequest
2729
import io.hstream.internal.ListShardsRequest
2830
import io.hstream.internal.ListStreamsRequest
@@ -262,6 +264,15 @@ class HStreamClientKtImpl(bootstrapServerUrls: List<String>, credentials: Channe
262264
}
263265
}
264266

267+
override fun listConsumers(subscriptionId: String?): List<ConsumerInformation> {
268+
return runBlocking {
269+
val serverUrl = lookupSubscriptionServerUrl(subscriptionId)
270+
val stub = HStreamApiGrpcKt.HStreamApiCoroutineStub(channelProvider.get(serverUrl))
271+
stub.listConsumers(ListConsumersRequest.newBuilder().setSubscriptionId(subscriptionId).build())
272+
.consumersList.stream().map(GrpcUtils::consumerInformationFromGrpc).toList()
273+
}
274+
}
275+
265276
private final suspend fun lookupSubscriptionServerUrl(subscriptionId: String?): String {
266277
return unaryCallCoroutine {
267278
val req: LookupSubscriptionRequest =

client/src/main/proto

Submodule proto updated 1 file

0 commit comments

Comments
 (0)