Skip to content

Commit 8ec703b

Browse files
committed
feat: disable GRPC compression
fix: properly setting user-agent header
1 parent 13b9538 commit 8ec703b

File tree

7 files changed

+425
-258
lines changed

7 files changed

+425
-258
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.5.0 [unreleased]
22

3+
### Features
4+
5+
1. [#289](https://github.com/InfluxCommunity/influxdb3-java/pull/289) Add the possibility to disable gRPC compression via the `disableGRPCCompression` parameter in the `ClientConfig`.
6+
37
### CI
48

59
1. [#283](https://github.com/InfluxCommunity/influxdb3-java/pull/283) Fix pipeline not downloading the correct java images.

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
* <li><code>authenticator</code> - HTTP proxy authenticator</li>
6868
* <li><code>headers</code> - headers to be added to requests</li>
6969
* <li><code>sslRootsFilePath</code> - path to the stored certificates file in PEM format</li>
70+
* <li><code>disableGRPCCompression</code> - disables the default gRPC compression header</li>
7071
* </ul>
7172
* <p>
7273
* If you want to create a client with custom configuration, you can use following code:
@@ -113,6 +114,7 @@ public final class ClientConfig {
113114
private final Authenticator authenticator;
114115
private final Map<String, String> headers;
115116
private final String sslRootsFilePath;
117+
private final boolean disableGRPCCompression;
116118

117119
/**
118120
* Deprecated use {@link #proxyUrl}.
@@ -318,6 +320,15 @@ public Map<String, String> getHeaders() {
318320
return headers;
319321
}
320322

323+
/**
324+
* Is gRPC compression disabled.
325+
*
326+
* @return true if gRPC compression is disabled
327+
*/
328+
public boolean getDisableGRPCCompression() {
329+
return disableGRPCCompression;
330+
}
331+
321332
/**
322333
* Validates the configuration properties.
323334
*/
@@ -354,7 +365,8 @@ public boolean equals(final Object o) {
354365
&& Objects.equals(proxyUrl, that.proxyUrl)
355366
&& Objects.equals(authenticator, that.authenticator)
356367
&& Objects.equals(headers, that.headers)
357-
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath);
368+
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath)
369+
&& disableGRPCCompression == that.disableGRPCCompression;
358370
}
359371

360372
@Override
@@ -363,7 +375,7 @@ public int hashCode() {
363375
database, writePrecision, gzipThreshold, writeNoSync,
364376
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
365377
proxy, proxyUrl, authenticator, headers,
366-
defaultTags, sslRootsFilePath);
378+
defaultTags, sslRootsFilePath, disableGRPCCompression);
367379
}
368380

369381
@Override
@@ -386,6 +398,7 @@ public String toString() {
386398
.add("headers=" + headers)
387399
.add("defaultTags=" + defaultTags)
388400
.add("sslRootsFilePath=" + sslRootsFilePath)
401+
.add("disableGRPCCompression=" + disableGRPCCompression)
389402
.toString();
390403
}
391404

@@ -415,6 +428,7 @@ public static final class Builder {
415428
private Authenticator authenticator;
416429
private Map<String, String> headers;
417430
private String sslRootsFilePath;
431+
private boolean disableGRPCCompression;
418432

419433
/**
420434
* Sets the URL of the InfluxDB server.
@@ -697,6 +711,18 @@ public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) {
697711
return this;
698712
}
699713

714+
/**
715+
* Sets whether to disable gRPC compression. Default is 'false'.
716+
*
717+
* @param disableGRPCCompression disable gRPC compression
718+
* @return this
719+
*/
720+
@Nonnull
721+
public Builder disableGRPCCompression(final boolean disableGRPCCompression) {
722+
this.disableGRPCCompression = disableGRPCCompression;
723+
return this;
724+
}
725+
700726
/**
701727
* Build an instance of {@code ClientConfig}.
702728
*
@@ -745,6 +771,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
745771
if (parameters.containsKey("writeNoSync")) {
746772
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
747773
}
774+
if (parameters.containsKey("disableGRPCCompression")) {
775+
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
776+
}
748777

749778
return new ClientConfig(this);
750779
}
@@ -807,6 +836,11 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
807836
long to = Long.parseLong(queryTimeout);
808837
this.queryTimeout(Duration.ofSeconds(to));
809838
}
839+
final String disableGRPCCompression = get.apply("INFLUX_DISABLE_GRPC_COMPRESSION",
840+
"influx.disableGRPCCompression");
841+
if (disableGRPCCompression != null) {
842+
this.disableGRPCCompression(Boolean.parseBoolean(disableGRPCCompression));
843+
}
810844

811845
return new ClientConfig(this);
812846
}
@@ -862,5 +896,6 @@ private ClientConfig(@Nonnull final Builder builder) {
862896
authenticator = builder.authenticator;
863897
headers = builder.headers;
864898
sslRootsFilePath = builder.sslRootsFilePath;
899+
disableGRPCCompression = builder.disableGRPCCompression;
865900
}
866901
}

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import com.fasterxml.jackson.core.JsonProcessingException;
4343
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import io.grpc.Codec;
45+
import io.grpc.DecompressorRegistry;
4446
import io.grpc.HttpConnectProxiedSocketAddress;
4547
import io.grpc.Metadata;
4648
import io.grpc.ProxyDetector;
@@ -93,8 +95,6 @@ final class FlightSqlClient implements AutoCloseable {
9395
defaultHeaders.put("Authorization", "Bearer " + new String(config.getToken()));
9496
}
9597

96-
defaultHeaders.put("User-Agent", Identity.getUserAgent());
97-
9898
if (config.getHeaders() != null) {
9999
defaultHeaders.putAll(config.getHeaders());
100100
}
@@ -148,6 +148,8 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
148148
URI uri = createLocation(config).getUri();
149149
final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort());
150150

151+
nettyChannelBuilder.userAgent(Identity.getUserAgent());
152+
151153
if (LocationSchemes.GRPC_TLS.equals(uri.getScheme())) {
152154
nettyChannelBuilder.useTransportSecurity();
153155

@@ -169,6 +171,11 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
169171
nettyChannelBuilder.maxTraceEvents(0)
170172
.maxInboundMetadataSize(Integer.MAX_VALUE);
171173

174+
if (config.getDisableGRPCCompression()) {
175+
nettyChannelBuilder.decompressorRegistry(DecompressorRegistry.emptyInstance()
176+
.with(Codec.Identity.NONE, false));
177+
}
178+
172179
return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
173180
}
174181

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client;
23+
24+
import java.net.URI;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import javax.annotation.Nonnull;
29+
30+
import org.apache.arrow.flight.FlightServer;
31+
import org.apache.arrow.flight.Location;
32+
import org.apache.arrow.flight.NoOpFlightProducer;
33+
import org.apache.arrow.flight.Ticket;
34+
import org.apache.arrow.memory.BufferAllocator;
35+
import org.apache.arrow.memory.RootAllocator;
36+
import org.apache.arrow.vector.VarCharVector;
37+
import org.apache.arrow.vector.VectorSchemaRoot;
38+
import org.apache.arrow.vector.types.pojo.ArrowType;
39+
import org.apache.arrow.vector.types.pojo.Field;
40+
import org.apache.arrow.vector.types.pojo.FieldType;
41+
import org.apache.arrow.vector.types.pojo.Schema;
42+
43+
public final class TestUtils {
44+
45+
private TestUtils() {
46+
throw new IllegalStateException("Utility class");
47+
}
48+
49+
public static FlightServer simpleFlightServer(@Nonnull final URI uri,
50+
@Nonnull final BufferAllocator allocator,
51+
@Nonnull final NoOpFlightProducer producer) throws Exception {
52+
Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort());
53+
return FlightServer.builder(allocator, location, producer).build();
54+
}
55+
56+
public static NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) {
57+
return new NoOpFlightProducer() {
58+
@Override
59+
public void getStream(final CallContext context,
60+
final Ticket ticket,
61+
final ServerStreamListener listener) {
62+
listener.start(vectorSchemaRoot);
63+
if (listener.isReady()) {
64+
listener.putNext();
65+
}
66+
listener.completed();
67+
}
68+
};
69+
}
70+
71+
public static VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) {
72+
List<Field> fields = new ArrayList<>();
73+
for (int i = 0; i < fieldCount; i++) {
74+
Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null);
75+
fields.add(field);
76+
}
77+
78+
Schema schema = new Schema(fields);
79+
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE));
80+
for (Field field : fields) {
81+
VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field);
82+
vector.allocateNew(rowCount);
83+
for (int i = 0; i < rowCount; i++) {
84+
vector.set(i, "Value".getBytes(StandardCharsets.UTF_8));
85+
}
86+
}
87+
vectorSchemaRoot.setRowCount(rowCount);
88+
89+
return vectorSchemaRoot;
90+
}
91+
}
92+

src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class ClientConfigTest {
4646
.queryTimeout(Duration.ofSeconds(120))
4747
.allowHttpRedirects(true)
4848
.disableServerCertificateValidation(true)
49-
.headers(Map.of("X-device", "ab-01"));
49+
.headers(Map.of("X-device", "ab-01"))
50+
.disableGRPCCompression(true);
5051

5152
@Test
5253
void equalConfig() {
@@ -81,6 +82,7 @@ void toStringConfig() {
8182
Assertions.assertThat(configString).contains("timeout=PT30S");
8283
Assertions.assertThat(configString).contains("writeTimeout=PT35S");
8384
Assertions.assertThat(configString).contains("queryTimeout=PT2M");
85+
Assertions.assertThat(configString).contains("disableGRPCCompression=true");
8486

8587
}
8688

@@ -131,10 +133,11 @@ void fromConnectionString() throws MalformedURLException {
131133

132134
cfg = new ClientConfig.Builder()
133135
.build("http://localhost:9999/"
134-
+ "?token=my-token&authScheme=my-auth");
136+
+ "?token=my-token&authScheme=my-auth&disableGRPCCompression=true");
135137
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
136138
Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray());
137139
Assertions.assertThat(cfg.getAuthScheme()).isEqualTo("my-auth");
140+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isEqualTo(true);
138141
}
139142

140143
@Test
@@ -204,7 +207,9 @@ void fromEnv() {
204207
"INFLUX_DATABASE", "my-db",
205208
"INFLUX_PRECISION", "ms",
206209
"INFLUX_GZIP_THRESHOLD", "64",
207-
"INFLUX_WRITE_NO_SYNC", "true"
210+
"INFLUX_WRITE_NO_SYNC", "true",
211+
"INFLUX_DISABLE_GRPC_COMPRESSION", "true"
212+
208213
);
209214
cfg = new ClientConfig.Builder()
210215
.build(env, null);
@@ -215,6 +220,7 @@ void fromEnv() {
215220
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
216221
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
217222
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
223+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
218224
}
219225

220226
@Test
@@ -318,6 +324,7 @@ void fromSystemProperties() {
318324
properties.put("influx.precision", "ms");
319325
properties.put("influx.gzipThreshold", "64");
320326
properties.put("influx.writeNoSync", "true");
327+
properties.put("influx.disableGRPCCompression", "true");
321328
cfg = new ClientConfig.Builder()
322329
.build(new HashMap<>(), properties);
323330
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
@@ -327,6 +334,7 @@ void fromSystemProperties() {
327334
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
328335
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
329336
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
337+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
330338
}
331339

332340
@Test

0 commit comments

Comments
 (0)