Skip to content

Commit eb75ba3

Browse files
Avoid stack overflow in IndicesClusterStateService applyClusterState (elastic#132536)
Every cluster state applied in the IndicesClusterStateService has the potential to chain a new RefCountingListener to a chain of such listeners. If the chain is too long, the unlucky thread that decreases the ref count to 0 for the head of the listeners chain, ends up calling each listener in turn, and, assuming all ref counts are hence decreased to 0, traversing the whole chain on its thread stack, possibly resulting in a Stackoverflow exception. This fix chains max 8 RefCountingListener, the 11th one is forked on a generic thread when it gets to execution.
1 parent d392312 commit eb75ba3

File tree

3 files changed

+218
-3
lines changed

3 files changed

+218
-3
lines changed

docs/changelog/132536.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132536
2+
summary: Avoid stack overflow in `IndicesClusterStateService` `applyClusterState`
3+
area: Cluster Coordination
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,15 +247,21 @@ protected void doClose() {}
247247
*/
248248
private volatile SubscribableListener<Void> lastClusterStateShardsClosedListener = SubscribableListener.nullSuccess();
249249

250+
// HACK used to avoid chaining too many ref counting listeners, hence avoiding stack overflow exceptions
251+
private int shardsClosedListenerChainLength = 0;
252+
private volatile boolean closingMoreShards;
253+
250254
@Nullable // if not currently applying a cluster state
251255
private RefCountingListener currentClusterStateShardsClosedListeners;
252256

253-
private ActionListener<Void> getShardsClosedListener() {
257+
// protected for tests
258+
protected ActionListener<Void> getShardsClosedListener() {
254259
assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
255260
if (currentClusterStateShardsClosedListeners == null) {
256261
assert false : "not currently applying cluster state";
257262
return ActionListener.noop();
258263
} else {
264+
closingMoreShards = true;
259265
return currentClusterStateShardsClosedListeners.acquire();
260266
}
261267
}
@@ -274,15 +280,44 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
274280
lastClusterStateShardsClosedListener = new SubscribableListener<>();
275281
currentClusterStateShardsClosedListeners = new RefCountingListener(lastClusterStateShardsClosedListener);
276282
try {
277-
previousShardsClosedListener.addListener(currentClusterStateShardsClosedListeners.acquire());
283+
// HACK: chain listeners but avoid too deep of a stack
284+
{
285+
if (previousShardsClosedListener.isDone()) {
286+
shardsClosedListenerChainLength = 0;
287+
}
288+
previousShardsClosedListener.addListener(
289+
currentClusterStateShardsClosedListeners.acquire(),
290+
// Sometimes fork the listener on a different thread.
291+
// Chaining too many listeners might trigger a stackoverflow exception on the thread that eventually gets to
292+
// execute them all (because the last thread that decreases the ref count to 0 of a {@link RefCountingListener}
293+
// also executes its listeners, which in turn might decrease the ref count to 0 of another
294+
// {@link RefCountingListerner}, again executing its listeners, etc...).
295+
shardsClosedListenerChainLength++ < 8 ? EsExecutors.DIRECT_EXECUTOR_SERVICE : threadPool.generic(),
296+
null
297+
);
298+
if (shardsClosedListenerChainLength >= 8) {
299+
shardsClosedListenerChainLength = 0;
300+
}
301+
// reset the variable before applying the cluster state
302+
closingMoreShards = false;
303+
}
278304
doApplyClusterState(event);
279305
} finally {
280306
currentClusterStateShardsClosedListeners.close();
281307
currentClusterStateShardsClosedListeners = null;
308+
// HACK
309+
if (closingMoreShards == false) {
310+
// avoids chaining when no shard has been closed after applying this cluster state
311+
lastClusterStateShardsClosedListener = previousShardsClosedListener;
312+
if (shardsClosedListenerChainLength > 0) {
313+
shardsClosedListenerChainLength--;
314+
}
315+
}
282316
}
283317
}
284318

285-
private void doApplyClusterState(final ClusterChangedEvent event) {
319+
// protected for tests
320+
protected void doApplyClusterState(final ClusterChangedEvent event) {
286321
if (lifecycle.started() == false) {
287322
return;
288323
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.cluster;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.SubscribableListener;
14+
import org.elasticsearch.client.internal.node.NodeClient;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.settings.ClusterSettings;
19+
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
21+
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
22+
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
23+
import org.elasticsearch.indices.IndicesService;
24+
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
25+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
26+
import org.elasticsearch.repositories.RepositoriesService;
27+
import org.elasticsearch.search.SearchService;
28+
import org.elasticsearch.snapshots.SnapshotShardsService;
29+
import org.elasticsearch.test.transport.MockTransport;
30+
import org.elasticsearch.test.transport.MockTransportService;
31+
import org.elasticsearch.threadpool.ThreadPool;
32+
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ConcurrentHashMap;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.function.BiConsumer;
40+
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.times;
43+
import static org.mockito.Mockito.verify;
44+
45+
public class IndicesClusterStateServiceShardsClosedListenersTests extends AbstractIndicesClusterStateServiceTestCase {
46+
47+
public void testRunnablesAreExecutedOnlyAfterAllPreviousListenersComplete() {
48+
AtomicInteger clusterStateAppliedRound = new AtomicInteger();
49+
int totalClusterStateAppliedRounds = randomIntBetween(10, 100);
50+
Map<Integer, List<Runnable>> runnablesOnShardsClosedForRoundMap = new ConcurrentHashMap<>();
51+
Map<Integer, List<ActionListener<Void>>> shardsClosedListenersForRoundMap = new ConcurrentHashMap<>();
52+
List<ActionListener<Void>> allShardsClosedListeners = Collections.synchronizedList(new ArrayList<>());
53+
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
54+
try (
55+
TestIndicesClusterStateService testIndicesClusterStateService = new TestIndicesClusterStateService(
56+
deterministicTaskQueue.getThreadPool(),
57+
// the apply cluster state hook
58+
(indicesClusterStateService, clusterChangedEvent) -> {
59+
final int round = clusterStateAppliedRound.get();
60+
// maybe register runnable for when all the shards in the currently applied cluster states are closed
61+
if (randomBoolean()) {
62+
Runnable mockRunnable = mock(Runnable.class);
63+
indicesClusterStateService.onClusterStateShardsClosed(mockRunnable);
64+
runnablesOnShardsClosedForRoundMap.get(round).add(mockRunnable);
65+
}
66+
// maybe get some listeners as if asynchronously closing some shards
67+
int listenersCount = randomIntBetween(0, 2);
68+
for (int i = 0; i < listenersCount; i++) {
69+
var shardsClosedListener = new SubscribableListener<Void>();
70+
shardsClosedListener.addListener(indicesClusterStateService.getShardsClosedListener());
71+
shardsClosedListenersForRoundMap.get(round).add(shardsClosedListener);
72+
allShardsClosedListeners.add(shardsClosedListener);
73+
shardsClosedListener.andThen(l -> {
74+
// the listeners auto-removes itself form the map, for testing purposes
75+
shardsClosedListenersForRoundMap.get(round).remove(shardsClosedListener);
76+
allShardsClosedListeners.remove(shardsClosedListener);
77+
});
78+
}
79+
// maybe register runnable for when all the shards in the currently applied cluster states are closed
80+
if (randomBoolean()) {
81+
Runnable mockRunnable = mock(Runnable.class);
82+
indicesClusterStateService.onClusterStateShardsClosed(mockRunnable);
83+
runnablesOnShardsClosedForRoundMap.get(round).add(mockRunnable);
84+
}
85+
}
86+
)
87+
) {
88+
int round = clusterStateAppliedRound.get();
89+
int runnablesDoneUpToRound = 0;
90+
while (round < totalClusterStateAppliedRounds || allShardsClosedListeners.isEmpty() == false) {
91+
if (round < totalClusterStateAppliedRounds) {
92+
runnablesOnShardsClosedForRoundMap.put(round, Collections.synchronizedList(new ArrayList<>()));
93+
shardsClosedListenersForRoundMap.put(round, Collections.synchronizedList(new ArrayList<>()));
94+
95+
// apply cluster state this round
96+
testIndicesClusterStateService.applyClusterState(mock(ClusterChangedEvent.class));
97+
98+
// maybe register runnable for when all the shards in the previously applied cluster states are closed
99+
runnablesOnShardsClosedForRoundMap.get(round).addAll(randomList(0, 2, () -> {
100+
Runnable mockRunnable = mock(Runnable.class);
101+
testIndicesClusterStateService.onClusterStateShardsClosed(mockRunnable);
102+
return mockRunnable;
103+
}));
104+
}
105+
106+
// complete one random listener
107+
if ((round >= totalClusterStateAppliedRounds || randomBoolean()) && allShardsClosedListeners.isEmpty() == false) {
108+
randomFrom(allShardsClosedListeners).onResponse(null);
109+
deterministicTaskQueue.runAllTasksInTimeOrder();
110+
}
111+
112+
// find the "oldest" applied cluster state that still has unfinished listeners
113+
for (int i = runnablesDoneUpToRound; i < totalClusterStateAppliedRounds; i++) {
114+
if (shardsClosedListenersForRoundMap.get(i) != null && shardsClosedListenersForRoundMap.get(i).isEmpty()) {
115+
runnablesDoneUpToRound = i + 1;
116+
} else {
117+
break;
118+
}
119+
}
120+
// assert older runnables executed
121+
for (int i = 0; i < runnablesDoneUpToRound; i++) {
122+
for (var runnable : runnablesOnShardsClosedForRoundMap.get(i)) {
123+
verify(runnable, times(1)).run();
124+
}
125+
}
126+
// assert any newer runnables not yet executed
127+
for (int i = runnablesDoneUpToRound; i < totalClusterStateAppliedRounds; i++) {
128+
if (runnablesOnShardsClosedForRoundMap.get(i) != null) {
129+
for (var runable : runnablesOnShardsClosedForRoundMap.get(i)) {
130+
verify(runable, times(0)).run();
131+
}
132+
}
133+
}
134+
round = clusterStateAppliedRound.incrementAndGet();
135+
}
136+
}
137+
}
138+
139+
class TestIndicesClusterStateService extends IndicesClusterStateService {
140+
BiConsumer<IndicesClusterStateService, ClusterChangedEvent> doApplyClusterStateHook;
141+
142+
TestIndicesClusterStateService(
143+
ThreadPool threadPool,
144+
BiConsumer<IndicesClusterStateService, ClusterChangedEvent> doApplyClusterStateHook
145+
) {
146+
super(
147+
Settings.EMPTY,
148+
new MockIndicesService(),
149+
new ClusterService(Settings.EMPTY, ClusterSettings.createBuiltInClusterSettings(), threadPool, null),
150+
threadPool,
151+
mock(PeerRecoveryTargetService.class),
152+
mock(ShardStateAction.class),
153+
mock(RepositoriesService.class),
154+
mock(SearchService.class),
155+
mock(PeerRecoverySourceService.class),
156+
new SnapshotShardsService(
157+
Settings.EMPTY,
158+
new ClusterService(Settings.EMPTY, ClusterSettings.createBuiltInClusterSettings(), threadPool, null),
159+
mock(RepositoriesService.class),
160+
MockTransportService.createMockTransportService(new MockTransport(), threadPool),
161+
mock(IndicesService.class)
162+
),
163+
mock(PrimaryReplicaSyncer.class),
164+
RetentionLeaseSyncer.EMPTY,
165+
mock(NodeClient.class)
166+
);
167+
this.doApplyClusterStateHook = doApplyClusterStateHook;
168+
}
169+
170+
@Override
171+
protected void doApplyClusterState(final ClusterChangedEvent event) {
172+
doApplyClusterStateHook.accept(this, event);
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)