Skip to content

Commit 5c4a328

Browse files
wip
1 parent ac7ca27 commit 5c4a328

File tree

3 files changed

+48
-51
lines changed

3 files changed

+48
-51
lines changed

src/main/java/com/influxdb/v3/client/internal/ClientChannelInitializer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import io.netty.channel.ChannelHandler;
44
import io.netty.channel.ChannelInitializer;
55
import io.netty.channel.ChannelPipeline;
6-
import io.netty.channel.socket.SocketChannel;
6+
import io.netty.channel.socket.oio.OioSocketChannel;
77
import io.netty.handler.codec.http.HttpClientCodec;
88
import io.netty.handler.codec.http.HttpObjectAggregator;
99
import io.netty.handler.logging.LogLevel;
@@ -15,7 +15,7 @@
1515
import javax.annotation.Nonnull;
1616
import javax.annotation.Nullable;
1717

18-
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
18+
public class ClientChannelInitializer extends ChannelInitializer<OioSocketChannel> {
1919

2020
private final SslContext sslCtx;
2121

@@ -44,7 +44,7 @@ public ClientChannelInitializer(@Nonnull String host,
4444
}
4545

4646
@Override
47-
public void initChannel(SocketChannel ch) {
47+
public void initChannel(OioSocketChannel ch) {
4848
ChannelPipeline p = ch.pipeline();
4949
p.addLast(new LoggingHandler(LogLevel.INFO));
5050
if (proxyHandler != null) {

src/main/java/com/influxdb/v3/client/internal/RestClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public FullHttpResponse request(@Nonnull HttpMethod method,
259259

260260

261261
if (this.channel == null || !this.channel.isOpen()) {
262-
ChannelFuture channelFuture = getBootstrap().connect();
262+
ChannelFuture channelFuture = getBootstrap().connect().sync();
263263
if (!channelFuture.await(this.timeout.toMillis(), TimeUnit.MILLISECONDS)) {
264264
throw new InfluxDBApiException(new ConnectTimeoutException());
265265
} else {
@@ -268,7 +268,7 @@ public FullHttpResponse request(@Nonnull HttpMethod method,
268268
}
269269

270270
//fixme remove syncUninterruptibly
271-
this.channel.writeAndFlush(request).syncUninterruptibly();
271+
this.channel.writeAndFlush(request).sync();
272272

273273
FullHttpResponse fullHttpResponse = this.clientHandler.getResponseFuture().get();
274274

src/test/java/com/influxdb/v3/client/integration/E2ETest.java

Lines changed: 43 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,20 @@
2121
*/
2222
package com.influxdb.v3.client.integration;
2323

24+
import com.influxdb.v3.client.InfluxDBClient;
25+
import com.influxdb.v3.client.Point;
26+
import com.influxdb.v3.client.PointValues;
27+
import com.influxdb.v3.client.config.ClientConfig;
28+
import com.influxdb.v3.client.query.QueryOptions;
29+
import com.influxdb.v3.client.write.WriteOptions;
30+
import com.influxdb.v3.client.write.WritePrecision;
31+
import org.apache.arrow.flight.FlightRuntimeException;
32+
import org.assertj.core.api.Assertions;
33+
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
35+
36+
import javax.annotation.Nonnull;
37+
import javax.net.ssl.SSLException;
2438
import java.math.BigInteger;
2539
import java.net.ConnectException;
2640
import java.net.URISyntaxException;
@@ -35,22 +49,6 @@
3549
import java.util.logging.Logger;
3650
import java.util.stream.Collectors;
3751
import java.util.stream.Stream;
38-
import javax.annotation.Nonnull;
39-
import javax.net.ssl.SSLException;
40-
41-
import io.netty.handler.proxy.HttpProxyHandler;
42-
import org.apache.arrow.flight.FlightRuntimeException;
43-
import org.assertj.core.api.Assertions;
44-
import org.junit.jupiter.api.Test;
45-
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
46-
47-
import com.influxdb.v3.client.InfluxDBClient;
48-
import com.influxdb.v3.client.Point;
49-
import com.influxdb.v3.client.PointValues;
50-
import com.influxdb.v3.client.config.ClientConfig;
51-
import com.influxdb.v3.client.query.QueryOptions;
52-
import com.influxdb.v3.client.write.WriteOptions;
53-
import com.influxdb.v3.client.write.WritePrecision;
5452

5553
import static org.junit.jupiter.api.Assumptions.assumeFalse;
5654

@@ -85,20 +83,16 @@ void testQueryWithProxy() throws URISyntaxException, SSLException {
8583
.proxyUrl(proxyUrl)
8684
.build();
8785

88-
var testId = UUID.randomUUID().toString();
8986
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
9087
influxDBClient.writePoint(
91-
Point.measurement("test5")
92-
.setTag("tag", "tagValue1")
88+
Point.measurement("test1")
9389
.setField("field", "field1")
94-
.setField("testId", testId)
9590
);
9691

97-
String query = String.format("SELECT * FROM \"test5\" WHERE \"testId\" = '%s'", testId);
98-
try (Stream<PointValues> stream = influxDBClient.queryPoints(query)) {
92+
try (Stream<PointValues> stream = influxDBClient.queryPoints("SELECT * FROM test1")) {
9993
stream.findFirst()
10094
.ifPresent(pointValues -> {
101-
Assertions.assertThat(pointValues.getField("testId")).isEqualTo(testId);
95+
Assertions.assertThat(pointValues.getField("field")).isEqualTo("field1");
10296
});
10397
}
10498
}
@@ -401,27 +395,27 @@ public void testNoAllocatorMemoryLeak() {
401395
Assertions.assertThatNoException().isThrownBy(() -> {
402396

403397
try (InfluxDBClient client = InfluxDBClient.getInstance(
404-
System.getenv("TESTING_INFLUXDB_URL"),
405-
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
406-
System.getenv("TESTING_INFLUXDB_DATABASE"),
407-
null)) {
398+
System.getenv("TESTING_INFLUXDB_URL"),
399+
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
400+
System.getenv("TESTING_INFLUXDB_DATABASE"),
401+
null)) {
408402

409403
List<Point> points = List.of(
410-
Point.measurement(measurement)
411-
.setTag("type", "test")
412-
.setFloatField("rads", 3.14)
413-
.setIntegerField("life", 42)
414-
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
415-
Point.measurement(measurement)
416-
.setTag("type", "test")
417-
.setFloatField("rads", 3.14)
418-
.setIntegerField("life", 42)
419-
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
420-
Point.measurement(measurement)
421-
.setTag("type", "test")
422-
.setFloatField("rads", 3.14)
423-
.setIntegerField("life", 42)
424-
.setTimestamp(now));
404+
Point.measurement(measurement)
405+
.setTag("type", "test")
406+
.setFloatField("rads", 3.14)
407+
.setIntegerField("life", 42)
408+
.setTimestamp(now.minus(2, ChronoUnit.SECONDS)),
409+
Point.measurement(measurement)
410+
.setTag("type", "test")
411+
.setFloatField("rads", 3.14)
412+
.setIntegerField("life", 42)
413+
.setTimestamp(now.minus(1, ChronoUnit.SECONDS)),
414+
Point.measurement(measurement)
415+
.setTag("type", "test")
416+
.setFloatField("rads", 3.14)
417+
.setIntegerField("life", 42)
418+
.setTimestamp(now));
425419

426420
client.writePoints(points);
427421
String query = "SELECT * FROM " + measurement;
@@ -433,10 +427,10 @@ public void testNoAllocatorMemoryLeak() {
433427
// Test to ensure FlightStream was closed even though two more records
434428
// remain in the stream
435429
stream.findFirst()
436-
.ifPresent(pointValues -> {
437-
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
438-
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
439-
});
430+
.ifPresent(pointValues -> {
431+
Assertions.assertThat(pointValues.getField("life")).isEqualTo(42L);
432+
Assertions.assertThat(pointValues.getField("rads")).isEqualTo(3.14);
433+
});
440434
}
441435
}
442436
});
@@ -487,8 +481,11 @@ public void testMultipleQueries() throws Exception {
487481
}
488482
}
489483
}
484+
485+
490486
}
491487

488+
492489
private void assertGetDataSuccess(@Nonnull final InfluxDBClient influxDBClient) {
493490
influxDBClient.writePoint(
494491
Point.measurement("test1")

0 commit comments

Comments
 (0)