diff --git a/blibli-backend-framework-kafka/README.md b/blibli-backend-framework-kafka/README.md index 612abdd..37be73a 100644 --- a/blibli-backend-framework-kafka/README.md +++ b/blibli-backend-framework-kafka/README.md @@ -136,4 +136,12 @@ blibli.backend.kafka.logging.before-send=true blibli.backend.kafka.logging.before-consume=true blibli.backend.kafka.logging.after-success-consume=true blibli.backend.kafka.logging.after-failed-consume=true +``` + +By default if the log interceptor is turned on it will show the payload on each log, we can make it excluded from the log to save log spaces by setting these properties +```properties +blibli.backend.kafka.logging.before-send-exclude-event=true +blibli.backend.kafka.logging.before-consume-exclude-event=true +blibli.backend.kafka.logging.after-success-consume-exclude-event=true +blibli.backend.kafka.logging.after-failed-consume-exclude-event=true ``` \ No newline at end of file diff --git a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaConsumerInterceptor.java b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaConsumerInterceptor.java index 8bcfbd4..12524af 100644 --- a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaConsumerInterceptor.java +++ b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaConsumerInterceptor.java @@ -20,7 +20,11 @@ public class LogKafkaConsumerInterceptor implements KafkaConsumerInterceptor, Or @Override public boolean beforeConsume(ConsumerRecord consumerRecord) { if (kafkaProperties.getLogging().isBeforeConsume()) { - log.info("Receive from topic {} with message {}:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()); + if (kafkaProperties.getLogging().isBeforeConsumeExcludeEvent()) { + log.info("Receive from topic {} with message key: {}", consumerRecord.topic(), consumerRecord.key()); + } else { + log.info("Receive from topic {} with message {}:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()); + } } return false; } @@ -28,14 +32,22 @@ public boolean beforeConsume(ConsumerRecord consumerRecord) { @Override public void afterSuccessConsume(ConsumerRecord consumerRecord) { if (kafkaProperties.getLogging().isAfterSuccessConsume()) { - log.info("Success consume from topic {} with message {}:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()); + if (kafkaProperties.getLogging().isAfterSuccessExcludeEvent()) { + log.info("Success consume from topic {} with message key: {}", consumerRecord.topic(), consumerRecord.key()); + } else { + log.info("Success consume from topic {} with message {}:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value()); + } } } @Override public void afterFailedConsume(ConsumerRecord consumerRecord, Throwable throwable) { if (kafkaProperties.getLogging().isAfterFailedConsume()) { - log.error(String.format("Failed consume from topic %s with message %s:%s and exception %s", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value(), throwable.getMessage()), throwable); + if (kafkaProperties.getLogging().isAfterFailedExcludeEvent()) { + log.error(String.format("Failed consume from topic %s with message key: %s and exception %s", consumerRecord.topic(), consumerRecord.key(), throwable.getMessage()), throwable); + } else { + log.error(String.format("Failed consume from topic %s with message %s:%s and exception %s", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value(), throwable.getMessage()), throwable); + } } } } diff --git a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaProducerInterceptor.java b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaProducerInterceptor.java index 123c0ae..b8ca7ef 100644 --- a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaProducerInterceptor.java +++ b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/interceptor/log/LogKafkaProducerInterceptor.java @@ -20,7 +20,11 @@ public class LogKafkaProducerInterceptor implements KafkaProducerInterceptor, Or @Override public ProducerEvent beforeSend(ProducerEvent event) { if (kafkaProperties.getLogging().isBeforeSend()) { - log.info("Send message to kafka : {}", event); + if (kafkaProperties.getLogging().isBeforeSendExcludeEvent()) { + log.info("Send message to kafka : {topic: {}, key: {}}", event.getTopic(), event.getKey()); + } else { + log.info("Send message to kafka : {}", event); + } } return event; } diff --git a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/properties/KafkaProperties.java b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/properties/KafkaProperties.java index 2f53b87..a785cde 100644 --- a/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/properties/KafkaProperties.java +++ b/blibli-backend-framework-kafka/src/main/java/com/blibli/oss/backend/kafka/properties/KafkaProperties.java @@ -31,6 +31,14 @@ public static class LoggingProperties { private boolean afterFailedConsume = false; + private boolean beforeSendExcludeEvent = false; + + private boolean beforeConsumeExcludeEvent = false; + + private boolean afterSuccessExcludeEvent = false; + + private boolean afterFailedExcludeEvent = false; + } } diff --git a/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/consumer/KafkaConsumerInterceptorTest.java b/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/consumer/KafkaConsumerInterceptorTest.java index 64da459..34e105b 100644 --- a/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/consumer/KafkaConsumerInterceptorTest.java +++ b/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/consumer/KafkaConsumerInterceptorTest.java @@ -3,6 +3,7 @@ import com.blibli.oss.backend.kafka.interceptor.KafkaConsumerInterceptor; import com.blibli.oss.backend.kafka.producer.KafkaProducer; import com.blibli.oss.backend.kafka.producer.helper.KafkaHelper; +import com.blibli.oss.backend.kafka.properties.KafkaProperties; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -13,6 +14,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -24,7 +27,9 @@ import java.lang.reflect.Method; import java.util.Collections; -@ExtendWith(SpringExtension.class) +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith({SpringExtension.class, OutputCaptureExtension.class}) @SpringBootTest(classes = KafkaConsumerInterceptorTest.Application.class) @EmbeddedKafka( partitions = 1, @@ -53,6 +58,9 @@ class KafkaConsumerInterceptorTest { @Autowired private EmbeddedKafkaBroker broker; + @Autowired + private KafkaProperties kafkaProperties; + private Consumer consumer; @BeforeEach @@ -62,6 +70,10 @@ void setUp() { helloInterceptor.reset(); counterInterceptor.reset(); + + kafkaProperties.getLogging().setBeforeConsumeExcludeEvent(false); + kafkaProperties.getLogging().setAfterSuccessExcludeEvent(false); + kafkaProperties.getLogging().setAfterFailedExcludeEvent(false); } @AfterEach @@ -70,7 +82,7 @@ void tearDown() { } @Test - @Order(4) + @Order(6) void testListener() throws InterruptedException { kafkaProducer.sendAndSubscribe(TOPIC, "key", "value", Schedulers.boundedElastic()); Thread.sleep(2_000L); // wait 5 seconds until message received by listener @@ -88,18 +100,73 @@ void testListener() throws InterruptedException { } @Test - @Order(3) - void testListenerWithInterceptor() throws InterruptedException { + @Order(4) + void testListenerWithInterceptor(CapturedOutput output) throws InterruptedException { + kafkaProducer.sendAndSubscribe(TOPIC_GOODBYE, "key", "value", Schedulers.boundedElastic()); + Thread.sleep(4_000L); // wait 5 seconds until message received by listener + + Assertions.assertEquals(1, counterInterceptor.getBeforeConsume()); + Assertions.assertEquals(1, counterInterceptor.getAfterSuccessConsume()); + + assertTrue( + output.getOut() + .contains("Receive from topic " + TOPIC_GOODBYE + " with message key:value") + ); + assertTrue( + output.getOut() + .contains("Success consume from topic " + TOPIC_GOODBYE + " with message key:value") + ); + } + + @Test + @Order(5) + void testListenerWithInterceptorExcludeEventOnLogs(CapturedOutput output) throws InterruptedException { + kafkaProperties.getLogging().setBeforeConsumeExcludeEvent(true); + kafkaProperties.getLogging().setAfterSuccessExcludeEvent(true); + kafkaProducer.sendAndSubscribe(TOPIC_GOODBYE, "key", "value", Schedulers.boundedElastic()); Thread.sleep(4_000L); // wait 5 seconds until message received by listener Assertions.assertEquals(1, counterInterceptor.getBeforeConsume()); Assertions.assertEquals(1, counterInterceptor.getAfterSuccessConsume()); + + assertTrue( + output.getOut() + .contains("Receive from topic " + TOPIC_GOODBYE + " with message key: key") + ); + assertTrue( + output.getOut() + .contains("Success consume from topic " + TOPIC_GOODBYE + " with message key: key") + ); } @Test @Order(2) - void testInterceptorError() throws InterruptedException { + void testInterceptorError(CapturedOutput output) throws InterruptedException { + kafkaProducer.sendAndSubscribe(TOPIC, "error", "value", Schedulers.boundedElastic()); + Thread.sleep(2_000L); // wait 5 seconds until message received by listener + + Assertions.assertEquals(helloInterceptor.beforeConsume, "value"); + Assertions.assertEquals(helloInterceptor.afterFailed, "value"); + Assertions.assertEquals(0, counterInterceptor.getBeforeConsume()); + Assertions.assertEquals(0, counterInterceptor.getAfterSuccessConsume()); + + assertTrue( + output.getOut() + .contains("Receive from topic " + TOPIC + " with message error:value") + ); + assertTrue( + output.getOut() + .contains("Failed consume from topic " + TOPIC + " with message error:value") + ); + } + + @Test + @Order(3) + void testInterceptorErrorExcludeEventOnLogs(CapturedOutput output) throws InterruptedException { + kafkaProperties.getLogging().setBeforeConsumeExcludeEvent(true); + kafkaProperties.getLogging().setAfterFailedExcludeEvent(true); + kafkaProducer.sendAndSubscribe(TOPIC, "error", "value", Schedulers.boundedElastic()); Thread.sleep(2_000L); // wait 5 seconds until message received by listener @@ -107,6 +174,15 @@ void testInterceptorError() throws InterruptedException { Assertions.assertEquals(helloInterceptor.afterFailed, "value"); Assertions.assertEquals(0, counterInterceptor.getBeforeConsume()); Assertions.assertEquals(0, counterInterceptor.getAfterSuccessConsume()); + + assertTrue( + output.getOut() + .contains("Receive from topic " + TOPIC + " with message key: error") + ); + assertTrue( + output.getOut() + .contains("Failed consume from topic " + TOPIC + " with message key: error") + ); } @Test diff --git a/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/producer/KafkaProducerInterceptorTest.java b/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/producer/KafkaProducerInterceptorTest.java index e0aac31..f89ce96 100644 --- a/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/producer/KafkaProducerInterceptorTest.java +++ b/blibli-backend-framework-kafka/src/test/java/com/blibli/oss/backend/kafka/interceptor/producer/KafkaProducerInterceptorTest.java @@ -4,6 +4,7 @@ import com.blibli.oss.backend.kafka.model.ProducerEvent; import com.blibli.oss.backend.kafka.producer.KafkaProducer; import com.blibli.oss.backend.kafka.producer.helper.KafkaHelper; +import com.blibli.oss.backend.kafka.properties.KafkaProperties; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.Data; @@ -17,6 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; import org.springframework.context.annotation.Bean; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -30,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.*; -@ExtendWith(SpringExtension.class) +@ExtendWith({SpringExtension.class, OutputCaptureExtension.class}) @SpringBootTest(classes = KafkaProducerInterceptorTest.Application.class) @EmbeddedKafka( topics = KafkaProducerInterceptorTest.TOPIC @@ -51,10 +54,15 @@ class KafkaProducerInterceptorTest { @Autowired private ErrorFlag errorFlag; + @Autowired + private KafkaProperties kafkaProperties; + @BeforeEach void setUp() { consumer = KafkaHelper.newConsumer(broker); consumer.subscribe(Collections.singletonList(TOPIC)); + + kafkaProperties.getLogging().setBeforeConsumeExcludeEvent(false); } @AfterEach @@ -63,7 +71,26 @@ void tearDown() { } @Test - void testInterceptor() { + void testInterceptor(CapturedOutput output) { + errorFlag.setError(false); + kafkaProducer.sendAndSubscribe(TOPIC, "key", "value", Schedulers.boundedElastic()); + + ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, TOPIC); + + Assertions.assertEquals(record.key(), "key"); + Assertions.assertEquals(record.value(), "value changed"); + + assertTrue( + output.getOut() + .contains("Send message to kafka : " + + "ProducerEvent(topic=KafkaProducerInterceptorTest, partition=null, headers=null, key=key, value=value, timestamp=null)") + ); + } + + @Test + void testInterceptorExcludeEventOnLogs(CapturedOutput output) { + kafkaProperties.getLogging().setBeforeSendExcludeEvent(true); + errorFlag.setError(false); kafkaProducer.sendAndSubscribe(TOPIC, "key", "value", Schedulers.boundedElastic()); @@ -71,6 +98,12 @@ void testInterceptor() { Assertions.assertEquals(record.key(), "key"); Assertions.assertEquals(record.value(), "value changed"); + + assertTrue( + output.getOut() + .contains("Send message to kafka : " + + "{topic: KafkaProducerInterceptorTest, key: key}") + ); } @Test