Skip to content

Commit 4f10825

Browse files
DaveCTurnerjfreden
authored andcommitted
Add consistency checker for stuck snapshot debugging (elastic#124616)
Checks the local cluster state after marking a shard snapshot as complete, and logs a message if the completion is not reflected in this cluster state.
1 parent 3bc7c8e commit 4f10825

File tree

2 files changed

+174
-0
lines changed

2 files changed

+174
-0
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,29 @@
99

1010
package org.elasticsearch.snapshots;
1111

12+
import org.apache.logging.log4j.Level;
13+
import org.apache.logging.log4j.core.LogEvent;
14+
import org.elasticsearch.ElasticsearchException;
15+
import org.elasticsearch.action.ActionListener;
16+
import org.elasticsearch.action.support.ActionTestUtils;
17+
import org.elasticsearch.action.support.SubscribableListener;
18+
import org.elasticsearch.cluster.coordination.Coordinator;
1219
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
1320
import org.elasticsearch.plugins.Plugin;
1421
import org.elasticsearch.snapshots.mockstore.MockRepository;
1522
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.elasticsearch.test.MockLog;
1624
import org.elasticsearch.test.disruption.NetworkDisruption;
25+
import org.elasticsearch.test.junit.annotations.TestLogging;
1726
import org.elasticsearch.test.transport.MockTransportService;
27+
import org.elasticsearch.transport.TestTransportChannel;
28+
import org.elasticsearch.transport.TransportResponse;
1829

1930
import java.util.Arrays;
2031
import java.util.Collection;
2132
import java.util.List;
2233
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicBoolean;
2335

2436
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2537
import static org.hamcrest.Matchers.equalTo;
@@ -89,4 +101,71 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception {
89101
assertThat(snapshotInfo.successfulShards(), equalTo(shards));
90102
}, 30L, TimeUnit.SECONDS);
91103
}
104+
105+
@TestLogging(
106+
reason = "verifying debug logging",
107+
value = "org.elasticsearch.snapshots.SnapshotShardsService.ShardStatusConsistencyChecker:DEBUG"
108+
)
109+
public void testConsistencyCheckerLogging() {
110+
final var masterNode = internalCluster().startMasterOnlyNode();
111+
final var dataNode = internalCluster().startDataOnlyNode();
112+
113+
final var repoName = randomIdentifier();
114+
createRepository(repoName, "fs");
115+
116+
final var indexName = randomIdentifier();
117+
assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(between(1, 5))));
118+
indexRandomDocs(indexName, scaledRandomIntBetween(50, 100));
119+
120+
// allow cluster states to go through normally until we see a shard snapshot update
121+
final var shardUpdateSeen = new AtomicBoolean();
122+
MockTransportService.getInstance(masterNode)
123+
.addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> {
124+
shardUpdateSeen.set(true);
125+
handler.messageReceived(request, channel, task);
126+
});
127+
128+
// then, after the shard snapshot update, delay cluster state update commits on the data node until we see the
129+
// ShardStatusConsistencyChecker log an inconsistency.
130+
final var logMessageSeenListener = new SubscribableListener<Void>();
131+
MockTransportService.getInstance(dataNode)
132+
.addRequestHandlingBehavior(Coordinator.COMMIT_STATE_ACTION_NAME, (handler, request, channel, task) -> {
133+
if (shardUpdateSeen.get() == false || logMessageSeenListener.isDone()) {
134+
handler.messageReceived(request, channel, task);
135+
} else {
136+
// delay the commit ...
137+
logMessageSeenListener.addListener(
138+
ActionTestUtils.assertNoFailureListener(
139+
ignored -> ActionListener.run(
140+
ActionListener.<TransportResponse>noop(),
141+
l -> handler.messageReceived(request, new TestTransportChannel(l), task)
142+
)
143+
)
144+
);
145+
// ... and send a failure straight back to the master so it applies and acks the state update anyway
146+
channel.sendResponse(new ElasticsearchException("simulated"));
147+
}
148+
});
149+
150+
MockLog.awaitLogger(
151+
() -> createSnapshot(repoName, randomIdentifier(), List.of(indexName)),
152+
SnapshotShardsService.ShardStatusConsistencyChecker.class,
153+
new MockLog.SeenEventExpectation(
154+
"debug log",
155+
SnapshotShardsService.ShardStatusConsistencyChecker.class.getCanonicalName(),
156+
Level.DEBUG,
157+
"*unexpectedly still in state [*INIT*] after notifying master"
158+
) {
159+
@Override
160+
public boolean innerMatch(LogEvent event) {
161+
if (event.getMessage().getFormattedMessage().contains("after notifying master")) {
162+
// now that the inconsistency was logged, release the cluster state commits on the data node
163+
logMessageSeenListener.onResponse(null);
164+
}
165+
return true;
166+
}
167+
}
168+
);
169+
170+
}
92171
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,14 @@
5858
import java.io.IOException;
5959
import java.util.ArrayList;
6060
import java.util.HashMap;
61+
import java.util.HashSet;
6162
import java.util.Iterator;
6263
import java.util.List;
6364
import java.util.Map;
65+
import java.util.Queue;
66+
import java.util.Set;
67+
import java.util.concurrent.ConcurrentLinkedQueue;
68+
import java.util.concurrent.atomic.AtomicInteger;
6469
import java.util.function.Consumer;
6570

6671
import static java.util.Collections.emptyMap;
@@ -841,6 +846,11 @@ public void onResponse(Void aVoid) {
841846
Strings.format("successfully sent shard snapshot state [%s] update to the master node", status.state())
842847
);
843848
logger.trace("[{}][{}] updated snapshot state to [{}]", shardId, snapshot, status);
849+
if (status.state().completed()) {
850+
shardStatusConsistencyChecker.ensureShardComplete(snapshot, shardId);
851+
} else {
852+
assert status.state() == ShardState.PAUSED_FOR_NODE_REMOVAL : status;
853+
}
844854
}
845855

846856
@Override
@@ -868,4 +878,89 @@ public void onFailure(Exception e) {
868878
)
869879
);
870880
}
881+
882+
private final ShardStatusConsistencyChecker shardStatusConsistencyChecker = new ShardStatusConsistencyChecker();
883+
884+
/**
885+
* After receiving an ack from the master confirming that it marked a shard snapshot as complete, checks the local cluster state to
886+
* confirm that it was updated to reflect this and logs a message if not. Not 100% watertight because this node might be lagging behind
887+
* and hasn't received the cluster state yet, or else it might have been removed from the cluster, but both of these things will also
888+
* be visible in the logs. See e.g. o.e.c.c.Publication#logIncompleteNodes and o.e.c.c.NodeLeftExecutor.
889+
*/
890+
// visible for testing
891+
class ShardStatusConsistencyChecker {
892+
893+
// dedicated logger so we can separate these logs from other SnapshotShardsService DEBUG logs
894+
private static final Logger CONSISTENCY_CHECKER_LOGGER = LogManager.getLogger(ShardStatusConsistencyChecker.class);
895+
896+
private record CheckTask(Snapshot snapshot, ShardId shardId) {}
897+
898+
private final AtomicInteger queuedTaskCount = new AtomicInteger(0); // atomic to ensure max one thread processing the queue
899+
private final Queue<CheckTask> queue = new ConcurrentLinkedQueue<>();
900+
901+
/**
902+
* Adds a {@link CheckTask} with the snapshot info to the queue. If the queue was previously empty, then {@link #runCheck()} will be
903+
* started on a GENERIC thread.
904+
*/
905+
void ensureShardComplete(Snapshot snapshot, ShardId shardId) {
906+
if (CONSISTENCY_CHECKER_LOGGER.isDebugEnabled() == false) {
907+
return;
908+
}
909+
910+
if (queuedTaskCount.get() > 1000) {
911+
// racy check, we only need an approximate limit
912+
return;
913+
}
914+
915+
queue.add(new CheckTask(snapshot, shardId));
916+
if (queuedTaskCount.getAndIncrement() == 0) {
917+
threadPool.generic().execute(this::runCheck);
918+
} // else a runCheck is already running somewhere and will pick up the task we just added
919+
}
920+
921+
/**
922+
* Empties the {@link #queue} and verifies that the cluster state shows a successful shard snapshot for each {@link CheckTask}.
923+
* A debug level message is logged if the cluster state does not show a completed shard snapshot: this could be a bug, or a race
924+
* with a cluster state update not being applied yet.
925+
*/
926+
private void runCheck() {
927+
while (true) {
928+
final var taskCount = queuedTaskCount.get();
929+
final var clusterStateSnapshotsInProgress = SnapshotsInProgress.get(clusterService.state());
930+
final var shardsBySnapshot = new HashMap<Snapshot, Set<ShardId>>();
931+
for (int i = 0; i < taskCount; i++) {
932+
final var task = queue.poll();
933+
assert task != null;
934+
shardsBySnapshot.computeIfAbsent(task.snapshot(), ignored -> new HashSet<>()).add(task.shardId());
935+
}
936+
for (final var snapshotShards : shardsBySnapshot.entrySet()) {
937+
final var snapshot = snapshotShards.getKey();
938+
final var clusterStateSnapshotEntry = clusterStateSnapshotsInProgress.snapshot(snapshot);
939+
if (clusterStateSnapshotEntry != null) {
940+
for (final var shardId : snapshotShards.getValue()) {
941+
final var clusterStateShardStatus = clusterStateSnapshotEntry.shards().get(shardId);
942+
if (clusterStateShardStatus == null) {
943+
CONSISTENCY_CHECKER_LOGGER.debug(
944+
"shard [{}] in snapshot [{}] unexpectedly not found (should be impossible)",
945+
shardId,
946+
snapshot
947+
);
948+
} else if (clusterStateShardStatus.state().completed() == false) {
949+
CONSISTENCY_CHECKER_LOGGER.debug(
950+
"shard [{}] in snapshot [{}] unexpectedly still in state [{}] after notifying master",
951+
shardId,
952+
snapshot,
953+
clusterStateShardStatus
954+
);
955+
} // else shard is marked complete as expected
956+
}
957+
} // else snapshot already completed & removed from cluster state
958+
}
959+
960+
if (queuedTaskCount.addAndGet(-taskCount) == 0) {
961+
return;
962+
} // else someone added some more tasks, so keep trying
963+
}
964+
}
965+
}
871966
}

0 commit comments

Comments
 (0)