Skip to content

Commit 1039db0

Browse files
authored
CallTree recycling (#1592)
1 parent 56cc3b8 commit 1039db0

File tree

5 files changed

+79
-38
lines changed

5 files changed

+79
-38
lines changed

CHANGELOG.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ endif::[]
4343
* Serialize all stack trace frames when setting `stack_trace_limit=-1` instead of none - {pull}1571[#1571]
4444
* Fix `UnsupportedOperationException` when calling `ServletContext.getClassLoader()` - {pull}1576[#1576]
4545
* Fix improper request body capturing - {pull}1579[#1579]
46-
* avoid `NullPointerException` due to null return values instrumentation advices - {pull}1601[#1601]
46+
* Avoid `NullPointerException` due to null return values instrumentation advices - {pull}1601[#1601]
4747
* Update async-profiler to 1.8.3 {pull}1602[1602]
4848
* Use null-safe data structures to avoid `NullPointerException` {pull}1597[1597]
49+
* Fix memory leak in sampling profiler mechanism - {pull}1592[#1592]
4950
5051
[float]
5152
===== Refactors

apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/CallTree.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,16 @@ private void fillStackTrace(List<StackFrame> stackTrace) {
436436
}
437437
}
438438

439-
public void recycle(ObjectPool<CallTree> pool) {
439+
/**
440+
* Recycles this subtree to the provided pool recursively.
441+
* Note that this method ends by recycling {@code this} node (i.e. - this subtree root), which means that
442+
* <b>the caller of this method should make sure that no reference to this object is held anywhere</b>.
443+
* <p>ALSO NOTE: MAKE SURE NOT TO CALL THIS METHOD FOR {@link CallTree.Root} INSTANCES.</p>
444+
*
445+
* @param pool the pool to which all subtree nodes are to be recycled
446+
*/
447+
public final void recycle(ObjectPool<CallTree> pool) {
448+
assert !(this instanceof Root);
440449
List<CallTree> children = this.children;
441450
for (int i = 0, size = children.size(); i < size; i++) {
442451
children.get(i).recycle(pool);
@@ -719,11 +728,21 @@ public long getEpochMicros(long nanoTime) {
719728
return rootContext.getClock().getEpochMicros(nanoTime);
720729
}
721730

722-
public void recycle(ObjectPool<CallTree> pool) {
731+
/**
732+
* Recycles this tree to the provided pools.
733+
* First, all child subtrees are recycled recursively to the children pool.
734+
* Then, {@code this} root node is recycled to the root pool. This means that <b>the caller of this method
735+
* should make sure that no reference to this root object is held anywhere</b>.
736+
*
737+
* @param childrenPool object pool for all non-root nodes
738+
* @param rootPool object pool for root nodes
739+
*/
740+
public void recycle(ObjectPool<CallTree> childrenPool, ObjectPool<CallTree.Root> rootPool) {
723741
List<CallTree> children = getChildren();
724742
for (int i = 0, size = children.size(); i < size; i++) {
725-
children.get(i).recycle(pool);
743+
children.get(i).recycle(childrenPool);
726744
}
745+
rootPool.recycle(this);
727746
}
728747

729748
public void end(ObjectPool<CallTree> pool, long minDurationNs) {
@@ -733,8 +752,11 @@ public void end(ObjectPool<CallTree> pool, long minDurationNs) {
733752
@Override
734753
public void resetState() {
735754
super.resetState();
755+
rootContext.resetState();
736756
activeSpan = null;
737757
activationTimestamp = -1;
758+
Arrays.fill(activeSpanSerialized, (byte) 0);
759+
previousTopOfStack = null;
738760
topOfStack = null;
739761
activeSet.clear();
740762
}

apm-agent-plugins/apm-profiling-plugin/src/main/java/co/elastic/apm/agent/profiler/SamplingProfiler.java

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@
8383
* </p>
8484
* <p>
8585
* The {@link #onActivation} and {@link #onDeactivation} methods are called by {@link ProfilingActivationListener}
86-
* which register an {@link ActivationEvent} in to a {@linkplain #eventBuffer ring buffer} whenever a {@link Span}
86+
* which register an {@link ActivationEvent} to a {@linkplain #eventBuffer ring buffer} whenever a {@link Span}
8787
* gets {@link Span#activate()}d or {@link Span#deactivate()}d while a {@linkplain #profilingSessionOngoing profiling session is ongoing}.
8888
* A background thread consumes the {@link ActivationEvent}s and writes them to a {@linkplain #activationEventsBuffer direct buffer}
8989
* which is flushed to a {@linkplain #activationEventsFileChannel file}.
9090
* That is necessary because within a profiling session (which lasts 10s by default) there may be many more {@link ActivationEvent}s
91-
* than the ring buffer can hold {@link #RING_BUFFER_SIZE}.
91+
* than the ring buffer {@link #RING_BUFFER_SIZE can hold}.
9292
* The file can hold {@link #ACTIVATION_EVENTS_IN_FILE} events and each is {@link ActivationEvent#SERIALIZED_SIZE} in size.
9393
* This process is completely garbage free thanks to the {@link RingBuffer} acting as an object pool for {@link ActivationEvent}s.
9494
* </p>
@@ -335,12 +335,15 @@ public void run() {
335335
if (!config.isProfilingEnabled() || !tracer.isRunning()) {
336336
if (jfrParser != null) {
337337
jfrParser = null;
338-
rootPool.clear();
339-
callTreePool.clear();
340338
}
341339
if (!scheduler.isShutdown()) {
342340
scheduler.schedule(this, config.getProfilingInterval().getMillis(), TimeUnit.MILLISECONDS);
343341
}
342+
try {
343+
clear();
344+
} catch (Throwable throwable) {
345+
logger.error("Error while trying to clear profiler constructs", throwable);
346+
}
344347
return;
345348
}
346349

@@ -391,6 +394,8 @@ private void profile(TimeDuration sampleRate, TimeDuration profilingDuration) th
391394
if (!profiledThreads.isEmpty()) {
392395
restoreFilterState(asyncProfiler);
393396
}
397+
// Doesn't need to be atomic as this field is being updated only by a single thread
398+
//noinspection NonAtomicOperationOnVolatileField
394399
profilingSessions++;
395400

396401
// When post-processing is disabled activation events are ignored, but we still need to invoke this method
@@ -418,17 +423,22 @@ private void profile(TimeDuration sampleRate, TimeDuration profilingDuration) th
418423
* we have to tell async-profiler which threads it should profile after re-starting it.
419424
*/
420425
private void restoreFilterState(AsyncProfiler asyncProfiler) {
421-
threadMatcher.forEachThread(new ThreadMatcher.NonCapturingPredicate<Thread, Long2ObjectHashMap<?>.KeySet>() {
422-
@Override
423-
public boolean test(Thread thread, Long2ObjectHashMap<?>.KeySet profiledThreads) {
424-
return profiledThreads.contains(thread.getId());
425-
}
426-
}, profiledThreads.keySet(), new ThreadMatcher.NonCapturingConsumer<Thread, AsyncProfiler>() {
427-
@Override
428-
public void accept(Thread thread, AsyncProfiler asyncProfiler) {
429-
asyncProfiler.enableProfilingThread(thread);
430-
}
431-
}, asyncProfiler);
426+
threadMatcher.forEachThread(
427+
new ThreadMatcher.NonCapturingPredicate<Thread, Long2ObjectHashMap<?>.KeySet>() {
428+
@Override
429+
public boolean test(Thread thread, Long2ObjectHashMap<?>.KeySet profiledThreads) {
430+
return profiledThreads.contains(thread.getId());
431+
}
432+
},
433+
profiledThreads.keySet(),
434+
new ThreadMatcher.NonCapturingConsumer<Thread, AsyncProfiler>() {
435+
@Override
436+
public void accept(Thread thread, AsyncProfiler asyncProfiler) {
437+
asyncProfiler.enableProfilingThread(thread);
438+
}
439+
},
440+
asyncProfiler
441+
);
432442
}
433443

434444
private void consumeActivationEventsFromRingBufferAndWriteToFile(TimeDuration profilingDuration) throws Exception {
@@ -623,7 +633,9 @@ private static long peekLong(ByteBuffer buf) {
623633

624634
public void resetActivationEventBuffer() throws IOException {
625635
((Buffer) activationEventsBuffer).clear();
626-
activationEventsFileChannel.position(0L);
636+
if (activationEventsFileChannel != null && activationEventsFileChannel.isOpen()) {
637+
activationEventsFileChannel.position(0L);
638+
}
627639
}
628640

629641
private void flushActivationEvents() throws IOException {
@@ -691,7 +703,7 @@ void setProfilingSessionOngoing(boolean profilingSessionOngoing) {
691703

692704
public void clearProfiledThreads() {
693705
for (CallTree.Root root : profiledThreads.values()) {
694-
root.recycle(callTreePool);
706+
root.recycle(callTreePool, rootPool);
695707
}
696708
profiledThreads.clear();
697709
}
@@ -702,7 +714,6 @@ CallTree.Root getRoot() {
702714
}
703715

704716
void clear() throws IOException {
705-
profiledThreads.clear();
706717
// consume all remaining events from the ring buffer
707718
try {
708719
poller.poll(new EventPoller.Handler<ActivationEvent>() {
@@ -716,6 +727,9 @@ public boolean onEvent(ActivationEvent event, long sequence, boolean endOfBatch)
716727
throw new RuntimeException(e);
717728
}
718729
resetActivationEventBuffer();
730+
profiledThreads.clear();
731+
callTreePool.clear();
732+
rootPool.clear();
719733
}
720734

721735
int getProfilingSessions() {
@@ -834,8 +848,7 @@ private void startProfiling(SamplingProfiler samplingProfiler) {
834848
if (logger.isDebugEnabled()) {
835849
logger.warn("Illegal state when stopping profiling for thread {}: orphaned root", threadId);
836850
}
837-
orphaned.recycle(samplingProfiler.callTreePool);
838-
samplingProfiler.rootPool.recycle(orphaned);
851+
orphaned.recycle(samplingProfiler.callTreePool, samplingProfiler.rootPool);
839852
}
840853
}
841854

@@ -867,17 +880,19 @@ private void stopProfiling(SamplingProfiler samplingProfiler) {
867880
logger.debug("End call tree ({}) for thread {}", deserialize(samplingProfiler, traceContextBuffer), threadId);
868881
}
869882
samplingProfiler.profiledThreads.remove(threadId);
870-
callTree.end(samplingProfiler.callTreePool, samplingProfiler.getInferredSpansMinDurationNs());
871-
int createdSpans = callTree.spanify();
872-
if (logger.isDebugEnabled()) {
873-
if (createdSpans > 0) {
874-
logger.debug("Created spans ({}) for thread {}", createdSpans, threadId);
875-
} else {
876-
logger.debug("Created no spans for thread {} (count={})", threadId, callTree.getCount());
883+
try {
884+
callTree.end(samplingProfiler.callTreePool, samplingProfiler.getInferredSpansMinDurationNs());
885+
int createdSpans = callTree.spanify();
886+
if (logger.isDebugEnabled()) {
887+
if (createdSpans > 0) {
888+
logger.debug("Created spans ({}) for thread {}", createdSpans, threadId);
889+
} else {
890+
logger.debug("Created no spans for thread {} (count={})", threadId, callTree.getCount());
891+
}
877892
}
893+
} finally {
894+
callTree.recycle(samplingProfiler.callTreePool, samplingProfiler.rootPool);
878895
}
879-
callTree.recycle(samplingProfiler.callTreePool);
880-
samplingProfiler.rootPool.recycle(callTree);
881896
}
882897
}
883898

apm-agent-plugins/apm-profiling-plugin/src/test/java/co/elastic/apm/agent/profiler/CallTreeSpanifyTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.junit.jupiter.api.Test;
3838
import org.stagemonitor.configuration.ConfigurationRegistry;
3939

40+
import java.io.IOException;
4041
import java.util.List;
42+
import java.util.Objects;
4143
import java.util.concurrent.TimeUnit;
4244

4345
import static org.assertj.core.api.Assertions.assertThat;
@@ -59,7 +61,8 @@ void setUp() {
5961
}
6062

6163
@AfterEach
62-
void tearDown() {
64+
void tearDown() throws IOException {
65+
Objects.requireNonNull(tracer.getLifecycleListener(ProfilingFactory.class)).getProfiler().clear();
6366
reporter.assertRecycledAfterDecrementingReferences();
6467
tracer.stop();
6568
}

apm-agent-plugins/apm-profiling-plugin/src/test/java/co/elastic/apm/agent/profiler/CallTreeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import javax.annotation.Nonnull;
4747
import javax.annotation.Nullable;
48+
import java.io.IOException;
4849
import java.util.ArrayList;
4950
import java.util.Arrays;
5051
import java.util.HashMap;
@@ -57,7 +58,6 @@
5758

5859
import static java.util.stream.Collectors.toMap;
5960
import static org.assertj.core.api.Assertions.assertThat;
60-
import static org.mockito.Mockito.mock;
6161
import static org.mockito.Mockito.when;
6262

6363
class CallTreeTest {
@@ -77,12 +77,13 @@ void setUp() {
7777
}
7878

7979
@AfterEach
80-
void tearDown() {
80+
void tearDown() throws IOException {
81+
Objects.requireNonNull(tracer.getLifecycleListener(ProfilingFactory.class)).getProfiler().clear();
8182
tracer.stop();
8283
}
8384

8485
@Test
85-
void testCallTree() throws Exception {
86+
void testCallTree() {
8687
TraceContext traceContext = TraceContext.with64BitId(MockTracer.create());
8788
CallTree.Root root = CallTree.createRoot(NoopObjectPool.ofRecyclable(() -> new CallTree.Root(tracer)), traceContext.serialize(), traceContext.getServiceName(), 0);
8889
ObjectPool<CallTree> callTreePool = ListBasedObjectPool.ofRecyclable(new ArrayList<>(), Integer.MAX_VALUE, CallTree::new);
@@ -921,7 +922,6 @@ public static CallTree.Root getCallTree(ElasticApmTracer tracer, String[] stackT
921922
transaction.deactivate().end(nanoClock.nanoTime() / 1000);
922923
assertThat(root).isNotNull();
923924
root.end(callTreePool, 0);
924-
profiler.clear();
925925
return root;
926926
}
927927

0 commit comments

Comments
 (0)