Skip to content

Commit ef47678

Browse files
authored
feat: add metadata setting api (#218)
1 parent c990347 commit ef47678

File tree

4 files changed

+57
-33
lines changed

4 files changed

+57
-33
lines changed

client/src/main/java/io/hstream/HStreamClientBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,7 @@ public interface HStreamClientBuilder {
5353

5454
HStreamClientBuilder requestTimeoutMs(long timeoutMs);
5555

56+
HStreamClientBuilder withMetadata(String key, String value);
57+
5658
HStreamClient build();
5759
}

client/src/main/java/io/hstream/impl/ChannelProviderImpl.java

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package io.hstream.impl;
22

33
import com.google.common.util.concurrent.MoreExecutors;
4-
import io.grpc.ChannelCredentials;
5-
import io.grpc.Grpc;
6-
import io.grpc.ManagedChannel;
7-
import io.grpc.ManagedChannelBuilder;
4+
import io.grpc.*;
5+
import io.grpc.stub.MetadataUtils;
6+
import java.util.HashMap;
7+
import java.util.Map;
88
import java.util.concurrent.ConcurrentHashMap;
99

1010
public class ChannelProviderImpl implements ChannelProvider {
@@ -16,6 +16,8 @@ public class ChannelProviderImpl implements ChannelProvider {
1616
static final String userAgent =
1717
"hstreamdb-java/" + ChannelProvider.class.getPackage().getImplementationVersion();
1818

19+
Map<String, String> header = new HashMap<>();
20+
1921
public ChannelProviderImpl(int size) {
2022
provider = new ConcurrentHashMap<>(size);
2123
}
@@ -25,31 +27,41 @@ public ChannelProviderImpl(ChannelCredentials credentials) {
2527
provider = new ConcurrentHashMap<>(DEFAULT_CHANNEL_PROVIDER_SIZE);
2628
}
2729

30+
public ChannelProviderImpl(ChannelCredentials credentials, Map<String, String> header) {
31+
this.credentials = credentials;
32+
provider = new ConcurrentHashMap<>(DEFAULT_CHANNEL_PROVIDER_SIZE);
33+
if (header != null) {
34+
this.header = header;
35+
}
36+
}
37+
2838
public ChannelProviderImpl() {
2939
this(DEFAULT_CHANNEL_PROVIDER_SIZE);
3040
}
3141

3242
@Override
3343
public ManagedChannel get(String serverUrl) {
44+
return provider.computeIfAbsent(serverUrl, this::getInternal);
45+
}
46+
47+
ManagedChannel getInternal(String url) {
48+
ManagedChannelBuilder<?> builder;
3449
if (credentials == null) {
35-
return provider.computeIfAbsent(
36-
serverUrl,
37-
url ->
38-
ManagedChannelBuilder.forTarget(url)
39-
.disableRetry()
40-
.usePlaintext()
41-
.userAgent(userAgent)
42-
.executor(MoreExecutors.directExecutor())
43-
.build());
50+
builder = ManagedChannelBuilder.forTarget(url).usePlaintext();
51+
} else {
52+
builder = Grpc.newChannelBuilder(url, credentials);
53+
}
54+
if (!header.isEmpty()) {
55+
var metadata = new Metadata();
56+
header.forEach(
57+
(k, v) -> metadata.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v));
58+
builder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
4459
}
45-
return provider.computeIfAbsent(
46-
serverUrl,
47-
url ->
48-
Grpc.newChannelBuilder(url, credentials)
49-
.disableRetry()
50-
.userAgent(userAgent)
51-
.executor(MoreExecutors.directExecutor())
52-
.build());
60+
return builder
61+
.disableRetry()
62+
.userAgent(userAgent)
63+
.executor(MoreExecutors.directExecutor())
64+
.build();
5365
}
5466

5567
@Override

client/src/main/java/io/hstream/impl/HStreamClientBuilderImpl.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33
import static com.google.common.base.Preconditions.*;
44
import static io.hstream.util.UrlSchemaUtils.parseServerUrls;
55

6+
import io.grpc.ChannelCredentials;
67
import io.grpc.TlsChannelCredentials;
78
import io.hstream.HStreamClient;
89
import io.hstream.HStreamClientBuilder;
910
import io.hstream.HStreamDBClientException;
1011
import io.hstream.UrlSchema;
1112
import java.io.File;
1213
import java.io.IOException;
14+
import java.util.HashMap;
1315
import java.util.List;
16+
import java.util.Map;
1417
import org.apache.commons.lang3.tuple.Pair;
1518

1619
public class HStreamClientBuilderImpl implements HStreamClientBuilder {
@@ -21,6 +24,7 @@ public class HStreamClientBuilderImpl implements HStreamClientBuilder {
2124
boolean enableTlsAuthentication;
2225
String keyPath;
2326
String certPath;
27+
Map<String, String> metadata = new HashMap<>();
2428

2529
long requestTimeoutMs = DefaultSettings.GRPC_CALL_TIMEOUT_MS;
2630

@@ -72,6 +76,12 @@ public HStreamClientBuilder requestTimeoutMs(long timeoutMs) {
7276
return this;
7377
}
7478

79+
@Override
80+
public HStreamClientBuilder withMetadata(String key, String value) {
81+
metadata.put(key, value);
82+
return this;
83+
}
84+
7585
@Override
7686
public HStreamClient build() {
7787
checkArgument(serviceUrl != null, "HStreamClientBuilder: `serviceUrl` should not be null");
@@ -81,6 +91,9 @@ public HStreamClient build() {
8191
if (schemaHosts.getKey().equals(UrlSchema.HSTREAMS) && !enableTls) {
8292
throw new HStreamDBClientException("hstreams url schema should enable tls");
8393
}
94+
95+
// tls
96+
ChannelCredentials credentials = null;
8497
if (enableTls) {
8598
checkArgument(caPath != null, "when TLS is enabled, `caPath` should not be null");
8699
try {
@@ -94,12 +107,17 @@ public HStreamClient build() {
94107
keyPath != null, "when TLS authentication is enabled, `keyPath` should not be null");
95108
credentialsBuilder = credentialsBuilder.keyManager(new File(certPath), new File(keyPath));
96109
}
97-
return new HStreamClientKtImpl(
98-
schemaHosts.getRight(), requestTimeoutMs, credentialsBuilder.build(), channelProvider);
110+
credentials = credentialsBuilder.build();
99111
} catch (IOException e) {
100112
throw new HStreamDBClientException(String.format("invalid tls options, %s", e));
101113
}
102114
}
103-
return new HStreamClientKtImpl(schemaHosts.getRight(), requestTimeoutMs, null, channelProvider);
115+
116+
// channel provider
117+
ChannelProvider provider = channelProvider;
118+
if (provider == null) {
119+
provider = new ChannelProviderImpl(credentials, metadata);
120+
}
121+
return new HStreamClientKtImpl(schemaHosts.getRight(), requestTimeoutMs, provider);
104122
}
105123
}

client/src/main/kotlin/io/hstream/impl/HStreamClientKtImpl.kt

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package io.hstream.impl
33
import com.google.common.base.Preconditions.checkArgument
44
import com.google.common.util.concurrent.MoreExecutors
55
import com.google.protobuf.Empty
6-
import io.grpc.ChannelCredentials
76
import io.hstream.BufferedProducerBuilder
87
import io.hstream.Cluster
98
import io.hstream.Connector
@@ -65,12 +64,10 @@ import kotlin.streams.toList
6564
class HStreamClientKtImpl(
6665
private val bootstrapServerUrls: List<String>,
6766
private val requestTimeoutMs: Long,
68-
credentials: ChannelCredentials? = null,
69-
channelProvider: ChannelProvider? = null,
67+
private val channelProvider: ChannelProvider,
7068
) : HStreamClient {
7169

7270
private val logger = LoggerFactory.getLogger(HStreamClientKtImpl::class.java)
73-
private var channelProvider: ChannelProvider
7471

7572
private val clusterServerUrls: AtomicReference<List<String>> = AtomicReference(null)
7673
fun refreshClusterServerUrls() {
@@ -120,11 +117,6 @@ class HStreamClientKtImpl(
120117
}
121118

122119
init {
123-
if (channelProvider == null) {
124-
this.channelProvider = ChannelProviderImpl(credentials)
125-
} else {
126-
this.channelProvider = channelProvider
127-
}
128120
logger.info("client init with bootstrapServerUrls [{}]", bootstrapServerUrls)
129121
refreshClusterServerUrls()
130122
}

0 commit comments

Comments
 (0)