Skip to content

Commit e2a9906

Browse files
feat: netty for rest client
1 parent 057604a commit e2a9906

File tree

24 files changed

+1348
-500
lines changed

24 files changed

+1348
-500
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import io.netty.channel.ChannelInitializer;
4+
import io.netty.channel.ChannelPipeline;
5+
import io.netty.channel.socket.SocketChannel;
6+
import io.netty.handler.codec.http.FullHttpResponse;
7+
import io.netty.handler.codec.http.HttpClientCodec;
8+
import io.netty.handler.codec.http.HttpObjectAggregator;
9+
import io.netty.handler.ssl.SslContext;
10+
import io.netty.util.concurrent.Promise;
11+
12+
import javax.annotation.Nonnull;
13+
import javax.annotation.Nullable;
14+
15+
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
16+
17+
private final SslContext sslCtx;
18+
19+
private final Promise<FullHttpResponse> promise;
20+
21+
private final String host;
22+
23+
private final Integer port;
24+
25+
public ClientChannelInitializer(@Nonnull String host, @Nonnull Integer port, @Nonnull Promise<FullHttpResponse> promise, @Nullable SslContext sslCtx) {
26+
this.sslCtx = sslCtx;
27+
this.promise = promise;
28+
this.host = host;
29+
this.port = port;
30+
}
31+
32+
@Override
33+
public void initChannel(SocketChannel ch) {
34+
ChannelPipeline p = ch.pipeline();
35+
if (sslCtx != null) {
36+
p.addLast("ssl", sslCtx.newHandler(ch.alloc(), host, port));
37+
}
38+
p.addLast(new HttpClientCodec());
39+
p.addLast(new HttpObjectAggregator(1048576));
40+
p.addLast(new ClientHandler(this.promise));
41+
}
42+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.SimpleChannelInboundHandler;
5+
import io.netty.handler.codec.http.*;
6+
import io.netty.util.CharsetUtil;
7+
import io.netty.util.concurrent.Promise;
8+
9+
public class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
10+
11+
private final Promise<FullHttpResponse> promise;
12+
13+
public ClientHandler(Promise<FullHttpResponse> promise) {
14+
this.promise = promise;
15+
}
16+
17+
@Override
18+
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
19+
// if (msg instanceof HttpResponse) {
20+
// HttpResponse response = (HttpResponse) msg;
21+
// System.err.println("{ START CONTENT");
22+
// }
23+
// if (msg instanceof HttpContent) {
24+
// HttpContent content = (HttpContent) msg;
25+
// System.err.print(content.content().toString(CharsetUtil.UTF_8));
26+
// System.err.flush();
27+
//
28+
// if (content instanceof LastHttpContent) {
29+
// System.err.println("} END OF CONTENT");
30+
// }
31+
// }
32+
// System.out.println(msg);
33+
this.promise.trySuccess(msg.retain());
34+
35+
}
36+
37+
@Override
38+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
39+
this.promise.tryFailure(cause);
40+
cause.printStackTrace();
41+
ctx.close();
42+
}
43+
44+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import com.influxdb.v3.client.InfluxDBClient;
4+
import com.influxdb.v3.client.Point;
5+
import com.influxdb.v3.client.config.ClientConfig;
6+
import io.netty.handler.codec.http.FullHttpResponse;
7+
import io.netty.handler.codec.http.HttpMethod;
8+
9+
import javax.net.ssl.SSLException;
10+
import java.net.URISyntaxException;
11+
import java.util.UUID;
12+
import java.util.concurrent.ExecutionException;
13+
14+
15+
public class Main {
16+
public static void main(String[] args) throws URISyntaxException, SSLException, ExecutionException, InterruptedException {
17+
ClientConfig clientConfig = configCloud();
18+
19+
var testId = UUID.randomUUID().toString();
20+
// Temporarily use `ClientConfig` as a constructor argument.
21+
try (RestClient restClient = new RestClient(clientConfig)) {
22+
23+
// Get server version.
24+
System.out.println("Server version: " + restClient.getServerVersion());
25+
26+
// Write data
27+
System.out.println("Write data with testId " + testId);
28+
var p = Point.measurement("cpu_sonnh")
29+
.setTag("host", "server1")
30+
.setField("usage_idle", 90.0f)
31+
.setField("testId", testId);
32+
var lineProtocol = p.toLineProtocol();
33+
restClient.write(lineProtocol);
34+
35+
// Read data
36+
System.out.println("Read data with testId " + testId);
37+
String query = String.format("SELECT * FROM \"cpu_sonnh\" WHERE \"testId\" = '%s'", testId);
38+
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
39+
var stream = influxDBClient.queryPoints(query);
40+
stream.findFirst().ifPresent(pointValues -> System.out.println("Field usage_idle: " + pointValues.getField("usage_idle")));
41+
42+
} catch (Exception e) {
43+
throw new RuntimeException(e);
44+
}
45+
}
46+
47+
public static ClientConfig configCloud() {
48+
String url = System.getenv("TESTING_INFLUXDB_URL");
49+
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
50+
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
51+
return new ClientConfig.Builder()
52+
.host(url)
53+
.token(token.toCharArray())
54+
.database(database)
55+
.build();
56+
}
57+
58+
// This is a docker container ran from scripts/influxdb-setup.sh, get the token and database, url from that script
59+
public static ClientConfig configLocal() {
60+
String url = "localhost";
61+
String token = "apiv3_sMYBS-vRxl6UDMylb7m2u64G6R7g61jlGL76XnUJY3EaN4MD0tZd4DZOBhe6j-dYtoVhrC6PqGgI9Xiv8d3Psw";
62+
String database = System.getenv("bucket0");
63+
return new ClientConfig.Builder()
64+
.host(url)
65+
.token(token.toCharArray())
66+
.database(database)
67+
.build();
68+
}
69+
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.JsonNode;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import com.influxdb.v3.client.InfluxDBApiHttpException;
7+
import com.influxdb.v3.client.config.ClientConfig;
8+
import com.influxdb.v3.client.internal.Identity;
9+
import io.netty.bootstrap.Bootstrap;
10+
import io.netty.buffer.Unpooled;
11+
import io.netty.channel.Channel;
12+
import io.netty.channel.ChannelOption;
13+
import io.netty.channel.EventLoopGroup;
14+
import io.netty.channel.MultiThreadIoEventLoopGroup;
15+
import io.netty.channel.nio.NioIoHandler;
16+
import io.netty.channel.socket.nio.NioSocketChannel;
17+
import io.netty.handler.codec.http.*;
18+
import io.netty.handler.ssl.SslContext;
19+
import io.netty.handler.ssl.SslContextBuilder;
20+
import io.netty.util.CharsetUtil;
21+
import io.netty.util.concurrent.Promise;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import javax.annotation.Nonnull;
26+
import javax.annotation.Nullable;
27+
import javax.net.ssl.SSLException;
28+
import java.net.URI;
29+
import java.net.URISyntaxException;
30+
import java.nio.charset.StandardCharsets;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.ExecutionException;
35+
36+
public final class RestClient implements AutoCloseable {
37+
38+
private Channel channel;
39+
40+
private final EventLoopGroup eventLoopGroup;
41+
42+
private final Promise<FullHttpResponse> promise;
43+
44+
private final Map<String, String> defaultHeader = new HashMap<>();
45+
46+
private final Integer port;
47+
48+
private final String host;
49+
50+
private SslContext sslCtx;
51+
52+
private final ObjectMapper objectMapper = new ObjectMapper();
53+
54+
private final ClientConfig config;
55+
56+
private final Map<String, String> defaultHeaders;
57+
58+
final String userAgent;
59+
60+
final String baseUrl;
61+
62+
private static final Logger LOG = LoggerFactory.getLogger(com.influxdb.v3.netty.rest.RestClient.class);
63+
64+
public RestClient(ClientConfig config) throws URISyntaxException, SSLException {
65+
URI uri = new URI(config.getHost());
66+
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
67+
int port = uri.getPort();
68+
if (port == -1) {
69+
if ("http".equalsIgnoreCase(scheme)) {
70+
port = 80;
71+
} else if ("https".equalsIgnoreCase(scheme)) {
72+
port = 443;
73+
}
74+
}
75+
this.port = port;
76+
this.host = uri.getHost();
77+
78+
this.baseUrl = host.endsWith("/") ? host : String.format("%s/", host);
79+
80+
// user agent version
81+
this.userAgent = Identity.getUserAgent();
82+
83+
this.config = config;
84+
85+
if ("https".equalsIgnoreCase(scheme)) {
86+
this.sslCtx = SslContextBuilder.forClient().build();
87+
}
88+
89+
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
90+
91+
this.promise = this.eventLoopGroup.next().newPromise();
92+
93+
this.defaultHeaders = config.getHeaders() != null ? Map.copyOf(config.getHeaders()) : null;
94+
95+
this.channel = createChannel();
96+
}
97+
98+
public Channel createChannel() {
99+
//fixme handler follow-redirect
100+
int timeoutMillis = (int) this.config.getWriteTimeout().toMillis();
101+
Bootstrap b = new Bootstrap();
102+
return b.group(this.eventLoopGroup)
103+
.channel(NioSocketChannel.class)
104+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
105+
.handler(new ClientChannelInitializer(this.host, this.port, this.promise, this.sslCtx))
106+
.remoteAddress(this.host, this.port)
107+
.connect()
108+
.syncUninterruptibly()
109+
.channel();
110+
}
111+
112+
public String getServerVersion() throws InterruptedException, ExecutionException, JsonProcessingException {
113+
FullHttpResponse response = this.request(HttpMethod.GET, "/ping", null, null);
114+
115+
String version = response.headers().get("x-influxdb-version");
116+
if (version == null) {
117+
return "unknown";
118+
}
119+
//fixme get version from the body
120+
return version;
121+
}
122+
123+
public void write(String lineProtocol) throws InterruptedException, ExecutionException, JsonProcessingException {
124+
var header = new HashMap<String, String>();
125+
header.put("content-type", "text/plain; charset=utf-8");
126+
127+
header.put("content-length", String.valueOf(lineProtocol.getBytes(StandardCharsets.UTF_8).length));
128+
header.putAll(this.defaultHeader);
129+
130+
QueryStringEncoder encoder = new QueryStringEncoder("/api/v2/write");
131+
encoder.addParam("bucket", "bucket0");
132+
encoder.addParam("precision", "ns");
133+
134+
this.request(HttpMethod.POST, encoder.toString(), header, lineProtocol);
135+
}
136+
137+
public FullHttpResponse request(@Nonnull HttpMethod method, @Nonnull String path, @Nullable Map<String, String> header, @Nullable String body) throws InterruptedException, ExecutionException, JsonProcessingException {
138+
var content = Unpooled.EMPTY_BUFFER;
139+
if (body != null) {
140+
content = Unpooled.copiedBuffer(body, CharsetUtil.UTF_8);
141+
}
142+
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, path, content);
143+
144+
if (this.defaultHeaders != null) {
145+
this.defaultHeaders.forEach((s, s2) -> request.headers().set(s, s2));
146+
}
147+
148+
request.headers().set("user-agent", this.userAgent);
149+
request.headers().set("host", this.host);
150+
if (this.config.getToken() != null && this.config.getToken().length > 0) {
151+
String authScheme = config.getAuthScheme();
152+
if (authScheme == null) {
153+
authScheme = "Token";
154+
}
155+
request.headers().set("authorization", String.format("%s %s", authScheme, new String(this.config.getToken())));
156+
}
157+
request.headers().set("connection", "closed");
158+
159+
if (header != null) {
160+
header.forEach(request.headers()::set);
161+
}
162+
163+
164+
if (!this.channel.isOpen()) {
165+
this.channel = createChannel();
166+
}
167+
168+
this.channel.writeAndFlush(request).channel().closeFuture().sync();
169+
FullHttpResponse fullHttpResponse = this.promise.get();
170+
171+
int statusCode = fullHttpResponse.status().code();
172+
if (statusCode < 200 || statusCode >= 300) {
173+
String reason = "";
174+
var jsonString = fullHttpResponse.content().toString(CharsetUtil.UTF_8);
175+
if (!jsonString.isEmpty()) {
176+
try {
177+
final JsonNode root = objectMapper.readTree(jsonString);
178+
final List<String> possibilities = List.of("message", "error_message", "error");
179+
for (final String field : possibilities) {
180+
final JsonNode node = root.findValue(field);
181+
if (node != null) {
182+
reason = node.asText();
183+
break;
184+
}
185+
}
186+
} catch (JsonProcessingException e) {
187+
LOG.debug("Can't parse msg from response {}", fullHttpResponse);
188+
}
189+
}
190+
191+
if (reason.isEmpty()) {
192+
for (String s : List.of("X-Platform-Error-Code", "X-Influx-Error", "X-InfluxDb-Error")) {
193+
if (fullHttpResponse.headers().contains(s.toLowerCase())) {
194+
reason = fullHttpResponse.headers().get(s);
195+
break;
196+
}
197+
}
198+
}
199+
200+
// if (reason.isEmpty()) {
201+
// reason = body;
202+
// }
203+
204+
if (reason.isEmpty()) {
205+
reason = HttpResponseStatus.valueOf(statusCode).reasonPhrase();
206+
}
207+
String message = String.format("HTTP status code: %d; Message: %s", statusCode, reason);
208+
throw new InfluxDBApiHttpException(message, null, statusCode);
209+
}
210+
211+
return fullHttpResponse;
212+
}
213+
214+
@Override
215+
public void close() {
216+
channel.close();
217+
eventLoopGroup.shutdownGracefully();
218+
}
219+
}
220+
221+

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@
174174
<version>${netty-handler.version}</version>
175175
</dependency>
176176

177+
<dependency>
178+
<groupId>io.netty</groupId>
179+
<artifactId>netty-handler-proxy</artifactId>
180+
<version>${netty-handler.version}</version>
181+
</dependency>
182+
177183
<dependency>
178184
<groupId>io.netty</groupId>
179185
<artifactId>netty-buffer</artifactId>

0 commit comments

Comments
 (0)