diff --git a/src/java/org/apache/cassandra/db/compaction/BackgroundCompactionRunner.java b/src/java/org/apache/cassandra/db/compaction/BackgroundCompactionRunner.java index ab0a5923ac5b..174817c593b6 100644 --- a/src/java/org/apache/cassandra/db/compaction/BackgroundCompactionRunner.java +++ b/src/java/org/apache/cassandra/db/compaction/BackgroundCompactionRunner.java @@ -47,7 +47,10 @@ import org.apache.cassandra.io.FSDiskFullWriteError; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CorruptBlockException; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.CorruptFileException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Throwables; @@ -460,6 +463,7 @@ public static void handleCompactionError(Throwable t, ColumnFamilyStore cfs) // we might have to rely on error message parsing... t = t instanceof FSError ? t : new FSWriteError(t); JVMStabilityInspector.inspectThrowable(t); + CompactionManager.instance.incrementFailed(); } // No-Space-Left IO exception is thrown by JDK when disk has reached its capacity. The key difference between this // and the earlier case with `FSDiskFullWriteError` is that here we have definitively run out of disk space, and @@ -471,15 +475,26 @@ else if (Throwables.isCausedBy(t, IOException.class) && t.toString().contains(NO // wrap it with FSWriteError so that JVMStabilityInspector can properly stop or die t = t instanceof FSError ? t : new FSWriteError(t); JVMStabilityInspector.inspectThrowable(t); + CompactionManager.instance.incrementFailed(); } else if (Throwables.isCausedBy(t, OutOfMemoryError.class)) { logger.error("Encountered out of memory error on {}", cfs, t); JVMStabilityInspector.inspectThrowable(t); + CompactionManager.instance.incrementFailed(); + } + else if (Throwables.anyCauseMatches(t, err -> err instanceof CorruptBlockException + || err instanceof CorruptFileException + || err instanceof CorruptSSTableException)) + { + logger.error("Encountered corruption exception on {}", cfs, t); + JVMStabilityInspector.inspectThrowable(t); + CompactionManager.instance.incrementFailed(); } else if (t instanceof CompactionInterruptedException) { logger.warn(String.format("Aborting background compaction of %s due to interruption", cfs), Throwables.unwrapped(t)); + CompactionManager.instance.incrementAborted(); } else { diff --git a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java index ebb07e2c6936..515bd682a150 100644 --- a/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java +++ b/src/java/org/apache/cassandra/io/FSDiskFullWriteError.java @@ -22,11 +22,20 @@ public class FSDiskFullWriteError extends FSWriteError { + private final String keyspace; + public FSDiskFullWriteError(String keyspace, long mutationSize) { super(new IOException(String.format("Insufficient disk space to write %d bytes into the %s keyspace", mutationSize, keyspace))); + + this.keyspace = keyspace; + } + + public String getKeyspace() + { + return keyspace; } @Override diff --git a/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java index 14dcd38f2a62..59af52f42544 100644 --- a/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java +++ b/src/java/org/apache/cassandra/io/FSNoDiskAvailableForWriteError.java @@ -25,10 +25,19 @@ */ public class FSNoDiskAvailableForWriteError extends FSWriteError { + private final String keyspace; + public FSNoDiskAvailableForWriteError(String keyspace) { super(new IOException(String.format("The data directories for the %s keyspace have been marked as unwritable", keyspace))); + + this.keyspace = keyspace; + } + + public String getKeyspace() + { + return keyspace; } @Override diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 7aac9a3d5410..e9f2e91dd280 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -300,7 +300,7 @@ private synchronized void doResetAndTruncate(DataPosition mark) } catch (IOException e) { - throw new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize, e); + throw new CorruptBlockException(getFile(), chunkOffset, chunkSize, e); } CRC32 checksum = new CRC32(); @@ -313,7 +313,7 @@ private synchronized void doResetAndTruncate(DataPosition mark) int storedChecksum = crcCheckBuffer.getInt(); int computedChecksum = (int) checksum.getValue(); if (storedChecksum != computedChecksum) - throw new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize, storedChecksum, computedChecksum); + throw new CorruptBlockException(getFile(), chunkOffset, chunkSize, storedChecksum, computedChecksum); } catch (CorruptBlockException e) { @@ -321,7 +321,7 @@ private synchronized void doResetAndTruncate(DataPosition mark) } catch (EOFException e) { - throw new CorruptSSTableException(new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize), getFile()); + throw new CorruptSSTableException(new CorruptBlockException(getFile(), chunkOffset, chunkSize), getFile()); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java b/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java index a41d8dfc05b7..9315fff66048 100644 --- a/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java +++ b/src/java/org/apache/cassandra/io/compress/CorruptBlockException.java @@ -19,37 +19,47 @@ import java.io.IOException; +import org.apache.cassandra.io.util.File; + public class CorruptBlockException extends IOException { - public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk) + private final File file; + + public CorruptBlockException(File file, CompressionMetadata.Chunk chunk) { - this(filePath, chunk, null); + this(file, chunk, null); } - public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk, Throwable cause) + public CorruptBlockException(File file, CompressionMetadata.Chunk chunk, Throwable cause) { - this(filePath, chunk.offset, chunk.length, cause); + this(file, chunk.offset, chunk.length, cause); } - public CorruptBlockException(String filePath, long offset, int length) + public CorruptBlockException(File file, long offset, int length) { - this(filePath, offset, length, null); + this(file, offset, length, null); } - public CorruptBlockException(String filePath, long offset, int length, Throwable cause) + public CorruptBlockException(File file, long offset, int length, Throwable cause) { - super(String.format("(%s): corruption detected, chunk at %d of length %d.", filePath, offset, length), cause); + super(String.format("(%s): corruption detected, chunk at %d of length %d.", file.toString(), offset, length), cause); + this.file = file; } - public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk, int storedChecksum, int calculatedChecksum) + public CorruptBlockException(File file, CompressionMetadata.Chunk chunk, int storedChecksum, int calculatedChecksum) { - this(filePath, chunk.offset, chunk.length, storedChecksum, calculatedChecksum); + this(file, chunk.offset, chunk.length, storedChecksum, calculatedChecksum); } - public CorruptBlockException(String filePath, long offset, int length, int storedChecksum, int calculatedChecksum) + public CorruptBlockException(File file, long offset, int length, int storedChecksum, int calculatedChecksum) { super(String.format("(%s): corruption detected, chunk at %d of length %d has mismatched checksums. Expected %d, but calculated %d", - filePath, offset, length, storedChecksum, calculatedChecksum)); + file.toString(), offset, length, storedChecksum, calculatedChecksum)); + this.file = file; } + public File getFile() + { + return file; + } } diff --git a/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java index ba0a85ee0b28..68f49b7b25cb 100644 --- a/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/EncryptedSequentialWriter.java @@ -221,7 +221,7 @@ public synchronized void resetAndTruncate(DataPosition mark) encrypted.limit(CHUNK_SIZE); if (encrypted.getInt(CHUNK_SIZE - 4) != (int) checksum.getValue()) - throw new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE); + throw new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE); try { @@ -233,7 +233,7 @@ public synchronized void resetAndTruncate(DataPosition mark) } catch (IOException e) { - throw new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE, e); + throw new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE, e); } } catch (CorruptBlockException e) @@ -242,7 +242,7 @@ public synchronized void resetAndTruncate(DataPosition mark) } catch (EOFException e) { - throw new CorruptSSTableException(new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE), getFile()); + throw new CorruptSSTableException(new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE), getFile()); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java index 6ede05bb4d12..728e05c9261a 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedRebufferer.java @@ -50,7 +50,7 @@ public BufferHolder rebuffer(long desiredPosition) } catch (IOException e) { - throw new CorruptFileException(e, channel().filePath()); + throw new CorruptFileException(e, channel().getFile()); } return this; diff --git a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java index 7ab034a7f877..48b27a140829 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/CompressedChunkReader.java @@ -140,7 +140,7 @@ public void readChunk(long position, ByteBuffer uncompressed) { compressed.limit(length); if (channel.read(compressed, chunkOffset) != length) - throw new CorruptBlockException(channel.filePath(), chunk); + throw new CorruptBlockException(channel.getFile(), chunk); if (shouldCheckCrc) { @@ -151,7 +151,7 @@ public void readChunk(long position, ByteBuffer uncompressed) compressed.limit(length); int storedChecksum = compressed.getInt(); if (storedChecksum != checksum) - throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum); + throw new CorruptBlockException(channel.getFile(), chunk, storedChecksum, checksum); } compressed.position(0).limit(chunk.length); @@ -171,7 +171,7 @@ public void readChunk(long position, ByteBuffer uncompressed) } catch (IOException e) { - throw new CorruptBlockException(channel.filePath(), chunk, e); + throw new CorruptBlockException(channel.getFile(), chunk, e); } } finally @@ -183,7 +183,7 @@ public void readChunk(long position, ByteBuffer uncompressed) { uncompressed.position(0).limit(chunk.length); if (channel.read(uncompressed, chunkOffset) != chunk.length) - throw new CorruptBlockException(channel.filePath(), chunk); + throw new CorruptBlockException(channel.getFile(), chunk); } uncompressed.flip(); } @@ -239,7 +239,7 @@ public void readChunk(long position, ByteBuffer uncompressed) compressedChunk.limit(compressedChunk.capacity()); int storedChecksum = compressedChunk.getInt(); if (storedChecksum != checksum) - throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum); + throw new CorruptBlockException(channel.getFile(), chunk, storedChecksum, checksum); } compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length); @@ -257,7 +257,7 @@ public void readChunk(long position, ByteBuffer uncompressed) } catch (IOException e) { - throw new CorruptBlockException(channel.filePath(), chunk, e); + throw new CorruptBlockException(channel.getFile(), chunk, e); } uncompressed.flip(); } diff --git a/src/java/org/apache/cassandra/io/util/CorruptFileException.java b/src/java/org/apache/cassandra/io/util/CorruptFileException.java index 875d06f537c4..37e1cea46fc0 100644 --- a/src/java/org/apache/cassandra/io/util/CorruptFileException.java +++ b/src/java/org/apache/cassandra/io/util/CorruptFileException.java @@ -21,11 +21,16 @@ @SuppressWarnings("serial") public class CorruptFileException extends RuntimeException { - public final String filePath; + public final File file; - public CorruptFileException(Exception cause, String filePath) + public CorruptFileException(Exception cause, File file) { super(cause); - this.filePath = filePath; + this.file = file; + } + + public File getFile() + { + return file; } } diff --git a/src/java/org/apache/cassandra/io/util/EncryptedChunkReader.java b/src/java/org/apache/cassandra/io/util/EncryptedChunkReader.java index 9310ecac33fe..ed37c33ae32c 100644 --- a/src/java/org/apache/cassandra/io/util/EncryptedChunkReader.java +++ b/src/java/org/apache/cassandra/io/util/EncryptedChunkReader.java @@ -114,7 +114,7 @@ protected ByteBuffer decrypt(ByteBuffer input, int start, ByteBuffer output, lon //Change the limit to include the checksum input.limit(start + CHUNK_SIZE); if (input.getInt() != checksum) - throw new CorruptBlockException(channel.filePath(), position, CHUNK_SIZE); + throw new CorruptBlockException(channel.getFile(), position, CHUNK_SIZE); } int length = input.getInt(start + CHUNK_SIZE - FOOTER_LENGTH); diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java b/src/java/org/apache/cassandra/repair/ValidationManager.java index 600d1c953f48..c773768a8c94 100644 --- a/src/java/org/apache/cassandra/repair/ValidationManager.java +++ b/src/java/org/apache/cassandra/repair/ValidationManager.java @@ -38,6 +38,7 @@ import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.NonThrowingCloseable; @@ -176,6 +177,7 @@ public Object call() throws IOException // we need to inform the remote end of our failure, otherwise it will hang on repair forever validator.fail(); logger.error("Validation failed.", e); + JVMStabilityInspector.inspectThrowable(e); throw e; } return this; diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java index 4e3eb86375e0..d72d54ab9297 100644 --- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java +++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java @@ -87,6 +87,16 @@ public static void setCommitLogErrorHandler(Function commitLogHandler = errorHandler; } + public static Consumer getGlobalErrorHandler() + { + return globalHandler; + } + + public static Consumer getDiskErrorHandler() + { + return diskHandler; + } + /** * Certain Throwables and Exceptions represent "Die" conditions for the server. * This recursively checks the input Throwable's cause hierarchy until null. diff --git a/test/unit/org/apache/cassandra/db/compaction/BackgroundCompactionRunnerTest.java b/test/unit/org/apache/cassandra/db/compaction/BackgroundCompactionRunnerTest.java index df6aef75d3d9..a8e181f86cf7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/BackgroundCompactionRunnerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/BackgroundCompactionRunnerTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction; import java.io.IOError; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -27,10 +28,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.cassandra.config.Config; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.utils.JVMKiller; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.KillerForTests; @@ -49,6 +54,7 @@ import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.assertj.core.util.Lists; +import org.assertj.core.util.Preconditions; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; @@ -512,19 +518,115 @@ public void handleTaskFailure() throws Exception public void handleOOMError() { JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); - Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + Consumer originalGlobalErrorHandler = JVMStabilityInspector.getGlobalErrorHandler(); try { - DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die); + AtomicReference globalErrorEncountered = new AtomicReference<>(); + JVMStabilityInspector.setGlobalErrorHandler(err -> { + Preconditions.checkArgument(globalErrorEncountered.get() == null, "Expect only one exception"); + globalErrorEncountered.set(err); + }); + + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); OutOfMemoryError oomError = new OutOfMemoryError("oom"); - assertThatThrownBy(() -> BackgroundCompactionRunner.handleCompactionError(oomError, cfs)) - .isInstanceOf(OutOfMemoryError.class); + BackgroundCompactionRunner.handleCompactionError(oomError, cfs); + + assertThat(globalErrorEncountered.get()).isSameAs(oomError); + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); + } + finally + { + JVMStabilityInspector.replaceKiller(originalKiller); + JVMStabilityInspector.setGlobalErrorHandler(originalGlobalErrorHandler); + } + } + + @Test + public void handleCorruptionException() + { + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + Consumer originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); + try + { + AtomicReference diskErrorEncountered = new AtomicReference<>(); + JVMStabilityInspector.setDiskErrorHandler(err -> { + Preconditions.checkArgument(diskErrorEncountered.get() == null, "Expect only one exception"); + diskErrorEncountered.set(err); + }); + + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); + + CorruptSSTableException corruptSSTableException = new CorruptSSTableException(null, "corrupted"); + BackgroundCompactionRunner.handleCompactionError(corruptSSTableException, cfs); + + assertThat(diskErrorEncountered.get()).isSameAs(corruptSSTableException); + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(originalPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); + } + } + + @Test + public void handleFSWriteError() + { + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + Consumer originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); + try + { + AtomicReference diskErrorEncountered = new AtomicReference<>(); + JVMStabilityInspector.setDiskErrorHandler(err -> { + Preconditions.checkArgument(diskErrorEncountered.get() == null, "Expect only one exception"); + diskErrorEncountered.set(err); + }); + + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); + + FSWriteError fsWriteError = new FSWriteError(null, "file"); + BackgroundCompactionRunner.handleCompactionError(fsWriteError, cfs); + + assertThat(diskErrorEncountered.get()).isSameAs(fsWriteError); + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); + } + finally + { + DatabaseDescriptor.setDiskFailurePolicy(originalPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); + } + } + + @Test + public void handleNoSpaceLeftException() + { + JVMKiller originalKiller = JVMStabilityInspector.replaceKiller(new KillerForTests()); + Config.DiskFailurePolicy originalPolicy = DatabaseDescriptor.getDiskFailurePolicy(); + Consumer originalDiskErrorHandler = JVMStabilityInspector.getDiskErrorHandler(); + try + { + // it's wrapped in FSWriteError + AtomicReference diskErrorEncountered = new AtomicReference<>(); + JVMStabilityInspector.setDiskErrorHandler(err -> diskErrorEncountered.set(err.getCause() != null ? err.getCause() : err)); + + long before = CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount(); + + IOException noSpaceLeftException = new IOException("No space left on device"); + BackgroundCompactionRunner.handleCompactionError(noSpaceLeftException, cfs); + + assertThat(diskErrorEncountered.get()).isSameAs(noSpaceLeftException); + assertThat(CompactionManager.instance.getMetrics().totalCompactionsFailed.getCount()).isEqualTo(before + 1); } finally { DatabaseDescriptor.setDiskFailurePolicy(originalPolicy); JVMStabilityInspector.replaceKiller(originalKiller); + JVMStabilityInspector.setDiskErrorHandler(originalDiskErrorHandler); } }