|
6 | 6 | import io.hstream.HStreamClient; |
7 | 7 | import io.hstream.HStreamClientBuilder; |
8 | 8 | import io.hstream.HStreamDBClientException; |
| 9 | +import io.hstream.UrlSchema; |
9 | 10 | import java.io.File; |
10 | 11 | import java.io.IOException; |
| 12 | +import java.util.Arrays; |
11 | 13 | import java.util.List; |
| 14 | +import java.util.stream.Collectors; |
| 15 | +import org.apache.commons.lang3.tuple.Pair; |
12 | 16 |
|
13 | 17 | public class HStreamClientBuilderImpl implements HStreamClientBuilder { |
14 | 18 |
|
@@ -58,29 +62,53 @@ public HStreamClientBuilder tlsCertPath(String certPath) { |
58 | 62 | @Override |
59 | 63 | public HStreamClient build() { |
60 | 64 | checkNotNull(serviceUrl); |
61 | | - List<String> serverUrls = parseServerUrls(serviceUrl); |
| 65 | + Pair<UrlSchema, List<String>> schemaHosts = parseServerUrls(serviceUrl); |
| 66 | + // FIXME: remove enableTls option |
| 67 | + if (schemaHosts.getKey().equals(UrlSchema.HSTREAMS) && !enableTls) { |
| 68 | + throw new HStreamDBClientException("hstreams url schema should enable tls"); |
| 69 | + } |
62 | 70 | if (enableTls) { |
63 | 71 | try { |
64 | 72 | TlsChannelCredentials.Builder credentialsBuilder = |
65 | 73 | TlsChannelCredentials.newBuilder().trustManager(new File(caPath)); |
66 | 74 | if (enableTlsAuthentication) { |
67 | 75 | credentialsBuilder = credentialsBuilder.keyManager(new File(certPath), new File(keyPath)); |
68 | 76 | } |
69 | | - return new HStreamClientKtImpl(serverUrls, credentialsBuilder.build()); |
| 77 | + return new HStreamClientKtImpl(schemaHosts.getRight(), credentialsBuilder.build()); |
70 | 78 | } catch (IOException e) { |
71 | 79 | throw new HStreamDBClientException(String.format("invalid tls options, %s", e)); |
72 | 80 | } |
73 | 81 | } |
74 | | - return new HStreamClientKtImpl(serverUrls, null); |
| 82 | + return new HStreamClientKtImpl(schemaHosts.getRight(), null); |
75 | 83 | } |
76 | 84 |
|
77 | | - private List<String> parseServerUrls(String url) { |
78 | | - var prefix = "hstream://"; |
| 85 | + private Pair<UrlSchema, List<String>> parseServerUrls(String url) { |
79 | 86 | String uriStr = url.strip(); |
80 | | - if (!uriStr.startsWith(prefix)) { |
| 87 | + var schemaHosts = uriStr.split("://"); |
| 88 | + if (schemaHosts.length != 2) { |
81 | 89 | throw new HStreamDBClientException( |
82 | 90 | "incorrect serviceUrl:" + uriStr + " (correct example: hstream://127.0.0.1:6570)"); |
83 | 91 | } |
84 | | - return List.of(uriStr.substring(prefix.length()).split(",")); |
| 92 | + var schemaStr = schemaHosts[0]; |
| 93 | + UrlSchema urlSchema; |
| 94 | + try { |
| 95 | + urlSchema = UrlSchema.valueOf(schemaStr.toUpperCase()); |
| 96 | + } catch (IllegalArgumentException e) { |
| 97 | + throw new HStreamDBClientException("Invalid url schema:" + schemaStr); |
| 98 | + } |
| 99 | + var hosts = schemaHosts[1]; |
| 100 | + return Pair.of(urlSchema, parseHosts(hosts)); |
| 101 | + } |
| 102 | + |
| 103 | + private List<String> parseHosts(String hosts) { |
| 104 | + return Arrays.stream(hosts.split(",")).map(this::normalizeHost).collect(Collectors.toList()); |
| 105 | + } |
| 106 | + |
| 107 | + private String normalizeHost(String host) { |
| 108 | + var address_port = host.split(":"); |
| 109 | + if (address_port.length == 1) { |
| 110 | + return host + ":6570"; |
| 111 | + } |
| 112 | + return host; |
85 | 113 | } |
86 | 114 | } |
0 commit comments