Skip to content

Commit 966e576

Browse files
committed
Extract buffer management logic into KeyBuffer class and keep log4j appender minimal.
1 parent 4ff36d9 commit 966e576

File tree

5 files changed

+512
-60
lines changed

5 files changed

+512
-60
lines changed

powertools-logging/powertools-logging-log4j/src/main/java/software/amazon/lambda/powertools/logging/log4j/BufferingAppender.java

Lines changed: 55 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
package software.amazon.lambda.powertools.logging.log4j;
1616

1717
import java.io.Serializable;
18-
import java.util.ArrayDeque;
1918
import java.util.Deque;
20-
import java.util.Map;
21-
import java.util.concurrent.ConcurrentHashMap;
2219

2320
import org.apache.logging.log4j.Level;
2421
import org.apache.logging.log4j.core.Appender;
@@ -35,24 +32,64 @@
3532
import org.apache.logging.log4j.core.config.plugins.PluginElement;
3633
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
3734
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
38-
import org.apache.logging.log4j.message.SimpleMessage;
3935

4036
import software.amazon.lambda.powertools.common.internal.LambdaHandlerProcessor;
37+
import software.amazon.lambda.powertools.logging.internal.BufferManager;
38+
import software.amazon.lambda.powertools.logging.internal.KeyBuffer;
39+
40+
import static software.amazon.lambda.powertools.logging.log4j.BufferingAppenderConstants.NAME;
4141

4242
/**
43-
* A minimalistic Log4j2 appender that buffers log events based on trace ID
44-
* and flushes them when error logs are encountered or manually triggered.
43+
* A Log4j2 appender that buffers log events by AWS X-Ray trace ID for optimized Lambda logging.
44+
*
45+
* <p>This appender is designed specifically for AWS Lambda functions to reduce log ingestion
46+
* by buffering lower-level logs and only outputting them when errors occur, preserving
47+
* full context for troubleshooting while minimizing routine log volume.
48+
*
49+
* <h3>Key Features:</h3>
50+
* <ul>
51+
* <li><strong>Trace-based buffering:</strong> Groups logs by AWS X-Ray trace ID</li>
52+
* <li><strong>Selective output:</strong> Only buffers logs at or below configured verbosity level</li>
53+
* <li><strong>Auto-flush on errors:</strong> Automatically outputs buffered logs when ERROR/FATAL events occur</li>
54+
* <li><strong>Memory management:</strong> Prevents memory leaks with configurable buffer size limits</li>
55+
* <li><strong>Overflow protection:</strong> Warns when logs are discarded due to buffer limits</li>
56+
* </ul>
57+
*
58+
* <h3>Configuration Example:</h3>
59+
* <pre>{@code
60+
* <BufferingAppender name="BufferedAppender"
61+
* bufferAtVerbosity="INFO"
62+
* maxBytes="20480"
63+
* flushOnErrorLog="true">
64+
* <AppenderRef ref="ConsoleAppender"/>
65+
* </BufferingAppender>
66+
* }</pre>
67+
*
68+
* <h3>Configuration Parameters:</h3>
69+
* <ul>
70+
* <li><strong>bufferAtVerbosity:</strong> Log level to buffer (default: DEBUG). Logs at this level and below are buffered</li>
71+
* <li><strong>maxBytes:</strong> Maximum buffer size in bytes per trace ID (default: 20480)</li>
72+
* <li><strong>flushOnErrorLog:</strong> Whether to flush buffer on ERROR/FATAL logs (default: true)</li>
73+
* </ul>
74+
*
75+
* <h3>Behavior:</h3>
76+
* <ul>
77+
* <li>During Lambda INIT phase (no trace ID): logs are output directly</li>
78+
* <li>During Lambda execution (with trace ID): logs are buffered or output based on level</li>
79+
* <li>When buffer overflows: oldest logs are discarded and a warning is logged</li>
80+
* <li>On Lambda completion: remaining buffered logs can be flushed via {@link software.amazon.lambda.powertools.logging.PowertoolsLogging}</li>
81+
* </ul>
82+
*
83+
* @see software.amazon.lambda.powertools.logging.PowertoolsLogging#flushLogBuffer()
4584
*/
46-
@Plugin(name = "BufferingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
47-
public class BufferingAppender extends AbstractAppender {
85+
@Plugin(name = NAME, category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
86+
public class BufferingAppender extends AbstractAppender implements BufferManager {
4887

4988
private final AppenderRef[] appenderRefs;
5089
private final Configuration configuration;
5190
private final Level bufferAtVerbosity;
52-
private final int maxBytes;
5391
private final boolean flushOnErrorLog;
54-
private final Map<String, Deque<LogEvent>> bufferCache = new ConcurrentHashMap<>();
55-
private final ThreadLocal<Boolean> bufferOverflowTriggered = new ThreadLocal<>();
92+
private final KeyBuffer<String, LogEvent> buffer;
5693

5794
protected BufferingAppender(String name, Filter filter, Layout<? extends Serializable> layout,
5895
AppenderRef[] appenderRefs, Configuration configuration, Level bufferAtVerbosity, int maxBytes,
@@ -61,18 +98,18 @@ protected BufferingAppender(String name, Filter filter, Layout<? extends Seriali
6198
this.appenderRefs = appenderRefs;
6299
this.configuration = configuration;
63100
this.bufferAtVerbosity = bufferAtVerbosity;
64-
this.maxBytes = maxBytes;
65101
this.flushOnErrorLog = flushOnErrorLog;
102+
this.buffer = new KeyBuffer<>(maxBytes, event -> event.getMessage().getFormattedMessage().length());
66103
}
67104

68105
@Override
69106
public void append(LogEvent event) {
70107
if (appenderRefs == null || appenderRefs.length == 0) {
71108
return;
72109
}
110+
73111
LambdaHandlerProcessor.getXrayTraceId().ifPresentOrElse(
74112
traceId -> {
75-
// Check if we should buffer this log level
76113
if (shouldBuffer(event.getLevel())) {
77114
bufferEvent(traceId, event);
78115
} else {
@@ -102,62 +139,22 @@ private boolean shouldBuffer(Level level) {
102139
}
103140

104141
private void bufferEvent(String traceId, LogEvent event) {
105-
// Create immutable copy to prevent mutation
106142
LogEvent immutableEvent = Log4jLogEvent.createMemento(event);
107-
108-
// Check if single event is larger than buffer - discard if so
109-
int eventSize = immutableEvent.getMessage().getFormattedMessage().length();
110-
if (eventSize > maxBytes) {
111-
if (Boolean.TRUE != bufferOverflowTriggered.get()) {
112-
bufferOverflowTriggered.set(true);
113-
}
114-
return;
115-
}
116-
117-
bufferCache.computeIfAbsent(traceId, k -> new ArrayDeque<>()).add(immutableEvent);
118-
119-
// Simple size check - remove oldest if over limit
120-
Deque<LogEvent> buffer = bufferCache.get(traceId);
121-
while (getBufferSize(buffer) > maxBytes && !buffer.isEmpty()) {
122-
if (Boolean.TRUE != bufferOverflowTriggered.get()) {
123-
bufferOverflowTriggered.set(true);
124-
}
125-
buffer.removeFirst();
126-
}
127-
}
128-
129-
private int getBufferSize(Deque<LogEvent> buffer) {
130-
return buffer.stream()
131-
.mapToInt(event -> event.getMessage().getFormattedMessage().length())
132-
.sum();
143+
buffer.add(traceId, immutableEvent);
133144
}
134145

135146
public void clearBuffer() {
136-
LambdaHandlerProcessor.getXrayTraceId().ifPresent(bufferCache::remove);
147+
LambdaHandlerProcessor.getXrayTraceId().ifPresent(buffer::clear);
137148
}
138149

139150
public void flushBuffer() {
140151
LambdaHandlerProcessor.getXrayTraceId().ifPresent(this::flushBuffer);
141152
}
142153

143154
private void flushBuffer(String traceId) {
144-
// Emit buffer overflow warning if it occurred
145-
if (Boolean.TRUE == bufferOverflowTriggered.get()) {
146-
// Create LogEvent directly since Log4j status logger may not reach target appenders
147-
LogEvent warningEvent = Log4jLogEvent.newBuilder()
148-
.setLoggerName("BufferingAppender")
149-
.setLevel(Level.WARN)
150-
.setMessage(new SimpleMessage(
151-
"Some logs are not displayed because they were evicted from the buffer. Increase buffer size to store more logs in the buffer."))
152-
.setTimeMillis(System.currentTimeMillis())
153-
.build();
154-
callAppenders(warningEvent);
155-
bufferOverflowTriggered.remove();
156-
}
157-
158-
Deque<LogEvent> buffer = bufferCache.remove(traceId);
159-
if (buffer != null) {
160-
buffer.forEach(this::callAppenders);
155+
Deque<LogEvent> events = buffer.removeAll(traceId);
156+
if (events != null) {
157+
events.forEach(this::callAppenders);
161158
}
162159
}
163160

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2023 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.logging.log4j;
16+
17+
/**
18+
* Constants for BufferingAppender configuration and references.
19+
*/
20+
public final class BufferingAppenderConstants {
21+
22+
/**
23+
* The name used for BufferingAppender in Log4j2 configuration and references.
24+
*/
25+
public static final String NAME = "BufferingAppender";
26+
27+
private BufferingAppenderConstants() {
28+
// Utility class
29+
}
30+
}

powertools-logging/powertools-logging-log4j/src/main/java/software/amazon/lambda/powertools/logging/log4j/internal/Log4jLoggingManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import software.amazon.lambda.powertools.logging.internal.LoggingManager;
2525
import software.amazon.lambda.powertools.logging.log4j.BufferingAppender;
2626

27+
import static software.amazon.lambda.powertools.logging.log4j.BufferingAppenderConstants.NAME;
28+
2729
/**
2830
* LoggingManager for Log4j2 that provides log level management and buffer operations.
2931
* Implements both {@link LoggingManager} and {@link BufferManager} interfaces.
@@ -57,7 +59,7 @@ public org.slf4j.event.Level getLogLevel(Logger logger) {
5759
public void flushBuffer() {
5860
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
5961
BufferingAppender bufferingAppender = (BufferingAppender) ctx.getConfiguration()
60-
.getAppender("BufferedAppender");
62+
.getAppender(NAME);
6163
if (bufferingAppender != null) {
6264
bufferingAppender.flushBuffer();
6365
}
@@ -70,7 +72,7 @@ public void flushBuffer() {
7072
public void clearBuffer() {
7173
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
7274
BufferingAppender bufferingAppender = (BufferingAppender) ctx.getConfiguration()
73-
.getAppender("BufferedAppender");
75+
.getAppender(NAME);
7476
if (bufferingAppender != null) {
7577
bufferingAppender.clearBuffer();
7678
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2023 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.logging.internal;
16+
17+
import java.util.ArrayDeque;
18+
import java.util.Deque;
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.function.Function;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* Generic buffer data structure for storing events by key with size-based eviction.
28+
*
29+
* <p>This buffer maintains separate event queues for each key, with configurable size limits
30+
* to prevent memory exhaustion. When buffers exceed their size limit, older events are
31+
* automatically evicted to make room for newer ones.
32+
*
33+
* <h3>Key Features:</h3>
34+
* <ul>
35+
* <li><strong>Per-key buffering:</strong> Each key maintains its own independent buffer</li>
36+
* <li><strong>Size-based eviction:</strong> Oldest events are removed when buffer size exceeds limit</li>
37+
* <li><strong>Overflow protection:</strong> Events larger than buffer size are rejected entirely</li>
38+
* <li><strong>Thread-safe:</strong> Supports concurrent access across different keys</li>
39+
* <li><strong>Overflow tracking:</strong> Logs warnings when events are evicted or rejected</li>
40+
* </ul>
41+
*
42+
* <h3>Eviction Behavior:</h3>
43+
* <ul>
44+
* <li><strong>Buffer overflow:</strong> When adding an event would exceed maxBytes, oldest events are evicted first</li>
45+
* <li><strong>Large events:</strong> Events larger than maxBytes are rejected without evicting existing events</li>
46+
* <li><strong>FIFO eviction:</strong> Events are removed in first-in-first-out order during overflow</li>
47+
* <li><strong>Overflow warnings:</strong> Automatic logging when events are evicted or rejected</li>
48+
* </ul>
49+
*
50+
* <h3>Thread Safety:</h3>
51+
* <p>This class is thread-safe for concurrent operations. Different keys can be accessed
52+
* simultaneously, and operations on the same key are synchronized to prevent data corruption.
53+
*
54+
* @param <K> the type of key used for buffering (e.g., String for trace IDs)
55+
* @param <T> the type of events to buffer (must be compatible with the size calculator)
56+
*/
57+
public class KeyBuffer<K, T> {
58+
59+
private static final Logger logger = LoggerFactory.getLogger(KeyBuffer.class);
60+
61+
private final Map<K, Deque<T>> keyBufferCache = new ConcurrentHashMap<>();
62+
private final Map<K, Boolean> overflowTriggered = new ConcurrentHashMap<>();
63+
private final int maxBytes;
64+
private final Function<T, Integer> sizeCalculator;
65+
66+
public KeyBuffer(int maxBytes, Function<T, Integer> sizeCalculator) {
67+
this.maxBytes = maxBytes;
68+
this.sizeCalculator = sizeCalculator;
69+
}
70+
71+
public void add(K key, T event) {
72+
int eventSize = sizeCalculator.apply(event);
73+
if (eventSize > maxBytes) {
74+
overflowTriggered.put(key, true);
75+
return;
76+
}
77+
78+
Deque<T> buffer = keyBufferCache.computeIfAbsent(key, k -> new ArrayDeque<>());
79+
synchronized (buffer) {
80+
buffer.add(event);
81+
while (getBufferSize(buffer) > maxBytes && !buffer.isEmpty()) {
82+
overflowTriggered.put(key, true);
83+
buffer.removeFirst();
84+
}
85+
}
86+
}
87+
88+
public Deque<T> removeAll(K key) {
89+
logOverflowWarningIfNeeded(key);
90+
Deque<T> buffer = keyBufferCache.remove(key);
91+
if (buffer != null) {
92+
synchronized (buffer) {
93+
return new ArrayDeque<>(buffer);
94+
}
95+
}
96+
return buffer;
97+
}
98+
99+
public void clear(K key) {
100+
keyBufferCache.remove(key);
101+
overflowTriggered.remove(key);
102+
}
103+
104+
private void logOverflowWarningIfNeeded(K key) {
105+
if (Boolean.TRUE.equals(overflowTriggered.remove(key))) {
106+
logger.warn(
107+
"Some logs are not displayed because they were evicted from the buffer. Increase buffer size to store more logs in the buffer.");
108+
}
109+
}
110+
111+
private int getBufferSize(Deque<T> buffer) {
112+
return buffer.stream().mapToInt(sizeCalculator::apply).sum();
113+
}
114+
}

0 commit comments

Comments
 (0)