diff --git a/README.md b/README.md index e8d3c470..f8db693f 100644 --- a/README.md +++ b/README.md @@ -158,15 +158,16 @@ To log using JSON format, you must configure logback to use either: The appenders, encoders, and layouts provided by the logstash-logback-encoder library are as follows: -| Format | Protocol | Function | LoggingEvent | AccessEvent -|---------------|------------|----------| ------------ | ----------- -| Logstash JSON | Syslog/UDP | Appender | [`LogstashUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashUdpSocketAppender.java) | [`LogstashAccessUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessUdpSocketAppender.java) -| Logstash JSON | TCP | Appender | [`LogstashTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashTcpSocketAppender.java) | [`LogstashAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessTcpSocketAppender.java) -| any | any | Appender | [`LoggingEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java) | [`AccessEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/AccessEventAsyncDisruptorAppender.java) -| Logstash JSON | any | Encoder | [`LogstashEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashEncoder.java) | [`LogstashAccessEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashAccessEncoder.java) -| Logstash JSON | any | Layout | [`LogstashLayout`](/src/main/java/net/logstash/logback/layout/LogstashLayout.java) | [`LogstashAccessLayout`](/src/main/java/net/logstash/logback/layout/LogstashAccessLayout.java) -| General JSON | any | Encoder | [`LoggingEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/LoggingEventCompositeJsonEncoder.java) | [`AccessEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonEncoder.java) -| General JSON | any | Layout | [`LoggingEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/layout/LoggingEventCompositeJsonLayout.java) | [`AccessEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonLayout.java) +| Format | Protocol | Function | LoggingEvent | AccessEvent +|---------------|----------------|----------| ------------ | ----------- +| Logstash JSON | Syslog/UDP | Appender | [`LogstashUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashUdpSocketAppender.java) | [`LogstashAccessUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessUdpSocketAppender.java) +| Logstash JSON | TCP | Appender | [`LogstashTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashTcpSocketAppender.java) | [`LogstashAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessTcpSocketAppender.java) +| Beats JSON | [Lumberjack v2](https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md) | Appender | [`BeatsTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java) | [`BeatsAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java) +| any | any | Appender | [`LoggingEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java) | [`AccessEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/AccessEventAsyncDisruptorAppender.java) +| Logstash JSON | any | Encoder | [`LogstashEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashEncoder.java) | [`LogstashAccessEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashAccessEncoder.java) +| Logstash JSON | any | Layout | [`LogstashLayout`](/src/main/java/net/logstash/logback/layout/LogstashLayout.java) | [`LogstashAccessLayout`](/src/main/java/net/logstash/logback/layout/LogstashAccessLayout.java) +| General JSON | any | Encoder | [`LoggingEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/LoggingEventCompositeJsonEncoder.java) | [`AccessEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonEncoder.java) +| General JSON | any | Layout | [`LoggingEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/layout/LoggingEventCompositeJsonLayout.java) | [`AccessEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonLayout.java) These encoders/layouts can generally be used by any logback appender (such as `RollingFileAppender`). @@ -275,8 +276,26 @@ in your `logback.xml`, like this: ``` +Alternatively, you can use `Beats` variant, which supports the same features as `LogstashTcpSocketAppender`. +```xml + + + + 127.0.0.1:5044 + + 20 + + + + -To output JSON for AccessEvents over TCP, use a `LogstashAccessTcpSocketAppender` + + + + +``` + +To output JSON for AccessEvents over TCP, use a `LogstashAccessTcpSocketAppender` or `BeatsAccessTcpSocketAppender` with a `LogstashAccessEncoder` or `AccessEventCompositeJsonEncoder` in your `logback-access.xml`, like this: @@ -290,13 +309,27 @@ in your `logback-access.xml`, like this: - + + 127.0.0.1:5044 + + 20 + + + + + + + + + + + ``` The TCP appenders use an encoder, rather than a layout as the [UDP appenders](#udp) . You can use a `Logstash*Encoder`, `*EventCompositeJsonEncoder`, or any other logback encoder. -All of the output formatting options are configured at the encoder level. +All the output formatting options are configured at the encoder level. Internally, the TCP appenders are asynchronous (using the [LMAX Disruptor RingBuffer](https://lmax-exchange.github.io/disruptor/)). All the encoding and TCP communication is delegated to a single writer thread. @@ -312,8 +345,23 @@ If the RingBuffer is full (e.g. due to slow network, etc), then events will be d The TCP appenders will automatically reconnect if the connection breaks. However, events may be lost before Java's socket realizes the connection has broken. +To receive logs in Logstash, you have two options: +* configure [`Beats` input](http://www.logstash.net/docs/latest/inputs/beats) (same as for Filebeat) and use `BeatsTcpSocketAppender`/`BeatsAccessTcpSocketAppender`, +* configure [`tcp` input](http://www.logstash.net/docs/latest/inputs/tcp) and use other TCP appenders. + +To receive messages via `Beats` input, you configure a basic input. +However, this is only compatible with `BeatsTcpSocketAppender`/`BeatsAccessTcpSocketAppender`. +``` +input { + beats { + port => 5044 + } +} +``` + To receive TCP input in logstash, configure a [`tcp`](http://www.logstash.net/docs/latest/inputs/tcp) input with the [`json_lines`](http://www.logstash.net/docs/latest/codecs/json_lines) codec in logstash's configuration like this: +This method is compatible with all the other TCP appenders. ``` input { @@ -1805,7 +1853,6 @@ These encoders/layouts make use of an internal buffer to hold the JSON output du The size of this buffer is set to `1024` bytes by default. A different size can be configured by setting the `minBufferSize` property to the desired value. The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector. - #### Providers for LoggingEvents The table below lists the available providers for LoggingEvents, and their configuration properties (defaults in parentheses). diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java new file mode 100644 index 00000000..04a8a212 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -0,0 +1,373 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.EncoderBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; +import net.logstash.logback.appender.listener.TcpAppenderListener; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static net.logstash.logback.util.ByteUtil.bytesToInt; +import static net.logstash.logback.util.ByteUtil.intToBytes; + +/** + * An appender that is compatible with Lumberjack v2 protocol (Beats input). + * + * Implemented according to current Lumberjack specification. + * Verified with Graylog's Lumberjack v2 implementation. + */ +public abstract class AbstractBeatsTcpSocketAppender> + extends AbstractLogstashTcpSocketAppender { + + private static final byte PROTOCOL_VERSION = '2'; + + private static final byte PAYLOAD_JSON_TYPE = 'J'; + private static final byte PAYLOAD_WINDOW_TYPE = 'W'; + private static final byte PAYLOAD_ACK_TYPE = 'A'; + + /** + * Counter for sequence number of published events. + * Sent to remote Beats input to track how many messages were sent + * and used to track when to wait for remote to be ready to accept new logs. + */ + private final AtomicInteger counter = new AtomicInteger(); + /** + * A queue of currently accepted ACKs from remote + */ + private final BlockingQueue ackEvents = new ArrayBlockingQueue<>(10); + + /** + * Used to tell the reader the maximum number of unacknowledged data frames + * the writer (this appender) will send before blocking for ACKs. + */ + private int windowSize; + + /** + * Callable that reads ACKs from remote reader. + */ + private AckReaderCallable ackReaderCallable; + + public AbstractBeatsTcpSocketAppender() { + this.windowSize = 10; + } + + @Override + public synchronized void start() { + Encoder encoder = getEncoder(); + setEncoder(wrapAsBeatsEncoder(encoder)); + super.start(); + } + + private Encoder wrapAsBeatsEncoder(Encoder original) { + return new BeatsEncoderWrapper<>(original, createConverter()); + } + + private LumberjackV2PayloadWrapper createConverter() { + return new LumberjackV2PayloadWrapper( + counter, windowSize + ); + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + public int getWindowSize() { + return windowSize; + } + + /** + * Beats-compatible encoder. + * Wraps the original encoder and blocks if we should wait for ACK. + * + * @param log event type + */ + private class BeatsEncoderWrapper extends EncoderBase { + + private final Encoder original; + private final LumberjackV2PayloadWrapper wrapper; + + BeatsEncoderWrapper(Encoder original, LumberjackV2PayloadWrapper wrapper) { + this.original = original; + this.wrapper = wrapper; + } + + @Override + public boolean isStarted() { + return original.isStarted() && super.isStarted(); + } + + @Override + public void start() { + if (isStarted()) { + return; + } + original.start(); + super.start(); + } + + @Override + public void stop() { + if (isStarted()) { + super.stop(); + original.stop(); + } + } + + @Override + public byte[] headerBytes() { + return original.headerBytes(); + } + + @Override + public byte[] encode(E event) { + if (counter.get() == windowSize) { + try { + ackEvents.take(); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } + } + + byte[] encoded = original.encode(event); + try { + return wrapper.convert(encoded); + } catch (IOException e) { + addWarn("Error encountered while converting log event. Event: " + event, e); + return new byte[0]; + } + } + + @Override + public byte[] footerBytes() { + return original.footerBytes(); + } + } + + @Override + protected Future scheduleReaderCallable(Callable readerCallable) { + this.ackReaderCallable = new AckReaderCallable(); + return getExecutorService().submit(this.ackReaderCallable); + } + + @Override + protected void fireConnectionOpened(Socket socket) { + if (this.ackReaderCallable == null) { + addError("Connection was not initialized properly - the ACK reader is not running. This will block the appender."); + throw new IllegalStateException("ACK reader not running - appender would hang"); + } + + try { + ackReaderCallable.signalConnectionOpened(socket.getInputStream()); + sendWindowSize(socket.getOutputStream()); + } catch (IOException e) { + addError("Error during Beats connection initialization", e); + } + + counter.set(0); + super.fireConnectionOpened(socket); + } + + /** + * Writes window size frame to the output stream to tell the reader + * how often we would like to receive ACKs. + */ + private void sendWindowSize(OutputStream outputStream) throws IOException { + byte[] message = new byte[6]; + message[0] = PROTOCOL_VERSION; + message[1] = PAYLOAD_WINDOW_TYPE; + byte[] windowSizeBytes = intToBytes(windowSize); + System.arraycopy(windowSizeBytes, 0, message, 2, windowSizeBytes.length); + outputStream.write(message); + outputStream.flush(); + } + + /** + * Callable that reads all ACKs from the remote log reader. + */ + private class AckReaderCallable implements Callable { + + /** + * An InputStream that will be read from. + * Due to current {@link AbstractLogstashTcpSocketAppender} implementation, + * this field is lazily initialized. + */ + private volatile InputStream inputStream; + /** + * Lock that guards {@link #inputStream} state. + * Used to prevent a race condition during lazy {@link #inputStream} initialization. + */ + private final Lock lock = new ReentrantLock(); + + /** + * Condition that indicates whether {@link #inputStream} is ready. + */ + private final Condition readyCondition = lock.newCondition(); + + AckReaderCallable() { + super(); + } + + @Override + public Void call() throws Exception { + updateCurrentThreadName(); + try { + waitUntilInputStreamIsReady(); + while (true) { + try { + int responseByte = inputStream.read(); + if (responseByte == -1) { + // end of stream + return null; + } else { + readAckResponse(responseByte); + } + } catch (SocketTimeoutException e) { + // try again + } + } + } finally { + if (!Thread.currentThread().isInterrupted()) { + getExecutorService().submit(() -> { + // https://github.com/logstash/logstash-logback-encoder/issues/341 + // see ReaderCallable from AbstractLogstashTcpSocketAppender for more clarification + getDisruptor().getRingBuffer().tryPublishEvent(getEventTranslator(), null); + }); + } + } + } + + private void waitUntilInputStreamIsReady() throws InterruptedException { + lock.lock(); + try { + if (inputStream == null) { + readyCondition.await(); + } + } finally { + lock.unlock(); + } + } + + private void readAckResponse(int responseByte) throws IOException { + if (responseByte != PROTOCOL_VERSION) { + addWarn(String.format("Protocol version is incorrect: %d, expected: %d.", responseByte, PROTOCOL_VERSION)); + return; + } + + int payloadType = inputStream.read(); + if (payloadType != PAYLOAD_ACK_TYPE) { + addWarn(String.format("Unknown payload type: %d, expected: %d (ACK)", payloadType, PAYLOAD_ACK_TYPE)); + return; + } + + byte[] sequenceNumber = new byte[4]; + for (int i = 0; i < 4; i++) { + int next = inputStream.read(); + if (next == -1) { + throw new IOException("End of stream while reading ACK sequence number"); + } + sequenceNumber[i] = (byte) next; + } + ackEvents.offer(new AckEvent(bytesToInt(sequenceNumber))); + } + + public void signalConnectionOpened(InputStream inputStream) { + lock.lock(); + try { + this.inputStream = inputStream; + readyCondition.signalAll(); + } finally { + lock.unlock(); + } + } + } + + private static class AckEvent { + + private final int sequenceNumber; + + private AckEvent(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + } + + + /** + * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). + * Can be used with Logstash input or Graylog's Beats input. + * + * See Logstash documentation + * to read more about this protocol. + */ + static class LumberjackV2PayloadWrapper { + + private final AtomicInteger counter; + /** + * A maximum counter number. Counter will be reset after this value is reached. + */ + private final int windowSize; + + LumberjackV2PayloadWrapper(AtomicInteger counter, int windowSize) { + this.counter = counter; + this.windowSize = windowSize; + } + + public byte[] convert(byte[] encoded) throws IOException { + byte[] payloadLength = intToBytes(encoded.length); + byte[] sequenceNumber = intToBytes(nextSequenceNumber()); + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( + 2 + encoded.length + payloadLength.length + sequenceNumber.length + )) { + outputStream.write(PROTOCOL_VERSION); + outputStream.write(PAYLOAD_JSON_TYPE); + outputStream.write(sequenceNumber); + outputStream.write(payloadLength); + outputStream.write(encoded); + + return outputStream.toByteArray(); + } + } + + private int nextSequenceNumber() { + return counter.updateAndGet(old -> { + int updated = old + 1; + if (updated > windowSize) { + return 1; + } else { + return updated; + } + }); + } + } +} diff --git a/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java new file mode 100644 index 00000000..c5494f54 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.access.spi.IAccessEvent; +import net.logstash.logback.appender.listener.TcpAppenderListener; + +public class BeatsAccessTcpSocketAppender + extends AbstractBeatsTcpSocketAppender> { + +} diff --git a/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java new file mode 100644 index 00000000..50cf98b0 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.lmax.disruptor.RingBuffer; +import net.logstash.logback.appender.listener.TcpAppenderListener; + +public class BeatsTcpSocketAppender + extends AbstractBeatsTcpSocketAppender> { + + /** + * Set to true if the caller data should be captured before publishing the event + * to the {@link RingBuffer} + */ + private boolean includeCallerData; + + @Override + protected void prepareForDeferredProcessing(final ILoggingEvent event) { + super.prepareForDeferredProcessing(event); + if (includeCallerData) { + event.getCallerData(); + } + } + + public boolean isIncludeCallerData() { + return includeCallerData; + } + + public void setIncludeCallerData(boolean includeCallerData) { + this.includeCallerData = includeCallerData; + } + +} diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 44968855..468d27ec 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -13,10 +13,11 @@ */ package net.logstash.logback.encoder; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; - +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.EncoderBase; +import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import ch.qos.logback.core.pattern.PatternLayoutBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; import net.logstash.logback.composite.CompositeJsonFormatter; import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; @@ -24,11 +25,9 @@ import net.logstash.logback.util.ReusableByteBuffer; import net.logstash.logback.util.ReusableByteBufferPool; -import ch.qos.logback.core.encoder.Encoder; -import ch.qos.logback.core.encoder.EncoderBase; -import ch.qos.logback.core.encoder.LayoutWrappingEncoder; -import ch.qos.logback.core.pattern.PatternLayoutBase; -import ch.qos.logback.core.spi.DeferredProcessingAware; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; public abstract class CompositeJsonEncoder extends EncoderBase implements StreamingEncoder { @@ -44,7 +43,7 @@ public abstract class CompositeJsonEncoder encoder, Event event, OutputStream outputStream) throws IOException { if (encoder != null) { byte[] data = encoder.encode(event); @@ -113,7 +112,7 @@ public void start() { if (isStarted()) { return; } - + super.start(); this.bufferPool = new ReusableByteBufferPool(this.minBufferSize); formatter.setContext(getContext()); @@ -250,7 +249,7 @@ public void setLineSeparator(String lineSeparator) { public int getMinBufferSize() { return minBufferSize; } - + /** * The minimum size of the byte buffer used when encoding events. * diff --git a/src/main/java/net/logstash/logback/util/ByteUtil.java b/src/main/java/net/logstash/logback/util/ByteUtil.java new file mode 100644 index 00000000..f998e53d --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ByteUtil.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.math.BigInteger; + +public class ByteUtil { + + private static final int INT_SIZE_IN_BYTES = 4; + + private ByteUtil() { + // util + } + + public static byte[] intToBytes(int value) { + byte[] result = new byte[INT_SIZE_IN_BYTES]; + byte[] unpadded = BigInteger.valueOf(value).toByteArray(); + for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i--, j--) { + result[j] = unpadded[i]; + } + return result; + } + + public static int bytesToInt(byte[] value) { + return new BigInteger(1, value).intValueExact(); + } +} diff --git a/src/test/java/net/logstash/logback/ConfigurationTest.java b/src/test/java/net/logstash/logback/ConfigurationTest.java index a1ff9549..e45bc96d 100644 --- a/src/test/java/net/logstash/logback/ConfigurationTest.java +++ b/src/test/java/net/logstash/logback/ConfigurationTest.java @@ -220,7 +220,6 @@ private void verifyOutput(LoggingEventCompositeJsonEncoder encoder) throws IOExc }); byte[] encoded = encoder.encode(listAppender.list.get(0)); - Map output = parseJson(new String(encoded, StandardCharsets.UTF_8)); Assertions.assertNotNull(output.get("@timestamp")); diff --git a/src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java b/src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java new file mode 100644 index 00000000..35471b1c --- /dev/null +++ b/src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java @@ -0,0 +1,74 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class LumberjackV2PayloadWrapperTest { + private final AbstractBeatsTcpSocketAppender.LumberjackV2PayloadWrapper payloadConverter + = new AbstractBeatsTcpSocketAppender.LumberjackV2PayloadWrapper(new AtomicInteger(), 10); + + @Test + void testPayloadProperlyConverted() { + String payload = "{\"message\":\"Log message\"}"; + byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); + + byte[] wrapped = assertDoesNotThrow(() -> payloadConverter.convert(encoded)); + + ByteBuffer buffer = ByteBuffer.wrap(wrapped); + + // version + assertEquals('2', buffer.get()); + // payload type + assertEquals('J', buffer.get()); + // sequence number + assertEquals(1, buffer.getInt()); + // payload length + int payloadLength = buffer.getInt(); + assertEquals(encoded.length, payloadLength); + // actual payload length + int position = buffer.position(); + int leftOverByteLength = wrapped.length - position; + assertEquals(encoded.length, leftOverByteLength); + // payload + byte[] encodedPayload = new byte[leftOverByteLength]; + System.arraycopy(buffer.array(), position, encodedPayload, 0, encodedPayload.length); + assertEquals(payload, new String(encodedPayload)); + } + + @Test + void testSequenceNumberIncremented() { + String payload = "{\"message\":\"Log message\"}"; + byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); + + for (int i = 1; i <= 20; i++) { + byte[] wrapped = assertDoesNotThrow(() -> payloadConverter.convert(encoded)); + assertSequenceNumberIs(wrapped, ((i - 1) % 10) + 1); + } + } + + private void assertSequenceNumberIs(byte[] wrapped, int number) { + ByteBuffer buffer = ByteBuffer.wrap(wrapped); + buffer.get(); + buffer.get(); + assertEquals(number, buffer.getInt()); + } +} \ No newline at end of file