Skip to content

Commit 8bf73e0

Browse files
wip
1 parent a3e0075 commit 8bf73e0

File tree

2 files changed

+90
-75
lines changed

2 files changed

+90
-75
lines changed

src/main/java/com/influxdb/v3/client/internal/ClientHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.influxdb.v3.client.internal;
22

3+
import io.netty.channel.ChannelHandler;
34
import io.netty.channel.ChannelHandlerContext;
45
import io.netty.channel.SimpleChannelInboundHandler;
56
import io.netty.handler.codec.http.FullHttpResponse;
@@ -10,6 +11,7 @@
1011

1112
import java.util.concurrent.CompletableFuture;
1213

14+
@ChannelHandler.Sharable
1315
public class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
1416

1517
private final CompletableFuture<FullHttpResponse> responseFuture = new CompletableFuture<>();
@@ -34,9 +36,17 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
3436
}
3537
}
3638
this.responseFuture.complete(msg.retain());
37-
39+
// System.out.println("channelRead0");
40+
// ctx.fireChannelReadComplete();
3841
}
3942

43+
// @Override
44+
// public void channelReadComplete(ChannelHandlerContext ctx) {
45+
//// System.out.println("channelReadComplete");
46+
//// ctx.flush();
47+
//// ctx.close().syncUninterruptibly();
48+
// }
49+
4050
@Override
4151
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4252
ctx.fireChannelInactive();
@@ -46,7 +56,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4656
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
4757
this.responseFuture.completeExceptionally(cause);
4858
cause.printStackTrace();
49-
ctx.close();
59+
// ctx.close();
5060
}
5161

5262
public CompletableFuture<FullHttpResponse> getResponseFuture() {

src/main/java/com/influxdb/v3/client/internal/RestClient.java

Lines changed: 78 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ final class RestClient implements AutoCloseable {
135135
}
136136
}
137137

138-
this.eventLoopGroup = new OioEventLoopGroup(1);
138+
this.eventLoopGroup = new OioEventLoopGroup();
139139

140140
// this.promise = this.eventLoopGroup.next().newPromise();
141141

@@ -217,45 +217,47 @@ public FullHttpResponse request(@Nonnull HttpMethod method,
217217
@Nullable byte[] body,
218218
@Nullable Map<String, String> queryParams)
219219
throws RuntimeException, InterruptedException, ExecutionException {
220-
var content = Unpooled.EMPTY_BUFFER;
221-
if (body != null) {
222-
content = Unpooled.copiedBuffer(body);
223-
}
220+
FullHttpResponse fullHttpResponse = null;
221+
try {
222+
var content = Unpooled.EMPTY_BUFFER;
223+
if (body != null) {
224+
content = Unpooled.copiedBuffer(body);
225+
}
224226

225-
String uri = path.startsWith("/") ? path : "/" + path;
226-
if (queryParams != null) {
227-
QueryStringEncoder queryStringEncoder = new QueryStringEncoder("/" + path);
228-
queryParams.forEach((key, value) -> {
229-
if (value != null) {
230-
queryStringEncoder.addParam(key, value);
231-
}
232-
});
233-
uri = queryStringEncoder.toString();
234-
}
227+
String uri = path.startsWith("/") ? path : "/" + path;
228+
if (queryParams != null) {
229+
QueryStringEncoder queryStringEncoder = new QueryStringEncoder("/" + path);
230+
queryParams.forEach((key, value) -> {
231+
if (value != null) {
232+
queryStringEncoder.addParam(key, value);
233+
}
234+
});
235+
uri = queryStringEncoder.toString();
236+
}
235237

236238

237-
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, content);
239+
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, content);
238240

239-
if (this.defaultHeaders != null) {
240-
this.defaultHeaders.forEach((s, s2) -> request.headers().set(s, s2));
241-
}
241+
if (this.defaultHeaders != null) {
242+
this.defaultHeaders.forEach((s, s2) -> request.headers().set(s, s2));
243+
}
242244

243-
request.headers().add("user-agent", this.userAgent);
244-
request.headers().add("host", String.format("%s:%d", this.host, this.port));
245-
request.headers().add("content-length", body == null ? "0" : body.length + "");
246-
if (this.config.getToken() != null && this.config.getToken().length > 0) {
247-
String authScheme = config.getAuthScheme();
248-
if (authScheme == null) {
249-
authScheme = "Token";
245+
request.headers().add("user-agent", this.userAgent);
246+
request.headers().add("host", String.format("%s:%d", this.host, this.port));
247+
request.headers().add("content-length", body == null ? "0" : body.length + "");
248+
if (this.config.getToken() != null && this.config.getToken().length > 0) {
249+
String authScheme = config.getAuthScheme();
250+
if (authScheme == null) {
251+
authScheme = "Token";
252+
}
253+
request.headers().add("authorization", String.format("%s %s", authScheme, new String(this.config.getToken())));
250254
}
251-
request.headers().add("authorization", String.format("%s %s", authScheme, new String(this.config.getToken())));
252-
}
253255

254-
request.headers().add("accept", "*/*");
256+
request.headers().add("accept", "*/*");
255257

256-
if (headers != null) {
257-
headers.forEach(request.headers()::set);
258-
}
258+
if (headers != null) {
259+
headers.forEach(request.headers()::set);
260+
}
259261

260262

261263
// if (this.channel == null || !this.channel.isOpen()) {
@@ -269,62 +271,65 @@ public FullHttpResponse request(@Nonnull HttpMethod method,
269271
// this.channel.writeAndFlush(request).sync();
270272
// }
271273

272-
//fixme refactor
273-
if (this.channel == null || !this.channel.isOpen()) {
274-
this.channel = getBootstrap().connect().syncUninterruptibly().channel();
274+
//fixme refactor
275+
if (this.channel == null || !this.channel.isOpen()) {
276+
this.channel = getBootstrap().connect().syncUninterruptibly().channel();
277+
}
275278

276279
//fixme remove syncUninterruptibly
277280
this.channel.writeAndFlush(request).syncUninterruptibly();
278-
} else {
279-
this.channel.writeAndFlush(request).syncUninterruptibly();
280-
}
281281

282-
FullHttpResponse fullHttpResponse = this.clientHandler.getResponseFuture().get();
283-
284-
// Extract headers into io.netty.handler.codec.http.HttpHeaders;
285-
HttpHeaders responseHeaders = new DefaultHttpHeaders();
286-
fullHttpResponse.headers().forEach(entry -> responseHeaders.add(entry.getKey(), entry.getValue()));
287-
int statusCode = fullHttpResponse.status().code();
288-
if (statusCode < 200 || statusCode >= 300) {
289-
String reason = "";
290-
var strContent = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
291-
if (!strContent.isEmpty()) {
292-
try {
293-
final JsonNode root = objectMapper.readTree(strContent);
294-
final List<String> possibilities = List.of("message", "error_message", "error");
295-
for (final String field : possibilities) {
296-
final JsonNode node = root.findValue(field);
297-
if (node != null) {
298-
reason = node.asText();
299-
break;
282+
fullHttpResponse = this.clientHandler.getResponseFuture().get();
283+
284+
// Extract headers into io.netty.handler.codec.http.HttpHeaders;
285+
HttpHeaders responseHeaders = new DefaultHttpHeaders();
286+
fullHttpResponse.headers().forEach(entry -> responseHeaders.add(entry.getKey(), entry.getValue()));
287+
int statusCode = fullHttpResponse.status().code();
288+
if (statusCode < 200 || statusCode >= 300) {
289+
String reason = "";
290+
var strContent = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
291+
if (!strContent.isEmpty()) {
292+
try {
293+
final JsonNode root = objectMapper.readTree(strContent);
294+
final List<String> possibilities = List.of("message", "error_message", "error");
295+
for (final String field : possibilities) {
296+
final JsonNode node = root.findValue(field);
297+
if (node != null) {
298+
reason = node.asText();
299+
break;
300+
}
300301
}
302+
} catch (JsonProcessingException e) {
303+
LOG.debug("Can't parse msg from response {}", fullHttpResponse);
301304
}
302-
} catch (JsonProcessingException e) {
303-
LOG.debug("Can't parse msg from response {}", fullHttpResponse);
304305
}
305-
}
306306

307-
if (reason.isEmpty()) {
308-
for (String s : List.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error")) {
309-
if (responseHeaders.contains(s.toLowerCase())) {
310-
reason = responseHeaders.get(s.toLowerCase());
311-
break;
307+
if (reason.isEmpty()) {
308+
for (String s : List.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error")) {
309+
if (responseHeaders.contains(s.toLowerCase())) {
310+
reason = responseHeaders.get(s.toLowerCase());
311+
break;
312+
}
312313
}
313314
}
314-
}
315315

316-
if (reason.isEmpty()) {
317-
reason = strContent;
318-
}
316+
if (reason.isEmpty()) {
317+
reason = strContent;
318+
}
319319

320-
if (reason.isEmpty()) {
321-
reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase();
322-
}
320+
if (reason.isEmpty()) {
321+
reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase();
322+
}
323323

324324

325-
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
325+
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
326326

327-
throw new InfluxDBApiNettyException(message, responseHeaders, statusCode);
327+
throw new InfluxDBApiNettyException(message, responseHeaders, statusCode);
328+
}
329+
} finally {
330+
if (this.channel != null && this.channel.isOpen()) {
331+
this.channel.close().sync();
332+
}
328333
}
329334

330335
return fullHttpResponse;

0 commit comments

Comments
 (0)