Skip to content

Commit 56021a9

Browse files
feat: add Netty-based server and client examples with REST client integration.
1 parent 057604a commit 56021a9

File tree

6 files changed

+581
-0
lines changed

6 files changed

+581
-0
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package com.influxdb.v3.netty;
2+
3+
import com.influxdb.v3.client.InfluxDBClient;
4+
import com.influxdb.v3.client.Point;
5+
import com.influxdb.v3.client.config.ClientConfig;
6+
import io.netty.bootstrap.ServerBootstrap;
7+
import io.netty.buffer.ByteBuf;
8+
import io.netty.buffer.Unpooled;
9+
import io.netty.channel.*;
10+
import io.netty.channel.nio.NioEventLoopGroup;
11+
import io.netty.channel.socket.SocketChannel;
12+
import io.netty.channel.socket.nio.NioServerSocketChannel;
13+
import io.netty.handler.codec.compression.CompressionOptions;
14+
import io.netty.handler.codec.http.*;
15+
import io.netty.util.CharsetUtil;
16+
17+
import javax.net.ssl.SSLException;
18+
19+
import java.util.UUID;
20+
21+
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
22+
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
23+
import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
24+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
25+
26+
public class Netty {
27+
28+
public static void main(String[] args) throws InterruptedException, SSLException {
29+
// ClientConfig clientConfig = configLocal();
30+
ClientConfig clientConfig = configCloud();
31+
32+
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
33+
34+
// Version
35+
// System.out.println("Server version: " + influxDBClient.getServerVersion());
36+
37+
// Write
38+
// var testId = UUID.randomUUID().toString();
39+
// var p = Point.measurement("cpu_sonnh")
40+
// .setTag("host", "server1")
41+
// .setField("usage_idle", 90.0f)
42+
// .setField("testId", testId);
43+
// System.out.println("testId: " + testId + "");
44+
// influxDBClient.writePoint(p);
45+
46+
47+
System.out.println("--------------------");
48+
49+
NettyClient nettyClient = new NettyClient(clientConfig);
50+
51+
}
52+
53+
public static ClientConfig configCloud() {
54+
String url = System.getenv("TESTING_INFLUXDB_URL");
55+
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
56+
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
57+
return new ClientConfig.Builder()
58+
.host(url)
59+
.token(token.toCharArray())
60+
.database(database)
61+
.build();
62+
}
63+
64+
public static ClientConfig configLocal() {
65+
String url = "localhost";
66+
String token = "apiv3_sMYBS-vRxl6UDMylb7m2u64G6R7g61jlGL76XnUJY3EaN4MD0tZd4DZOBhe6j-dYtoVhrC6PqGgI9Xiv8d3Psw";
67+
String database = System.getenv("bucket0");
68+
return new ClientConfig.Builder()
69+
.host(url)
70+
.token(token.toCharArray())
71+
.database(database)
72+
.build();
73+
}
74+
75+
public void test() throws InterruptedException {
76+
System.out.println("Netty");
77+
var port = 8080;
78+
EventLoopGroup bossGroup = new NioEventLoopGroup();
79+
EventLoopGroup workerGroup = new NioEventLoopGroup();
80+
try {
81+
ServerBootstrap b = new ServerBootstrap();
82+
b.group(bossGroup, workerGroup)
83+
.channel(NioServerSocketChannel.class)
84+
.childHandler(new ChannelInitializer<SocketChannel>() {
85+
@Override
86+
public void initChannel(SocketChannel ch) {
87+
ch.pipeline().addLast(
88+
new HttpServerCodec(),
89+
new HttpContentCompressor((CompressionOptions[]) null),
90+
new HttpServerExpectContinueHandler(),
91+
new FirstHandler()
92+
93+
);
94+
}
95+
})
96+
.option(ChannelOption.SO_BACKLOG, 128);
97+
98+
b.bind(port)
99+
.sync()
100+
.channel()
101+
.closeFuture()
102+
.sync();
103+
} finally {
104+
workerGroup.shutdownGracefully();
105+
bossGroup.shutdownGracefully();
106+
}
107+
}
108+
}
109+
110+
111+
class FirstHandler extends SimpleChannelInboundHandler<HttpObject> {
112+
113+
@Override
114+
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
115+
final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', '1'};
116+
117+
if (msg instanceof HttpRequest) {
118+
HttpRequest req = (HttpRequest) msg;
119+
120+
FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), OK,
121+
Unpooled.wrappedBuffer(CONTENT));
122+
response.headers()
123+
.set(CONTENT_TYPE, TEXT_PLAIN)
124+
.setInt(CONTENT_LENGTH, response.content().readableBytes());
125+
126+
127+
ChannelFuture f = ctx.writeAndFlush(response);
128+
}
129+
}
130+
131+
@Override
132+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
133+
cause.printStackTrace();
134+
ctx.close();
135+
ctx.fireExceptionCaught(cause);
136+
}
137+
138+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.influxdb.v3.netty;
2+
3+
import com.influxdb.v3.client.Point;
4+
import com.influxdb.v3.client.config.ClientConfig;
5+
import io.netty.bootstrap.Bootstrap;
6+
import io.netty.buffer.ByteBuf;
7+
import io.netty.buffer.Unpooled;
8+
import io.netty.channel.*;
9+
import io.netty.channel.nio.NioIoHandler;
10+
import io.netty.channel.socket.SocketChannel;
11+
import io.netty.channel.socket.nio.NioSocketChannel;
12+
import io.netty.handler.codec.http.*;
13+
import io.netty.handler.logging.LogLevel;
14+
import io.netty.handler.logging.LoggingHandler;
15+
import io.netty.handler.ssl.SslContext;
16+
import io.netty.handler.ssl.SslContextBuilder;
17+
import io.netty.util.CharsetUtil;
18+
19+
import javax.net.ssl.SSLException;
20+
import java.util.UUID;
21+
22+
public class NettyClient {
23+
24+
public NettyClient(ClientConfig config) throws InterruptedException, SSLException {
25+
SslContext sslCtx = SslContextBuilder.forClient().build();
26+
27+
EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
28+
try {
29+
Bootstrap b = new Bootstrap();
30+
b.group(group)
31+
.channel(NioSocketChannel.class)
32+
.handler(new LoggingHandler(LogLevel.DEBUG))
33+
.handler(new ChannelInitializer<SocketChannel>() {
34+
@Override
35+
public void initChannel(SocketChannel ch) throws Exception {
36+
ChannelPipeline p = ch.pipeline();
37+
p.addLast("ssl", sslCtx.newHandler(ch.alloc(), "us-east-1-1.aws.cloud2.influxdata.com", 443));
38+
p.addLast(
39+
new HttpClientCodec(),
40+
new HttpObjectAggregator(1048576),
41+
new CHander()
42+
);
43+
}
44+
});
45+
46+
// How to handler host + port properly
47+
// URI uri = new URI(config.getHost());
48+
ChannelFuture f = b.connect("us-east-1-1.aws.cloud2.influxdata.com", 443).sync();
49+
Channel ch = f.channel();
50+
51+
52+
var testId = UUID.randomUUID().toString();
53+
System.out.println("testId: " + testId + "");
54+
55+
var p = Point.measurement("cpu_sonnh")
56+
.setTag("host", "server1")
57+
.setField("usage_idle", 90.0f)
58+
.setField("testId", testId);
59+
var txt = p.toLineProtocol();
60+
ByteBuf content = Unpooled.copiedBuffer(txt, CharsetUtil.UTF_8);
61+
62+
var writePath = "/api/v2/write";
63+
var pingPath = "/ping";
64+
QueryStringEncoder encoder = new QueryStringEncoder(writePath);
65+
encoder.addParam("bucket", "bucket0");
66+
encoder.addParam("precision", "ns");
67+
String uriWithParams = encoder.toString();
68+
69+
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriWithParams, content);
70+
request.headers().set(HttpHeaderNames.HOST, "us-east-1-1.aws.cloud2.influxdata.com");
71+
request.headers().set(HttpHeaderNames.AUTHORIZATION, "Token lDAtMRmhnLp5GjWNVBsieufUb66XZAPxvX3etlmi9wmeq7ispWoL06mwnxmY_BtHKoBhG4lR-c7WfrFgUXy15w==");
72+
// request.headers().set(HttpHeaderNames.CONNECTION, "close");
73+
// request.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br, deflate, gzip, x-gzip");
74+
request.headers().set(HttpHeaderNames.ACCEPT, "*/*");
75+
request.headers().set(HttpHeaderNames.USER_AGENT, "influxdb3-java/unknown");
76+
request.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
77+
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
78+
79+
ch.writeAndFlush(request);
80+
81+
f.channel().closeFuture().sync();
82+
} finally {
83+
group.shutdownGracefully();
84+
}
85+
}
86+
87+
}
88+
89+
class CHander extends SimpleChannelInboundHandler<FullHttpResponse> {
90+
91+
@Override
92+
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
93+
System.out.println(msg);
94+
}
95+
}
96+
97+
class Out extends ChannelOutboundHandlerAdapter {
98+
99+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import io.netty.channel.ChannelInitializer;
4+
import io.netty.channel.ChannelPipeline;
5+
import io.netty.channel.socket.SocketChannel;
6+
import io.netty.handler.codec.http.FullHttpResponse;
7+
import io.netty.handler.codec.http.HttpClientCodec;
8+
import io.netty.handler.codec.http.HttpObjectAggregator;
9+
import io.netty.handler.ssl.SslContext;
10+
import io.netty.util.concurrent.Promise;
11+
12+
import javax.annotation.Nonnull;
13+
import javax.annotation.Nullable;
14+
15+
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
16+
17+
private final SslContext sslCtx;
18+
19+
private final Promise<FullHttpResponse> promise;
20+
21+
private final String host;
22+
23+
private final Integer port;
24+
25+
public ClientChannelInitializer(@Nonnull String host, @Nonnull Integer port, @Nonnull Promise<FullHttpResponse> promise, @Nullable SslContext sslCtx) {
26+
this.sslCtx = sslCtx;
27+
this.promise = promise;
28+
this.host = host;
29+
this.port = port;
30+
}
31+
32+
@Override
33+
public void initChannel(SocketChannel ch) {
34+
ChannelPipeline p = ch.pipeline();
35+
if (sslCtx != null) {
36+
p.addLast("ssl", sslCtx.newHandler(ch.alloc(), host, port));
37+
}
38+
p.addLast(new HttpClientCodec());
39+
p.addLast(new HttpObjectAggregator(1048576));
40+
p.addLast(new ClientHandler(this.promise));
41+
}
42+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.SimpleChannelInboundHandler;
5+
import io.netty.handler.codec.http.*;
6+
import io.netty.util.CharsetUtil;
7+
import io.netty.util.concurrent.Promise;
8+
9+
public class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
10+
11+
private final Promise<FullHttpResponse> promise;
12+
13+
public ClientHandler(Promise<FullHttpResponse> promise) {
14+
this.promise = promise;
15+
}
16+
17+
@Override
18+
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
19+
// if (msg instanceof HttpResponse) {
20+
// HttpResponse response = (HttpResponse) msg;
21+
// System.err.println("{ START CONTENT");
22+
// }
23+
// if (msg instanceof HttpContent) {
24+
// HttpContent content = (HttpContent) msg;
25+
// System.err.print(content.content().toString(CharsetUtil.UTF_8));
26+
// System.err.flush();
27+
//
28+
// if (content instanceof LastHttpContent) {
29+
// System.err.println("} END OF CONTENT");
30+
// }
31+
// }
32+
// System.out.println(msg);
33+
this.promise.trySuccess(msg.retain());
34+
35+
}
36+
37+
@Override
38+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
39+
this.promise.tryFailure(cause);
40+
cause.printStackTrace();
41+
ctx.close();
42+
}
43+
44+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.influxdb.v3.netty.rest;
2+
3+
import com.influxdb.v3.client.InfluxDBClient;
4+
import com.influxdb.v3.client.Point;
5+
import com.influxdb.v3.client.config.ClientConfig;
6+
import io.netty.handler.codec.http.FullHttpResponse;
7+
import io.netty.handler.codec.http.HttpMethod;
8+
9+
import javax.net.ssl.SSLException;
10+
import java.net.URISyntaxException;
11+
import java.util.UUID;
12+
import java.util.concurrent.ExecutionException;
13+
14+
15+
public class Main {
16+
public static void main(String[] args) throws URISyntaxException, SSLException, ExecutionException, InterruptedException {
17+
ClientConfig clientConfig = configCloud();
18+
// ClientConfig clientConfig = new ClientConfig.Builder().host("http://localhost:8080").token("sda".toCharArray()).build();
19+
20+
var testId = UUID.randomUUID().toString();
21+
try (
22+
RestClient restClient = new RestClient(clientConfig);
23+
) {
24+
System.out.println("Server version: " + restClient.getServerVersion());
25+
FullHttpResponse response = restClient.request(HttpMethod.POST, "/", null, null);
26+
System.out.println("Write data with testId " + testId);
27+
var p = Point.measurement("cpu_sonnh")
28+
.setTag("host", "server1")
29+
.setField("usage_idle", 90.0f)
30+
.setField("testId", testId);
31+
var lineProtocol = p.toLineProtocol();
32+
restClient.write(lineProtocol);
33+
34+
System.out.println("Read data with testId " + testId);
35+
String query = String.format("SELECT * FROM \"cpu_sonnh\" WHERE \"testId\" = '%s'", testId);
36+
InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig);
37+
var stream = influxDBClient.queryPoints(query);
38+
stream.findFirst().ifPresent(pointValues -> System.out.println("Field usage_idle: " + pointValues.getField("usage_idle")));
39+
40+
} catch (Exception e) {
41+
throw new RuntimeException(e);
42+
}
43+
}
44+
45+
public static ClientConfig configCloud() {
46+
String url = System.getenv("TESTING_INFLUXDB_URL");
47+
String token = System.getenv("TESTING_INFLUXDB_TOKEN");
48+
String database = System.getenv("TESTING_INFLUXDB_DATABASE");
49+
return new ClientConfig.Builder()
50+
.host(url)
51+
.token(token.toCharArray())
52+
.database(database)
53+
.build();
54+
}
55+
56+
// This is a docker container ran from scripts/influxdb-setup.sh, get the token and database, url from that script
57+
public static ClientConfig configLocal() {
58+
String url = "localhost";
59+
String token = "apiv3_sMYBS-vRxl6UDMylb7m2u64G6R7g61jlGL76XnUJY3EaN4MD0tZd4DZOBhe6j-dYtoVhrC6PqGgI9Xiv8d3Psw";
60+
String database = System.getenv("bucket0");
61+
return new ClientConfig.Builder()
62+
.host(url)
63+
.token(token.toCharArray())
64+
.database(database)
65+
.build();
66+
}
67+
}

0 commit comments

Comments
 (0)