diff --git a/build.gradle b/build.gradle index d7f8376f..ad538cb3 100644 --- a/build.gradle +++ b/build.gradle @@ -31,6 +31,7 @@ dependencies { testImplementation 'org.apache.logging.log4j:log4j-core:2.17.0' implementation "io.netty:netty-buffer:${nettyVersion}" implementation "io.netty:netty-codec:${nettyVersion}" + implementation "io.netty:netty-codec-http:${nettyVersion}" implementation "io.netty:netty-common:${nettyVersion}" implementation "io.netty:netty-transport:${nettyVersion}" implementation "io.netty:netty-handler:${nettyVersion}" diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index fb4b4b1b..5eeb1987 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -1,14 +1,22 @@ package org.logstash.beats; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutor; @@ -18,6 +26,12 @@ import org.apache.logging.log4j.Logger; import org.logstash.netty.SslHandlerProvider; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + public class Server { private final static Logger logger = LogManager.getLogger(Server.class); @@ -59,13 +73,13 @@ public Server listen() throws InterruptedException { try { logger.info("Starting server on port: {}", this.port); - beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount); + //beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount); ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket. - .childHandler(beatsInitializer); + .childHandler(new ProtocolDetectionHandler()); Channel channel = server .bind(host, port) @@ -142,6 +156,7 @@ private class BeatsInitializer extends ChannelInitializer { private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; + private BeatsHandler beatsHandler; BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) { // Keeps a local copy of Server settings, so they can't be modified once it starts listening @@ -149,6 +164,24 @@ private class BeatsInitializer extends ChannelInitializer { this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); + + } + + public void addPipelineHandlers(ChannelPipeline pipeline) { +// if (isSslEnabled()) { +// pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); +// } + pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER, + new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); + pipeline.addLast(BEATS_ACKER, new AckEncoder()); + pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); + + this.beatsHandler = new BeatsHandler(localMessageListener); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), beatsHandler); + } + + public void channelActive(ChannelHandlerContext ctx) throws Exception { + beatsHandler.channelActive(ctx); } public void initChannel(SocketChannel socket) { @@ -204,4 +237,59 @@ private void shutdownEventExecutorsWithPendingTasks() { } } } + + @ChannelHandler.Sharable + private class ProtocolDetectionHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf in = (ByteBuf) msg; + + if (in.readableBytes() < 4) + return; + + in.markReaderIndex(); + byte[] magicBytes = new byte[4]; + in.readBytes(magicBytes); + + final byte[] needAcks = "GET ".getBytes(StandardCharsets.UTF_8); + System.out.println("Required 4 bytes of 'GET ', bytes:" + Arrays.toString(needAcks)); + System.out.println("Read " + magicBytes.length + " bytes, magic:" + Arrays.toString(magicBytes)); + + ChannelPipeline pipeline = ctx.pipeline(); + if (Arrays.equals(needAcks, magicBytes)) { + System.out.println("Initializing HTTP echo server..."); + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(1048576)); + pipeline.addLast(new EchoServerHandler()); + pipeline.remove(this); + } else { + beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount); + beatsInitializer.addPipelineHandlers(pipeline); + beatsInitializer.channelActive(ctx); + pipeline.remove(this); + } + + // TODO: is default behaviour we want to be Lumberjack (considering backwards compatibility), + // if not then -> ctx.close(); + super.channelRead(ctx, msg); + } + } + + @ChannelHandler.Sharable + private class EchoServerHandler extends SimpleChannelInboundHandler { + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + //The close is important here in an HTTP request as it sets the Content-Length of a + //response body back to the client. + ctx.close(); + } + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, msg.content().copy()); + ctx.write(response); + } + } }