30
30
import java .util .concurrent .RejectedExecutionException ;
31
31
import java .util .concurrent .atomic .AtomicReference ;
32
32
import java .util .function .Consumer ;
33
+ import java .util .function .Supplier ;
33
34
34
35
import com .google .common .collect .ImmutableList ;
35
36
import com .google .common .collect .ImmutableSet ;
36
37
import org .apache .cassandra .config .Config ;
37
38
import org .apache .cassandra .io .FSWriteError ;
39
+ import org .apache .cassandra .io .compress .CompressionMetadata ;
40
+ import org .apache .cassandra .io .compress .CorruptBlockException ;
38
41
import org .apache .cassandra .io .sstable .CorruptSSTableException ;
42
+ import org .apache .cassandra .io .util .CorruptFileException ;
43
+ import org .apache .cassandra .io .util .File ;
39
44
import org .apache .cassandra .utils .JVMKiller ;
40
45
import org .apache .cassandra .utils .JVMStabilityInspector ;
41
46
import org .apache .cassandra .utils .KillerForTests ;
58
63
import org .mockito .ArgumentCaptor ;
59
64
import org .mockito .ArgumentMatchers ;
60
65
import org .mockito .Mockito ;
66
+ import org .openjdk .jmh .util .FileUtils ;
61
67
62
68
import static org .assertj .core .api .Assertions .*;
63
69
import static org .mockito .ArgumentMatchers .anyInt ;
@@ -543,7 +549,50 @@ public void handleOOMError()
543
549
}
544
550
545
551
@ Test
546
- public void handleCorruptionException ()
552
+ public void handleCorruptionSSTableException ()
553
+ {
554
+ handleCorruptionException (() -> new CorruptSSTableException (null , "corrupted" ));
555
+ }
556
+
557
+ @ Test
558
+ public void handleCorruptionBlockException ()
559
+ {
560
+ handleCorruptionException (() -> {
561
+ try
562
+ {
563
+ CompressionMetadata .Chunk chunk = new CompressionMetadata .Chunk (1L , 1 );
564
+ File file = new File (FileUtils .tempFile ("temp" ));
565
+ CorruptBlockException corruptBlockException = new CorruptBlockException (file , chunk );
566
+ assertThat (corruptBlockException .getFile ()).isEqualTo (file );
567
+ return corruptBlockException ;
568
+ }
569
+ catch (IOException e )
570
+ {
571
+ throw new RuntimeException (e );
572
+ }
573
+ });
574
+ }
575
+
576
+ @ Test
577
+ public void handleCorruptionFileException ()
578
+ {
579
+ handleCorruptionException (() -> {
580
+ try
581
+ {
582
+ File file = new File (FileUtils .tempFile ("temp" ));
583
+ CorruptFileException corruptFileException = new CorruptFileException (null , file );
584
+ assertThat (corruptFileException .getFile ()).isEqualTo (file );
585
+ return corruptFileException ;
586
+ }
587
+ catch (IOException e )
588
+ {
589
+ throw new RuntimeException (e );
590
+ }
591
+ });
592
+ }
593
+
594
+
595
+ private void handleCorruptionException (Supplier <Exception > provider )
547
596
{
548
597
JVMKiller originalKiller = JVMStabilityInspector .replaceKiller (new KillerForTests ());
549
598
Config .DiskFailurePolicy originalPolicy = DatabaseDescriptor .getDiskFailurePolicy ();
@@ -558,10 +607,10 @@ public void handleCorruptionException()
558
607
559
608
long before = CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ();
560
609
561
- CorruptSSTableException corruptSSTableException = new CorruptSSTableException ( null , "corrupted" );
562
- BackgroundCompactionRunner .handleCompactionError (corruptSSTableException , cfs );
610
+ Exception exception = provider . get ( );
611
+ BackgroundCompactionRunner .handleCompactionError (exception , cfs );
563
612
564
- assertThat (diskErrorEncountered .get ()).isSameAs (corruptSSTableException );
613
+ assertThat (diskErrorEncountered .get ()).isSameAs (exception );
565
614
assertThat (CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ()).isEqualTo (before + 1 );
566
615
}
567
616
finally
@@ -572,6 +621,7 @@ public void handleCorruptionException()
572
621
}
573
622
}
574
623
624
+
575
625
@ Test
576
626
public void handleFSWriteError ()
577
627
{
0 commit comments