|
9 | 9 |
|
10 | 10 | package org.elasticsearch.snapshots; |
11 | 11 |
|
| 12 | +import org.apache.logging.log4j.Level; |
| 13 | +import org.apache.logging.log4j.core.LogEvent; |
| 14 | +import org.elasticsearch.ElasticsearchException; |
| 15 | +import org.elasticsearch.action.ActionListener; |
| 16 | +import org.elasticsearch.action.support.ActionTestUtils; |
| 17 | +import org.elasticsearch.action.support.SubscribableListener; |
| 18 | +import org.elasticsearch.cluster.coordination.Coordinator; |
12 | 19 | import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; |
13 | 20 | import org.elasticsearch.plugins.Plugin; |
14 | 21 | import org.elasticsearch.snapshots.mockstore.MockRepository; |
15 | 22 | import org.elasticsearch.test.ESIntegTestCase; |
| 23 | +import org.elasticsearch.test.MockLog; |
16 | 24 | import org.elasticsearch.test.disruption.NetworkDisruption; |
| 25 | +import org.elasticsearch.test.junit.annotations.TestLogging; |
17 | 26 | import org.elasticsearch.test.transport.MockTransportService; |
| 27 | +import org.elasticsearch.transport.TestTransportChannel; |
| 28 | +import org.elasticsearch.transport.TransportResponse; |
18 | 29 |
|
19 | 30 | import java.util.Arrays; |
20 | 31 | import java.util.Collection; |
21 | 32 | import java.util.List; |
22 | 33 | import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
23 | 35 |
|
24 | 36 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; |
25 | 37 | import static org.hamcrest.Matchers.equalTo; |
@@ -89,4 +101,71 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { |
89 | 101 | assertThat(snapshotInfo.successfulShards(), equalTo(shards)); |
90 | 102 | }, 30L, TimeUnit.SECONDS); |
91 | 103 | } |
| 104 | + |
| 105 | + @TestLogging( |
| 106 | + reason = "verifying debug logging", |
| 107 | + value = "org.elasticsearch.snapshots.SnapshotShardsService.ShardStatusConsistencyChecker:DEBUG" |
| 108 | + ) |
| 109 | + public void testConsistencyCheckerLogging() { |
| 110 | + final var masterNode = internalCluster().startMasterOnlyNode(); |
| 111 | + final var dataNode = internalCluster().startDataOnlyNode(); |
| 112 | + |
| 113 | + final var repoName = randomIdentifier(); |
| 114 | + createRepository(repoName, "fs"); |
| 115 | + |
| 116 | + final var indexName = randomIdentifier(); |
| 117 | + assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(between(1, 5)))); |
| 118 | + indexRandomDocs(indexName, scaledRandomIntBetween(50, 100)); |
| 119 | + |
| 120 | + // allow cluster states to go through normally until we see a shard snapshot update |
| 121 | + final var shardUpdateSeen = new AtomicBoolean(); |
| 122 | + MockTransportService.getInstance(masterNode) |
| 123 | + .addRequestHandlingBehavior(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, (handler, request, channel, task) -> { |
| 124 | + shardUpdateSeen.set(true); |
| 125 | + handler.messageReceived(request, channel, task); |
| 126 | + }); |
| 127 | + |
| 128 | + // then, after the shard snapshot update, delay cluster state update commits on the data node until we see the |
| 129 | + // ShardStatusConsistencyChecker log an inconsistency. |
| 130 | + final var logMessageSeenListener = new SubscribableListener<Void>(); |
| 131 | + MockTransportService.getInstance(dataNode) |
| 132 | + .addRequestHandlingBehavior(Coordinator.COMMIT_STATE_ACTION_NAME, (handler, request, channel, task) -> { |
| 133 | + if (shardUpdateSeen.get() == false || logMessageSeenListener.isDone()) { |
| 134 | + handler.messageReceived(request, channel, task); |
| 135 | + } else { |
| 136 | + // delay the commit ... |
| 137 | + logMessageSeenListener.addListener( |
| 138 | + ActionTestUtils.assertNoFailureListener( |
| 139 | + ignored -> ActionListener.run( |
| 140 | + ActionListener.<TransportResponse>noop(), |
| 141 | + l -> handler.messageReceived(request, new TestTransportChannel(l), task) |
| 142 | + ) |
| 143 | + ) |
| 144 | + ); |
| 145 | + // ... and send a failure straight back to the master so it applies and acks the state update anyway |
| 146 | + channel.sendResponse(new ElasticsearchException("simulated")); |
| 147 | + } |
| 148 | + }); |
| 149 | + |
| 150 | + MockLog.awaitLogger( |
| 151 | + () -> createSnapshot(repoName, randomIdentifier(), List.of(indexName)), |
| 152 | + SnapshotShardsService.ShardStatusConsistencyChecker.class, |
| 153 | + new MockLog.SeenEventExpectation( |
| 154 | + "debug log", |
| 155 | + SnapshotShardsService.ShardStatusConsistencyChecker.class.getCanonicalName(), |
| 156 | + Level.DEBUG, |
| 157 | + "*unexpectedly still in state [*INIT*] after notifying master" |
| 158 | + ) { |
| 159 | + @Override |
| 160 | + public boolean innerMatch(LogEvent event) { |
| 161 | + if (event.getMessage().getFormattedMessage().contains("after notifying master")) { |
| 162 | + // now that the inconsistency was logged, release the cluster state commits on the data node |
| 163 | + logMessageSeenListener.onResponse(null); |
| 164 | + } |
| 165 | + return true; |
| 166 | + } |
| 167 | + } |
| 168 | + ); |
| 169 | + |
| 170 | + } |
92 | 171 | } |
0 commit comments