Skip to content

Commit 742f6aa

Browse files
wip
1 parent 81e0cd8 commit 742f6aa

File tree

6 files changed

+101
-156
lines changed

6 files changed

+101
-156
lines changed

examples/src/main/java/com/influxdb/v3/ProxyExample.java

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,52 +21,98 @@
2121
*/
2222
package com.influxdb.v3;
2323

24+
import java.net.InetSocketAddress;
25+
import java.net.URI;
2426
import java.util.UUID;
27+
import java.util.function.Supplier;
2528
import java.util.stream.Stream;
29+
import javax.annotation.Nonnull;
30+
import javax.annotation.Nullable;
31+
32+
import io.grpc.HttpConnectProxiedSocketAddress;
33+
import io.grpc.ProxyDetector;
34+
import io.netty.handler.proxy.HttpProxyHandler;
2635

2736
import com.influxdb.v3.client.InfluxDBClient;
2837
import com.influxdb.v3.client.Point;
2938
import com.influxdb.v3.client.PointValues;
3039
import com.influxdb.v3.client.config.ClientConfig;
40+
import com.influxdb.v3.client.config.NettyHttpClientConfig;
3141

3242
public final class ProxyExample {
3343

3444
private ProxyExample() {
3545
}
3646

3747
public static void main(final String[] args) throws Exception {
38-
// Run docker-compose.yml file to start Envoy proxy
48+
// Run the docker-compose.yml file to start Envoy proxy,
49+
// or start envoy proxy directly with the command `envoy-c envoy.yaml`
3950

4051
String proxyUrl = "http://localhost:10000";
52+
String targetUrl = "http://localhost:8086";
53+
String username = "username";
54+
String password = "password";
55+
56+
NettyHttpClientConfig nettyHttpClientConfig = new NettyHttpClientConfig();
57+
58+
// Set proxy for write api
59+
Supplier<HttpProxyHandler> writeApiProxy = () ->
60+
new HttpProxyHandler(new InetSocketAddress("localhost", 10000), username, password);
61+
nettyHttpClientConfig.configureChannelProxy(writeApiProxy);
62+
63+
// Set proxy for query api
64+
ProxyDetector proxyDetector = createProxyDetector(targetUrl, proxyUrl, username, password);
65+
nettyHttpClientConfig.configureManagedChannelProxy(proxyDetector);
66+
4167
String sslRootsFilePath = "src/test/java/com/influxdb/v3/client/testdata/influxdb-certificate.pem";
4268
ClientConfig clientConfig = new ClientConfig.Builder()
4369
.host(System.getenv("INFLUXDB_URL"))
4470
.token(System.getenv("INFLUXDB_TOKEN").toCharArray())
4571
.database(System.getenv("INFLUXDB_DATABASE"))
46-
.proxyUrl(proxyUrl)
72+
.nettyHttpClientConfig(nettyHttpClientConfig)
4773
.sslRootsFilePath(sslRootsFilePath)
4874
.build();
4975

50-
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
51-
String testId = UUID.randomUUID().toString();
52-
Point point = Point.measurement("My_Home")
53-
.setTag("room", "Kitchen")
54-
.setField("temp", 12.7)
55-
.setField("hum", 37)
56-
.setField("testId", testId);
57-
influxDBClient.writePoint(point);
76+
try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) {
77+
String testId = UUID.randomUUID().toString();
78+
Point point = Point.measurement("My_Home")
79+
.setTag("room", "Kitchen")
80+
.setField("temp", 12.7)
81+
.setField("hum", 37)
82+
.setField("testId", testId);
83+
influxDBClient.writePoint(point);
5884

59-
String query = String.format("SELECT * FROM \"My_Home\" WHERE \"testId\" = '%s'", testId);
60-
try (Stream<PointValues> stream = influxDBClient.queryPoints(query)) {
61-
stream.findFirst().ifPresent(values -> {
62-
assert values.getTimestamp() != null;
63-
System.out.printf("room[%s]: %s, temp: %3.2f, hum: %d",
64-
new java.util.Date(values.getTimestamp().longValue() / 1000000),
65-
values.getTag("room"),
66-
(Double) values.getField("temp"),
67-
(Long) values.getField("hum"));
68-
});
85+
String query = String.format("SELECT * FROM \"My_Home\" WHERE \"testId\" = '%s'", testId);
86+
try (Stream<PointValues> stream = influxDBClient.queryPoints(query)) {
87+
stream.findFirst().ifPresent(values -> {
88+
assert values.getTimestamp() != null;
89+
System.out.printf("room[%s]: %s, temp: %3.2f, hum: %d",
90+
new java.util.Date(values.getTimestamp().longValue() / 1000000),
91+
values.getTag("room"),
92+
(Double) values.getField("temp"),
93+
(Long) values.getField("hum"));
94+
});
95+
}
6996
}
7097
}
98+
99+
public static ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull final String proxyUrl,
100+
@Nullable final String username, @Nullable final String password) {
101+
URI targetUri = URI.create(targetUrl);
102+
URI proxyUri = URI.create(proxyUrl);
103+
return (targetServerAddress) -> {
104+
InetSocketAddress targetAddress = (InetSocketAddress) targetServerAddress;
105+
if (targetUri.getHost().equals(targetAddress.getHostString())
106+
&& targetUri.getPort() == targetAddress.getPort()) {
107+
return HttpConnectProxiedSocketAddress.newBuilder()
108+
.setProxyAddress(new InetSocketAddress(proxyUri.getHost(), proxyUri.getPort()))
109+
.setTargetAddress(targetAddress)
110+
.setUsername(username)
111+
.setPassword(password)
112+
.build();
113+
}
114+
return null;
115+
};
116+
}
71117
}
72118

examples/src/main/java/com/influxdb/v3/durable/DurableExample.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.influxdb.v3.durable;
2323

24+
import java.net.URISyntaxException;
2425
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.concurrent.ExecutorService;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.locks.LockSupport;
3132
import java.util.logging.Logger;
3233
import java.util.stream.Stream;
34+
import javax.net.ssl.SSLException;
3335

3436
import com.influxdb.v3.client.InfluxDBApiException;
3537
import com.influxdb.v3.client.InfluxDBClient;
@@ -124,8 +126,13 @@ public static void main(final String[] args) {
124126
}
125127

126128
// borrow then return a client
127-
InfluxDBClient client = clientPool.borrowClient();
128-
try {
129+
InfluxDBClient client = null;
130+
try {
131+
client = clientPool.borrowClient();
132+
} catch (URISyntaxException | SSLException e) {
133+
throw new RuntimeException(e);
134+
}
135+
try {
129136
logger.info(" [writeTaskPointsOK " + count + "] Writing " + points.size()
130137
+ " points with client " + client.hashCode());
131138
client.writePoints(points);
@@ -159,8 +166,13 @@ public static void main(final String[] args) {
159166
}
160167
}
161168
// borrow a client from the pool
162-
InfluxDBClient client = clientPool.borrowClient();
163-
try {
169+
InfluxDBClient client = null;
170+
try {
171+
client = clientPool.borrowClient();
172+
} catch (URISyntaxException | SSLException e) {
173+
throw new RuntimeException(e);
174+
}
175+
try {
164176
logger.info("[writeErrorRecover " + count + "] Writing " + lps.size()
165177
+ " lps with client " + client.hashCode());
166178
client.writeRecords(lps);
@@ -183,9 +195,14 @@ public static void main(final String[] args) {
183195
int count = 0;
184196
while (!shutdownAll.get()) {
185197
// borrow a client from the pool
186-
InfluxDBClient client = clientPool.borrowClient();
198+
InfluxDBClient client = null;
199+
try {
200+
client = clientPool.borrowClient();
201+
} catch (URISyntaxException | SSLException e) {
202+
throw new RuntimeException(e);
203+
}
187204

188-
// initiate the query and process the results
205+
// initiate the query and process the results
189206
try (Stream<PointValues> pvs = client.queryPoints(query)) {
190207
logger.info("[queryOK " + count + "] with client " + client.hashCode()
191208
+ ": query returned " + pvs.toArray().length + " records");
@@ -208,8 +225,13 @@ public static void main(final String[] args) {
208225
int count = 0;
209226
while (!shutdownAll.get()) {
210227
// borrow a client from the pool
211-
InfluxDBClient client = clientPool.borrowClient();
212-
// every third query attempt results in an error
228+
InfluxDBClient client = null;
229+
try {
230+
client = clientPool.borrowClient();
231+
} catch (URISyntaxException | SSLException e) {
232+
throw new RuntimeException(e);
233+
}
234+
// every third query attempt results in an error
213235
String effectiveQuery = count > 0 && count % 3 == 0 ? badQuery : query;
214236

215237
// attempt to execute the query and process the results

examples/src/main/java/com/influxdb/v3/durable/InfluxClientPool.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
*/
2222
package com.influxdb.v3.durable;
2323

24+
import java.net.URISyntaxException;
2425
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.Stack;
2728
import java.util.logging.Logger;
2829

30+
import javax.net.ssl.SSLException;
31+
2932
import com.influxdb.v3.client.InfluxDBClient;
3033
import com.influxdb.v3.client.config.ClientConfig;
3134

@@ -75,7 +78,7 @@ public InfluxClientPool(final ClientConfig clientConfig, final int maxSize) {
7578
*
7679
* @return - An InfluxDBClient ready for use.
7780
*/
78-
public synchronized InfluxDBClient borrowClient() {
81+
public synchronized InfluxDBClient borrowClient() throws URISyntaxException, SSLException {
7982
InfluxDBClient client;
8083
if (idlers.isEmpty()) {
8184
client = InfluxDBClient.getInstance(clientConfig);

0 commit comments

Comments
 (0)