Skip to content

Commit 2243c72

Browse files
refactor: set default max message size in constructor
1 parent 4f8b779 commit 2243c72

File tree

5 files changed

+158
-75
lines changed

5 files changed

+158
-75
lines changed

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@
2727
import java.net.URISyntaxException;
2828
import java.nio.charset.StandardCharsets;
2929
import java.util.ArrayList;
30-
import java.util.Arrays;
3130
import java.util.HashMap;
3231
import java.util.Iterator;
3332
import java.util.List;
3433
import java.util.Map;
3534
import java.util.NoSuchElementException;
36-
import java.util.Objects;
3735
import java.util.Spliterator;
3836
import java.util.Spliterators;
3937
import java.util.stream.Stream;
@@ -130,7 +128,7 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
130128
}
131129

132130
HeaderCallOption headerCallOption = metadataHeader(headers);
133-
CallOption[] callOptionArray = concatCallOptions(callOptions, headerCallOption);
131+
CallOption[] callOptionArray = GrpcCallOptions.mergeCallOptions(callOptions, headerCallOption);
134132

135133
Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
136134
FlightStream stream = client.getStream(ticket, callOptionArray);
@@ -237,16 +235,6 @@ ProxyDetector createProxyDetector(@Nonnull final String targetUrl, @Nonnull fina
237235
};
238236
}
239237

240-
@Nullable
241-
CallOption[] concatCallOptions(@Nullable final CallOption[] base, final CallOption... callOption) {
242-
if (base == null || base.length == 0) {
243-
return callOption;
244-
}
245-
List<CallOption> results = new ArrayList<>(List.of(base));
246-
Arrays.stream(callOption).filter(Objects::nonNull).forEach(results::add);
247-
return results.toArray(new CallOption[0]);
248-
}
249-
250238
private static final class FlightSqlIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
251239

252240
private final List<AutoCloseable> autoCloseable = new ArrayList<>();

src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
package com.influxdb.v3.client.internal;
2323

2424
import java.util.ArrayList;
25+
import java.util.Arrays;
2526
import java.util.List;
2627
import java.util.Objects;
28+
import java.util.Optional;
2729
import java.util.concurrent.Executor;
30+
import java.util.stream.Stream;
2831
import javax.annotation.Nonnull;
2932
import javax.annotation.Nullable;
3033

@@ -57,6 +60,22 @@ private GrpcCallOptions(@Nonnull final Builder builder) {
5760
this.callOptions = builder.callOptions.toArray(new CallOption[0]);
5861
}
5962

63+
/**
64+
* Merges two arrays of {@link CallOption} into a single array. The method combines the elements
65+
* from the baseCallOptions array and the additional callOptions array. If either of the input
66+
* arrays is null, it will be treated as an empty array.
67+
*
68+
* @param baseCallOptions the base array of {@link CallOption} instances, may be null
69+
* @param callOptions additional {@link CallOption} instances to be added, may also be null
70+
* @return a combined array containing all {@link CallOption} instances from both input arrays
71+
*/
72+
public static CallOption[] mergeCallOptions(@Nullable final CallOption[] baseCallOptions, final CallOption... callOptions) {
73+
return Stream.concat(
74+
Arrays.stream(Optional.ofNullable(baseCallOptions).orElse(new CallOption[0])),
75+
Arrays.stream(Optional.ofNullable(callOptions).orElse(new CallOption[0]))
76+
).toArray(CallOption[]::new);
77+
}
78+
6079
/**
6180
* Returns the absolute deadline for a call.
6281
*
@@ -178,8 +197,17 @@ public static final class Builder {
178197
private Integer maxOutboundMessageSize;
179198
private final List<CallOption> callOptions = new ArrayList<>();
180199

200+
/**
201+
* Constructs a new instance of the Builder with default values.
202+
* By default, the maximum inbound message size is set to the largest possible value.
203+
*/
204+
public Builder() {
205+
this.maxInboundMessageSize = Integer.MAX_VALUE;
206+
}
207+
181208
/**
182209
* Sets the absolute deadline for a rpc call.
210+
*
183211
* @param deadline The deadline
184212
* @return this
185213
*/
@@ -191,6 +219,7 @@ public Builder withDeadline(final @Nonnull Deadline deadline) {
191219
/**
192220
* Sets an {@code executor} to be used instead of the default
193221
* executor specified with {@link ManagedChannelBuilder#executor}.
222+
*
194223
* @param executor The executor
195224
* @return this
196225
*/
@@ -206,6 +235,7 @@ public Builder withExecutor(@Nonnull final Executor executor) {
206235
* <p>It is only safe to call this if the server supports the compression format chosen. There is
207236
* no negotiation performed; if the server does not support the compression chosen, the call will
208237
* fail.
238+
*
209239
* @param compressorName The compressor name
210240
* @return this
211241
*/
@@ -220,6 +250,7 @@ public Builder withCompressorName(@Nonnull final String compressorName) {
220250
* available. This may dramatically increase the latency of the RPC, but avoids failing
221251
* "unnecessarily." The default queues the RPC until an attempt to connect has completed, but
222252
* fails RPCs without sending them if unable to connect.
253+
*
223254
* @return this
224255
*/
225256
public Builder withWaitForReady() {
@@ -230,6 +261,7 @@ public Builder withWaitForReady() {
230261
/**
231262
* Sets the maximum allowed message size acceptable from the remote peer. If unset, this will
232263
* default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}.
264+
*
233265
* @param maxInboundMessageSize The max receive message size
234266
* @return this
235267
*/
@@ -240,6 +272,7 @@ public Builder withMaxInboundMessageSize(@Nonnull final Integer maxInboundMessag
240272

241273
/**
242274
* Sets the maximum allowed message size acceptable sent to the remote peer.
275+
*
243276
* @param maxOutboundMessageSize The maximum message send size
244277
* @return this
245278
*/

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
6565
private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter "
6666
+ "or use default configuration at 'ClientConfig.database'.";
6767

68-
private static final CallOption[] EMPTY_CALL_OPTIONS = new CallOption[0];
6968
private static final Map<String, Object> NO_PARAMETERS = Map.of();
7069
private static final List<Class<?>> ALLOWED_NAMED_PARAMETER_TYPES = List.of(
7170
String.class,
@@ -345,41 +344,22 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
345344
}
346345
});
347346

348-
CallOption[] queryCallOptions = createQueryCallOptions(options);
347+
CallOption[] callOptions = options.grpcCallOption() != null ? options.grpcCallOption().getCallOptions() : null;
349348
return flightSqlClient.execute(
350349
query,
351350
database,
352351
options.queryTypeSafe(),
353352
parameters,
354353
options.headersSafe(),
355-
queryCallOptions
354+
callOptions
356355
);
357356
}
358357

359-
/**
360-
* Creates an array of CallOption with some default CallOption.
361-
*
362-
* @param options the QueryOptions object
363-
* @return the array of CallOption
364-
*/
365-
@Nonnull
366-
CallOption[] createQueryCallOptions(@Nonnull final QueryOptions options) {
367-
GrpcCallOptions grpcCallOption = options.grpcCallOption();
368-
CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : EMPTY_CALL_OPTIONS;
369-
if (grpcCallOption == null || grpcCallOption.getMaxInboundMessageSize() == null) {
370-
callOptions = Stream.concat(
371-
Stream.of(maxInboundMessageCallOption()),
372-
Stream.of(callOptions))
373-
.toArray(CallOption[]::new);
374-
}
375-
return callOptions;
376-
}
377-
378358
@Nonnull
379359
private CallOption maxInboundMessageCallOption() {
380360
return new CallOptions.GrpcCallOption() {
381361
@Override
382-
public <T extends AbstractStub<T>> T wrapStub(T stub) {
362+
public <T extends AbstractStub<T>> T wrapStub(final T stub) {
383363
return stub.withMaxInboundMessageSize(Integer.MAX_VALUE);
384364
}
385365
};

src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,16 @@
2626
import java.util.Map;
2727

2828
import io.grpc.HttpConnectProxiedSocketAddress;
29-
import io.grpc.Metadata;
3029
import io.grpc.ProxyDetector;
3130
import io.grpc.internal.GrpcUtil;
3231
import org.apache.arrow.flight.CallHeaders;
3332
import org.apache.arrow.flight.CallInfo;
34-
import org.apache.arrow.flight.CallOption;
3533
import org.apache.arrow.flight.CallStatus;
3634
import org.apache.arrow.flight.FlightClient;
3735
import org.apache.arrow.flight.FlightClientMiddleware;
3836
import org.apache.arrow.flight.FlightServer;
39-
import org.apache.arrow.flight.HeaderCallOption;
4037
import org.apache.arrow.flight.Location;
4138
import org.apache.arrow.flight.NoOpFlightProducer;
42-
import org.apache.arrow.flight.grpc.MetadataAdapter;
4339
import org.apache.arrow.memory.RootAllocator;
4440
import org.assertj.core.api.Assertions;
4541
import org.junit.jupiter.api.AfterEach;
@@ -329,41 +325,6 @@ void createProxyDetector() {
329325
}
330326
}
331327

332-
@Test
333-
void concatCallOptions() throws Exception {
334-
ClientConfig clientConfig = new ClientConfig.Builder()
335-
.host("https://localhost:80")
336-
.build();
337-
try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig)) {
338-
Assertions.assertThatNoException().isThrownBy(() -> {
339-
CallOption[] results = flightSqlClient.concatCallOptions(null, null);
340-
Assertions.assertThat(results).isNull();
341-
});
342-
343-
MetadataAdapter metadata = new MetadataAdapter(new Metadata());
344-
metadata.insert("key1", "value1");
345-
HeaderCallOption headerCallOption = new HeaderCallOption(metadata);
346-
347-
CallOption[] callOptions = flightSqlClient.concatCallOptions(null, headerCallOption);
348-
Assertions.assertThat(callOptions).isNotNull();
349-
Assertions.assertThat(callOptions.length).isEqualTo(1);
350-
351-
callOptions = flightSqlClient.concatCallOptions(new CallOption[]{headerCallOption});
352-
Assertions.assertThat(callOptions).isNotNull();
353-
Assertions.assertThat(callOptions.length).isEqualTo(1);
354-
355-
GrpcCallOptions grpcCallOption = new GrpcCallOptions.Builder()
356-
.withMaxOutboundMessageSize(1)
357-
.withCompressorName("gzip")
358-
.build();
359-
360-
callOptions = flightSqlClient.concatCallOptions(grpcCallOption.getCallOptions(), headerCallOption);
361-
Assertions.assertThat(callOptions).isNotNull();
362-
// This equals to 4 because we always have a default maxInboundMessageSize
363-
Assertions.assertThat(callOptions.length).isEqualTo(3);
364-
}
365-
}
366-
367328
static class CallHeadersMiddleware implements FlightClientMiddleware.Factory {
368329
CallHeaders headers;
369330

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client.internal;
23+
24+
import io.grpc.stub.AbstractStub;
25+
import org.apache.arrow.flight.CallOption;
26+
import org.apache.arrow.flight.CallOptions;
27+
import org.junit.jupiter.api.Test;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertNotNull;
31+
32+
class GrpcCallOptionsTest {
33+
34+
@Test
35+
void testNotSetMaxInboundMessageSize() {
36+
GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder().build();
37+
assertNotNull(grpcCallOptions);
38+
assertEquals(Integer.MAX_VALUE, grpcCallOptions.getMaxInboundMessageSize());
39+
}
40+
41+
@Test
42+
void testSetMaxInboundMessageSize() {
43+
GrpcCallOptions grpcCallOptions = new GrpcCallOptions.Builder()
44+
.withMaxInboundMessageSize(2000)
45+
.build();
46+
assertNotNull(grpcCallOptions);
47+
assertEquals(2000, grpcCallOptions.getMaxInboundMessageSize());
48+
}
49+
50+
@Test
51+
void testMergeCallOptionsWithBothNonNullArrays() {
52+
CallOption option1 = callOption();
53+
CallOption option2 = callOption();
54+
CallOption[] baseCallOptions = {option1};
55+
CallOption[] additionalCallOptions = {option2};
56+
57+
CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions);
58+
59+
assertNotNull(result);
60+
assertEquals(2, result.length);
61+
assertEquals(option1, result[0]);
62+
assertEquals(option2, result[1]);
63+
}
64+
65+
@Test
66+
void testMergeCallOptionsWithBaseCallOptionsNull() {
67+
CallOption option1 = callOption();
68+
CallOption option2 = callOption();
69+
CallOption[] baseCallOptions = null;
70+
CallOption[] additionalCallOptions = {option1, option2};
71+
72+
CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions);
73+
74+
assertNotNull(result);
75+
assertEquals(2, result.length);
76+
assertEquals(option1, result[0]);
77+
assertEquals(option2, result[1]);
78+
}
79+
80+
@Test
81+
void testMergeCallOptionsWithAdditionalCallOptionsNull() {
82+
CallOption option1 = callOption();
83+
CallOption[] baseCallOptions = {option1};
84+
CallOption[] additionalCallOptions = null;
85+
86+
CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions);
87+
88+
assertNotNull(result);
89+
assertEquals(1, result.length);
90+
assertEquals(option1, result[0]);
91+
}
92+
93+
@Test
94+
void testMergeCallOptionsWithBothArraysNull() {
95+
CallOption[] result = GrpcCallOptions.mergeCallOptions(null, null);
96+
97+
assertNotNull(result);
98+
assertEquals(0, result.length);
99+
}
100+
101+
@Test
102+
void testMergeCallOptionsWithEmptyArrays() {
103+
CallOption[] baseCallOptions = {};
104+
CallOption[] additionalCallOptions = {};
105+
106+
CallOption[] result = GrpcCallOptions.mergeCallOptions(baseCallOptions, additionalCallOptions);
107+
108+
assertNotNull(result);
109+
assertEquals(0, result.length);
110+
}
111+
112+
private CallOption callOption() {
113+
return new CallOptions.GrpcCallOption() {
114+
@Override
115+
public <T extends AbstractStub<T>> T wrapStub(final T stub) {
116+
return stub.withMaxInboundMessageSize(Integer.MAX_VALUE);
117+
}
118+
};
119+
}
120+
121+
}

0 commit comments

Comments
 (0)