30
30
import java .util .concurrent .RejectedExecutionException ;
31
31
import java .util .concurrent .atomic .AtomicReference ;
32
32
import java .util .function .Consumer ;
33
+ import java .util .function .Supplier ;
33
34
34
35
import com .google .common .collect .ImmutableList ;
35
36
import com .google .common .collect .ImmutableSet ;
36
37
import org .apache .cassandra .config .Config ;
37
38
import org .apache .cassandra .io .FSWriteError ;
39
+ import org .apache .cassandra .io .compress .CompressionMetadata ;
40
+ import org .apache .cassandra .io .compress .CorruptBlockException ;
38
41
import org .apache .cassandra .io .sstable .CorruptSSTableException ;
42
+ import org .apache .cassandra .io .util .CorruptFileException ;
43
+ import org .apache .cassandra .io .util .File ;
39
44
import org .apache .cassandra .utils .JVMKiller ;
40
45
import org .apache .cassandra .utils .JVMStabilityInspector ;
41
46
import org .apache .cassandra .utils .KillerForTests ;
58
63
import org .mockito .ArgumentCaptor ;
59
64
import org .mockito .ArgumentMatchers ;
60
65
import org .mockito .Mockito ;
66
+ import org .openjdk .jmh .util .FileUtils ;
61
67
62
68
import static org .assertj .core .api .Assertions .*;
63
69
import static org .mockito .ArgumentMatchers .anyInt ;
@@ -543,7 +549,47 @@ public void handleOOMError()
543
549
}
544
550
545
551
@ Test
546
- public void handleCorruptionException ()
552
+ public void handleCorruptionSSTableException ()
553
+ {
554
+ handleCorruptionException (() -> new CorruptSSTableException (null , "corrupted" ));
555
+ }
556
+
557
+ @ Test
558
+ public void handleCorruptionBlockException ()
559
+ {
560
+ handleCorruptionException (() -> {
561
+ try
562
+ {
563
+ CompressionMetadata .Chunk chunk = new CompressionMetadata .Chunk (1L , 1 );
564
+ return new CorruptBlockException (new File (FileUtils .tempFile ("temp" )), chunk );
565
+ }
566
+ catch (IOException e )
567
+ {
568
+ throw new RuntimeException (e );
569
+ }
570
+ });
571
+ }
572
+
573
+ @ Test
574
+ public void handleCorruptionFileException ()
575
+ {
576
+ handleCorruptionException (() -> {
577
+ try
578
+ {
579
+ File file = new File (FileUtils .tempFile ("temp" ));
580
+ CorruptFileException corruptFileException = new CorruptFileException (null , file );
581
+ assertThat (corruptFileException .getFile ()).isEqualTo (file );
582
+ return corruptFileException ;
583
+ }
584
+ catch (IOException e )
585
+ {
586
+ throw new RuntimeException (e );
587
+ }
588
+ });
589
+ }
590
+
591
+
592
+ private void handleCorruptionException (Supplier <Exception > provider )
547
593
{
548
594
JVMKiller originalKiller = JVMStabilityInspector .replaceKiller (new KillerForTests ());
549
595
Config .DiskFailurePolicy originalPolicy = DatabaseDescriptor .getDiskFailurePolicy ();
@@ -558,10 +604,10 @@ public void handleCorruptionException()
558
604
559
605
long before = CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ();
560
606
561
- CorruptSSTableException corruptSSTableException = new CorruptSSTableException ( null , "corrupted" );
562
- BackgroundCompactionRunner .handleCompactionError (corruptSSTableException , cfs );
607
+ Exception exception = provider . get ( );
608
+ BackgroundCompactionRunner .handleCompactionError (exception , cfs );
563
609
564
- assertThat (diskErrorEncountered .get ()).isSameAs (corruptSSTableException );
610
+ assertThat (diskErrorEncountered .get ()).isSameAs (exception );
565
611
assertThat (CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ()).isEqualTo (before + 1 );
566
612
}
567
613
finally
@@ -572,6 +618,7 @@ public void handleCorruptionException()
572
618
}
573
619
}
574
620
621
+
575
622
@ Test
576
623
public void handleFSWriteError ()
577
624
{
0 commit comments