Skip to content

Commit c6e07c5

Browse files
feat: query api proxy
1 parent 8db709b commit c6e07c5

File tree

6 files changed

+308
-8
lines changed

6 files changed

+308
-8
lines changed

examples/docker-compose.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
version: "3"
2+
3+
services:
4+
envoy:
5+
image: envoyproxy/envoy:v1.26-latest
6+
volumes:
7+
- ./envoy.yaml:/etc/envoy/envoy.yaml
8+
ports:
9+
- "10000:10000"
10+
environment:
11+
- ENVOY_UID=0
12+

examples/envoy.yaml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
static_resources:
2+
listeners:
3+
- name: listener_0
4+
address:
5+
socket_address: { address: 0.0.0.0, port_value: 10000 }
6+
filter_chains:
7+
- filter_chain_match:
8+
filters:
9+
- name: envoy.filters.network.http_connection_manager
10+
typed_config:
11+
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
12+
stat_prefix: ingress_http
13+
access_log:
14+
- name: envoy.access_loggers.stdout
15+
typed_config:
16+
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
17+
http2_protocol_options:
18+
allow_connect: true
19+
upgrade_configs:
20+
- upgrade_type: CONNECT
21+
http_filters:
22+
- name: envoy.filters.http.router
23+
typed_config:
24+
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
25+
26+
route_config:
27+
name: local_route
28+
virtual_hosts:
29+
- name: local_service
30+
domains: [ "*" ]
31+
routes:
32+
- match:
33+
connect_matcher: { }
34+
route:
35+
cluster: influxdb_cluster
36+
upgrade_configs:
37+
upgrade_type: CONNECT
38+
connect_config: { }
39+
- match:
40+
prefix: "/"
41+
route:
42+
cluster: influxdb_cluster
43+
prefix_rewrite: "/"
44+
auto_host_rewrite: true
45+
timeout: 10s
46+
cors:
47+
allow_origin_string_match:
48+
- prefix: "*"
49+
allow_methods: GET, PUT, DELETE, POST, OPTIONS
50+
clusters:
51+
- name: influxdb_cluster
52+
connect_timeout: 10s
53+
type: STRICT_DNS
54+
load_assignment:
55+
cluster_name: influxdb_cluster
56+
endpoints:
57+
- lb_endpoints:
58+
- endpoint:
59+
address:
60+
socket_address:
61+
address: "us-east-1-1.aws.cloud2.influxdata.com"
62+
port_value: 443
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.influxdb.v3;
2+
3+
import java.net.InetSocketAddress;
4+
import java.net.ProxySelector;
5+
import java.net.URI;
6+
import java.util.stream.Stream;
7+
8+
import io.grpc.HttpConnectProxiedSocketAddress;
9+
import io.grpc.ProxyDetector;
10+
11+
import com.influxdb.v3.client.InfluxDBClient;
12+
import com.influxdb.v3.client.Point;
13+
import com.influxdb.v3.client.PointValues;
14+
import com.influxdb.v3.client.config.ClientConfig;
15+
16+
public class ProxyExample {
17+
public static void main(final String[] args) throws Exception {
18+
// Run docker-compose.yml file to start Envoy proxy
19+
20+
URI queryProxyUri = new URI("proxyUrl");
21+
URI uri = new URI(System.getenv("url"));
22+
23+
ProxyDetector proxyDetector = (targetServerAddress) -> {
24+
InetSocketAddress targetAddress = (InetSocketAddress) targetServerAddress;
25+
if (uri.getHost().equals(targetAddress.getHostString())) {
26+
return HttpConnectProxiedSocketAddress.newBuilder()
27+
.setProxyAddress(new InetSocketAddress(queryProxyUri.getHost(), queryProxyUri.getPort()))
28+
.setTargetAddress(targetAddress)
29+
.build();
30+
}
31+
return null;
32+
};
33+
ProxySelector proxy = ProxySelector.of(new InetSocketAddress(queryProxyUri.getHost(), queryProxyUri.getPort()));
34+
ClientConfig clientConfig = new ClientConfig.Builder()
35+
.host(uri.toString())
36+
.token(System.getenv("token").toCharArray())
37+
.database(System.getenv("database"))
38+
.proxy(proxy)
39+
.queryApiProxy(proxyDetector)
40+
.build();
41+
42+
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
43+
influxDBClient.writePoint(
44+
Point.measurement("test1")
45+
.setField("field", "field1")
46+
);
47+
48+
try (Stream<PointValues> stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
49+
stream.findFirst()
50+
.ifPresent(pointValues -> {
51+
// do something
52+
});
53+
}
54+
}
55+
}
56+

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import javax.annotation.Nonnull;
3737
import javax.annotation.Nullable;
3838

39+
import io.grpc.ProxyDetector;
40+
3941
import com.influxdb.v3.client.write.WritePrecision;
4042

4143
/**
@@ -58,6 +60,7 @@
5860
* disable server certificate validation for HTTPS connections
5961
* </li>
6062
* <li><code>proxy</code> - HTTP proxy selector</li>
63+
* <li><code>queryApiProxy</code> - HTTP query detector</li>
6164
* <li><code>authenticator</code> - HTTP proxy authenticator</li>
6265
* <li><code>headers</code> - headers to be added to requests</li>
6366
* </ul>
@@ -97,6 +100,7 @@ public final class ClientConfig {
97100
private final Boolean allowHttpRedirects;
98101
private final Boolean disableServerCertificateValidation;
99102
private final ProxySelector proxy;
103+
private final ProxyDetector queryApiProxy;
100104
private final Authenticator authenticator;
101105
private final Map<String, String> headers;
102106

@@ -219,6 +223,16 @@ public ProxySelector getProxy() {
219223
return proxy;
220224
}
221225

226+
/**
227+
* Gets the proxy for query api.
228+
*
229+
* @return the proxy, may be null
230+
*/
231+
@Nullable
232+
public ProxyDetector getQueryApiProxy() {
233+
return queryApiProxy;
234+
}
235+
222236
/**
223237
* Gets the (proxy) authenticator.
224238
*
@@ -269,6 +283,7 @@ public boolean equals(final Object o) {
269283
&& Objects.equals(allowHttpRedirects, that.allowHttpRedirects)
270284
&& Objects.equals(disableServerCertificateValidation, that.disableServerCertificateValidation)
271285
&& Objects.equals(proxy, that.proxy)
286+
&& Objects.equals(queryApiProxy, that.queryApiProxy)
272287
&& Objects.equals(authenticator, that.authenticator)
273288
&& Objects.equals(headers, that.headers);
274289
}
@@ -278,7 +293,7 @@ public int hashCode() {
278293
return Objects.hash(host, Arrays.hashCode(token), authScheme, organization,
279294
database, writePrecision, gzipThreshold,
280295
timeout, allowHttpRedirects, disableServerCertificateValidation,
281-
proxy, authenticator, headers,
296+
proxy, queryApiProxy, authenticator, headers,
282297
defaultTags);
283298
}
284299

@@ -294,6 +309,7 @@ public String toString() {
294309
.add("allowHttpRedirects=" + allowHttpRedirects)
295310
.add("disableServerCertificateValidation=" + disableServerCertificateValidation)
296311
.add("proxy=" + proxy)
312+
.add("queryApiProxy=" + queryApiProxy)
297313
.add("authenticator=" + authenticator)
298314
.add("headers=" + headers)
299315
.add("defaultTags=" + defaultTags)
@@ -318,6 +334,7 @@ public static final class Builder {
318334
private Boolean allowHttpRedirects;
319335
private Boolean disableServerCertificateValidation;
320336
private ProxySelector proxy;
337+
private ProxyDetector queryApiProxy;
321338
private Authenticator authenticator;
322339
private Map<String, String> headers;
323340

@@ -480,6 +497,19 @@ public Builder proxy(@Nullable final ProxySelector proxy) {
480497
return this;
481498
}
482499

500+
/**
501+
* Sets the proxy detector for query api. Default is 'null'.
502+
*
503+
* @param proxy Proxy detector.
504+
* @return this
505+
*/
506+
@Nonnull
507+
public Builder queryApiProxy(@Nullable final ProxyDetector proxy) {
508+
509+
this.queryApiProxy = proxy;
510+
return this;
511+
}
512+
483513
/**
484514
* Sets the proxy authenticator. Default is 'null'.
485515
*
@@ -658,6 +688,7 @@ private ClientConfig(@Nonnull final Builder builder) {
658688
disableServerCertificateValidation = builder.disableServerCertificateValidation != null
659689
? builder.disableServerCertificateValidation : false;
660690
proxy = builder.proxy;
691+
queryApiProxy = builder.queryApiProxy;
661692
authenticator = builder.authenticator;
662693
headers = builder.headers;
663694
}

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.influxdb.v3.client.internal;
2323

24+
import java.lang.reflect.InvocationTargetException;
2425
import java.net.URI;
2526
import java.net.URISyntaxException;
2627
import java.nio.charset.StandardCharsets;
@@ -30,20 +31,30 @@
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.NoSuchElementException;
34+
import java.util.Objects;
3335
import java.util.Spliterator;
3436
import java.util.Spliterators;
3537
import java.util.stream.Stream;
3638
import java.util.stream.StreamSupport;
3739
import javax.annotation.Nonnull;
3840
import javax.annotation.Nullable;
41+
import javax.net.ssl.SSLException;
3942

4043
import com.fasterxml.jackson.core.JsonProcessingException;
4144
import com.fasterxml.jackson.databind.ObjectMapper;
4245
import io.grpc.Metadata;
46+
import io.grpc.netty.GrpcSslContexts;
47+
import io.grpc.netty.NettyChannelBuilder;
48+
import io.netty.channel.EventLoopGroup;
49+
import io.netty.channel.ServerChannel;
50+
import io.netty.handler.ssl.SslContextBuilder;
51+
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
4352
import org.apache.arrow.flight.FlightClient;
53+
import org.apache.arrow.flight.FlightGrpcUtils;
4454
import org.apache.arrow.flight.FlightStream;
4555
import org.apache.arrow.flight.HeaderCallOption;
4656
import org.apache.arrow.flight.Location;
57+
import org.apache.arrow.flight.LocationSchemes;
4758
import org.apache.arrow.flight.Ticket;
4859
import org.apache.arrow.flight.grpc.MetadataAdapter;
4960
import org.apache.arrow.memory.RootAllocator;
@@ -83,9 +94,11 @@ final class FlightSqlClient implements AutoCloseable {
8394
defaultHeaders.putAll(config.getHeaders());
8495
}
8596

86-
Package mainPackage = RestClient.class.getPackage();
87-
88-
this.client = (client != null) ? client : createFlightClient(config);
97+
this.client = Objects.requireNonNullElseGet(client, () -> config.getQueryApiProxy() != null
98+
?
99+
createFlightWithQueryProxy(config)
100+
:
101+
createFlightClient(config));
89102
}
90103

91104
@Nonnull
@@ -137,6 +150,85 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
137150
.build();
138151
}
139152

153+
public FlightClient createFlightWithQueryProxy(@Nonnull final ClientConfig config) {
154+
final NettyChannelBuilder nettyChannelBuilder;
155+
Location location = createLocation(config);
156+
switch (location.getUri().getScheme()) {
157+
case LocationSchemes.GRPC:
158+
case LocationSchemes.GRPC_INSECURE:
159+
case LocationSchemes.GRPC_TLS: {
160+
nettyChannelBuilder = NettyChannelBuilder.forTarget(location.getUri().getHost());
161+
break;
162+
}
163+
case LocationSchemes.GRPC_DOMAIN_SOCKET: {
164+
// The implementation is platform-specific, so we have to find the classes at runtime
165+
nettyChannelBuilder = NettyChannelBuilder.forTarget(location.getUri().getHost());
166+
try {
167+
try {
168+
// Linux
169+
nettyChannelBuilder.channelType(
170+
Class.forName("io.netty.channel.epoll.EpollDomainSocketChannel")
171+
.asSubclass(ServerChannel.class));
172+
final EventLoopGroup elg =
173+
Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
174+
.asSubclass(EventLoopGroup.class)
175+
.getDeclaredConstructor()
176+
.newInstance();
177+
nettyChannelBuilder.eventLoopGroup(elg);
178+
} catch (ClassNotFoundException e) {
179+
// BSD
180+
nettyChannelBuilder.channelType(
181+
Class.forName("io.netty.channel.kqueue.KQueueDomainSocketChannel")
182+
.asSubclass(ServerChannel.class));
183+
final EventLoopGroup elg =
184+
Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
185+
.asSubclass(EventLoopGroup.class)
186+
.getDeclaredConstructor()
187+
.newInstance();
188+
nettyChannelBuilder.eventLoopGroup(elg);
189+
}
190+
} catch (ClassNotFoundException
191+
| InstantiationException
192+
| IllegalAccessException
193+
| NoSuchMethodException
194+
| InvocationTargetException e) {
195+
throw new UnsupportedOperationException(
196+
"Could not find suitable Netty native transport implementation for domain socket address.");
197+
}
198+
break;
199+
}
200+
default:
201+
throw new IllegalArgumentException(
202+
"Scheme is not supported: " + location.getUri().getScheme());
203+
}
204+
205+
if (LocationSchemes.GRPC_TLS.equals(location.getUri().getScheme())) {
206+
nettyChannelBuilder.useTransportSecurity();
207+
208+
final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
209+
210+
if (config.getDisableServerCertificateValidation()) {
211+
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
212+
}
213+
214+
try {
215+
nettyChannelBuilder.sslContext(sslContextBuilder.build());
216+
} catch (SSLException e) {
217+
throw new RuntimeException(e);
218+
}
219+
}
220+
221+
if (config.getQueryApiProxy() != null) {
222+
nettyChannelBuilder.proxyDetector(config.getQueryApiProxy());
223+
}
224+
225+
nettyChannelBuilder.maxTraceEvents(0)
226+
.maxInboundMessageSize(Integer.MAX_VALUE)
227+
.maxInboundMetadataSize(Integer.MAX_VALUE);
228+
229+
return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
230+
}
231+
140232
@Nonnull
141233
private Location createLocation(@Nonnull final ClientConfig config) {
142234
try {

0 commit comments

Comments
 (0)