1313import org .apache .lucene .index .IndexWriter ;
1414import org .apache .lucene .index .IndexWriterConfig ;
1515import org .apache .lucene .index .Term ;
16+ import org .apache .lucene .mockfile .ExtrasFS ;
1617import org .apache .lucene .store .Directory ;
1718import org .apache .lucene .store .FilterDirectory ;
1819import org .apache .lucene .store .IOContext ;
3839import org .elasticsearch .gateway .PersistedClusterStateService .Writer ;
3940import org .elasticsearch .index .Index ;
4041import org .elasticsearch .indices .breaker .NoneCircuitBreakerService ;
42+ import org .elasticsearch .test .CorruptionUtils ;
4143import org .elasticsearch .test .ESTestCase ;
4244import org .elasticsearch .test .MockLogAppender ;
4345import org .elasticsearch .test .junit .annotations .TestLogging ;
4446
4547import java .io .IOError ;
4648import java .io .IOException ;
49+ import java .nio .file .DirectoryStream ;
50+ import java .nio .file .Files ;
4751import java .nio .file .Path ;
4852import java .util .ArrayList ;
4953import java .util .Collection ;
5054import java .util .List ;
5155import java .util .concurrent .atomic .AtomicBoolean ;
5256import java .util .concurrent .atomic .AtomicLong ;
57+ import java .util .stream .Collectors ;
58+ import java .util .stream .StreamSupport ;
5359
60+ import static org .apache .lucene .index .IndexWriter .WRITE_LOCK_NAME ;
5461import static org .hamcrest .Matchers .allOf ;
5562import static org .hamcrest .Matchers .containsString ;
63+ import static org .hamcrest .Matchers .endsWith ;
5664import static org .hamcrest .Matchers .equalTo ;
5765import static org .hamcrest .Matchers .lessThan ;
5866import static org .hamcrest .Matchers .nullValue ;
67+ import static org .hamcrest .Matchers .startsWith ;
5968
6069public class PersistedClusterStateServiceTests extends ESTestCase {
6170
@@ -73,7 +82,7 @@ public void testPersistsAndReloadsTerm() throws IOException {
7382 assertThat (persistedClusterStateService .loadOnDiskState ().currentTerm , equalTo (0L ));
7483 try (Writer writer = persistedClusterStateService .createWriter ()) {
7584 writer .writeFullStateAndCommit (newTerm , ClusterState .EMPTY_STATE );
76- assertThat (persistedClusterStateService .loadOnDiskState ().currentTerm , equalTo (newTerm ));
85+ assertThat (persistedClusterStateService .loadOnDiskState (false ).currentTerm , equalTo (newTerm ));
7786 }
7887
7988 assertThat (persistedClusterStateService .loadOnDiskState ().currentTerm , equalTo (newTerm ));
@@ -638,6 +647,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException {
638647 }
639648 }
640649
650+ public void testFailsIfCorrupt () throws IOException {
651+ try (NodeEnvironment nodeEnvironment = newNodeEnvironment (createTempDir ())) {
652+ final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService (nodeEnvironment );
653+
654+ try (Writer writer = persistedClusterStateService .createWriter ()) {
655+ writer .writeFullStateAndCommit (1 , ClusterState .EMPTY_STATE );
656+ }
657+
658+ try (DirectoryStream <Path > directoryStream = Files .newDirectoryStream (nodeEnvironment .nodeDataPath ().resolve ("_state" ))) {
659+ CorruptionUtils .corruptFile (random (), randomFrom (StreamSupport
660+ .stream (directoryStream .spliterator (), false )
661+ .filter (p -> {
662+ final String filename = p .getFileName ().toString ();
663+ return ExtrasFS .isExtra (filename ) == false && filename .equals (WRITE_LOCK_NAME ) == false ;
664+ })
665+ .collect (Collectors .toList ())));
666+ }
667+
668+ assertThat (expectThrows (IllegalStateException .class , persistedClusterStateService ::loadOnDiskState ).getMessage (), allOf (
669+ startsWith ("the index containing the cluster metadata under the data path [" ),
670+ endsWith ("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable" )));
671+ }
672+ }
673+
641674 private void assertExpectedLogs (long currentTerm , ClusterState previousState , ClusterState clusterState ,
642675 PersistedClusterStateService .Writer writer , MockLogAppender .LoggingExpectation expectation )
643676 throws IllegalAccessException , IOException {
@@ -675,7 +708,7 @@ private NodeEnvironment newNodeEnvironment(Path dataPath) throws IOException {
675708 }
676709
677710 private static ClusterState loadPersistedClusterState (PersistedClusterStateService persistedClusterStateService ) throws IOException {
678- final PersistedClusterStateService .OnDiskState onDiskState = persistedClusterStateService .loadOnDiskState ();
711+ final PersistedClusterStateService .OnDiskState onDiskState = persistedClusterStateService .loadOnDiskState (false );
679712 return clusterStateFromMetadata (onDiskState .lastAcceptedVersion , onDiskState .metadata );
680713 }
681714
0 commit comments