Skip to content

Commit 47ee962

Browse files
feat: set grpc options
1 parent f146795 commit 47ee962

File tree

8 files changed

+521
-143
lines changed

8 files changed

+521
-143
lines changed

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
* <li><code>authenticator</code> - HTTP proxy authenticator</li>
6262
* <li><code>headers</code> - headers to be added to requests</li>
6363
* <li><code>sslRootsFilePath</code> - path to the stored certificates file in PEM format</li>
64-
* <li><code>maxInboundMessageSize</code> - RPC maximum inbound message size that the client can receive</li>
6564
* </ul>
6665
* <p>
6766
* If you want to create a client with custom configuration, you can use following code:
@@ -103,7 +102,6 @@ public final class ClientConfig {
103102
private final Authenticator authenticator;
104103
private final Map<String, String> headers;
105104
private final String sslRootsFilePath;
106-
private final Integer maxInboundMessageSize;
107105

108106
/**
109107
* Deprecated use {@link #proxyUrl}.
@@ -242,16 +240,6 @@ public String getProxyUrl() {
242240
return proxyUrl;
243241
}
244242

245-
/**
246-
* Gets rpc max message size client can receive.
247-
*
248-
* @return the size in Integer, may be null
249-
*/
250-
@Nullable
251-
public Integer getMaxInboundMessageSize() {
252-
return maxInboundMessageSize;
253-
}
254-
255243
/**
256244
* Gets certificates file path.
257245
*
@@ -315,8 +303,7 @@ public boolean equals(final Object o) {
315303
&& Objects.equals(proxyUrl, that.proxyUrl)
316304
&& Objects.equals(authenticator, that.authenticator)
317305
&& Objects.equals(headers, that.headers)
318-
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath)
319-
&& Objects.equals(maxInboundMessageSize, that.maxInboundMessageSize);
306+
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath);
320307
}
321308

322309
@Override
@@ -325,7 +312,7 @@ public int hashCode() {
325312
database, writePrecision, gzipThreshold,
326313
timeout, allowHttpRedirects, disableServerCertificateValidation,
327314
proxy, proxyUrl, authenticator, headers,
328-
defaultTags, sslRootsFilePath, maxInboundMessageSize);
315+
defaultTags, sslRootsFilePath);
329316
}
330317

331318
@Override
@@ -345,7 +332,6 @@ public String toString() {
345332
.add("headers=" + headers)
346333
.add("defaultTags=" + defaultTags)
347334
.add("sslRootsFilePath=" + sslRootsFilePath)
348-
.add("maxInboundMessageSize=" + maxInboundMessageSize)
349335
.toString();
350336
}
351337

@@ -371,7 +357,6 @@ public static final class Builder {
371357
private Authenticator authenticator;
372358
private Map<String, String> headers;
373359
private String sslRootsFilePath;
374-
private Integer maxInboundMessageSize;
375360

376361
/**
377362
* Sets the URL of the InfluxDB server.
@@ -604,19 +589,6 @@ public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) {
604589
return this;
605590
}
606591

607-
/**
608-
* Set rpc max message size client can receive. Default is 'null'.
609-
*
610-
* @param maxInboundMessageSize The size in Integer
611-
* @return this
612-
*/
613-
@Nonnull
614-
public Builder maxInboundMessageSize(@Nullable final Integer maxInboundMessageSize) {
615-
616-
this.maxInboundMessageSize = maxInboundMessageSize;
617-
return this;
618-
}
619-
620592
/**
621593
* Build an instance of {@code ClientConfig}.
622594
*
@@ -756,6 +728,5 @@ private ClientConfig(@Nonnull final Builder builder) {
756728
authenticator = builder.authenticator;
757729
headers = builder.headers;
758730
sslRootsFilePath = builder.sslRootsFilePath;
759-
maxInboundMessageSize = builder.maxInboundMessageSize;
760731
}
761732
}

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import java.net.URISyntaxException;
2828
import java.nio.charset.StandardCharsets;
2929
import java.util.ArrayList;
30+
import java.util.Arrays;
3031
import java.util.HashMap;
3132
import java.util.Iterator;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.NoSuchElementException;
36+
import java.util.Objects;
3537
import java.util.Spliterator;
3638
import java.util.Spliterators;
3739
import java.util.stream.Stream;
@@ -49,6 +51,7 @@
4951
import io.netty.handler.ssl.SslContext;
5052
import io.netty.handler.ssl.SslContextBuilder;
5153
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
54+
import org.apache.arrow.flight.CallOption;
5255
import org.apache.arrow.flight.FlightClient;
5356
import org.apache.arrow.flight.FlightGrpcUtils;
5457
import org.apache.arrow.flight.FlightStream;
@@ -106,7 +109,8 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
106109
@Nonnull final String database,
107110
@Nonnull final QueryType queryType,
108111
@Nonnull final Map<String, Object> queryParameters,
109-
@Nonnull final Map<String, String> headers) {
112+
@Nonnull final Map<String, String> headers,
113+
final CallOption... callOption) {
110114

111115
Map<String, Object> ticketData = new HashMap<>() {{
112116
put("database", database);
@@ -126,8 +130,10 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
126130
}
127131

128132
HeaderCallOption headerCallOption = metadataHeader(headers);
133+
CallOption[] callOptions = concatCallOptions(callOption, headerCallOption);
134+
129135
Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
130-
FlightStream stream = client.getStream(ticket, headerCallOption);
136+
FlightStream stream = client.getStream(ticket, callOptions);
131137
FlightSqlIterator iterator = new FlightSqlIterator(stream);
132138

133139
Spliterator<VectorSchemaRoot> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
@@ -162,13 +168,8 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
162168
LOG.warn("proxy property in ClientConfig will not work in query api, use proxyUrl property instead");
163169
}
164170

165-
int maxInboundMessageSize = config.getMaxInboundMessageSize() != null
166-
?
167-
config.getMaxInboundMessageSize()
168-
: Integer.MAX_VALUE;
169171
nettyChannelBuilder.maxTraceEvents(0)
170-
.maxInboundMetadataSize(Integer.MAX_VALUE)
171-
.maxInboundMessageSize(maxInboundMessageSize);
172+
.maxInboundMetadataSize(Integer.MAX_VALUE);
172173

173174
return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
174175
}
@@ -236,6 +237,16 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina
236237
};
237238
}
238239

240+
@Nullable
241+
CallOption[] concatCallOptions(@Nullable final CallOption[] base, final CallOption... callOption) {
242+
if (base == null || base.length == 0) {
243+
return callOption;
244+
}
245+
List<CallOption> results = new ArrayList<>(List.of(base));
246+
Arrays.stream(callOption).filter(Objects::nonNull).forEach(results::add);
247+
return results.toArray(new CallOption[0]);
248+
}
249+
239250
private static final class FlightSqlIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
240251

241252
private final List<AutoCloseable> autoCloseable = new ArrayList<>();

0 commit comments

Comments
 (0)