Skip to content

Commit 1e61a22

Browse files
authored
Add several functionalities to the BanyanDBClient (#49)
1 parent 5548fea commit 1e61a22

File tree

8 files changed

+96
-44
lines changed

8 files changed

+96
-44
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ Release Notes.
99

1010
* Add mod revision check to write requests
1111
* Add TTL to property.
12+
* Support setting multiple server addresses
13+
* Support DNS name resolution
14+
* Support round-robin load balancing
1215

1316
0.4.0
1417
------------------

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,22 @@ The client implement for SkyWalking BanyanDB in Java.
1414

1515
## Create a client
1616

17-
Create a `BanyanDBClient` with host, port and then use `connect()` to establish a connection.
17+
Create a `BanyanDBClient` with the server's several addresses and then use `connect()` to establish a connection.
1818

1919
```java
2020
// use `default` group
21-
client = new BanyanDBClient("127.0.0.1", 17912);
21+
BanyanDBClient client = new BanyanDBClient("banyandb.svc:17912", "10.0.12.9:17912");
2222
// to send any request, a connection to the server must be estabilished
2323
client.connect();
2424
```
2525

26+
These addresses are either IP addresses or DNS names. If DNS names are used, the client will resolve the DNS name to
27+
IP addresses and use them to connect to the server. The client will periodically refresh the IP addresses of the DNS
28+
name. The refresh interval can be configured by `resolveDNSInterval` option.
29+
30+
The client will try to connect to the server in a round-robin manner. The client will periodically refresh the server
31+
addresses. The refresh interval can be configured by `refreshInterval` option.
32+
2633
Besides, you may pass a customized options while building a `BanyanDBClient`. Supported
2734
options are listed below,
2835

@@ -32,6 +39,7 @@ options are listed below,
3239
| maxInboundMessageSize | Max inbound message size | 1024 * 1024 * 50 (~50MB) |
3340
| deadline | Threshold of gRPC blocking query, unit is second | 30 (seconds) |
3441
| refreshInterval | Refresh interval for the gRPC channel, unit is second | 30 (seconds) |
42+
| resolveDNSInterval | DNS resolve interval, unit is second | 30 (minutes) |
3543
| forceReconnectionThreshold | Threshold of force gRPC reconnection if network issue is encountered | 1 |
3644
| forceTLS | Force use TLS for gRPC | false |
3745
| sslTrustCAPath | SSL: Trusted CA Path | |

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import lombok.AccessLevel;
2929
import lombok.Getter;
3030
import lombok.extern.slf4j.Slf4j;
31-
3231
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
3332
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
3433
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
@@ -59,7 +58,9 @@
5958

6059
import java.io.Closeable;
6160
import java.io.IOException;
61+
import java.net.URI;
6262
import java.util.ArrayList;
63+
import java.util.Arrays;
6364
import java.util.Collections;
6465
import java.util.List;
6566
import java.util.concurrent.CompletableFuture;
@@ -83,14 +84,8 @@
8384
*/
8485
@Slf4j
8586
public class BanyanDBClient implements Closeable {
86-
/**
87-
* The hostname of BanyanDB server.
88-
*/
89-
private final String host;
90-
/**
91-
* The port of BanyanDB server.
92-
*/
93-
private final int port;
87+
88+
private final String[] targets;
9489
/**
9590
* Options for server connection.
9691
*/
@@ -138,23 +133,24 @@ public class BanyanDBClient implements Closeable {
138133
/**
139134
* Create a BanyanDB client instance with a default options.
140135
*
141-
* @param host IP or domain name
142-
* @param port Server port
136+
* @param targets server targets
143137
*/
144-
public BanyanDBClient(String host, int port) {
145-
this(host, port, new Options());
138+
public BanyanDBClient(String... targets) {
139+
this(targets, new Options());
146140
}
147141

148142
/**
149143
* Create a BanyanDB client instance with a customized options.
150144
*
151-
* @param host IP or domain name
152-
* @param port Server port
145+
* @param targets server targets
153146
* @param options customized options
154147
*/
155-
public BanyanDBClient(String host, int port, Options options) {
156-
this.host = host;
157-
this.port = port;
148+
public BanyanDBClient(String[] targets, Options options) {
149+
String[] tt = Preconditions.checkNotNull(targets, "targets");
150+
checkState(tt.length > 0, "targets' size must be more than 1");
151+
tt = Arrays.stream(tt).filter(t -> !Strings.isNullOrEmpty(t)).toArray(size -> new String[size]);
152+
checkState(tt.length > 0, "valid targets' size must be more than 1");
153+
this.targets = tt;
158154
this.options = options;
159155
this.connectionEstablishLock = new ReentrantLock();
160156
this.metadataCache = new MetadataCache();
@@ -169,8 +165,12 @@ public void connect() throws IOException {
169165
connectionEstablishLock.lock();
170166
try {
171167
if (!isConnected) {
168+
URI[] addresses = new URI[this.targets.length];
169+
for (int i = 0; i < this.targets.length; i++) {
170+
addresses[i] = URI.create("//" + this.targets[i]);
171+
}
172172
this.channel = ChannelManager.create(this.options.buildChannelManagerSettings(),
173-
new DefaultChannelFactory(this.host, this.port, this.options));
173+
new DefaultChannelFactory(addresses, this.options));
174174
streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel);
175175
measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel);
176176
streamServiceStub = StreamServiceGrpc.newStub(this.channel);

src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class Options {
4545
* Threshold of force gRPC reconnection if network issue is encountered
4646
*/
4747
private long forceReconnectionThreshold = 1;
48+
/**
49+
* Threshold of resolving the DNS
50+
*/
51+
private long resolveDNSInterval = 30 * 60;
4852
/**
4953
* Force use TLS for gRPC
5054
* Default is false

src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/ChannelManager.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import io.grpc.ManagedChannel;
3030
import io.grpc.Metadata;
3131
import io.grpc.MethodDescriptor;
32-
import io.grpc.NameResolverRegistry;
3332
import io.grpc.Status;
34-
import io.grpc.internal.DnsNameResolverProvider;
3533
import lombok.RequiredArgsConstructor;
3634
import lombok.extern.slf4j.Slf4j;
3735

@@ -57,22 +55,18 @@ public class ChannelManager extends ManagedChannel {
5755
private final ScheduledExecutorService executor;
5856
@VisibleForTesting
5957
final AtomicReference<Entry> entryRef = new AtomicReference<>();
60-
private final String authority;
6158

6259
public static ChannelManager create(ChannelManagerSettings settings, ChannelFactory channelFactory)
6360
throws IOException {
6461
return new ChannelManager(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
6562
}
6663

67-
public ChannelManager(ChannelManagerSettings settings, ChannelFactory channelFactory, ScheduledExecutorService executor) throws IOException {
64+
ChannelManager(ChannelManagerSettings settings, ChannelFactory channelFactory, ScheduledExecutorService executor) throws IOException {
6865
this.settings = settings;
6966
this.channelFactory = channelFactory;
7067
this.executor = executor;
7168

72-
NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
73-
7469
entryRef.set(new Entry(channelFactory.create()));
75-
authority = entryRef.get().channel.authority();
7670

7771
this.executor.scheduleAtFixedRate(
7872
this::refreshSafely,
@@ -92,16 +86,17 @@ private void refreshSafely() {
9286

9387
void refresh() throws IOException {
9488
Entry entry = entryRef.get();
95-
if (entry.needReconnect) {
96-
if (entry.isConnected(entry.reconnectCount.incrementAndGet() > this.settings.forceReconnectionThreshold())) {
97-
// Reconnect to the same server is automatically done by GRPC
98-
// clear the flags
99-
entry.reset();
100-
} else {
101-
Entry replacedEntry = entryRef.getAndSet(new Entry(this.channelFactory.create()));
102-
replacedEntry.shutdown();
103-
}
89+
if (!entry.needReconnect) {
90+
return;
91+
}
92+
if (entry.isConnected(entry.reconnectCount.incrementAndGet() > this.settings.forceReconnectionThreshold())) {
93+
// Reconnect to the same server is automatically done by GRPC
94+
// clear the flags
95+
entry.reset();
96+
return;
10497
}
98+
Entry replacedEntry = entryRef.getAndSet(new Entry(this.channelFactory.create()));
99+
replacedEntry.shutdown();
105100
}
106101

107102
@Override
@@ -157,7 +152,7 @@ public <REQ, RESP> ClientCall<REQ, RESP> newCall(MethodDescriptor<REQ, RESP> met
157152

158153
@Override
159154
public String authority() {
160-
return this.authority;
155+
return this.entryRef.get().channel.authority();
161156
}
162157

163158
@RequiredArgsConstructor
@@ -192,7 +187,7 @@ public <REQ, RESP> ClientCall<REQ, RESP> newCall(MethodDescriptor<REQ, RESP> met
192187

193188
@Override
194189
public String authority() {
195-
return authority;
190+
return ChannelManager.this.authority();
196191
}
197192
}
198193

src/main/java/org/apache/skywalking/banyandb/v1/client/grpc/channel/DefaultChannelFactory.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.grpc.netty.NegotiationType;
2525
import io.grpc.netty.NettyChannelBuilder;
2626
import io.netty.handler.ssl.SslContextBuilder;
27+
import io.netty.util.internal.PlatformDependent;
28+
import io.netty.util.internal.SocketUtils;
2729
import lombok.RequiredArgsConstructor;
2830
import lombok.extern.slf4j.Slf4j;
2931
import org.apache.skywalking.banyandb.v1.client.Options;
@@ -33,17 +35,30 @@
3335
import java.io.FileInputStream;
3436
import java.io.IOException;
3537
import java.io.InputStream;
38+
import java.net.InetAddress;
39+
import java.net.InetSocketAddress;
40+
import java.net.SocketAddress;
41+
import java.net.URI;
42+
import java.net.UnknownHostException;
43+
import java.util.Arrays;
44+
import java.util.Comparator;
45+
import java.util.stream.Stream;
3646

3747
@Slf4j
3848
@RequiredArgsConstructor
3949
public class DefaultChannelFactory implements ChannelFactory {
40-
private final String host;
41-
private final int port;
50+
private final URI[] targets;
4251
private final Options options;
52+
private SocketAddress[] addresses;
53+
private long lastTargetsResolvedTime;
4354

4455
@Override
4556
public ManagedChannel create() throws IOException {
46-
NettyChannelBuilder managedChannelBuilder = NettyChannelBuilder.forAddress(this.host, this.port)
57+
if (this.addresses == null ||
58+
System.currentTimeMillis() - this.lastTargetsResolvedTime > this.options.getResolveDNSInterval()) {
59+
resolveTargets();
60+
}
61+
NettyChannelBuilder managedChannelBuilder = NettyChannelBuilder.forAddress(resolveAddress())
4762
.maxInboundMessageSize(options.getMaxInboundMessageSize())
4863
.usePlaintext();
4964

@@ -75,4 +90,31 @@ public ManagedChannel create() throws IOException {
7590
}
7691
return managedChannelBuilder.build();
7792
}
93+
94+
private void resolveTargets() {
95+
this.addresses = Arrays.stream(this.targets)
96+
.flatMap(target -> {
97+
try {
98+
return Arrays.stream(SocketUtils.allAddressesByName(target.getHost()))
99+
.map(InetAddress::getHostAddress)
100+
.map(ip -> new InetSocketAddress(ip, target.getPort()));
101+
} catch (Throwable t) {
102+
log.error("Failed to resolve the BanyanDB server's address ", t);
103+
}
104+
return Stream.empty();
105+
})
106+
.sorted(Comparator.comparing(InetSocketAddress::toString))
107+
.distinct()
108+
.toArray(InetSocketAddress[]::new);
109+
this.lastTargetsResolvedTime = System.currentTimeMillis();
110+
}
111+
112+
private SocketAddress resolveAddress() throws UnknownHostException {
113+
int numAddresses = this.addresses.length;
114+
if (numAddresses < 1) {
115+
throw new UnknownHostException();
116+
}
117+
int offset = numAddresses == 1 ? 0 : PlatformDependent.threadLocalRandom().nextInt(numAddresses);
118+
return this.addresses[offset];
119+
}
78120
}

src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ protected void setUp(SetupFunction... setUpFunctions) throws IOException {
354354
this.channel = grpcCleanup.register(
355355
InProcessChannelBuilder.forName(serverName).directExecutor().build());
356356

357-
client = new BanyanDBClient("127.0.0.1", s.getPort());
357+
client = new BanyanDBClient(String.format("127.0.0.1:%d", s.getPort()));
358358
client.connect(this.channel);
359359
}
360360

src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class BanyanDBClientTestCI {
4949

5050
protected void setUpConnection() throws IOException {
5151
log.info("create BanyanDB client and try to connect");
52-
client = new BanyanDBClient(banyanDB.getHost(), banyanDB.getMappedPort(GRPC_PORT));
52+
client = new BanyanDBClient(String.format("%s:%d", banyanDB.getHost(), banyanDB.getMappedPort(GRPC_PORT)));
5353
client.connect();
5454
}
5555

0 commit comments

Comments
 (0)