|
| 1 | +/* |
| 2 | + * Copyright 2023 Amazon.com, Inc. or its affiliates. |
| 3 | + * Licensed under the Apache License, Version 2.0 (the |
| 4 | + * "License"); you may not use this file except in compliance |
| 5 | + * with the License. You may obtain a copy of the License at |
| 6 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 7 | + * Unless required by applicable law or agreed to in writing, software |
| 8 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 9 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 10 | + * See the License for the specific language governing permissions and |
| 11 | + * limitations under the License. |
| 12 | + * |
| 13 | + */ |
| 14 | + |
| 15 | +package software.amazon.lambda.powertools.logging.log4j; |
| 16 | + |
| 17 | +import java.io.Serializable; |
| 18 | +import java.util.ArrayDeque; |
| 19 | +import java.util.Deque; |
| 20 | +import java.util.Map; |
| 21 | +import java.util.concurrent.ConcurrentHashMap; |
| 22 | + |
| 23 | +import org.apache.logging.log4j.Level; |
| 24 | +import org.apache.logging.log4j.core.Appender; |
| 25 | +import org.apache.logging.log4j.core.Core; |
| 26 | +import org.apache.logging.log4j.core.Filter; |
| 27 | +import org.apache.logging.log4j.core.Layout; |
| 28 | +import org.apache.logging.log4j.core.LogEvent; |
| 29 | +import org.apache.logging.log4j.core.appender.AbstractAppender; |
| 30 | +import org.apache.logging.log4j.core.config.AppenderRef; |
| 31 | +import org.apache.logging.log4j.core.config.Configuration; |
| 32 | +import org.apache.logging.log4j.core.config.plugins.Plugin; |
| 33 | +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; |
| 34 | +import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; |
| 35 | +import org.apache.logging.log4j.core.config.plugins.PluginElement; |
| 36 | +import org.apache.logging.log4j.core.config.plugins.PluginFactory; |
| 37 | +import org.apache.logging.log4j.core.impl.Log4jLogEvent; |
| 38 | + |
| 39 | +import software.amazon.lambda.powertools.common.internal.LambdaHandlerProcessor; |
| 40 | + |
| 41 | +/** |
| 42 | + * A minimalistic Log4j2 appender that buffers log events based on trace ID |
| 43 | + * and flushes them when error logs are encountered or manually triggered. |
| 44 | + */ |
| 45 | +@Plugin(name = "BufferingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE) |
| 46 | +public class BufferingAppender extends AbstractAppender { |
| 47 | + |
| 48 | + private final AppenderRef[] appenderRefs; |
| 49 | + private final Configuration configuration; |
| 50 | + private final Level bufferAtVerbosity; |
| 51 | + private final int maxBytes; |
| 52 | + private final boolean flushOnErrorLog; |
| 53 | + private final Map<String, Deque<LogEvent>> bufferCache = new ConcurrentHashMap<>(); |
| 54 | + private final ThreadLocal<Boolean> bufferOverflowWarned = new ThreadLocal<>(); |
| 55 | + |
| 56 | + protected BufferingAppender(String name, Filter filter, Layout<? extends Serializable> layout, |
| 57 | + AppenderRef[] appenderRefs, Configuration configuration, Level bufferAtVerbosity, int maxBytes, |
| 58 | + boolean flushOnErrorLog) { |
| 59 | + super(name, filter, layout, false, null); |
| 60 | + this.appenderRefs = appenderRefs; |
| 61 | + this.configuration = configuration; |
| 62 | + this.bufferAtVerbosity = bufferAtVerbosity; |
| 63 | + this.maxBytes = maxBytes; |
| 64 | + this.flushOnErrorLog = flushOnErrorLog; |
| 65 | + } |
| 66 | + |
| 67 | + @Override |
| 68 | + public void append(LogEvent event) { |
| 69 | + if (appenderRefs == null || appenderRefs.length == 0) { |
| 70 | + return; |
| 71 | + } |
| 72 | + LambdaHandlerProcessor.getXrayTraceId().ifPresentOrElse( |
| 73 | + traceId -> { |
| 74 | + // Check if we should buffer this log level |
| 75 | + if (shouldBuffer(event.getLevel())) { |
| 76 | + bufferEvent(traceId, event); |
| 77 | + } else { |
| 78 | + callAppenders(event); |
| 79 | + } |
| 80 | + |
| 81 | + // Flush buffer on error logs if configured |
| 82 | + if (flushOnErrorLog && event.getLevel().isMoreSpecificThan(Level.WARN)) { |
| 83 | + flushBuffer(traceId); |
| 84 | + } |
| 85 | + }, |
| 86 | + () -> callAppenders(event) // No trace ID (INIT phase), log directly |
| 87 | + ); |
| 88 | + } |
| 89 | + |
| 90 | + private void callAppenders(LogEvent event) { |
| 91 | + for (AppenderRef ref : appenderRefs) { |
| 92 | + Appender appender = configuration.getAppender(ref.getRef()); |
| 93 | + if (appender != null) { |
| 94 | + appender.append(event); |
| 95 | + } |
| 96 | + } |
| 97 | + } |
| 98 | + |
| 99 | + private boolean shouldBuffer(Level level) { |
| 100 | + return level.isLessSpecificThan(bufferAtVerbosity) || level.equals(bufferAtVerbosity); |
| 101 | + } |
| 102 | + |
| 103 | + private void bufferEvent(String traceId, LogEvent event) { |
| 104 | + // Create immutable copy to prevent mutation |
| 105 | + LogEvent immutableEvent = Log4jLogEvent.createMemento(event); |
| 106 | + |
| 107 | + // Check if single event is larger than buffer - discard if so |
| 108 | + int eventSize = immutableEvent.getMessage().getFormattedMessage().length(); |
| 109 | + if (eventSize > maxBytes) { |
| 110 | + if (Boolean.TRUE != bufferOverflowWarned.get()) { |
| 111 | + bufferOverflowWarned.set(true); |
| 112 | + } |
| 113 | + return; |
| 114 | + } |
| 115 | + |
| 116 | + bufferCache.computeIfAbsent(traceId, k -> new ArrayDeque<>()).add(immutableEvent); |
| 117 | + |
| 118 | + // Simple size check - remove oldest if over limit |
| 119 | + Deque<LogEvent> buffer = bufferCache.get(traceId); |
| 120 | + while (getBufferSize(buffer) > maxBytes && !buffer.isEmpty()) { |
| 121 | + if (Boolean.TRUE != bufferOverflowWarned.get()) { |
| 122 | + bufferOverflowWarned.set(true); |
| 123 | + } |
| 124 | + buffer.removeFirst(); |
| 125 | + } |
| 126 | + } |
| 127 | + |
| 128 | + private int getBufferSize(Deque<LogEvent> buffer) { |
| 129 | + return buffer.stream() |
| 130 | + .mapToInt(event -> event.getMessage().getFormattedMessage().length()) |
| 131 | + .sum(); |
| 132 | + } |
| 133 | + |
| 134 | + public void clearBuffer() { |
| 135 | + LambdaHandlerProcessor.getXrayTraceId().ifPresent(bufferCache::remove); |
| 136 | + } |
| 137 | + |
| 138 | + public void flushBuffer() { |
| 139 | + LambdaHandlerProcessor.getXrayTraceId().ifPresent(this::flushBuffer); |
| 140 | + } |
| 141 | + |
| 142 | + private void flushBuffer(String traceId) { |
| 143 | + Deque<LogEvent> buffer = bufferCache.remove(traceId); |
| 144 | + if (buffer != null) { |
| 145 | + // Emit buffer overflow warning if it occurred |
| 146 | + if (Boolean.TRUE == bufferOverflowWarned.get()) { |
| 147 | + LOGGER.warn("Buffer size exceeded for trace ID: {}. Some log events were discarded.", traceId); |
| 148 | + bufferOverflowWarned.remove(); |
| 149 | + } |
| 150 | + buffer.forEach(this::callAppenders); |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + @PluginFactory |
| 155 | + public static BufferingAppender createAppender( |
| 156 | + @PluginAttribute("name") String name, |
| 157 | + @PluginElement("Filter") Filter filter, |
| 158 | + @PluginElement("Layout") Layout<? extends Serializable> layout, |
| 159 | + @PluginElement("AppenderRef") AppenderRef[] appenderRefs, |
| 160 | + @PluginConfiguration Configuration configuration, |
| 161 | + @PluginAttribute(value = "bufferAtVerbosity", defaultString = "DEBUG") String bufferAtVerbosity, |
| 162 | + @PluginAttribute(value = "maxBytes", defaultInt = 20480) int maxBytes, |
| 163 | + @PluginAttribute(value = "flushOnErrorLog", defaultBoolean = true) boolean flushOnErrorLog) { |
| 164 | + |
| 165 | + if (name == null) { |
| 166 | + LOGGER.error("No name provided for BufferingAppender"); |
| 167 | + return null; |
| 168 | + } |
| 169 | + |
| 170 | + Level level = Level.getLevel(bufferAtVerbosity); |
| 171 | + if (level == null) { |
| 172 | + level = Level.DEBUG; |
| 173 | + } |
| 174 | + |
| 175 | + return new BufferingAppender(name, filter, layout, appenderRefs, configuration, level, maxBytes, |
| 176 | + flushOnErrorLog); |
| 177 | + } |
| 178 | +} |
0 commit comments