|
19 | 19 | package org.apache.cassandra.db.compaction;
|
20 | 20 |
|
21 | 21 | import java.io.IOError;
|
| 22 | +import java.io.IOException; |
22 | 23 | import java.util.ArrayList;
|
23 | 24 | import java.util.Collections;
|
24 | 25 | import java.util.List;
|
|
27 | 28 | import java.util.concurrent.CompletableFuture;
|
28 | 29 | import java.util.concurrent.Executor;
|
29 | 30 | import java.util.concurrent.RejectedExecutionException;
|
| 31 | +import java.util.concurrent.atomic.AtomicReference; |
| 32 | +import java.util.function.Consumer; |
30 | 33 |
|
31 | 34 | import com.google.common.collect.ImmutableList;
|
32 | 35 | import com.google.common.collect.ImmutableSet;
|
33 | 36 | import org.apache.cassandra.config.Config;
|
| 37 | +import org.apache.cassandra.io.FSWriteError; |
| 38 | +import org.apache.cassandra.io.sstable.CorruptSSTableException; |
34 | 39 | import org.apache.cassandra.utils.JVMKiller;
|
35 | 40 | import org.apache.cassandra.utils.JVMStabilityInspector;
|
36 | 41 | import org.apache.cassandra.utils.KillerForTests;
|
|
49 | 54 | import org.apache.cassandra.db.lifecycle.Tracker;
|
50 | 55 | import org.apache.cassandra.io.sstable.format.SSTableReader;
|
51 | 56 | import org.assertj.core.util.Lists;
|
| 57 | +import org.assertj.core.util.Preconditions; |
52 | 58 | import org.mockito.ArgumentCaptor;
|
53 | 59 | import org.mockito.ArgumentMatchers;
|
54 | 60 | import org.mockito.Mockito;
|
@@ -512,19 +518,115 @@ public void handleTaskFailure() throws Exception
|
512 | 518 | public void handleOOMError()
|
513 | 519 | {
|
514 | 520 | JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests());
|
515 |
| - Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); |
| 521 | + Consumer<Throwable> originalGlobalErrorHandler = JVMStabilityInspector.getGlobalErrorHandler(); |
516 | 522 | try
|
517 | 523 | {
|
518 |
| - DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die); |
| 524 | + AtomicReference<Throwable> globalErrorEncountered = new AtomicReference<>(); |
| 525 | + JVMStabilityInspector.setGlobalErrorHandler(err -> { |
| 526 | + Preconditions.checkArgument(globalErrorEncountered.get() == null, "Expect only one exception"); |
| 527 | + globalErrorEncountered.set(err); |
| 528 | + }); |
| 529 | + |
| 530 | + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); |
519 | 531 |
|
520 | 532 | OutOfMemoryError oomError = new OutOfMemoryError("oom");
|
521 |
| - assertThatThrownBy(() -> BackgroundCompactionRunner.handleCompactionError(oomError, cfs)) |
522 |
| - .isInstanceOf(OutOfMemoryError.class); |
| 533 | + BackgroundCompactionRunner.handleCompactionError(oomError, cfs); |
| 534 | + |
| 535 | + assertThat(globalErrorEncountered.get()).isSameAs(oomError); |
| 536 | + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); |
| 537 | + } |
| 538 | + finally |
| 539 | + { |
| 540 | + JVMStabilityInspector.replaceKiller(originalKiller); |
| 541 | + JVMStabilityInspector.setGlobalErrorHandler(originalGlobalErrorHandler); |
| 542 | + } |
| 543 | + } |
| 544 | + |
| 545 | + @Test |
| 546 | + public void handleCorruptionException() |
| 547 | + { |
| 548 | + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); |
| 549 | + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); |
| 550 | + Consumer<Throwable> originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); |
| 551 | + try |
| 552 | + { |
| 553 | + AtomicReference<Throwable> diskErrorEncountered = new AtomicReference<>(); |
| 554 | + JVMStabilityInspector.setDiskErrorHandler(err -> { |
| 555 | + Preconditions.checkArgument(diskErrorEncountered.get() == null, "Expect only one exception"); |
| 556 | + diskErrorEncountered.set(err); |
| 557 | + }); |
| 558 | + |
| 559 | + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); |
| 560 | + |
| 561 | + CorruptSSTableException corruptSSTableException = new CorruptSSTableException(null, "corrupted"); |
| 562 | + BackgroundCompactionRunner.handleCompactionError(corruptSSTableException, cfs); |
| 563 | + |
| 564 | + assertThat(diskErrorEncountered.get()).isSameAs(corruptSSTableException); |
| 565 | + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); |
| 566 | + } |
| 567 | + finally |
| 568 | + { |
| 569 | + DatabaseDescriptor.setDiskFailurePolicy(originalPolicy); |
| 570 | + JVMStabilityInspector.replaceKiller(originalKiller); |
| 571 | + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); |
| 572 | + } |
| 573 | + } |
| 574 | + |
| 575 | + @Test |
| 576 | + public void handleFSWriteError() |
| 577 | + { |
| 578 | + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); |
| 579 | + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); |
| 580 | + Consumer<Throwable> originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); |
| 581 | + try |
| 582 | + { |
| 583 | + AtomicReference<Throwable> diskErrorEncountered = new AtomicReference<>(); |
| 584 | + JVMStabilityInspector.setDiskErrorHandler(err -> { |
| 585 | + Preconditions.checkArgument(diskErrorEncountered.get() == null, "Expect only one exception"); |
| 586 | + diskErrorEncountered.set(err); |
| 587 | + }); |
| 588 | + |
| 589 | + long before = CompactionManager.instance.getMetrics().compactionsAborted.getCount(); |
| 590 | + |
| 591 | + FSWriteError fsWriteError = new FSWriteError(null, "file"); |
| 592 | + BackgroundCompactionRunner.handleCompactionError(fsWriteError, cfs); |
| 593 | + |
| 594 | + assertThat(diskErrorEncountered.get()).isSameAs(fsWriteError); |
| 595 | + assertThat(CompactionManager.instance.getMetrics().compactionsAborted.getCount()).isEqualTo(before + 1); |
| 596 | + } |
| 597 | + finally |
| 598 | + { |
| 599 | + DatabaseDescriptor.setDiskFailurePolicy(originalPolicy); |
| 600 | + JVMStabilityInspector.replaceKiller(originalKiller); |
| 601 | + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); |
| 602 | + } |
| 603 | + } |
| 604 | + |
| 605 | + @Test |
| 606 | + public void handleNoSpaceLeftException() |
| 607 | + { |
| 608 | + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); |
| 609 | + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); |
| 610 | + Consumer<Throwable> originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); |
| 611 | + try |
| 612 | + { |
| 613 | + // it's wrapped in FSWriteError |
| 614 | + AtomicReference<Throwable> diskErrorEncountered = new AtomicReference<>(); |
| 615 | + JVMStabilityInspector.setDiskErrorHandler(err -> diskErrorEncountered.set(err.getCause() != null ? err.getCause() : err)); |
| 616 | + |
| 617 | + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); |
| 618 | + |
| 619 | + IOException noSpaceLeftException = new IOException("No space left on device"); |
| 620 | + BackgroundCompactionRunner.handleCompactionError(noSpaceLeftException, cfs); |
| 621 | + |
| 622 | + assertThat(diskErrorEncountered.get()).isSameAs(noSpaceLeftException); |
| 623 | + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); |
523 | 624 | }
|
524 | 625 | finally
|
525 | 626 | {
|
526 | 627 | DatabaseDescriptor.setDiskFailurePolicy(originalPolicy);
|
527 | 628 | JVMStabilityInspector.replaceKiller(originalKiller);
|
| 629 | + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); |
528 | 630 | }
|
529 | 631 | }
|
530 | 632 |
|
|
0 commit comments