30
30
import java .util .concurrent .RejectedExecutionException ;
31
31
import java .util .concurrent .atomic .AtomicReference ;
32
32
import java .util .function .Consumer ;
33
+ import java .util .function .Supplier ;
33
34
34
35
import com .google .common .collect .ImmutableList ;
35
36
import com .google .common .collect .ImmutableSet ;
36
37
import org .apache .cassandra .config .Config ;
38
+ import org .apache .cassandra .io .FSDiskFullWriteError ;
39
+ import org .apache .cassandra .io .FSNoDiskAvailableForWriteError ;
37
40
import org .apache .cassandra .io .FSWriteError ;
41
+ import org .apache .cassandra .io .compress .CompressionMetadata ;
42
+ import org .apache .cassandra .io .compress .CorruptBlockException ;
38
43
import org .apache .cassandra .io .sstable .CorruptSSTableException ;
44
+ import org .apache .cassandra .io .util .CorruptFileException ;
45
+ import org .apache .cassandra .io .util .File ;
39
46
import org .apache .cassandra .utils .JVMKiller ;
40
47
import org .apache .cassandra .utils .JVMStabilityInspector ;
41
48
import org .apache .cassandra .utils .KillerForTests ;
58
65
import org .mockito .ArgumentCaptor ;
59
66
import org .mockito .ArgumentMatchers ;
60
67
import org .mockito .Mockito ;
68
+ import org .openjdk .jmh .util .FileUtils ;
61
69
62
70
import static org .assertj .core .api .Assertions .*;
63
71
import static org .mockito .ArgumentMatchers .anyInt ;
@@ -543,7 +551,50 @@ public void handleOOMError()
543
551
}
544
552
545
553
@ Test
546
- public void handleCorruptionException ()
554
+ public void handleCorruptionSSTableException ()
555
+ {
556
+ handleCorruptionException (() -> new CorruptSSTableException (null , "corrupted" ));
557
+ }
558
+
559
+ @ Test
560
+ public void handleCorruptionBlockException ()
561
+ {
562
+ handleCorruptionException (() -> {
563
+ try
564
+ {
565
+ CompressionMetadata .Chunk chunk = new CompressionMetadata .Chunk (1L , 1 );
566
+ File file = new File (FileUtils .tempFile ("temp" ));
567
+ CorruptBlockException corruptBlockException = new CorruptBlockException (file , chunk );
568
+ assertThat (corruptBlockException .getFile ()).isEqualTo (file );
569
+ return corruptBlockException ;
570
+ }
571
+ catch (IOException e )
572
+ {
573
+ throw new RuntimeException (e );
574
+ }
575
+ });
576
+ }
577
+
578
+ @ Test
579
+ public void handleCorruptionFileException ()
580
+ {
581
+ handleCorruptionException (() -> {
582
+ try
583
+ {
584
+ File file = new File (FileUtils .tempFile ("temp" ));
585
+ CorruptFileException corruptFileException = new CorruptFileException (null , file );
586
+ assertThat (corruptFileException .getFile ()).isEqualTo (file );
587
+ return corruptFileException ;
588
+ }
589
+ catch (IOException e )
590
+ {
591
+ throw new RuntimeException (e );
592
+ }
593
+ });
594
+ }
595
+
596
+
597
+ private void handleCorruptionException (Supplier <Exception > provider )
547
598
{
548
599
JVMKiller originalKiller = JVMStabilityInspector .replaceKiller (new KillerForTests ());
549
600
Config .DiskFailurePolicy originalPolicy = DatabaseDescriptor .getDiskFailurePolicy ();
@@ -558,10 +609,10 @@ public void handleCorruptionException()
558
609
559
610
long before = CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ();
560
611
561
- CorruptSSTableException corruptSSTableException = new CorruptSSTableException ( null , "corrupted" );
562
- BackgroundCompactionRunner .handleCompactionError (corruptSSTableException , cfs );
612
+ Exception exception = provider . get ( );
613
+ BackgroundCompactionRunner .handleCompactionError (exception , cfs );
563
614
564
- assertThat (diskErrorEncountered .get ()).isSameAs (corruptSSTableException );
615
+ assertThat (diskErrorEncountered .get ()).isSameAs (exception );
565
616
assertThat (CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ()).isEqualTo (before + 1 );
566
617
}
567
618
finally
@@ -572,6 +623,7 @@ public void handleCorruptionException()
572
623
}
573
624
}
574
625
626
+
575
627
@ Test
576
628
public void handleFSWriteError ()
577
629
{
@@ -630,6 +682,33 @@ public void handleNoSpaceLeftException()
630
682
}
631
683
}
632
684
685
+ @ Test
686
+ public void handDiskFullErrors ()
687
+ {
688
+ JVMKiller originalKiller = JVMStabilityInspector .replaceKiller (new KillerForTests ());
689
+ Consumer <Throwable > originalGlobalErrorHandler = JVMStabilityInspector .getGlobalErrorHandler ();
690
+ try
691
+ {
692
+ long before = CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ();
693
+
694
+ FSNoDiskAvailableForWriteError noDiskAvailableForWriteError = new FSNoDiskAvailableForWriteError ("ks" );
695
+ assertThat (noDiskAvailableForWriteError .getKeyspace ()).isEqualTo ("ks" );
696
+ BackgroundCompactionRunner .handleCompactionError (noDiskAvailableForWriteError , cfs );
697
+ assertThat (CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ()).isEqualTo (before + 1 );
698
+
699
+
700
+ FSDiskFullWriteError diskFullWriteError = new FSDiskFullWriteError ("ks2" , 100 );
701
+ assertThat (diskFullWriteError .getKeyspace ()).isEqualTo ("ks2" );
702
+ BackgroundCompactionRunner .handleCompactionError (diskFullWriteError , cfs );
703
+ assertThat (CompactionManager .instance .getMetrics ().totalCompactionsFailed .getCount ()).isEqualTo (before + 2 );
704
+ }
705
+ finally
706
+ {
707
+ JVMStabilityInspector .replaceKiller (originalKiller );
708
+ JVMStabilityInspector .setGlobalErrorHandler (originalGlobalErrorHandler );
709
+ }
710
+ }
711
+
633
712
private void verifyTaskScheduled (Executor executor )
634
713
{
635
714
verify (executor ).execute (notNull ());
0 commit comments