Skip to content

Commit 8ee25c6

Browse files
refactor: remove default inbound message size
1 parent 659b6e8 commit 8ee25c6

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

src/main/java/com/influxdb/v3/client/internal/GrpcCallOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public static final class Builder {
174174
private Executor executor;
175175
private String compressorName;
176176
private Boolean waitForReady;
177-
private Integer maxInboundMessageSize = Integer.MAX_VALUE;
177+
private Integer maxInboundMessageSize;
178178
private Integer maxOutboundMessageSize;
179179
private final List<CallOption> callOptions = new ArrayList<>();
180180

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
import javax.annotation.Nonnull;
3838
import javax.annotation.Nullable;
3939

40+
import io.grpc.stub.AbstractStub;
4041
import io.netty.handler.codec.http.HttpMethod;
4142
import org.apache.arrow.flight.CallOption;
43+
import org.apache.arrow.flight.CallOptions;
4244
import org.apache.arrow.vector.FieldVector;
4345
import org.apache.arrow.vector.VectorSchemaRoot;
4446

@@ -63,6 +65,7 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
6365
private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter "
6466
+ "or use default configuration at 'ClientConfig.database'.";
6567

68+
private static final CallOption[] EMPTY_CALL_OPTIONS = new CallOption[0];
6669
private static final Map<String, Object> NO_PARAMETERS = Map.of();
6770
private static final List<Class<?>> ALLOWED_NAMED_PARAMETER_TYPES = List.of(
6871
String.class,
@@ -342,18 +345,47 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
342345
}
343346
});
344347

345-
GrpcCallOptions grpcCallOption = options.grpcCallOption();
346-
CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : null;
348+
CallOption[] queryCallOptions = createQueryCallOptions(options);
347349
return flightSqlClient.execute(
348350
query,
349351
database,
350352
options.queryTypeSafe(),
351353
parameters,
352354
options.headersSafe(),
353-
callOptions
355+
queryCallOptions
354356
);
355357
}
356358

359+
/**
360+
* Creates an array of CallOption with some default CallOption.
361+
*
362+
* @param options the QueryOptions object
363+
* @return the array of CallOption
364+
*/
365+
@Nonnull
366+
CallOption[] createQueryCallOptions(@Nonnull final QueryOptions options) {
367+
GrpcCallOptions grpcCallOption = options.grpcCallOption();
368+
CallOption[] callOptions = grpcCallOption != null ? grpcCallOption.getCallOptions() : EMPTY_CALL_OPTIONS;
369+
if (grpcCallOption == null || grpcCallOption.getMaxInboundMessageSize() == null) {
370+
callOptions = Stream.concat(
371+
Stream.of(maxInboundMessageCallOption()),
372+
Stream.of(callOptions))
373+
.toArray(CallOption[]::new);
374+
}
375+
return callOptions;
376+
}
377+
378+
@Nonnull
379+
private CallOption maxInboundMessageCallOption() {
380+
return new CallOptions.GrpcCallOption() {
381+
@Override
382+
public <T extends AbstractStub<T>> T wrapStub(T stub) {
383+
return stub.withMaxInboundMessageSize(Integer.MAX_VALUE);
384+
}
385+
};
386+
}
387+
388+
357389
@Nonnull
358390
private byte[] gzipData(@Nonnull final byte[] data) throws IOException {
359391
final ByteArrayOutputStream out = new ByteArrayOutputStream();

0 commit comments

Comments
 (0)