Skip to content

Commit 944b9b6

Browse files
authored
Fix concurrency bug in TransportGetStackTracesAction (#136619)
If multiple concurrent mget operations fail, the submitListener is completed more than once, which triggers an assertion in tests.
1 parent 99fe696 commit 944b9b6

File tree

2 files changed

+80
-22
lines changed

2 files changed

+80
-22
lines changed

x-pack/plugin/profiling/src/main/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesAction.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.settings.Setting;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.util.concurrent.CountDown;
2627
import org.elasticsearch.common.util.concurrent.EsExecutors;
2728
import org.elasticsearch.core.TimeValue;
2829
import org.elasticsearch.index.Index;
@@ -65,6 +66,7 @@
6566
import java.util.concurrent.Executor;
6667
import java.util.concurrent.atomic.AtomicInteger;
6768
import java.util.concurrent.atomic.AtomicLong;
69+
import java.util.concurrent.atomic.AtomicReference;
6870
import java.util.function.Function;
6971
import java.util.random.RandomGenerator;
7072

@@ -854,17 +856,18 @@ private void retrieveStackTraceDetails(
854856
/**
855857
* Collects stack trace details which are retrieved concurrently and sends a response only when all details are known.
856858
*/
857-
private static class DetailsHandler {
859+
static class DetailsHandler {
858860
private static final String[] PATH_FILE_NAME = new String[] { "Executable", "file", "name" };
859861
private final GetStackTracesResponseBuilder builder;
860862
private final ActionListener<GetStackTracesResponse> submitListener;
861863
private final Map<String, String> executables;
862864
private final Map<String, StackFrame> stackFrames;
863-
private final AtomicInteger expectedSlices;
865+
private final AtomicReference<Exception> failure = new AtomicReference<>();
866+
private final CountDown countDown;
864867
private final AtomicInteger totalInlineFrames = new AtomicInteger();
865868
private final StopWatch watch = new StopWatch("retrieveStackTraceDetails");
866869

867-
private DetailsHandler(
870+
DetailsHandler(
868871
GetStackTracesResponseBuilder builder,
869872
ActionListener<GetStackTracesResponse> submitListener,
870873
int executableCount,
@@ -878,16 +881,14 @@ private DetailsHandler(
878881
this.stackFrames = new ConcurrentHashMap<>(stackFrameCount);
879882
// for deciding when we're finished it is irrelevant where a slice originated, so we can
880883
// simplify state handling by treating them equally.
881-
this.expectedSlices = new AtomicInteger(expectedExecutableSlices + expectedStackFrameSlices);
884+
this.countDown = new CountDown(expectedExecutableSlices + expectedStackFrameSlices);
882885
}
883886

884-
public void onStackFramesResponse(MultiGetResponse multiGetItemResponses) {
887+
void onStackFramesResponse(MultiGetResponse multiGetItemResponses) {
885888
for (MultiGetItemResponse frame : multiGetItemResponses) {
886889
if (frame.isFailed()) {
887-
submitListener.onFailure(frame.getFailure().getFailure());
888-
return;
889-
}
890-
if (frame.getResponse().isExists()) {
890+
recordFailure(frame.getFailure().getFailure());
891+
} else if (frame.getResponse().isExists()) {
891892
// Duplicates are expected as we query multiple indices - do a quick pre-check before we deserialize a response
892893
if (stackFrames.containsKey(frame.getId()) == false) {
893894
StackFrame stackFrame = StackFrame.fromSource(frame.getResponse().getSource());
@@ -904,13 +905,11 @@ public void onStackFramesResponse(MultiGetResponse multiGetItemResponses) {
904905
mayFinish();
905906
}
906907

907-
public void onExecutableDetailsResponse(MultiGetResponse multiGetItemResponses) {
908+
void onExecutableDetailsResponse(MultiGetResponse multiGetItemResponses) {
908909
for (MultiGetItemResponse executable : multiGetItemResponses) {
909910
if (executable.isFailed()) {
910-
submitListener.onFailure(executable.getFailure().getFailure());
911-
return;
912-
}
913-
if (executable.getResponse().isExists()) {
911+
recordFailure(executable.getFailure().getFailure());
912+
} else if (executable.getResponse().isExists()) {
914913
// Duplicates are expected as we query multiple indices - do a quick pre-check before we deserialize a response
915914
if (executables.containsKey(executable.getId()) == false) {
916915
Map<String, Object> source = executable.getResponse().getSource();
@@ -934,14 +933,29 @@ public void onExecutableDetailsResponse(MultiGetResponse multiGetItemResponses)
934933
mayFinish();
935934
}
936935

937-
public void mayFinish() {
938-
if (expectedSlices.decrementAndGet() == 0) {
939-
builder.setExecutables(executables);
940-
builder.setStackFrames(stackFrames);
941-
builder.addTotalFrames(totalInlineFrames.get());
942-
log.debug("retrieveStackTraceDetails found [{}] stack frames, [{}] executables.", stackFrames.size(), executables.size());
943-
log.debug(watch::report);
944-
submitListener.onResponse(builder.build());
936+
private void recordFailure(Exception e) {
937+
final var firstException = failure.compareAndExchange(null, e);
938+
if (firstException != null && firstException != e) {
939+
firstException.addSuppressed(e);
940+
}
941+
}
942+
943+
private void mayFinish() {
944+
if (countDown.countDown()) {
945+
if (failure.get() != null) {
946+
submitListener.onFailure(failure.get());
947+
} else {
948+
builder.setExecutables(executables);
949+
builder.setStackFrames(stackFrames);
950+
builder.addTotalFrames(totalInlineFrames.get());
951+
log.debug(
952+
"retrieveStackTraceDetails found [{}] stack frames, [{}] executables.",
953+
stackFrames.size(),
954+
executables.size()
955+
);
956+
log.debug(watch::report);
957+
submitListener.onResponse(builder.build());
958+
}
945959
}
946960
}
947961
}

x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/action/TransportGetStackTracesActionTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,22 @@
77

88
package org.elasticsearch.xpack.profiling.action;
99

10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.get.MultiGetItemResponse;
12+
import org.elasticsearch.action.get.MultiGetResponse;
1013
import org.elasticsearch.test.ESTestCase;
1114

1215
import java.util.Collections;
1316
import java.util.List;
1417

18+
import static org.hamcrest.Matchers.anyOf;
19+
import static org.hamcrest.Matchers.arrayWithSize;
20+
import static org.hamcrest.Matchers.is;
21+
import static org.mockito.ArgumentMatchers.assertArg;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
1526
public class TransportGetStackTracesActionTests extends ESTestCase {
1627
public void testSliceEmptyList() {
1728
assertEquals(List.of(List.of()), TransportGetStackTracesAction.sliced(Collections.emptyList(), 4));
@@ -58,4 +69,37 @@ public void testRandomLengthListGreaterThanSliceCount() {
5869
List<List<String>> sliced = TransportGetStackTracesAction.sliced(input, slices);
5970
assertEquals(slices, sliced.size());
6071
}
72+
73+
public void testDetailsHandlerOnConcurrentFailure() throws InterruptedException {
74+
ActionListener<GetStackTracesResponse> listener = mock();
75+
76+
var handler = new TransportGetStackTracesAction.DetailsHandler(mock(), listener, 0, 0, 1, 2);
77+
78+
var executables1 = new MultiGetItemResponse(null, new MultiGetResponse.Failure("executables", "1", new RuntimeException()));
79+
var stackframes1 = new MultiGetItemResponse(null, new MultiGetResponse.Failure("stackframes", "1", new RuntimeException()));
80+
var stackframes2 = new MultiGetItemResponse(null, new MultiGetResponse.Failure("stackframes", "2", new RuntimeException()));
81+
82+
var t1 = Thread.ofVirtual()
83+
.start(() -> handler.onExecutableDetailsResponse(new MultiGetResponse(new MultiGetItemResponse[] { executables1 })));
84+
var t2 = Thread.ofVirtual()
85+
.start(() -> handler.onExecutableDetailsResponse(new MultiGetResponse(new MultiGetItemResponse[] { stackframes1 })));
86+
var t3 = Thread.ofVirtual()
87+
.start(() -> handler.onStackFramesResponse(new MultiGetResponse(new MultiGetItemResponse[] { stackframes2 })));
88+
89+
t1.join();
90+
t2.join();
91+
t3.join();
92+
93+
verify(listener, times(1)).onFailure(assertArg(failure -> {
94+
assertThat(
95+
failure,
96+
anyOf(
97+
is(executables1.getFailure().getFailure()),
98+
is(stackframes1.getFailure().getFailure()),
99+
is(stackframes2.getFailure().getFailure())
100+
)
101+
);
102+
assertThat(failure.getSuppressed(), arrayWithSize(2));
103+
}));
104+
}
61105
}

0 commit comments

Comments
 (0)