Skip to content

Commit 801eea6

Browse files
authored
support hstream server cluster (#37)
1 parent c2cfcc0 commit 801eea6

File tree

14 files changed

+226
-81
lines changed

14 files changed

+226
-81
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
args: '--set-exit-if-changed'
2323

2424
- name: Prepare test env
25-
run: script/prepare-test-env.sh
25+
run: script/prepare-test-env-latest.sh
2626

2727
- name: Set up JDK 11
2828
uses: actions/setup-java@v2

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
![Build Status](https://github.com/hstreamdb/hstreamdb-java/actions/workflows/main.yml/badge.svg)
44
[![Maven Central](https://img.shields.io/maven-central/v/io.hstream/hstreamdb-java)](https://search.maven.org/artifact/io.hstream/hstreamdb-java)
55
[![javadoc](https://javadoc.io/badge2/io.hstream/hstreamdb-java/0.5.1/javadoc.svg)](https://javadoc.io/doc/io.hstream/hstreamdb-java/0.5.1)
6-
[![Snapshot Artifacts](https://img.shields.io/nexus/s/https/s01.oss.sonatype.org/io.hstream/hstreamdb-java.svg)](https://s01.oss.sonatype.org/content/repositories/snapshots/io/hstream/hstreamdb-java/0.5.1-SNAPSHOT/)
7-
[![javadoc](https://javadoc.io/badge2/io.hstream/hstreamdb-java/0.5.1-SNAPSHOT/javadoc.svg)](https://hstreamdb.github.io/hstreamdb-java/javadoc/)
6+
[![Snapshot Artifacts](https://img.shields.io/nexus/s/https/s01.oss.sonatype.org/io.hstream/hstreamdb-java.svg)](https://s01.oss.sonatype.org/content/repositories/snapshots/io/hstream/hstreamdb-java/0.6.0-SNAPSHOT/)
7+
[![javadoc](https://javadoc.io/badge2/io.hstream/hstreamdb-java/0.6.0-SNAPSHOT/javadoc.svg)](https://hstreamdb.github.io/hstreamdb-java/javadoc/)
88

99
This is the offical Java client library for [HStreamDB](https://hstream.io/).
1010

client/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ plugins {
88
}
99

1010
group = 'io.hstream'
11-
version = '0.5.1'
11+
version = '0.6.0-SNAPSHOT'
1212

1313
repositories {
1414
mavenCentral()
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.hstream.impl;
2+
3+
import io.grpc.ManagedChannel;
4+
import io.grpc.ManagedChannelBuilder;
5+
import java.io.Closeable;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
8+
public class ChannelProvider implements Closeable {
9+
10+
private static final int DEFAULT_CHANNEL_PROVIDER_SIZE = 64;
11+
12+
private final ConcurrentHashMap<String, ManagedChannel> provider;
13+
14+
public ChannelProvider(int size) {
15+
provider = new ConcurrentHashMap<>(size);
16+
}
17+
18+
public ChannelProvider() {
19+
this(DEFAULT_CHANNEL_PROVIDER_SIZE);
20+
}
21+
22+
public ManagedChannel get(String serverUrl) {
23+
return provider.computeIfAbsent(
24+
serverUrl, url -> ManagedChannelBuilder.forTarget(url).usePlaintext().build());
25+
}
26+
27+
@Override
28+
public void close() {
29+
provider.forEachValue(Long.MAX_VALUE, managedChannel -> managedChannel.shutdown());
30+
provider.clear();
31+
}
32+
}

client/src/main/java/io/hstream/impl/ConsumerBuilderImpl.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,21 @@
66
import io.hstream.ConsumerBuilder;
77
import io.hstream.HRecordReceiver;
88
import io.hstream.RawRecordReceiver;
9-
import io.hstream.internal.HStreamApiGrpc;
9+
import java.util.List;
1010

1111
public class ConsumerBuilderImpl implements ConsumerBuilder {
1212

13-
private HStreamApiGrpc.HStreamApiStub grpcStub;
14-
private HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub;
13+
private final List<String> serverUrls;
14+
private final ChannelProvider channelProvider;
15+
1516
private String name;
1617
private String subscription;
1718
private RawRecordReceiver rawRecordReceiver;
1819
private HRecordReceiver hRecordReceiver;
1920

20-
public ConsumerBuilderImpl(
21-
HStreamApiGrpc.HStreamApiStub grpcStub,
22-
HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub) {
23-
this.grpcStub = grpcStub;
24-
this.grpcBlockingStub = grpcBlockingStub;
21+
public ConsumerBuilderImpl(List<String> serverUrls, ChannelProvider channelProvider) {
22+
this.serverUrls = serverUrls;
23+
this.channelProvider = channelProvider;
2524
}
2625

2726
@Override
@@ -50,11 +49,9 @@ public ConsumerBuilder hRecordReceiver(HRecordReceiver hRecordReceiver) {
5049

5150
@Override
5251
public Consumer build() {
53-
checkNotNull(grpcStub);
54-
checkNotNull(grpcBlockingStub);
5552
checkNotNull(subscription);
5653
checkState(rawRecordReceiver != null || hRecordReceiver != null);
5754
return new ConsumerImpl(
58-
grpcStub, grpcBlockingStub, name, subscription, rawRecordReceiver, hRecordReceiver);
55+
serverUrls, channelProvider, name, subscription, rawRecordReceiver, hRecordReceiver);
5956
}
6057
}

client/src/main/java/io/hstream/impl/ConsumerImpl.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.util.concurrent.AbstractService;
44
import com.google.common.util.concurrent.ThreadFactoryBuilder;
55
import com.google.protobuf.InvalidProtocolBufferException;
6+
import io.grpc.ManagedChannelBuilder;
67
import io.grpc.stub.StreamObserver;
78
import io.hstream.*;
89
import io.hstream.internal.*;
@@ -18,29 +19,34 @@
1819
public class ConsumerImpl extends AbstractService implements Consumer {
1920
private static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
2021

21-
private HStreamApiGrpc.HStreamApiStub grpcStub;
22-
private HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub;
23-
private String consumerName;
24-
private String subscriptionId;
25-
private RawRecordReceiver rawRecordReceiver;
26-
private HRecordReceiver hRecordReceiver;
22+
private final List<String> serverUrls;
23+
private final ChannelProvider channelProvider;
24+
25+
private final String consumerName;
26+
private final String subscriptionId;
27+
private final RawRecordReceiver rawRecordReceiver;
28+
private final HRecordReceiver hRecordReceiver;
2729

2830
private ExecutorService executorService;
2931

32+
private HStreamApiGrpc.HStreamApiStub fetchStub;
33+
3034
private final StreamObserver<StreamingFetchResponse> responseStream;
3135
private final StreamObserver<StreamingFetchRequest> requestStream;
3236

33-
private final AtomicBoolean inited = new AtomicBoolean(false);
37+
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
3438

3539
public ConsumerImpl(
36-
HStreamApiGrpc.HStreamApiStub grpcStub,
37-
HStreamApiGrpc.HStreamApiBlockingStub grpcBlockingStub,
40+
List<String> serverUrls,
41+
ChannelProvider channelProvider,
3842
String consumerName,
3943
String subscriptionId,
4044
RawRecordReceiver rawRecordReceiver,
4145
HRecordReceiver hRecordReceiver) {
42-
this.grpcStub = grpcStub;
43-
this.grpcBlockingStub = grpcBlockingStub;
46+
47+
this.serverUrls = serverUrls;
48+
this.channelProvider = channelProvider;
49+
4450
if (consumerName == null) {
4551
this.consumerName = UUID.randomUUID().toString();
4652
} else {
@@ -54,11 +60,12 @@ public ConsumerImpl(
5460
Executors.newSingleThreadExecutor(
5561
new ThreadFactoryBuilder().setNameFormat("receiver-running-pool-%d").build());
5662

63+
fetchStub = createFetchStub();
5764
responseStream =
5865
new StreamObserver<StreamingFetchResponse>() {
5966
@Override
6067
public void onNext(StreamingFetchResponse value) {
61-
if (inited.compareAndSet(false, true)) {
68+
if (isInitialized.compareAndSet(false, true)) {
6269
// notifyStarted();
6370
}
6471

@@ -102,7 +109,7 @@ public void onNext(StreamingFetchResponse value) {
102109

103110
@Override
104111
public void onError(Throwable t) {
105-
if (inited.compareAndSet(false, true)) {
112+
if (isInitialized.compareAndSet(false, true)) {
106113
logger.error(
107114
"consumer {} attach to subscription {} error: {}",
108115
ConsumerImpl.this.consumerName,
@@ -123,7 +130,19 @@ public void onError(Throwable t) {
123130
public void onCompleted() {}
124131
};
125132

126-
this.requestStream = grpcStub.streamingFetch(responseStream);
133+
this.requestStream = fetchStub.streamingFetch(responseStream);
134+
}
135+
136+
private HStreamApiGrpc.HStreamApiStub createFetchStub() {
137+
ServerNode serverNode =
138+
HStreamApiGrpc.newBlockingStub(
139+
ManagedChannelBuilder.forTarget(serverUrls.get(0)).usePlaintext().build())
140+
.lookupSubscription(
141+
LookupSubscriptionRequest.newBuilder().setSubscriptionId(subscriptionId).build())
142+
.getServerNode();
143+
144+
String serverUrl = serverNode.getHost() + ":" + serverNode.getPort();
145+
return HStreamApiGrpc.newStub(channelProvider.get(serverUrl));
127146
}
128147

129148
@Override

client/src/main/java/io/hstream/impl/HStreamClientBuilderImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.hstream.HStreamClient;
66
import io.hstream.HStreamClientBuilder;
7+
import java.util.List;
78

89
public class HStreamClientBuilderImpl implements HStreamClientBuilder {
910

@@ -18,6 +19,11 @@ public HStreamClientBuilder serviceUrl(String serviceUrl) {
1819
@Override
1920
public HStreamClient build() {
2021
checkNotNull(serviceUrl);
21-
return new HStreamClientImpl(serviceUrl);
22+
List<String> serverUrls = parseServerUrls(serviceUrl);
23+
return new HStreamClientImpl(serverUrls);
24+
}
25+
26+
private List<String> parseServerUrls(String url) {
27+
return List.of(url.strip().split(","));
2228
}
2329
}

client/src/main/java/io/hstream/impl/HStreamClientImpl.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22

33
import com.google.protobuf.Empty;
44
import io.grpc.ManagedChannel;
5-
import io.grpc.ManagedChannelBuilder;
65
import io.hstream.*;
7-
import io.hstream.internal.DeleteStreamRequest;
8-
import io.hstream.internal.DeleteSubscriptionRequest;
9-
import io.hstream.internal.HStreamApiGrpc;
10-
import io.hstream.internal.ListStreamsResponse;
6+
import io.hstream.Stream;
7+
import io.hstream.Subscription;
8+
import io.hstream.internal.*;
119
import io.hstream.util.GrpcUtils;
10+
import java.util.ArrayList;
1211
import java.util.List;
1312
import java.util.concurrent.TimeUnit;
1413
import java.util.stream.Collectors;
@@ -19,32 +18,44 @@ public class HStreamClientImpl implements HStreamClient {
1918

2019
private static final Logger logger = LoggerFactory.getLogger(HStreamClientImpl.class);
2120

22-
private final ManagedChannel managedChannel;
23-
private final HStreamApiGrpc.HStreamApiStub stub;
24-
private final HStreamApiGrpc.HStreamApiBlockingStub blockingStub;
25-
2621
private static final short DEFAULT_STREAM_REPLICATOR = 3;
2722

28-
public HStreamClientImpl(String serviceUrl) {
29-
ManagedChannel channel = ManagedChannelBuilder.forTarget(serviceUrl).usePlaintext().build();
30-
this.managedChannel = channel;
31-
this.stub = HStreamApiGrpc.newStub(channel);
32-
this.blockingStub = HStreamApiGrpc.newBlockingStub(channel);
23+
private final ChannelProvider channelProvider;
24+
private final List<String> bootstrapServerUrls;
25+
private final List<String> initializedServerUrls;
26+
27+
private HStreamApiGrpc.HStreamApiBlockingStub blockingStub;
28+
29+
public HStreamClientImpl(List<String> bootstrapServerUrls) {
30+
this.bootstrapServerUrls = bootstrapServerUrls;
31+
channelProvider = new ChannelProvider();
32+
33+
ManagedChannel channel = channelProvider.get(bootstrapServerUrls.get(0));
34+
blockingStub = HStreamApiGrpc.newBlockingStub(channel);
35+
DescribeClusterResponse describeClusterResponse =
36+
blockingStub.describeCluster(Empty.newBuilder().build());
37+
List<ServerNode> serverNodes = describeClusterResponse.getServerNodesList();
38+
initializedServerUrls = new ArrayList<>(serverNodes.size());
39+
for (ServerNode serverNode : serverNodes) {
40+
String host = serverNode.getHost();
41+
int port = serverNode.getPort();
42+
initializedServerUrls.add(host + ":" + port);
43+
}
3344
}
3445

3546
@Override
3647
public ProducerBuilder newProducer() {
37-
return new ProducerBuilderImpl(stub);
48+
return new ProducerBuilderImpl(initializedServerUrls, channelProvider);
3849
}
3950

4051
@Override
4152
public ConsumerBuilder newConsumer() {
42-
return new ConsumerBuilderImpl(stub, blockingStub);
53+
return new ConsumerBuilderImpl(initializedServerUrls, channelProvider);
4354
}
4455

4556
@Override
4657
public QueryerBuilder newQueryer() {
47-
return new QueryerBuilderImpl(this, stub);
58+
return new QueryerBuilderImpl(this, initializedServerUrls, channelProvider);
4859
}
4960

5061
@Override
@@ -101,7 +112,7 @@ public void deleteSubscription(String subscriptionId) {
101112
}
102113

103114
@Override
104-
public void close() throws Exception {
105-
managedChannel.shutdownNow();
115+
public void close() {
116+
channelProvider.close();
106117
}
107118
}

client/src/main/java/io/hstream/impl/ProducerBuilderImpl.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@
44

55
import io.hstream.Producer;
66
import io.hstream.ProducerBuilder;
7-
import io.hstream.internal.HStreamApiGrpc;
7+
import java.util.List;
88

99
public class ProducerBuilderImpl implements ProducerBuilder {
1010

11-
private HStreamApiGrpc.HStreamApiStub grpcStub;
12-
1311
private String streamName;
1412

1513
private boolean enableBatch = false;
1614

1715
private int recordCountLimit = 1;
1816

19-
public ProducerBuilderImpl(HStreamApiGrpc.HStreamApiStub stub) {
20-
this.grpcStub = stub;
17+
private final List<String> serverUrls;
18+
private final ChannelProvider channelProvider;
19+
20+
public ProducerBuilderImpl(List<String> serverUrls, ChannelProvider channelProvider) {
21+
this.serverUrls = serverUrls;
22+
this.channelProvider = channelProvider;
2123
}
2224

2325
@Override
@@ -40,8 +42,7 @@ public ProducerBuilder recordCountLimit(int recordCountLimit) {
4042

4143
@Override
4244
public Producer build() {
43-
checkNotNull(grpcStub);
4445
checkNotNull(streamName);
45-
return new ProducerImpl(grpcStub, streamName, enableBatch, recordCountLimit);
46+
return new ProducerImpl(serverUrls, channelProvider, streamName, enableBatch, recordCountLimit);
4647
}
4748
}

0 commit comments

Comments
 (0)