Skip to content

Commit f6bc28d

Browse files
committed
feat: disable GRPC compression
fix: properly setting user-agent header
1 parent 1cdf10a commit f6bc28d

File tree

7 files changed

+424
-258
lines changed

7 files changed

+424
-258
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## 1.5.0 [unreleased]
22

3+
### Features
4+
5+
1. [#289](https://github.com/InfluxCommunity/influxdb3-java/pull/289) Add the possibility to disable gRPC compression via the `disableGRPCCompression` parameter in the `ClientConfig`.
6+
37
### CI
48

59
1. [#283](https://github.com/InfluxCommunity/influxdb3-java/pull/283) Fix pipeline not downloading the correct java images.

src/main/java/com/influxdb/v3/client/config/ClientConfig.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public final class ClientConfig {
113113
private final Authenticator authenticator;
114114
private final Map<String, String> headers;
115115
private final String sslRootsFilePath;
116+
private final boolean disableGRPCCompression;
116117

117118
/**
118119
* Deprecated use {@link #proxyUrl}.
@@ -318,6 +319,15 @@ public Map<String, String> getHeaders() {
318319
return headers;
319320
}
320321

322+
/**
323+
* Is gRPC compression disabled.
324+
*
325+
* @return true if gRPC compression is disabled
326+
*/
327+
public boolean getDisableGRPCCompression() {
328+
return disableGRPCCompression;
329+
}
330+
321331
/**
322332
* Validates the configuration properties.
323333
*/
@@ -354,7 +364,8 @@ public boolean equals(final Object o) {
354364
&& Objects.equals(proxyUrl, that.proxyUrl)
355365
&& Objects.equals(authenticator, that.authenticator)
356366
&& Objects.equals(headers, that.headers)
357-
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath);
367+
&& Objects.equals(sslRootsFilePath, that.sslRootsFilePath)
368+
&& disableGRPCCompression == that.disableGRPCCompression;
358369
}
359370

360371
@Override
@@ -363,7 +374,7 @@ public int hashCode() {
363374
database, writePrecision, gzipThreshold, writeNoSync,
364375
timeout, writeTimeout, queryTimeout, allowHttpRedirects, disableServerCertificateValidation,
365376
proxy, proxyUrl, authenticator, headers,
366-
defaultTags, sslRootsFilePath);
377+
defaultTags, sslRootsFilePath, disableGRPCCompression);
367378
}
368379

369380
@Override
@@ -386,6 +397,7 @@ public String toString() {
386397
.add("headers=" + headers)
387398
.add("defaultTags=" + defaultTags)
388399
.add("sslRootsFilePath=" + sslRootsFilePath)
400+
.add("disableGRPCCompression=" + disableGRPCCompression)
389401
.toString();
390402
}
391403

@@ -415,6 +427,7 @@ public static final class Builder {
415427
private Authenticator authenticator;
416428
private Map<String, String> headers;
417429
private String sslRootsFilePath;
430+
private boolean disableGRPCCompression;
418431

419432
/**
420433
* Sets the URL of the InfluxDB server.
@@ -697,6 +710,18 @@ public Builder sslRootsFilePath(@Nullable final String sslRootsFilePath) {
697710
return this;
698711
}
699712

713+
/**
714+
* Sets whether to disable gRPC compression. Default is 'false'.
715+
*
716+
* @param disableGRPCCompression disable gRPC compression
717+
* @return this
718+
*/
719+
@Nonnull
720+
public Builder disableGRPCCompression(final boolean disableGRPCCompression) {
721+
this.disableGRPCCompression = disableGRPCCompression;
722+
return this;
723+
}
724+
700725
/**
701726
* Build an instance of {@code ClientConfig}.
702727
*
@@ -745,6 +770,9 @@ public ClientConfig build(@Nonnull final String connectionString) throws Malform
745770
if (parameters.containsKey("writeNoSync")) {
746771
this.writeNoSync(Boolean.parseBoolean(parameters.get("writeNoSync")));
747772
}
773+
if (parameters.containsKey("disableGRPCCompression")) {
774+
this.disableGRPCCompression(Boolean.parseBoolean(parameters.get("disableGRPCCompression")));
775+
}
748776

749777
return new ClientConfig(this);
750778
}
@@ -807,6 +835,11 @@ public ClientConfig build(@Nonnull final Map<String, String> env, final Properti
807835
long to = Long.parseLong(queryTimeout);
808836
this.queryTimeout(Duration.ofSeconds(to));
809837
}
838+
final String disableGRPCCompression = get.apply("INFLUX_DISABLE_GRPC_COMPRESSION",
839+
"influx.disableGRPCCompression");
840+
if (disableGRPCCompression != null) {
841+
this.disableGRPCCompression(Boolean.parseBoolean(disableGRPCCompression));
842+
}
810843

811844
return new ClientConfig(this);
812845
}
@@ -862,5 +895,6 @@ private ClientConfig(@Nonnull final Builder builder) {
862895
authenticator = builder.authenticator;
863896
headers = builder.headers;
864897
sslRootsFilePath = builder.sslRootsFilePath;
898+
disableGRPCCompression = builder.disableGRPCCompression;
865899
}
866900
}

src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import com.fasterxml.jackson.core.JsonProcessingException;
4343
import com.fasterxml.jackson.databind.ObjectMapper;
44+
import io.grpc.Codec;
45+
import io.grpc.DecompressorRegistry;
4446
import io.grpc.HttpConnectProxiedSocketAddress;
4547
import io.grpc.Metadata;
4648
import io.grpc.ProxyDetector;
@@ -93,8 +95,6 @@ final class FlightSqlClient implements AutoCloseable {
9395
defaultHeaders.put("Authorization", "Bearer " + new String(config.getToken()));
9496
}
9597

96-
defaultHeaders.put("User-Agent", Identity.getUserAgent());
97-
9898
if (config.getHeaders() != null) {
9999
defaultHeaders.putAll(config.getHeaders());
100100
}
@@ -148,6 +148,8 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
148148
URI uri = createLocation(config).getUri();
149149
final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(uri.getHost(), uri.getPort());
150150

151+
nettyChannelBuilder.userAgent(Identity.getUserAgent());
152+
151153
if (LocationSchemes.GRPC_TLS.equals(uri.getScheme())) {
152154
nettyChannelBuilder.useTransportSecurity();
153155

@@ -169,6 +171,11 @@ private FlightClient createFlightClient(@Nonnull final ClientConfig config) {
169171
nettyChannelBuilder.maxTraceEvents(0)
170172
.maxInboundMetadataSize(Integer.MAX_VALUE);
171173

174+
if (config.getDisableGRPCCompression()) {
175+
nettyChannelBuilder.decompressorRegistry(DecompressorRegistry.emptyInstance()
176+
.with(Codec.Identity.NONE, false));
177+
}
178+
172179
return FlightGrpcUtils.createFlightClient(new RootAllocator(Long.MAX_VALUE), nettyChannelBuilder.build());
173180
}
174181

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
* THE SOFTWARE.
21+
*/
22+
package com.influxdb.v3.client;
23+
24+
import java.net.URI;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import javax.annotation.Nonnull;
29+
30+
import org.apache.arrow.flight.FlightServer;
31+
import org.apache.arrow.flight.Location;
32+
import org.apache.arrow.flight.NoOpFlightProducer;
33+
import org.apache.arrow.flight.Ticket;
34+
import org.apache.arrow.memory.BufferAllocator;
35+
import org.apache.arrow.memory.RootAllocator;
36+
import org.apache.arrow.vector.VarCharVector;
37+
import org.apache.arrow.vector.VectorSchemaRoot;
38+
import org.apache.arrow.vector.types.pojo.ArrowType;
39+
import org.apache.arrow.vector.types.pojo.Field;
40+
import org.apache.arrow.vector.types.pojo.FieldType;
41+
import org.apache.arrow.vector.types.pojo.Schema;
42+
43+
public final class TestUtils {
44+
45+
private TestUtils() {
46+
throw new IllegalStateException("Utility class");
47+
}
48+
49+
public static FlightServer simpleFlightServer(@Nonnull final URI uri,
50+
@Nonnull final BufferAllocator allocator,
51+
@Nonnull final NoOpFlightProducer producer) throws Exception {
52+
Location location = Location.forGrpcInsecure(uri.getHost(), uri.getPort());
53+
return FlightServer.builder(allocator, location, producer).build();
54+
}
55+
56+
public static NoOpFlightProducer simpleProducer(@Nonnull final VectorSchemaRoot vectorSchemaRoot) {
57+
return new NoOpFlightProducer() {
58+
@Override
59+
public void getStream(final CallContext context,
60+
final Ticket ticket,
61+
final ServerStreamListener listener) {
62+
listener.start(vectorSchemaRoot);
63+
if (listener.isReady()) {
64+
listener.putNext();
65+
}
66+
listener.completed();
67+
}
68+
};
69+
}
70+
71+
public static VectorSchemaRoot generateVectorSchemaRoot(final int fieldCount, final int rowCount) {
72+
List<Field> fields = new ArrayList<>();
73+
for (int i = 0; i < fieldCount; i++) {
74+
Field field = new Field("field" + i, FieldType.nullable(new ArrowType.Utf8()), null);
75+
fields.add(field);
76+
}
77+
78+
Schema schema = new Schema(fields);
79+
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, new RootAllocator(Long.MAX_VALUE));
80+
for (Field field : fields) {
81+
VarCharVector vector = (VarCharVector) vectorSchemaRoot.getVector(field);
82+
vector.allocateNew(rowCount);
83+
for (int i = 0; i < rowCount; i++) {
84+
vector.set(i, "Value".getBytes(StandardCharsets.UTF_8));
85+
}
86+
}
87+
vectorSchemaRoot.setRowCount(rowCount);
88+
89+
return vectorSchemaRoot;
90+
}
91+
}
92+

src/test/java/com/influxdb/v3/client/config/ClientConfigTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class ClientConfigTest {
4646
.queryTimeout(Duration.ofSeconds(120))
4747
.allowHttpRedirects(true)
4848
.disableServerCertificateValidation(true)
49-
.headers(Map.of("X-device", "ab-01"));
49+
.headers(Map.of("X-device", "ab-01"))
50+
.disableGRPCCompression(true);
5051

5152
@Test
5253
void equalConfig() {
@@ -81,6 +82,7 @@ void toStringConfig() {
8182
Assertions.assertThat(configString).contains("timeout=PT30S");
8283
Assertions.assertThat(configString).contains("writeTimeout=PT35S");
8384
Assertions.assertThat(configString).contains("queryTimeout=PT2M");
85+
Assertions.assertThat(configString).contains("disableGRPCCompression=true");
8486

8587
}
8688

@@ -131,10 +133,11 @@ void fromConnectionString() throws MalformedURLException {
131133

132134
cfg = new ClientConfig.Builder()
133135
.build("http://localhost:9999/"
134-
+ "?token=my-token&authScheme=my-auth");
136+
+ "?token=my-token&authScheme=my-auth&disableGRPCCompression=true");
135137
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
136138
Assertions.assertThat(cfg.getToken()).isEqualTo("my-token".toCharArray());
137139
Assertions.assertThat(cfg.getAuthScheme()).isEqualTo("my-auth");
140+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isEqualTo(true);
138141
}
139142

140143
@Test
@@ -204,7 +207,9 @@ void fromEnv() {
204207
"INFLUX_DATABASE", "my-db",
205208
"INFLUX_PRECISION", "ms",
206209
"INFLUX_GZIP_THRESHOLD", "64",
207-
"INFLUX_WRITE_NO_SYNC", "true"
210+
"INFLUX_WRITE_NO_SYNC", "true",
211+
"INFLUX_DISABLE_GRPC_COMPRESSION", "true"
212+
208213
);
209214
cfg = new ClientConfig.Builder()
210215
.build(env, null);
@@ -215,6 +220,7 @@ void fromEnv() {
215220
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
216221
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
217222
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
223+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
218224
}
219225

220226
@Test
@@ -318,6 +324,7 @@ void fromSystemProperties() {
318324
properties.put("influx.precision", "ms");
319325
properties.put("influx.gzipThreshold", "64");
320326
properties.put("influx.writeNoSync", "true");
327+
properties.put("influx.disableGRPCCompression", "true");
321328
cfg = new ClientConfig.Builder()
322329
.build(new HashMap<>(), properties);
323330
Assertions.assertThat(cfg.getHost()).isEqualTo("http://localhost:9999/");
@@ -327,6 +334,7 @@ void fromSystemProperties() {
327334
Assertions.assertThat(cfg.getWritePrecision()).isEqualTo(WritePrecision.MS);
328335
Assertions.assertThat(cfg.getGzipThreshold()).isEqualTo(64);
329336
Assertions.assertThat(cfg.getWriteNoSync()).isEqualTo(true);
337+
Assertions.assertThat(cfg.getDisableGRPCCompression()).isTrue();
330338
}
331339

332340
@Test

0 commit comments

Comments
 (0)