From 2b480288d607eddd37ef177a09e82050223eccec Mon Sep 17 00:00:00 2001 From: multicatch Date: Mon, 5 Jul 2021 18:11:18 +0200 Subject: [PATCH 01/12] Added Beats payload wrapper --- .../AccessCompositeJsonBeatsEncoder.java | 28 +++++++ .../logback/encoder/BeatsPayloadWrapper.java | 74 +++++++++++++++++++ .../LoggingCompositeJsonBeatsEncoder.java | 28 +++++++ .../encoder/LogstashAccessBeatsEncoder.java | 28 +++++++ .../logback/encoder/LogstashBeatsEncoder.java | 28 +++++++ 5 files changed, 186 insertions(+) create mode 100644 src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java create mode 100644 src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java create mode 100644 src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java create mode 100644 src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java create mode 100644 src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java diff --git a/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java new file mode 100644 index 00000000..9c4ceaa3 --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import ch.qos.logback.access.spi.IAccessEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +public class AccessCompositeJsonBeatsEncoder extends AccessEventCompositeJsonEncoder { + + private final AtomicInteger sequenceNumber = new AtomicInteger(); + + @Override + public byte[] encode(IAccessEvent iAccessEvent) { + return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iAccessEvent), sequenceNumber, this::addError); + } +} diff --git a/src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java b/src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java new file mode 100644 index 00000000..212deb19 --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java @@ -0,0 +1,74 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +/** + * Util to wrap encoder output that should be compatible with a Beats input. + * Can be used with Logstash input or Graylog's Beats input. + * + * See Logstash's documentation + * to read more about this protocol. + */ +public class BeatsPayloadWrapper { + + private static final int INT_SIZE_IN_BYTES = 4; + + private static final byte PROTOCOL_VERSION = '2'; + private static final byte PAYLOAD_JSON_TYPE = 'J'; + + private BeatsPayloadWrapper() { + // util + } + + public static byte[] wrapAsBeatsOrReturnEmpty(byte[] encoded, AtomicInteger counter, BiConsumer exceptionConsumer) { + try { + return wrapAsBeats(encoded, counter); + } catch (IOException e) { + exceptionConsumer.accept("Cannot wrap encoded event as Beats", e); + } + return new byte[0]; + } + + public static byte[] wrapAsBeats(byte[] encoded, AtomicInteger counter) throws IOException { + byte[] payloadLength = intToBytes(encoded.length); + byte[] sequenceNumber = intToBytes(counter.getAndIncrement()); + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( + 2 + encoded.length + payloadLength.length + sequenceNumber.length + )) { + outputStream.write(PROTOCOL_VERSION); + outputStream.write(PAYLOAD_JSON_TYPE); + outputStream.write(sequenceNumber); + outputStream.write(payloadLength); + outputStream.write(encoded); + + return outputStream.toByteArray(); + } + } + + private static byte[] intToBytes(int value) { + byte[] result = new byte[INT_SIZE_IN_BYTES]; + byte[] unpadded = BigInteger.valueOf(value).toByteArray(); + for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i++, j++) { + result[j] = unpadded[i]; + } + return result; + } +} diff --git a/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java new file mode 100644 index 00000000..470933ab --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import ch.qos.logback.classic.spi.ILoggingEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +public class LoggingCompositeJsonBeatsEncoder extends LoggingEventCompositeJsonEncoder { + + private final AtomicInteger sequenceNumber = new AtomicInteger(); + + @Override + public byte[] encode(ILoggingEvent iLoggingEvent) { + return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iLoggingEvent), sequenceNumber, this::addError); + } +} diff --git a/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java new file mode 100644 index 00000000..21d7ccfc --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import ch.qos.logback.access.spi.IAccessEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +public class LogstashAccessBeatsEncoder extends LogstashAccessEncoder { + + private final AtomicInteger sequenceNumber = new AtomicInteger(); + + @Override + public byte[] encode(IAccessEvent iAccessEvent) { + return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iAccessEvent), sequenceNumber, this::addError); + } +} diff --git a/src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java new file mode 100644 index 00000000..053b431d --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java @@ -0,0 +1,28 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder; + +import ch.qos.logback.classic.spi.ILoggingEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +public class LogstashBeatsEncoder extends LogstashEncoder { + + private final AtomicInteger sequenceNumber = new AtomicInteger(); + + @Override + public byte[] encode(ILoggingEvent iLoggingEvent) { + return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iLoggingEvent), sequenceNumber, this::addError); + } +} From b16b32b6a7e6cad8bbd5f231dd9e02f917e9e849 Mon Sep 17 00:00:00 2001 From: multicatch Date: Mon, 5 Jul 2021 20:14:45 +0200 Subject: [PATCH 02/12] Added ability to configure payload wrapper for composite encoders --- .../AccessCompositeJsonBeatsEncoder.java | 28 -------- .../logback/encoder/CompositeJsonEncoder.java | 18 ++++- .../LoggingCompositeJsonBeatsEncoder.java | 28 -------- .../encoder/LogstashAccessBeatsEncoder.java | 28 -------- .../EncodedPayloadWrapper.java} | 20 +++--- .../LumberjackPayloadWrapper.java} | 28 +++----- .../logstash/logback/ConfigurationTest.java | 11 ++- .../wrapper/LumberjackPayloadWrapperTest.java | 72 +++++++++++++++++++ src/test/resources/logback-test.xml | 1 + 9 files changed, 118 insertions(+), 116 deletions(-) delete mode 100644 src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java delete mode 100644 src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java delete mode 100644 src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java rename src/main/java/net/logstash/logback/encoder/{LogstashBeatsEncoder.java => wrapper/EncodedPayloadWrapper.java} (54%) rename src/main/java/net/logstash/logback/encoder/{BeatsPayloadWrapper.java => wrapper/LumberjackPayloadWrapper.java} (69%) create mode 100644 src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java diff --git a/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java deleted file mode 100644 index 9c4ceaa3..00000000 --- a/src/main/java/net/logstash/logback/encoder/AccessCompositeJsonBeatsEncoder.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder; - -import ch.qos.logback.access.spi.IAccessEvent; - -import java.util.concurrent.atomic.AtomicInteger; - -public class AccessCompositeJsonBeatsEncoder extends AccessEventCompositeJsonEncoder { - - private final AtomicInteger sequenceNumber = new AtomicInteger(); - - @Override - public byte[] encode(IAccessEvent iAccessEvent) { - return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iAccessEvent), sequenceNumber, this::addError); - } -} diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 5527b7ce..ca4791bf 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -26,6 +26,8 @@ import ch.qos.logback.core.encoder.LayoutWrappingEncoder; import ch.qos.logback.core.pattern.PatternLayoutBase; import ch.qos.logback.core.spi.DeferredProcessingAware; +import net.logstash.logback.encoder.wrapper.EncodedPayloadWrapper; +import net.logstash.logback.encoder.wrapper.LumberjackPayloadWrapper; public abstract class CompositeJsonEncoder extends EncoderBase { @@ -43,6 +45,7 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; + private EncodedPayloadWrapper payloadWrapper; private final CompositeJsonFormatter formatter; @@ -85,7 +88,7 @@ public byte[] encode(Event event) { outputStream.write(lineSeparatorBytes); - return outputStream.toByteArray(); + return wrapEncoded(outputStream.toByteArray()); } catch (IOException e) { addWarn("Error encountered while encoding log event. " + "Event: " + event, e); @@ -106,6 +109,13 @@ private byte[] doEncodeWrappedToBytes(Encoder wrapped, Event event) { return EMPTY_BYTES; } + private byte[] wrapEncoded(byte[] encoded) throws IOException { + if (payloadWrapper != null) { + return payloadWrapper.wrap(encoded); + } + return encoded; + } + @Override public void start() { super.start(); @@ -270,4 +280,10 @@ public void setSuffix(Encoder suffix) { this.suffix = suffix; } + public EncodedPayloadWrapper getPayloadWrapper() { + return payloadWrapper; + } + public void setPayloadWrapper(EncodedPayloadWrapper wrapper) { + this.payloadWrapper = wrapper; + } } diff --git a/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java deleted file mode 100644 index 470933ab..00000000 --- a/src/main/java/net/logstash/logback/encoder/LoggingCompositeJsonBeatsEncoder.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder; - -import ch.qos.logback.classic.spi.ILoggingEvent; - -import java.util.concurrent.atomic.AtomicInteger; - -public class LoggingCompositeJsonBeatsEncoder extends LoggingEventCompositeJsonEncoder { - - private final AtomicInteger sequenceNumber = new AtomicInteger(); - - @Override - public byte[] encode(ILoggingEvent iLoggingEvent) { - return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iLoggingEvent), sequenceNumber, this::addError); - } -} diff --git a/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java deleted file mode 100644 index 21d7ccfc..00000000 --- a/src/main/java/net/logstash/logback/encoder/LogstashAccessBeatsEncoder.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder; - -import ch.qos.logback.access.spi.IAccessEvent; - -import java.util.concurrent.atomic.AtomicInteger; - -public class LogstashAccessBeatsEncoder extends LogstashAccessEncoder { - - private final AtomicInteger sequenceNumber = new AtomicInteger(); - - @Override - public byte[] encode(IAccessEvent iAccessEvent) { - return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iAccessEvent), sequenceNumber, this::addError); - } -} diff --git a/src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java b/src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java similarity index 54% rename from src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java rename to src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java index 053b431d..2ddc1938 100644 --- a/src/main/java/net/logstash/logback/encoder/LogstashBeatsEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java @@ -11,18 +11,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder; +package net.logstash.logback.encoder.wrapper; -import ch.qos.logback.classic.spi.ILoggingEvent; +import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -public class LogstashBeatsEncoder extends LogstashEncoder { - - private final AtomicInteger sequenceNumber = new AtomicInteger(); - - @Override - public byte[] encode(ILoggingEvent iLoggingEvent) { - return BeatsPayloadWrapper.wrapAsBeatsOrReturnEmpty(super.encode(iLoggingEvent), sequenceNumber, this::addError); - } +/** + * Wraps encoded payload (with prefix and suffix). + * Can be used to convert plain bytes to a given format. + */ +public interface EncodedPayloadWrapper { + byte[] wrap(byte[] encoded) throws IOException; } diff --git a/src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java similarity index 69% rename from src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java rename to src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java index 212deb19..146b7424 100644 --- a/src/main/java/net/logstash/logback/encoder/BeatsPayloadWrapper.java +++ b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java @@ -11,44 +11,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder; +package net.logstash.logback.encoder.wrapper; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigInteger; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; /** - * Util to wrap encoder output that should be compatible with a Beats input. + * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). * Can be used with Logstash input or Graylog's Beats input. * * See Logstash's documentation * to read more about this protocol. */ -public class BeatsPayloadWrapper { +public class LumberjackPayloadWrapper implements EncodedPayloadWrapper { private static final int INT_SIZE_IN_BYTES = 4; private static final byte PROTOCOL_VERSION = '2'; private static final byte PAYLOAD_JSON_TYPE = 'J'; - private BeatsPayloadWrapper() { - // util - } + private int counter; - public static byte[] wrapAsBeatsOrReturnEmpty(byte[] encoded, AtomicInteger counter, BiConsumer exceptionConsumer) { - try { - return wrapAsBeats(encoded, counter); - } catch (IOException e) { - exceptionConsumer.accept("Cannot wrap encoded event as Beats", e); - } - return new byte[0]; + public LumberjackPayloadWrapper() { + this.counter = 0; } - public static byte[] wrapAsBeats(byte[] encoded, AtomicInteger counter) throws IOException { + @Override + public byte[] wrap(byte[] encoded) throws IOException { byte[] payloadLength = intToBytes(encoded.length); - byte[] sequenceNumber = intToBytes(counter.getAndIncrement()); + byte[] sequenceNumber = intToBytes(counter++); try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 2 + encoded.length + payloadLength.length + sequenceNumber.length @@ -66,7 +58,7 @@ public static byte[] wrapAsBeats(byte[] encoded, AtomicInteger counter) throws I private static byte[] intToBytes(int value) { byte[] result = new byte[INT_SIZE_IN_BYTES]; byte[] unpadded = BigInteger.valueOf(value).toByteArray(); - for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i++, j++) { + for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i--, j--) { result[j] = unpadded[i]; } return result; diff --git a/src/test/java/net/logstash/logback/ConfigurationTest.java b/src/test/java/net/logstash/logback/ConfigurationTest.java index a1ff9549..bbf2099b 100644 --- a/src/test/java/net/logstash/logback/ConfigurationTest.java +++ b/src/test/java/net/logstash/logback/ConfigurationTest.java @@ -220,7 +220,10 @@ private void verifyOutput(LoggingEventCompositeJsonEncoder encoder) throws IOExc }); byte[] encoded = encoder.encode(listAppender.list.get(0)); - + + if (encoded[0] == '2') { + encoded = stripLumberjackMeta(encoded); + } Map output = parseJson(new String(encoded, StandardCharsets.UTF_8)); Assertions.assertNotNull(output.get("@timestamp")); @@ -264,4 +267,10 @@ private Map parseJson(final String text) throws IOException { return jsonFactory.createParser(text).readValueAs(new TypeReference>() { }); } + + private byte[] stripLumberjackMeta(byte[] encoded) { + byte[] result = new byte[encoded.length - 10]; + System.arraycopy(encoded, 10, result, 0, result.length); + return result; + } } diff --git a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java new file mode 100644 index 00000000..0cad79aa --- /dev/null +++ b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java @@ -0,0 +1,72 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder.wrapper; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class LumberjackPayloadWrapperTest { + private final LumberjackPayloadWrapper payloadWrapper = new LumberjackPayloadWrapper(); + + @Test + void testPayloadProperlyWrapped() { + String payload = "{\"message\":\"Log message\"}"; + byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); + + byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.wrap(encoded)); + + ByteBuffer buffer = ByteBuffer.wrap(wrapped); + + // version + assertEquals('2', buffer.get()); + // payload type + assertEquals('J', buffer.get()); + // sequence number + assertEquals(0, buffer.getInt()); + // payload length + int payloadLength = buffer.getInt(); + assertEquals(encoded.length, payloadLength); + // actual payload length + int position = buffer.position(); + int leftOverByteLength = wrapped.length - position; + assertEquals(encoded.length, leftOverByteLength); + // payload + byte[] encodedPayload = new byte[leftOverByteLength]; + System.arraycopy(buffer.array(), position, encodedPayload, 0, encodedPayload.length); + assertEquals(payload, new String(encodedPayload)); + } + + @Test + void testSequenceNumberIncremented() { + String payload = "{\"message\":\"Log message\"}"; + byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i < 5; i++) { + byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.wrap(encoded)); + assertSequenceNumberIs(wrapped, i); + } + } + + private void assertSequenceNumberIs(byte[] wrapped, int number) { + ByteBuffer buffer = ByteBuffer.wrap(wrapped); + buffer.get(); + buffer.get(); + assertEquals(number, buffer.getInt()); + } +} \ No newline at end of file diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 72b6f3ce..1bdb7cf8 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -29,6 +29,7 @@ --> + customMessage caller From a399070e2dc7cf899d5b278090aa48beb59db125 Mon Sep 17 00:00:00 2001 From: multicatch Date: Mon, 5 Jul 2021 20:28:20 +0200 Subject: [PATCH 03/12] Added documentation for payload wrappers --- README.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/README.md b/README.md index 913cc2f7..49f5fedf 100644 --- a/README.md +++ b/README.md @@ -1801,6 +1801,24 @@ The logstash-logback-encoder library contains many providers out-of-the-box, and you can even plug-in your own by extending `JsonProvider`. Each provider has its own configuration options to further customize it. +Apart from providers, if you need to further customize the output format of encoded JSON, +you can specify the _payloadWrapper_. +A _payloadWrapper_ can wrap or convert the encoded JSON (with a prefix, suffix and other providers) to another byte format. +For example, it can be used to convert plain JSON to Lumberjack protocol, +which is used by Beats input. + +```xml + + + +``` + +The encoder will prepare the output JSON and pass it to given _payloadWrapper_, +which will then convert it to a proper format. + +The logstash-logback-encoder library contains `LumberjackPayloadWrapper`. +It can be used in situation when it is needed to reuse Beats input (in Logstash or Graylog for example) +that is also used by Filebeat (or other tools). #### Providers for LoggingEvents From 86cd671397fa4b22138065638742ccb30bc4d804 Mon Sep 17 00:00:00 2001 From: multicatch Date: Tue, 6 Jul 2021 12:15:38 +0200 Subject: [PATCH 04/12] Added ability to convert encoded payload while streaming --- README.md | 11 ++--- .../logback/encoder/CompositeJsonEncoder.java | 41 ++++++++++++++----- ...r.java => LumberjackPayloadConverter.java} | 6 +-- ...loadWrapper.java => PayloadConverter.java} | 12 ++++-- .../wrapper/PayloadStreamingConverter.java | 22 ++++++++++ ...va => LumberjackPayloadConverterTest.java} | 8 ++-- src/test/resources/logback-test.xml | 2 +- 7 files changed, 75 insertions(+), 27 deletions(-) rename src/main/java/net/logstash/logback/encoder/wrapper/{LumberjackPayloadWrapper.java => LumberjackPayloadConverter.java} (92%) rename src/main/java/net/logstash/logback/encoder/wrapper/{EncodedPayloadWrapper.java => PayloadConverter.java} (72%) create mode 100644 src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java rename src/test/java/net/logstash/logback/encoder/wrapper/{LumberjackPayloadWrapperTest.java => LumberjackPayloadConverterTest.java} (93%) diff --git a/README.md b/README.md index 24aa59ca..6e5aeb97 100644 --- a/README.md +++ b/README.md @@ -1806,21 +1806,22 @@ The size of this buffer is set to `1024` bytes by default. A different size can The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector. Apart from providers, if you need to further customize the output format of encoded JSON, -you can specify the _payloadWrapper_. -A _payloadWrapper_ can wrap or convert the encoded JSON (with a prefix, suffix and other providers) to another byte format. +you can specify the _payloadConverter_. +A _payloadConverter_ can wrap or convert the encoded JSON (with a prefix, suffix and other providers) to another byte format. For example, it can be used to convert plain JSON to Lumberjack protocol, which is used by Beats input. ```xml + - + ``` -The encoder will prepare the output JSON and pass it to given _payloadWrapper_, +The encoder will prepare the output JSON and pass it to given _payloadConverter_, which will then convert it to a proper format. -The logstash-logback-encoder library contains `LumberjackPayloadWrapper`. +The logstash-logback-encoder library contains `LumberjackPayloadConverter`. It can be used in situation when it is needed to reuse Beats input (in Logstash or Graylog for example) that is also used by Filebeat (or other tools). diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index f0798ebf..0576d08b 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -21,6 +21,7 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; +import net.logstash.logback.encoder.wrapper.PayloadStreamingConverter; import net.logstash.logback.util.ReusableByteBuffer; import net.logstash.logback.util.ReusableByteBufferPool; @@ -29,8 +30,7 @@ import ch.qos.logback.core.encoder.LayoutWrappingEncoder; import ch.qos.logback.core.pattern.PatternLayoutBase; import ch.qos.logback.core.spi.DeferredProcessingAware; -import net.logstash.logback.encoder.wrapper.EncodedPayloadWrapper; -import net.logstash.logback.encoder.wrapper.LumberjackPayloadWrapper; +import net.logstash.logback.encoder.wrapper.PayloadConverter; public abstract class CompositeJsonEncoder extends EncoderBase implements StreamingEncoder { @@ -54,7 +54,7 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; - private EncodedPayloadWrapper payloadWrapper; + private PayloadConverter payloadConverter; private final CompositeJsonFormatter formatter; @@ -77,6 +77,8 @@ public void encode(Event event, OutputStream outputStream) throws IOException { throw new IllegalStateException("Encoder is not started"); } + outputStream = wrappedStream(outputStream); + encode(prefix, event, outputStream); formatter.writeEventToOutputStream(event, outputStream); encode(suffix, event, outputStream); @@ -93,7 +95,7 @@ public byte[] encode(Event event) { ReusableByteBuffer buffer = bufferPool.acquire(); try { encode(event, buffer); - return buffer.toByteArray(); + return convertEncoded(buffer.toByteArray()); } catch (IOException e) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; @@ -111,13 +113,24 @@ private void encode(Encoder encoder, Event event, OutputStream outputStre } } - private byte[] wrapEncoded(byte[] encoded) throws IOException { - if (payloadWrapper != null) { - return payloadWrapper.wrap(encoded); + private OutputStream wrappedStream(OutputStream outputStream) { + if (payloadConverterReady() && payloadConverter instanceof PayloadStreamingConverter) { + return ((PayloadStreamingConverter) payloadConverter).streamTo(outputStream); + } + return outputStream; + } + + private byte[] convertEncoded(byte[] encoded) throws IOException { + if (payloadConverterReady() && !(payloadConverter instanceof PayloadStreamingConverter)) { + return payloadConverter.convert(encoded); } return encoded; } + private boolean payloadConverterReady() { + return payloadConverter != null && payloadConverter.isStarted(); + } + @Override public void start() { if (isStarted()) { @@ -134,6 +147,9 @@ public void start() { : this.lineSeparator.getBytes(charset); startWrapped(prefix); startWrapped(suffix); + if (payloadConverter != null) { + payloadConverter.start(); + } } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -176,6 +192,9 @@ public void stop() { formatter.stop(); stopWrapped(prefix); stopWrapped(suffix); + if (payloadConverterReady()) { + payloadConverter.stop(); + } } } @@ -295,10 +314,10 @@ public void setSuffix(Encoder suffix) { this.suffix = suffix; } - public EncodedPayloadWrapper getPayloadWrapper() { - return payloadWrapper; + public PayloadConverter getPayloadConverter() { + return payloadConverter; } - public void setPayloadWrapper(EncodedPayloadWrapper wrapper) { - this.payloadWrapper = wrapper; + public void setPayloadConverter(PayloadConverter wrapper) { + this.payloadConverter = wrapper; } } diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java similarity index 92% rename from src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java rename to src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java index 146b7424..d616ea0a 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapper.java +++ b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java @@ -24,7 +24,7 @@ * See Logstash's documentation * to read more about this protocol. */ -public class LumberjackPayloadWrapper implements EncodedPayloadWrapper { +public class LumberjackPayloadConverter implements PayloadConverter { private static final int INT_SIZE_IN_BYTES = 4; @@ -33,12 +33,12 @@ public class LumberjackPayloadWrapper implements EncodedPayloadWrapper { private int counter; - public LumberjackPayloadWrapper() { + public LumberjackPayloadConverter() { this.counter = 0; } @Override - public byte[] wrap(byte[] encoded) throws IOException { + public byte[] convert(byte[] encoded) throws IOException { byte[] payloadLength = intToBytes(encoded.length); byte[] sequenceNumber = intToBytes(counter++); diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java b/src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java similarity index 72% rename from src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java rename to src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java index 2ddc1938..89af4dac 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/EncodedPayloadWrapper.java +++ b/src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java @@ -16,9 +16,15 @@ import java.io.IOException; /** - * Wraps encoded payload (with prefix and suffix). + * Converts encoded payload (with prefix and suffix). * Can be used to convert plain bytes to a given format. */ -public interface EncodedPayloadWrapper { - byte[] wrap(byte[] encoded) throws IOException; +public interface PayloadConverter { + default void start() { } + default void stop() { } + default boolean isStarted() { + return true; + } + + byte[] convert(byte[] encoded) throws IOException; } diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java b/src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java new file mode 100644 index 00000000..4da5cdf2 --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder.wrapper; + +import java.io.OutputStream; + +public abstract class PayloadStreamingConverter extends OutputStream + implements PayloadConverter { + + public abstract PayloadStreamingConverter streamTo(OutputStream outputStream); +} diff --git a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java similarity index 93% rename from src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java rename to src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java index 0cad79aa..8e885fa8 100644 --- a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadWrapperTest.java +++ b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java @@ -21,15 +21,15 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -class LumberjackPayloadWrapperTest { - private final LumberjackPayloadWrapper payloadWrapper = new LumberjackPayloadWrapper(); +class LumberjackPayloadConverterTest { + private final LumberjackPayloadConverter payloadWrapper = new LumberjackPayloadConverter(); @Test void testPayloadProperlyWrapped() { String payload = "{\"message\":\"Log message\"}"; byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); - byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.wrap(encoded)); + byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.convert(encoded)); ByteBuffer buffer = ByteBuffer.wrap(wrapped); @@ -58,7 +58,7 @@ void testSequenceNumberIncremented() { byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); for (int i = 0; i < 5; i++) { - byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.wrap(encoded)); + byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.convert(encoded)); assertSequenceNumberIs(wrapped, i); } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 1bdb7cf8..f4db7fbd 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -29,7 +29,7 @@ --> - + customMessage caller From be7b321d06f8b1fe31a27f01dc505e6ca7ebf0b8 Mon Sep 17 00:00:00 2001 From: multicatch Date: Wed, 14 Jul 2021 20:26:12 +0200 Subject: [PATCH 05/12] Added Beats appenders --- .../AbstractBeatsTcpSocketAppender.java | 329 ++++++++++++++++++ .../BeatsAccessTcpSocketAppender.java | 22 ++ .../appender/BeatsTcpSocketAppender.java | 45 +++ .../wrapper/ChainedPayloadConverter.java | 62 ++++ .../wrapper/LumberjackPayloadConverter.java | 45 ++- .../net/logstash/logback/util/ByteUtil.java | 38 ++ .../LumberjackPayloadConverterTest.java | 4 +- 7 files changed, 531 insertions(+), 14 deletions(-) create mode 100644 src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java create mode 100644 src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java create mode 100644 src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java create mode 100644 src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java create mode 100644 src/main/java/net/logstash/logback/util/ByteUtil.java diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java new file mode 100644 index 00000000..01ac2ab0 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -0,0 +1,329 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.EncoderBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; +import net.logstash.logback.appender.listener.TcpAppenderListener; +import net.logstash.logback.encoder.CompositeJsonEncoder; +import net.logstash.logback.encoder.StreamingEncoder; +import net.logstash.logback.encoder.wrapper.ChainedPayloadConverter; +import net.logstash.logback.encoder.wrapper.LumberjackPayloadConverter; +import net.logstash.logback.encoder.wrapper.PayloadConverter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static net.logstash.logback.util.ByteUtil.bytesToInt; +import static net.logstash.logback.util.ByteUtil.intToBytes; + +/** + * An appender that is compatible with Lumberjack protocol (Beats input). + * + * See Logstash's documentation for more info. + */ +public abstract class AbstractBeatsTcpSocketAppender> + extends AbstractLogstashTcpSocketAppender { + + private static final byte PROTOCOL_VERSION = '2'; + private static final byte PAYLOAD_WINDOW_TYPE = 'W'; + private static final byte PAYLOAD_ACK_TYPE = 'A'; + + /** + * Counter for sequence number of published events. + * Sent to remote Beats input to track how many messages were sent + * and used to track when to wait for remote to be ready to accept new logs. + */ + private final AtomicInteger counter = new AtomicInteger(); + /** + * A queue of currently accepted ACKs from remote + */ + private final BlockingQueue ackEvents = new ArrayBlockingQueue<>(10); + + /** + * Used to tell the reader the maximum number of unacknowledged data frames + * the writer (this appender) will send before blocking for ACKs. + */ + private int windowSize; + + /** + * Callable that reads ACKs from remote reader. + */ + private AckReaderCallable ackReaderCallable; + + public AbstractBeatsTcpSocketAppender() { + this.windowSize = 10; + } + + @Override + public synchronized void start() { + Encoder encoder = getEncoder(); + PayloadConverter converter = installOrCreateConverter(encoder); + setEncoder(wrapAsBeatsEncoder(encoder, converter)); + super.start(); + } + + /** + * Setups {@link LumberjackPayloadConverter}. + * Installs it into an encoder if possible or else creates a new instance. + * @param encoder an encoder to install the converter in + * @return a converter if installing was not possible + */ + private PayloadConverter installOrCreateConverter(Encoder encoder) { + if (encoder instanceof CompositeJsonEncoder) { + installConverter((CompositeJsonEncoder) encoder); + return null; + } else { + return createConverter(); + } + } + + /** + * Installs and configures {@link LumberjackPayloadConverter} into {@link CompositeJsonEncoder}. + * @param encoder a target encoder to install the converter + * @param log event type + */ + private void installConverter(CompositeJsonEncoder encoder) { + PayloadConverter payloadConverter = encoder.getPayloadConverter(); + if (payloadConverter == null) { + encoder.setPayloadConverter(createConverter()); + } else if (payloadConverter instanceof LumberjackPayloadConverter) { + LumberjackPayloadConverter lumberjackPayloadConverter = (LumberjackPayloadConverter) payloadConverter; + lumberjackPayloadConverter.setWindowSize(windowSize); + lumberjackPayloadConverter.setCounter(counter); + } else { + ChainedPayloadConverter chain = new ChainedPayloadConverter(); + chain.add(createConverter()); + chain.add(payloadConverter); + encoder.setPayloadConverter(chain); + } + } + + private LumberjackPayloadConverter createConverter() { + LumberjackPayloadConverter lumberjackPayloadConverter = new LumberjackPayloadConverter(); + lumberjackPayloadConverter.setWindowSize(windowSize); + lumberjackPayloadConverter.setCounter(counter); + return lumberjackPayloadConverter; + } + + private Encoder wrapAsBeatsEncoder(Encoder original, PayloadConverter converter) { + if (original instanceof StreamingEncoder) { + addWarn(String.format( + "The selected encoder %s is a StreamingEncoder, which is incompatible with %s." + + "It will be used anyway, but streaming will be unavailable.", + original.getClass().getName(), this.getClass().getName() + )); + } + + return new BeatsEncoderWrapper<>(original, converter); + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + public int getWindowSize() { + return windowSize; + } + + /** + * Beats-compatible encoder. + * Wraps the original encoder and blocks if we should wait for ACK. + * + * @param log event type + */ + private class BeatsEncoderWrapper extends EncoderBase { + + private final Encoder original; + private final PayloadConverter converter; + + BeatsEncoderWrapper(Encoder original, PayloadConverter converter) { + this.original = original; + this.converter = converter; + } + + @Override + public byte[] headerBytes() { + return original.headerBytes(); + } + + @Override + public byte[] encode(E event) { + if (counter.get() == windowSize) { + try { + ackEvents.take(); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } + } + byte[] encoded = original.encode(event); + if (converter != null && converter.isStarted()) { + try { + return converter.convert(encoded); + } catch (IOException e) { + addWarn("Error encountered while converting log event. Event: " + event, e); + return new byte[0]; + } + } else { + return encoded; + } + } + + @Override + public byte[] footerBytes() { + return original.footerBytes(); + } + } + + @Override + protected Future scheduleReaderCallable(Callable readerCallable) { + this.ackReaderCallable = new AckReaderCallable(); + return getExecutorService().submit(this.ackReaderCallable); + } + + @Override + protected void fireConnectionOpened(Socket socket) { + if (this.ackReaderCallable == null) { + addError("Connection was not initialized properly - the ACK reader is not running. This will block the appender."); + throw new IllegalStateException("ACK reader not running - appender would hang"); + } + + try { + ackReaderCallable.setInputStream(socket.getInputStream()); + ackReaderCallable.readyLock.notify(); + sendWindowSize(socket.getOutputStream()); + } catch (IOException e) { + addError("Error during Beats connection initialization", e); + } + + counter.set(0); + super.fireConnectionOpened(socket); + } + + /** + * Writes window size frame to the output stream to tell the reader + * how often we would like to receive ACKs. + */ + private void sendWindowSize(OutputStream outputStream) throws IOException { + byte[] message = new byte[6]; + message[0] = PROTOCOL_VERSION; + message[1] = PAYLOAD_WINDOW_TYPE; + byte[] windowSizeBytes = intToBytes(windowSize); + System.arraycopy(windowSizeBytes, 0, message, 2, windowSizeBytes.length); + outputStream.write(message); + outputStream.flush(); + } + + /** + * Callable that reads all ACKs from the remote log reader. + */ + private class AckReaderCallable implements Callable { + + /** + * An InputStream that will be read from. + * Due to current {@link AbstractLogstashTcpSocketAppender} implementation, + * this field is lazily initialized. + */ + private volatile InputStream inputStream; + /** + * Lock that guards {@link #inputStream} state. + * Used to prevent a race condition during lazy {@link #inputStream} initialization. + */ + private final Lock readyLock = new ReentrantLock(); + + AckReaderCallable() { + super(); + } + + @Override + public Void call() throws Exception { + updateCurrentThreadName(); + try { + readyLock.wait(); + while (true) { + try { + int responseByte = inputStream.read(); + if (responseByte == -1) { + // end of stream + return null; + } else { + readAckResponse(responseByte); + } + } catch (SocketTimeoutException e) { + // try again + } + } + } finally { + if (!Thread.currentThread().isInterrupted()) { + getExecutorService().submit(() -> { + // https://github.com/logstash/logstash-logback-encoder/issues/341 + // see ReaderCallable from AbstractLogstashTcpSocketAppender for more clarification + getDisruptor().getRingBuffer().tryPublishEvent(getEventTranslator(), null); + }); + } + } + } + + private void readAckResponse(int responseByte) throws IOException { + if (responseByte != PROTOCOL_VERSION) { + addWarn(String.format("Protocol version is incorrect: %d, expected: %d.", responseByte, PROTOCOL_VERSION)); + return; + } + + int payloadType = inputStream.read(); + if (payloadType != PAYLOAD_ACK_TYPE) { + addWarn(String.format("Unknown payload type: %d, expected: %d (ACK)", payloadType, PAYLOAD_ACK_TYPE)); + return; + } + + byte[] sequenceNumber = new byte[4]; + for (int i = 0; i < 4; i++) { + int next = inputStream.read(); + if (next == -1) { + throw new IOException("End of stream while reading ACK sequence number"); + } + sequenceNumber[i] = (byte) next; + } + ackEvents.offer(new AckEvent(bytesToInt(sequenceNumber))); + } + + public void setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + } + } + + private static class AckEvent { + + private final int sequenceNumber; + + private AckEvent(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + public int getSequenceNumber() { + return sequenceNumber; + } + } +} diff --git a/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java new file mode 100644 index 00000000..c5494f54 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.access.spi.IAccessEvent; +import net.logstash.logback.appender.listener.TcpAppenderListener; + +public class BeatsAccessTcpSocketAppender + extends AbstractBeatsTcpSocketAppender> { + +} diff --git a/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java new file mode 100644 index 00000000..50cf98b0 --- /dev/null +++ b/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.appender; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.lmax.disruptor.RingBuffer; +import net.logstash.logback.appender.listener.TcpAppenderListener; + +public class BeatsTcpSocketAppender + extends AbstractBeatsTcpSocketAppender> { + + /** + * Set to true if the caller data should be captured before publishing the event + * to the {@link RingBuffer} + */ + private boolean includeCallerData; + + @Override + protected void prepareForDeferredProcessing(final ILoggingEvent event) { + super.prepareForDeferredProcessing(event); + if (includeCallerData) { + event.getCallerData(); + } + } + + public boolean isIncludeCallerData() { + return includeCallerData; + } + + public void setIncludeCallerData(boolean includeCallerData) { + this.includeCallerData = includeCallerData; + } + +} diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java new file mode 100644 index 00000000..7ace6bf0 --- /dev/null +++ b/src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java @@ -0,0 +1,62 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.encoder.wrapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ChainedPayloadConverter implements PayloadConverter { + private List chain; + + public ChainedPayloadConverter() { + this.chain = new ArrayList<>(); + } + + public void add(PayloadConverter converter) { + chain.add(converter); + } + + public List getChain() { + return chain; + } + + public void setChain(List chain) { + this.chain = chain; + } + + @Override + public void start() { + chain.forEach(PayloadConverter::start); + } + + @Override + public void stop() { + chain.forEach(PayloadConverter::stop); + } + + @Override + public boolean isStarted() { + return chain.stream().allMatch(PayloadConverter::isStarted); + } + + @Override + public byte[] convert(byte[] encoded) throws IOException { + byte[] result = encoded; + for (PayloadConverter converter : chain) { + result = converter.convert(result); + } + return result; + } +} diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java index d616ea0a..e39a46aa 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java +++ b/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java @@ -16,6 +16,9 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicInteger; + +import static net.logstash.logback.util.ByteUtil.intToBytes; /** * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). @@ -26,21 +29,24 @@ */ public class LumberjackPayloadConverter implements PayloadConverter { - private static final int INT_SIZE_IN_BYTES = 4; - private static final byte PROTOCOL_VERSION = '2'; private static final byte PAYLOAD_JSON_TYPE = 'J'; - private int counter; + private AtomicInteger counter; + /** + * A maximum counter number. Counter will be reset after this value is reached. + */ + private int windowSize; public LumberjackPayloadConverter() { - this.counter = 0; + this.counter = new AtomicInteger(); + this.windowSize = 10; } @Override public byte[] convert(byte[] encoded) throws IOException { byte[] payloadLength = intToBytes(encoded.length); - byte[] sequenceNumber = intToBytes(counter++); + byte[] sequenceNumber = intToBytes(nextSequenceNumber()); try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( 2 + encoded.length + payloadLength.length + sequenceNumber.length @@ -55,12 +61,27 @@ public byte[] convert(byte[] encoded) throws IOException { } } - private static byte[] intToBytes(int value) { - byte[] result = new byte[INT_SIZE_IN_BYTES]; - byte[] unpadded = BigInteger.valueOf(value).toByteArray(); - for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i--, j--) { - result[j] = unpadded[i]; - } - return result; + private int nextSequenceNumber() { + int maxSize = getWindowSize(); + return counter.updateAndGet(old -> { + int updated = old + 1; + if (updated > maxSize) { + return 1; + } else { + return updated; + } + }); + } + + public int getWindowSize() { + return windowSize; + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + public void setCounter(AtomicInteger counter) { + this.counter = counter; } } diff --git a/src/main/java/net/logstash/logback/util/ByteUtil.java b/src/main/java/net/logstash/logback/util/ByteUtil.java new file mode 100644 index 00000000..f998e53d --- /dev/null +++ b/src/main/java/net/logstash/logback/util/ByteUtil.java @@ -0,0 +1,38 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.logstash.logback.util; + +import java.math.BigInteger; + +public class ByteUtil { + + private static final int INT_SIZE_IN_BYTES = 4; + + private ByteUtil() { + // util + } + + public static byte[] intToBytes(int value) { + byte[] result = new byte[INT_SIZE_IN_BYTES]; + byte[] unpadded = BigInteger.valueOf(value).toByteArray(); + for (int i = unpadded.length - 1, j = result.length - 1; i >= 0 && j >= 0; i--, j--) { + result[j] = unpadded[i]; + } + return result; + } + + public static int bytesToInt(byte[] value) { + return new BigInteger(1, value).intValueExact(); + } +} diff --git a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java index 8e885fa8..20109706 100644 --- a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java +++ b/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java @@ -38,7 +38,7 @@ void testPayloadProperlyWrapped() { // payload type assertEquals('J', buffer.get()); // sequence number - assertEquals(0, buffer.getInt()); + assertEquals(1, buffer.getInt()); // payload length int payloadLength = buffer.getInt(); assertEquals(encoded.length, payloadLength); @@ -57,7 +57,7 @@ void testSequenceNumberIncremented() { String payload = "{\"message\":\"Log message\"}"; byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); - for (int i = 0; i < 5; i++) { + for (int i = 1; i <= 5; i++) { byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.convert(encoded)); assertSequenceNumberIs(wrapped, i); } From 3e28c80c654df8329b8be5ce8a5a924299c0bbe6 Mon Sep 17 00:00:00 2001 From: multicatch Date: Wed, 14 Jul 2021 20:27:03 +0200 Subject: [PATCH 06/12] Renamed converters package --- README.md | 2 +- .../logback/appender/AbstractBeatsTcpSocketAppender.java | 6 +++--- .../net/logstash/logback/encoder/CompositeJsonEncoder.java | 4 ++-- .../{wrapper => converter}/ChainedPayloadConverter.java | 2 +- .../{wrapper => converter}/LumberjackPayloadConverter.java | 3 +-- .../encoder/{wrapper => converter}/PayloadConverter.java | 2 +- .../{wrapper => converter}/PayloadStreamingConverter.java | 2 +- .../LumberjackPayloadConverterTest.java | 2 +- src/test/resources/logback-test.xml | 2 +- 9 files changed, 12 insertions(+), 13 deletions(-) rename src/main/java/net/logstash/logback/encoder/{wrapper => converter}/ChainedPayloadConverter.java (97%) rename src/main/java/net/logstash/logback/encoder/{wrapper => converter}/LumberjackPayloadConverter.java (97%) rename src/main/java/net/logstash/logback/encoder/{wrapper => converter}/PayloadConverter.java (95%) rename src/main/java/net/logstash/logback/encoder/{wrapper => converter}/PayloadStreamingConverter.java (94%) rename src/test/java/net/logstash/logback/encoder/{wrapper => converter}/LumberjackPayloadConverterTest.java (98%) diff --git a/README.md b/README.md index 6e5aeb97..b9006833 100644 --- a/README.md +++ b/README.md @@ -1814,7 +1814,7 @@ which is used by Beats input. ```xml - + ``` diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java index 01ac2ab0..dfa11c1c 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -19,9 +19,9 @@ import net.logstash.logback.appender.listener.TcpAppenderListener; import net.logstash.logback.encoder.CompositeJsonEncoder; import net.logstash.logback.encoder.StreamingEncoder; -import net.logstash.logback.encoder.wrapper.ChainedPayloadConverter; -import net.logstash.logback.encoder.wrapper.LumberjackPayloadConverter; -import net.logstash.logback.encoder.wrapper.PayloadConverter; +import net.logstash.logback.encoder.converter.ChainedPayloadConverter; +import net.logstash.logback.encoder.converter.LumberjackPayloadConverter; +import net.logstash.logback.encoder.converter.PayloadConverter; import java.io.IOException; import java.io.InputStream; diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 0576d08b..38a6dff5 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -21,7 +21,7 @@ import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; -import net.logstash.logback.encoder.wrapper.PayloadStreamingConverter; +import net.logstash.logback.encoder.converter.PayloadStreamingConverter; import net.logstash.logback.util.ReusableByteBuffer; import net.logstash.logback.util.ReusableByteBufferPool; @@ -30,7 +30,7 @@ import ch.qos.logback.core.encoder.LayoutWrappingEncoder; import ch.qos.logback.core.pattern.PatternLayoutBase; import ch.qos.logback.core.spi.DeferredProcessingAware; -import net.logstash.logback.encoder.wrapper.PayloadConverter; +import net.logstash.logback.encoder.converter.PayloadConverter; public abstract class CompositeJsonEncoder extends EncoderBase implements StreamingEncoder { diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java similarity index 97% rename from src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java rename to src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java index 7ace6bf0..439a9e58 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/ChainedPayloadConverter.java +++ b/src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.wrapper; +package net.logstash.logback.encoder.converter; import java.io.IOException; import java.util.ArrayList; diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java similarity index 97% rename from src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java rename to src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java index e39a46aa..0f10a8c5 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverter.java +++ b/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java @@ -11,11 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.wrapper; +package net.logstash.logback.encoder.converter; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.math.BigInteger; import java.util.concurrent.atomic.AtomicInteger; import static net.logstash.logback.util.ByteUtil.intToBytes; diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java similarity index 95% rename from src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java rename to src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java index 89af4dac..35e80053 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/PayloadConverter.java +++ b/src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.wrapper; +package net.logstash.logback.encoder.converter; import java.io.IOException; diff --git a/src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java b/src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java similarity index 94% rename from src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java rename to src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java index 4da5cdf2..4ba7d334 100644 --- a/src/main/java/net/logstash/logback/encoder/wrapper/PayloadStreamingConverter.java +++ b/src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.wrapper; +package net.logstash.logback.encoder.converter; import java.io.OutputStream; diff --git a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java b/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java similarity index 98% rename from src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java rename to src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java index 20109706..eed7595f 100644 --- a/src/test/java/net/logstash/logback/encoder/wrapper/LumberjackPayloadConverterTest.java +++ b/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.wrapper; +package net.logstash.logback.encoder.converter; import org.junit.jupiter.api.Test; diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index f4db7fbd..eb922fec 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -29,7 +29,7 @@ --> - + customMessage caller From 1938fbd4dfeef766a4dcfeea01f7a1601447b74c Mon Sep 17 00:00:00 2001 From: multicatch Date: Wed, 14 Jul 2021 23:45:58 +0200 Subject: [PATCH 07/12] Fix BeatsAppender initialization --- .../AbstractBeatsTcpSocketAppender.java | 44 ++++++++++++------- .../converter/LumberjackPayloadConverter.java | 2 +- .../LumberjackPayloadConverterTest.java | 8 ++-- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java index dfa11c1c..b80cdd76 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -18,7 +18,6 @@ import ch.qos.logback.core.spi.DeferredProcessingAware; import net.logstash.logback.appender.listener.TcpAppenderListener; import net.logstash.logback.encoder.CompositeJsonEncoder; -import net.logstash.logback.encoder.StreamingEncoder; import net.logstash.logback.encoder.converter.ChainedPayloadConverter; import net.logstash.logback.encoder.converter.LumberjackPayloadConverter; import net.logstash.logback.encoder.converter.PayloadConverter; @@ -33,6 +32,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -42,7 +42,7 @@ /** * An appender that is compatible with Lumberjack protocol (Beats input). * - * See Logstash's documentation for more info. + * See Logstash documentation for more info. */ public abstract class AbstractBeatsTcpSocketAppender> extends AbstractLogstashTcpSocketAppender { @@ -129,14 +129,6 @@ private LumberjackPayloadConverter createConverter() { } private Encoder wrapAsBeatsEncoder(Encoder original, PayloadConverter converter) { - if (original instanceof StreamingEncoder) { - addWarn(String.format( - "The selected encoder %s is a StreamingEncoder, which is incompatible with %s." - + "It will be used anyway, but streaming will be unavailable.", - original.getClass().getName(), this.getClass().getName() - )); - } - return new BeatsEncoderWrapper<>(original, converter); } @@ -178,6 +170,7 @@ public byte[] encode(E event) { Thread.currentThread().interrupt(); } } + byte[] encoded = original.encode(event); if (converter != null && converter.isStarted()) { try { @@ -211,8 +204,7 @@ protected void fireConnectionOpened(Socket socket) { } try { - ackReaderCallable.setInputStream(socket.getInputStream()); - ackReaderCallable.readyLock.notify(); + ackReaderCallable.signalConnectionOpened(socket.getInputStream()); sendWindowSize(socket.getOutputStream()); } catch (IOException e) { addError("Error during Beats connection initialization", e); @@ -251,7 +243,12 @@ private class AckReaderCallable implements Callable { * Lock that guards {@link #inputStream} state. * Used to prevent a race condition during lazy {@link #inputStream} initialization. */ - private final Lock readyLock = new ReentrantLock(); + private final Lock lock = new ReentrantLock(); + + /** + * Condition that indicates whether {@link #inputStream} is ready. + */ + private final Condition readyCondition = lock.newCondition(); AckReaderCallable() { super(); @@ -261,7 +258,7 @@ private class AckReaderCallable implements Callable { public Void call() throws Exception { updateCurrentThreadName(); try { - readyLock.wait(); + waitUntilInputStreamIsReady(); while (true) { try { int responseByte = inputStream.read(); @@ -286,6 +283,15 @@ public Void call() throws Exception { } } + private void waitUntilInputStreamIsReady() throws InterruptedException { + lock.lock(); + try { + readyCondition.await(); + } finally { + lock.unlock(); + } + } + private void readAckResponse(int responseByte) throws IOException { if (responseByte != PROTOCOL_VERSION) { addWarn(String.format("Protocol version is incorrect: %d, expected: %d.", responseByte, PROTOCOL_VERSION)); @@ -309,8 +315,14 @@ private void readAckResponse(int responseByte) throws IOException { ackEvents.offer(new AckEvent(bytesToInt(sequenceNumber))); } - public void setInputStream(InputStream inputStream) { - this.inputStream = inputStream; + public void signalConnectionOpened(InputStream inputStream) { + lock.lock(); + try { + this.inputStream = inputStream; + readyCondition.signal(); + } finally { + lock.unlock(); + } } } diff --git a/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java index 0f10a8c5..df7ee0c7 100644 --- a/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java +++ b/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java @@ -23,7 +23,7 @@ * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). * Can be used with Logstash input or Graylog's Beats input. * - * See Logstash's documentation + * See Logstash documentation * to read more about this protocol. */ public class LumberjackPayloadConverter implements PayloadConverter { diff --git a/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java b/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java index eed7595f..d190f579 100644 --- a/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java +++ b/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java @@ -22,14 +22,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; class LumberjackPayloadConverterTest { - private final LumberjackPayloadConverter payloadWrapper = new LumberjackPayloadConverter(); + private final LumberjackPayloadConverter payloadConverter = new LumberjackPayloadConverter(); @Test - void testPayloadProperlyWrapped() { + void testPayloadProperlyConverted() { String payload = "{\"message\":\"Log message\"}"; byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); - byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.convert(encoded)); + byte[] wrapped = assertDoesNotThrow(() -> payloadConverter.convert(encoded)); ByteBuffer buffer = ByteBuffer.wrap(wrapped); @@ -58,7 +58,7 @@ void testSequenceNumberIncremented() { byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); for (int i = 1; i <= 5; i++) { - byte[] wrapped = assertDoesNotThrow(() -> payloadWrapper.convert(encoded)); + byte[] wrapped = assertDoesNotThrow(() -> payloadConverter.convert(encoded)); assertSequenceNumberIs(wrapped, i); } } From 1d6fe4968f3fc91f2d79834febb82a16bd76db82 Mon Sep 17 00:00:00 2001 From: multicatch Date: Wed, 14 Jul 2021 23:57:12 +0200 Subject: [PATCH 08/12] Add documentation for Beats appenders --- README.md | 59 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index b9006833..6157f117 100644 --- a/README.md +++ b/README.md @@ -158,15 +158,16 @@ To log using JSON format, you must configure logback to use either: The appenders, encoders, and layouts provided by the logstash-logback-encoder library are as follows: -| Format | Protocol | Function | LoggingEvent | AccessEvent -|---------------|------------|----------| ------------ | ----------- -| Logstash JSON | Syslog/UDP | Appender | [`LogstashUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashUdpSocketAppender.java) | [`LogstashAccessUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessUdpSocketAppender.java) -| Logstash JSON | TCP | Appender | [`LogstashTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashTcpSocketAppender.java) | [`LogstashAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessTcpSocketAppender.java) -| any | any | Appender | [`LoggingEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java) | [`AccessEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/AccessEventAsyncDisruptorAppender.java) -| Logstash JSON | any | Encoder | [`LogstashEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashEncoder.java) | [`LogstashAccessEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashAccessEncoder.java) -| Logstash JSON | any | Layout | [`LogstashLayout`](/src/main/java/net/logstash/logback/layout/LogstashLayout.java) | [`LogstashAccessLayout`](/src/main/java/net/logstash/logback/layout/LogstashAccessLayout.java) -| General JSON | any | Encoder | [`LoggingEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/LoggingEventCompositeJsonEncoder.java) | [`AccessEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonEncoder.java) -| General JSON | any | Layout | [`LoggingEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/layout/LoggingEventCompositeJsonLayout.java) | [`AccessEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonLayout.java) +| Format | Protocol | Function | LoggingEvent | AccessEvent +|---------------|----------------|----------| ------------ | ----------- +| Logstash JSON | Syslog/UDP | Appender | [`LogstashUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashUdpSocketAppender.java) | [`LogstashAccessUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessUdpSocketAppender.java) +| Logstash JSON | TCP | Appender | [`LogstashTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashTcpSocketAppender.java) | [`LogstashAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessTcpSocketAppender.java) +| Beats JSON | Lumberjack/TCP | Appender | [`BeatsTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java) | [`BeatsAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java) +| any | any | Appender | [`LoggingEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java) | [`AccessEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/AccessEventAsyncDisruptorAppender.java) +| Logstash JSON | any | Encoder | [`LogstashEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashEncoder.java) | [`LogstashAccessEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashAccessEncoder.java) +| Logstash JSON | any | Layout | [`LogstashLayout`](/src/main/java/net/logstash/logback/layout/LogstashLayout.java) | [`LogstashAccessLayout`](/src/main/java/net/logstash/logback/layout/LogstashAccessLayout.java) +| General JSON | any | Encoder | [`LoggingEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/LoggingEventCompositeJsonEncoder.java) | [`AccessEventCompositeJsonEncoder`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonEncoder.java) +| General JSON | any | Layout | [`LoggingEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/layout/LoggingEventCompositeJsonLayout.java) | [`AccessEventCompositeJsonLayout`](/src/main/java/net/logstash/logback/encoder/AccessEventCompositeJsonLayout.java) These encoders/layouts can generally be used by any logback appender (such as `RollingFileAppender`). @@ -275,8 +276,26 @@ in your `logback.xml`, like this: ``` +Alternatively, you can use `Beats` variant, which supports the same features as `LogstashTcpSocketAppender`. +```xml + + + + 127.0.0.1:5044 + + 20 + + + + + + + + + +``` -To output JSON for AccessEvents over TCP, use a `LogstashAccessTcpSocketAppender` +To output JSON for AccessEvents over TCP, use a `LogstashAccessTcpSocketAppender` or `BeatsAccessTcpSocketAppender` with a `LogstashAccessEncoder` or `AccessEventCompositeJsonEncoder` in your `logback-access.xml`, like this: @@ -312,8 +331,23 @@ If the RingBuffer is full (e.g. due to slow network, etc), then events will be d The TCP appenders will automatically reconnect if the connection breaks. However, events may be lost before Java's socket realizes the connection has broken. +To receive logs in Logstash, you have two options: +* configure [`Beats` input](http://www.logstash.net/docs/latest/inputs/beats) (same as for Filebeat) and use `BeatsTcpSocketAppender`/`BeatsAccessTcpSocketAppender`, +* configure [`tcp` input](http://www.logstash.net/docs/latest/inputs/tcp) and use other TCP appenders. + +To receive messages via `Beats` input, you configure a basic input. +However, this is only compatible with `BeatsTcpSocketAppender`/`BeatsAccessTcpSocketAppender`. +``` +input { + beats { + port => 5044 + } +} +``` + To receive TCP input in logstash, configure a [`tcp`](http://www.logstash.net/docs/latest/inputs/tcp) input with the [`json_lines`](http://www.logstash.net/docs/latest/codecs/json_lines) codec in logstash's configuration like this: +This method is compatible with all the other TCP appenders. ``` input { @@ -1821,9 +1855,8 @@ which is used by Beats input. The encoder will prepare the output JSON and pass it to given _payloadConverter_, which will then convert it to a proper format. -The logstash-logback-encoder library contains `LumberjackPayloadConverter`. -It can be used in situation when it is needed to reuse Beats input (in Logstash or Graylog for example) -that is also used by Filebeat (or other tools). +The logstash-logback-encoder library contains `net.logstash.logback.encoder.converter.LumberjackPayloadConverter`, +which adds additional metadata to serialized output, which are needed by Lumberjack protocol. #### Providers for LoggingEvents From 3e3f4878438c42ac1b2e722b9fc3f043f0d18317 Mon Sep 17 00:00:00 2001 From: multicatch Date: Thu, 15 Jul 2021 18:56:07 +0200 Subject: [PATCH 09/12] Beats appender documentation fixes --- README.md | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6157f117..f69d154f 100644 --- a/README.md +++ b/README.md @@ -282,7 +282,7 @@ Alternatively, you can use `Beats` variant, which supports the same features as 127.0.0.1:5044 - + 20 @@ -290,7 +290,7 @@ Alternatively, you can use `Beats` variant, which supports the same features as - + ``` @@ -309,13 +309,27 @@ in your `logback-access.xml`, like this: - + + 127.0.0.1:5044 + + 20 + + + + + + + + + + + ``` The TCP appenders use an encoder, rather than a layout as the [UDP appenders](#udp) . You can use a `Logstash*Encoder`, `*EventCompositeJsonEncoder`, or any other logback encoder. -All of the output formatting options are configured at the encoder level. +All the output formatting options are configured at the encoder level. Internally, the TCP appenders are asynchronous (using the [LMAX Disruptor RingBuffer](https://lmax-exchange.github.io/disruptor/)). All the encoding and TCP communication is delegated to a single writer thread. @@ -1855,7 +1869,7 @@ which is used by Beats input. The encoder will prepare the output JSON and pass it to given _payloadConverter_, which will then convert it to a proper format. -The logstash-logback-encoder library contains `net.logstash.logback.encoder.converter.LumberjackPayloadConverter`, +The logstash-logback-encoder library contains [`LumberjackPayloadConverter`](/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter), which adds additional metadata to serialized output, which are needed by Lumberjack protocol. #### Providers for LoggingEvents From 2a6126647de658efbd9d394a27968c20206d9da8 Mon Sep 17 00:00:00 2001 From: multicatch Date: Thu, 15 Jul 2021 18:56:39 +0200 Subject: [PATCH 10/12] Fix for AckReaderCallable initialization race condition --- .../logback/appender/AbstractBeatsTcpSocketAppender.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java index b80cdd76..9346f985 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -286,7 +286,9 @@ public Void call() throws Exception { private void waitUntilInputStreamIsReady() throws InterruptedException { lock.lock(); try { - readyCondition.await(); + if (inputStream == null) { + readyCondition.await(); + } } finally { lock.unlock(); } @@ -319,7 +321,7 @@ public void signalConnectionOpened(InputStream inputStream) { lock.lock(); try { this.inputStream = inputStream; - readyCondition.signal(); + readyCondition.signalAll(); } finally { lock.unlock(); } From f4e95bdd82dfa26ccf97cd6e25d2bab2df2a8ff7 Mon Sep 17 00:00:00 2001 From: multicatch Date: Fri, 16 Jul 2021 11:16:58 +0200 Subject: [PATCH 11/12] Fix for BeatsEncoderWrapper starting and stopping --- .../AbstractBeatsTcpSocketAppender.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java index 9346f985..db0b1ed8 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -156,6 +156,28 @@ private class BeatsEncoderWrapper extends EncoderBase { this.converter = converter; } + @Override + public boolean isStarted() { + return original.isStarted() && super.isStarted(); + } + + @Override + public void start() { + if (isStarted()) { + return; + } + original.start(); + super.start(); + } + + @Override + public void stop() { + if (isStarted()) { + super.stop(); + original.stop(); + } + } + @Override public byte[] headerBytes() { return original.headerBytes(); From 770ac6bb9779c9890d070edd3762417453e05264 Mon Sep 17 00:00:00 2001 From: multicatch Date: Sun, 18 Jul 2021 20:00:07 +0200 Subject: [PATCH 12/12] Merge Lumberjack payload converter into Beats appender --- README.md | 21 +-- .../AbstractBeatsTcpSocketAppender.java | 134 ++++++++++-------- .../logback/encoder/CompositeJsonEncoder.java | 54 ++----- .../converter/ChainedPayloadConverter.java | 62 -------- .../converter/LumberjackPayloadConverter.java | 86 ----------- .../encoder/converter/PayloadConverter.java | 30 ---- .../converter/PayloadStreamingConverter.java | 22 --- .../logstash/logback/ConfigurationTest.java | 10 -- .../LumberjackV2PayloadWrapperTest.java} | 12 +- src/test/resources/logback-test.xml | 1 - 10 files changed, 88 insertions(+), 344 deletions(-) delete mode 100644 src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java delete mode 100644 src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java delete mode 100644 src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java delete mode 100644 src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java rename src/test/java/net/logstash/logback/{encoder/converter/LumberjackPayloadConverterTest.java => appender/LumberjackV2PayloadWrapperTest.java} (84%) diff --git a/README.md b/README.md index f69d154f..f8db693f 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,7 @@ The appenders, encoders, and layouts provided by the logstash-logback-encoder li |---------------|----------------|----------| ------------ | ----------- | Logstash JSON | Syslog/UDP | Appender | [`LogstashUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashUdpSocketAppender.java) | [`LogstashAccessUdpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessUdpSocketAppender.java) | Logstash JSON | TCP | Appender | [`LogstashTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashTcpSocketAppender.java) | [`LogstashAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/LogstashAccessTcpSocketAppender.java) -| Beats JSON | Lumberjack/TCP | Appender | [`BeatsTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java) | [`BeatsAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java) +| Beats JSON | [Lumberjack v2](https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md) | Appender | [`BeatsTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsTcpSocketAppender.java) | [`BeatsAccessTcpSocketAppender`](/src/main/java/net/logstash/logback/appender/BeatsAccessTcpSocketAppender.java) | any | any | Appender | [`LoggingEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java) | [`AccessEventAsyncDisruptorAppender`](/src/main/java/net/logstash/logback/appender/AccessEventAsyncDisruptorAppender.java) | Logstash JSON | any | Encoder | [`LogstashEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashEncoder.java) | [`LogstashAccessEncoder`](/src/main/java/net/logstash/logback/encoder/LogstashAccessEncoder.java) | Logstash JSON | any | Layout | [`LogstashLayout`](/src/main/java/net/logstash/logback/layout/LogstashLayout.java) | [`LogstashAccessLayout`](/src/main/java/net/logstash/logback/layout/LogstashAccessLayout.java) @@ -1853,25 +1853,6 @@ These encoders/layouts make use of an internal buffer to hold the JSON output du The size of this buffer is set to `1024` bytes by default. A different size can be configured by setting the `minBufferSize` property to the desired value. The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector. -Apart from providers, if you need to further customize the output format of encoded JSON, -you can specify the _payloadConverter_. -A _payloadConverter_ can wrap or convert the encoded JSON (with a prefix, suffix and other providers) to another byte format. -For example, it can be used to convert plain JSON to Lumberjack protocol, -which is used by Beats input. - -```xml - - - - -``` - -The encoder will prepare the output JSON and pass it to given _payloadConverter_, -which will then convert it to a proper format. - -The logstash-logback-encoder library contains [`LumberjackPayloadConverter`](/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter), -which adds additional metadata to serialized output, which are needed by Lumberjack protocol. - #### Providers for LoggingEvents The table below lists the available providers for LoggingEvents, and their configuration properties (defaults in parentheses). diff --git a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java index db0b1ed8..04a8a212 100644 --- a/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java +++ b/src/main/java/net/logstash/logback/appender/AbstractBeatsTcpSocketAppender.java @@ -17,11 +17,8 @@ import ch.qos.logback.core.encoder.EncoderBase; import ch.qos.logback.core.spi.DeferredProcessingAware; import net.logstash.logback.appender.listener.TcpAppenderListener; -import net.logstash.logback.encoder.CompositeJsonEncoder; -import net.logstash.logback.encoder.converter.ChainedPayloadConverter; -import net.logstash.logback.encoder.converter.LumberjackPayloadConverter; -import net.logstash.logback.encoder.converter.PayloadConverter; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,14 +37,17 @@ import static net.logstash.logback.util.ByteUtil.intToBytes; /** - * An appender that is compatible with Lumberjack protocol (Beats input). + * An appender that is compatible with Lumberjack v2 protocol (Beats input). * - * See Logstash documentation for more info. + * Implemented according to current Lumberjack specification. + * Verified with Graylog's Lumberjack v2 implementation. */ public abstract class AbstractBeatsTcpSocketAppender> extends AbstractLogstashTcpSocketAppender { private static final byte PROTOCOL_VERSION = '2'; + + private static final byte PAYLOAD_JSON_TYPE = 'J'; private static final byte PAYLOAD_WINDOW_TYPE = 'W'; private static final byte PAYLOAD_ACK_TYPE = 'A'; @@ -80,56 +80,18 @@ public AbstractBeatsTcpSocketAppender() { @Override public synchronized void start() { Encoder encoder = getEncoder(); - PayloadConverter converter = installOrCreateConverter(encoder); - setEncoder(wrapAsBeatsEncoder(encoder, converter)); + setEncoder(wrapAsBeatsEncoder(encoder)); super.start(); } - /** - * Setups {@link LumberjackPayloadConverter}. - * Installs it into an encoder if possible or else creates a new instance. - * @param encoder an encoder to install the converter in - * @return a converter if installing was not possible - */ - private PayloadConverter installOrCreateConverter(Encoder encoder) { - if (encoder instanceof CompositeJsonEncoder) { - installConverter((CompositeJsonEncoder) encoder); - return null; - } else { - return createConverter(); - } + private Encoder wrapAsBeatsEncoder(Encoder original) { + return new BeatsEncoderWrapper<>(original, createConverter()); } - /** - * Installs and configures {@link LumberjackPayloadConverter} into {@link CompositeJsonEncoder}. - * @param encoder a target encoder to install the converter - * @param log event type - */ - private void installConverter(CompositeJsonEncoder encoder) { - PayloadConverter payloadConverter = encoder.getPayloadConverter(); - if (payloadConverter == null) { - encoder.setPayloadConverter(createConverter()); - } else if (payloadConverter instanceof LumberjackPayloadConverter) { - LumberjackPayloadConverter lumberjackPayloadConverter = (LumberjackPayloadConverter) payloadConverter; - lumberjackPayloadConverter.setWindowSize(windowSize); - lumberjackPayloadConverter.setCounter(counter); - } else { - ChainedPayloadConverter chain = new ChainedPayloadConverter(); - chain.add(createConverter()); - chain.add(payloadConverter); - encoder.setPayloadConverter(chain); - } - } - - private LumberjackPayloadConverter createConverter() { - LumberjackPayloadConverter lumberjackPayloadConverter = new LumberjackPayloadConverter(); - lumberjackPayloadConverter.setWindowSize(windowSize); - lumberjackPayloadConverter.setCounter(counter); - return lumberjackPayloadConverter; - } - - private Encoder wrapAsBeatsEncoder(Encoder original, PayloadConverter converter) { - return new BeatsEncoderWrapper<>(original, converter); + private LumberjackV2PayloadWrapper createConverter() { + return new LumberjackV2PayloadWrapper( + counter, windowSize + ); } public void setWindowSize(int windowSize) { @@ -149,11 +111,11 @@ public int getWindowSize() { private class BeatsEncoderWrapper extends EncoderBase { private final Encoder original; - private final PayloadConverter converter; + private final LumberjackV2PayloadWrapper wrapper; - BeatsEncoderWrapper(Encoder original, PayloadConverter converter) { + BeatsEncoderWrapper(Encoder original, LumberjackV2PayloadWrapper wrapper) { this.original = original; - this.converter = converter; + this.wrapper = wrapper; } @Override @@ -194,15 +156,11 @@ public byte[] encode(E event) { } byte[] encoded = original.encode(event); - if (converter != null && converter.isStarted()) { - try { - return converter.convert(encoded); - } catch (IOException e) { - addWarn("Error encountered while converting log event. Event: " + event, e); - return new byte[0]; - } - } else { - return encoded; + try { + return wrapper.convert(encoded); + } catch (IOException e) { + addWarn("Error encountered while converting log event. Event: " + event, e); + return new byte[0]; } } @@ -362,4 +320,54 @@ public int getSequenceNumber() { return sequenceNumber; } } + + + /** + * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). + * Can be used with Logstash input or Graylog's Beats input. + * + * See Logstash documentation + * to read more about this protocol. + */ + static class LumberjackV2PayloadWrapper { + + private final AtomicInteger counter; + /** + * A maximum counter number. Counter will be reset after this value is reached. + */ + private final int windowSize; + + LumberjackV2PayloadWrapper(AtomicInteger counter, int windowSize) { + this.counter = counter; + this.windowSize = windowSize; + } + + public byte[] convert(byte[] encoded) throws IOException { + byte[] payloadLength = intToBytes(encoded.length); + byte[] sequenceNumber = intToBytes(nextSequenceNumber()); + + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( + 2 + encoded.length + payloadLength.length + sequenceNumber.length + )) { + outputStream.write(PROTOCOL_VERSION); + outputStream.write(PAYLOAD_JSON_TYPE); + outputStream.write(sequenceNumber); + outputStream.write(payloadLength); + outputStream.write(encoded); + + return outputStream.toByteArray(); + } + } + + private int nextSequenceNumber() { + return counter.updateAndGet(old -> { + int updated = old + 1; + if (updated > windowSize) { + return 1; + } else { + return updated; + } + }); + } + } } diff --git a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java index 38a6dff5..468d27ec 100644 --- a/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java +++ b/src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java @@ -13,24 +13,21 @@ */ package net.logstash.logback.encoder; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; - +import ch.qos.logback.core.encoder.Encoder; +import ch.qos.logback.core.encoder.EncoderBase; +import ch.qos.logback.core.encoder.LayoutWrappingEncoder; +import ch.qos.logback.core.pattern.PatternLayoutBase; +import ch.qos.logback.core.spi.DeferredProcessingAware; import net.logstash.logback.composite.CompositeJsonFormatter; import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.decorate.JsonFactoryDecorator; import net.logstash.logback.decorate.JsonGeneratorDecorator; -import net.logstash.logback.encoder.converter.PayloadStreamingConverter; import net.logstash.logback.util.ReusableByteBuffer; import net.logstash.logback.util.ReusableByteBufferPool; -import ch.qos.logback.core.encoder.Encoder; -import ch.qos.logback.core.encoder.EncoderBase; -import ch.qos.logback.core.encoder.LayoutWrappingEncoder; -import ch.qos.logback.core.pattern.PatternLayoutBase; -import ch.qos.logback.core.spi.DeferredProcessingAware; -import net.logstash.logback.encoder.converter.PayloadConverter; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; public abstract class CompositeJsonEncoder extends EncoderBase implements StreamingEncoder { @@ -54,7 +51,6 @@ public abstract class CompositeJsonEncoder prefix; private Encoder suffix; - private PayloadConverter payloadConverter; private final CompositeJsonFormatter formatter; @@ -77,8 +73,6 @@ public void encode(Event event, OutputStream outputStream) throws IOException { throw new IllegalStateException("Encoder is not started"); } - outputStream = wrappedStream(outputStream); - encode(prefix, event, outputStream); formatter.writeEventToOutputStream(event, outputStream); encode(suffix, event, outputStream); @@ -95,7 +89,7 @@ public byte[] encode(Event event) { ReusableByteBuffer buffer = bufferPool.acquire(); try { encode(event, buffer); - return convertEncoded(buffer.toByteArray()); + return buffer.toByteArray(); } catch (IOException e) { addWarn("Error encountered while encoding log event. Event: " + event, e); return EMPTY_BYTES; @@ -113,24 +107,6 @@ private void encode(Encoder encoder, Event event, OutputStream outputStre } } - private OutputStream wrappedStream(OutputStream outputStream) { - if (payloadConverterReady() && payloadConverter instanceof PayloadStreamingConverter) { - return ((PayloadStreamingConverter) payloadConverter).streamTo(outputStream); - } - return outputStream; - } - - private byte[] convertEncoded(byte[] encoded) throws IOException { - if (payloadConverterReady() && !(payloadConverter instanceof PayloadStreamingConverter)) { - return payloadConverter.convert(encoded); - } - return encoded; - } - - private boolean payloadConverterReady() { - return payloadConverter != null && payloadConverter.isStarted(); - } - @Override public void start() { if (isStarted()) { @@ -147,9 +123,6 @@ public void start() { : this.lineSeparator.getBytes(charset); startWrapped(prefix); startWrapped(suffix); - if (payloadConverter != null) { - payloadConverter.start(); - } } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -192,9 +165,6 @@ public void stop() { formatter.stop(); stopWrapped(prefix); stopWrapped(suffix); - if (payloadConverterReady()) { - payloadConverter.stop(); - } } } @@ -314,10 +284,4 @@ public void setSuffix(Encoder suffix) { this.suffix = suffix; } - public PayloadConverter getPayloadConverter() { - return payloadConverter; - } - public void setPayloadConverter(PayloadConverter wrapper) { - this.payloadConverter = wrapper; - } } diff --git a/src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java deleted file mode 100644 index 439a9e58..00000000 --- a/src/main/java/net/logstash/logback/encoder/converter/ChainedPayloadConverter.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder.converter; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class ChainedPayloadConverter implements PayloadConverter { - private List chain; - - public ChainedPayloadConverter() { - this.chain = new ArrayList<>(); - } - - public void add(PayloadConverter converter) { - chain.add(converter); - } - - public List getChain() { - return chain; - } - - public void setChain(List chain) { - this.chain = chain; - } - - @Override - public void start() { - chain.forEach(PayloadConverter::start); - } - - @Override - public void stop() { - chain.forEach(PayloadConverter::stop); - } - - @Override - public boolean isStarted() { - return chain.stream().allMatch(PayloadConverter::isStarted); - } - - @Override - public byte[] convert(byte[] encoded) throws IOException { - byte[] result = encoded; - for (PayloadConverter converter : chain) { - result = converter.convert(result); - } - return result; - } -} diff --git a/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java deleted file mode 100644 index df7ee0c7..00000000 --- a/src/main/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverter.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder.converter; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -import static net.logstash.logback.util.ByteUtil.intToBytes; - -/** - * Wraps payload so it should be compatible with a Beats input (Lumberjack protocol). - * Can be used with Logstash input or Graylog's Beats input. - * - * See Logstash documentation - * to read more about this protocol. - */ -public class LumberjackPayloadConverter implements PayloadConverter { - - private static final byte PROTOCOL_VERSION = '2'; - private static final byte PAYLOAD_JSON_TYPE = 'J'; - - private AtomicInteger counter; - /** - * A maximum counter number. Counter will be reset after this value is reached. - */ - private int windowSize; - - public LumberjackPayloadConverter() { - this.counter = new AtomicInteger(); - this.windowSize = 10; - } - - @Override - public byte[] convert(byte[] encoded) throws IOException { - byte[] payloadLength = intToBytes(encoded.length); - byte[] sequenceNumber = intToBytes(nextSequenceNumber()); - - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream( - 2 + encoded.length + payloadLength.length + sequenceNumber.length - )) { - outputStream.write(PROTOCOL_VERSION); - outputStream.write(PAYLOAD_JSON_TYPE); - outputStream.write(sequenceNumber); - outputStream.write(payloadLength); - outputStream.write(encoded); - - return outputStream.toByteArray(); - } - } - - private int nextSequenceNumber() { - int maxSize = getWindowSize(); - return counter.updateAndGet(old -> { - int updated = old + 1; - if (updated > maxSize) { - return 1; - } else { - return updated; - } - }); - } - - public int getWindowSize() { - return windowSize; - } - - public void setWindowSize(int windowSize) { - this.windowSize = windowSize; - } - - public void setCounter(AtomicInteger counter) { - this.counter = counter; - } -} diff --git a/src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java b/src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java deleted file mode 100644 index 35e80053..00000000 --- a/src/main/java/net/logstash/logback/encoder/converter/PayloadConverter.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder.converter; - -import java.io.IOException; - -/** - * Converts encoded payload (with prefix and suffix). - * Can be used to convert plain bytes to a given format. - */ -public interface PayloadConverter { - default void start() { } - default void stop() { } - default boolean isStarted() { - return true; - } - - byte[] convert(byte[] encoded) throws IOException; -} diff --git a/src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java b/src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java deleted file mode 100644 index 4ba7d334..00000000 --- a/src/main/java/net/logstash/logback/encoder/converter/PayloadStreamingConverter.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.logstash.logback.encoder.converter; - -import java.io.OutputStream; - -public abstract class PayloadStreamingConverter extends OutputStream - implements PayloadConverter { - - public abstract PayloadStreamingConverter streamTo(OutputStream outputStream); -} diff --git a/src/test/java/net/logstash/logback/ConfigurationTest.java b/src/test/java/net/logstash/logback/ConfigurationTest.java index bbf2099b..e45bc96d 100644 --- a/src/test/java/net/logstash/logback/ConfigurationTest.java +++ b/src/test/java/net/logstash/logback/ConfigurationTest.java @@ -221,10 +221,6 @@ private void verifyOutput(LoggingEventCompositeJsonEncoder encoder) throws IOExc byte[] encoded = encoder.encode(listAppender.list.get(0)); - if (encoded[0] == '2') { - encoded = stripLumberjackMeta(encoded); - } - Map output = parseJson(new String(encoded, StandardCharsets.UTF_8)); Assertions.assertNotNull(output.get("@timestamp")); Assertions.assertEquals("1", output.get("@version")); @@ -267,10 +263,4 @@ private Map parseJson(final String text) throws IOException { return jsonFactory.createParser(text).readValueAs(new TypeReference>() { }); } - - private byte[] stripLumberjackMeta(byte[] encoded) { - byte[] result = new byte[encoded.length - 10]; - System.arraycopy(encoded, 10, result, 0, result.length); - return result; - } } diff --git a/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java b/src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java similarity index 84% rename from src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java rename to src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java index d190f579..35471b1c 100644 --- a/src/test/java/net/logstash/logback/encoder/converter/LumberjackPayloadConverterTest.java +++ b/src/test/java/net/logstash/logback/appender/LumberjackV2PayloadWrapperTest.java @@ -11,18 +11,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.logstash.logback.encoder.converter; +package net.logstash.logback.appender; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -class LumberjackPayloadConverterTest { - private final LumberjackPayloadConverter payloadConverter = new LumberjackPayloadConverter(); +class LumberjackV2PayloadWrapperTest { + private final AbstractBeatsTcpSocketAppender.LumberjackV2PayloadWrapper payloadConverter + = new AbstractBeatsTcpSocketAppender.LumberjackV2PayloadWrapper(new AtomicInteger(), 10); @Test void testPayloadProperlyConverted() { @@ -57,9 +59,9 @@ void testSequenceNumberIncremented() { String payload = "{\"message\":\"Log message\"}"; byte[] encoded = payload.getBytes(StandardCharsets.UTF_8); - for (int i = 1; i <= 5; i++) { + for (int i = 1; i <= 20; i++) { byte[] wrapped = assertDoesNotThrow(() -> payloadConverter.convert(encoded)); - assertSequenceNumberIs(wrapped, i); + assertSequenceNumberIs(wrapped, ((i - 1) % 10) + 1); } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index eb922fec..72b6f3ce 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -29,7 +29,6 @@ --> - customMessage caller