Skip to content

CNDB-9697: notify JVMStabilityInspector when encountered corruption exception on compaction task #1901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CorruptBlockException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.CorruptFileException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
Expand Down Expand Up @@ -460,6 +463,7 @@ public static void handleCompactionError(Throwable t, ColumnFamilyStore cfs)
// we might have to rely on error message parsing...
t = t instanceof FSError ? t : new FSWriteError(t);
JVMStabilityInspector.inspectThrowable(t);
CompactionManager.instance.incrementFailed();
}
// No-Space-Left IO exception is thrown by JDK when disk has reached its capacity. The key difference between this
// and the earlier case with `FSDiskFullWriteError` is that here we have definitively run out of disk space, and
Expand All @@ -471,15 +475,26 @@ else if (Throwables.isCausedBy(t, IOException.class) && t.toString().contains(NO
// wrap it with FSWriteError so that JVMStabilityInspector can properly stop or die
t = t instanceof FSError ? t : new FSWriteError(t);
JVMStabilityInspector.inspectThrowable(t);
CompactionManager.instance.incrementFailed();
}
else if (Throwables.isCausedBy(t, OutOfMemoryError.class))
{
logger.error("Encountered out of memory error on {}", cfs, t);
JVMStabilityInspector.inspectThrowable(t);
CompactionManager.instance.incrementFailed();
}
else if (Throwables.anyCauseMatches(t, err -> err instanceof CorruptBlockException
|| err instanceof CorruptFileException
|| err instanceof CorruptSSTableException))
{
logger.error("Encountered corruption exception on {}", cfs, t);
JVMStabilityInspector.inspectThrowable(t);
CompactionManager.instance.incrementFailed();
}
else if (t instanceof CompactionInterruptedException)
{
logger.warn(String.format("Aborting background compaction of %s due to interruption", cfs), Throwables.unwrapped(t));
CompactionManager.instance.incrementAborted();
}
else
{
Expand Down
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/io/FSDiskFullWriteError.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,20 @@

public class FSDiskFullWriteError extends FSWriteError
{
private final String keyspace;

public FSDiskFullWriteError(String keyspace, long mutationSize)
{
super(new IOException(String.format("Insufficient disk space to write %d bytes into the %s keyspace",
mutationSize,
keyspace)));

this.keyspace = keyspace;
}

public String getKeyspace()
{
return keyspace;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,19 @@
*/
public class FSNoDiskAvailableForWriteError extends FSWriteError
{
private final String keyspace;

public FSNoDiskAvailableForWriteError(String keyspace)
{
super(new IOException(String.format("The data directories for the %s keyspace have been marked as unwritable",
keyspace)));

this.keyspace = keyspace;
}

public String getKeyspace()
{
return keyspace;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private synchronized void doResetAndTruncate(DataPosition mark)
}
catch (IOException e)
{
throw new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize, e);
throw new CorruptBlockException(getFile(), chunkOffset, chunkSize, e);
}

CRC32 checksum = new CRC32();
Expand All @@ -313,15 +313,15 @@ private synchronized void doResetAndTruncate(DataPosition mark)
int storedChecksum = crcCheckBuffer.getInt();
int computedChecksum = (int) checksum.getValue();
if (storedChecksum != computedChecksum)
throw new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize, storedChecksum, computedChecksum);
throw new CorruptBlockException(getFile(), chunkOffset, chunkSize, storedChecksum, computedChecksum);
}
catch (CorruptBlockException e)
{
throw new CorruptSSTableException(e, getFile());
}
catch (EOFException e)
{
throw new CorruptSSTableException(new CorruptBlockException(getFile().toString(), chunkOffset, chunkSize), getFile());
throw new CorruptSSTableException(new CorruptBlockException(getFile(), chunkOffset, chunkSize), getFile());
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,47 @@

import java.io.IOException;

import org.apache.cassandra.io.util.File;

public class CorruptBlockException extends IOException
{
public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk)
private final File file;

public CorruptBlockException(File file, CompressionMetadata.Chunk chunk)
{
this(filePath, chunk, null);
this(file, chunk, null);
}

public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk, Throwable cause)
public CorruptBlockException(File file, CompressionMetadata.Chunk chunk, Throwable cause)
{
this(filePath, chunk.offset, chunk.length, cause);
this(file, chunk.offset, chunk.length, cause);
}

public CorruptBlockException(String filePath, long offset, int length)
public CorruptBlockException(File file, long offset, int length)
{
this(filePath, offset, length, null);
this(file, offset, length, null);
}

public CorruptBlockException(String filePath, long offset, int length, Throwable cause)
public CorruptBlockException(File file, long offset, int length, Throwable cause)
{
super(String.format("(%s): corruption detected, chunk at %d of length %d.", filePath, offset, length), cause);
super(String.format("(%s): corruption detected, chunk at %d of length %d.", file.toString(), offset, length), cause);
this.file = file;
}

public CorruptBlockException(String filePath, CompressionMetadata.Chunk chunk, int storedChecksum, int calculatedChecksum)
public CorruptBlockException(File file, CompressionMetadata.Chunk chunk, int storedChecksum, int calculatedChecksum)
{
this(filePath, chunk.offset, chunk.length, storedChecksum, calculatedChecksum);
this(file, chunk.offset, chunk.length, storedChecksum, calculatedChecksum);
}

public CorruptBlockException(String filePath, long offset, int length, int storedChecksum, int calculatedChecksum)
public CorruptBlockException(File file, long offset, int length, int storedChecksum, int calculatedChecksum)
{
super(String.format("(%s): corruption detected, chunk at %d of length %d has mismatched checksums. Expected %d, but calculated %d",
filePath, offset, length, storedChecksum, calculatedChecksum));
file.toString(), offset, length, storedChecksum, calculatedChecksum));
this.file = file;
}

public File getFile()
{
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public synchronized void resetAndTruncate(DataPosition mark)
encrypted.limit(CHUNK_SIZE);

if (encrypted.getInt(CHUNK_SIZE - 4) != (int) checksum.getValue())
throw new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE);
throw new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE);

try
{
Expand All @@ -233,7 +233,7 @@ public synchronized void resetAndTruncate(DataPosition mark)
}
catch (IOException e)
{
throw new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE, e);
throw new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE, e);
}
}
catch (CorruptBlockException e)
Expand All @@ -242,7 +242,7 @@ public synchronized void resetAndTruncate(DataPosition mark)
}
catch (EOFException e)
{
throw new CorruptSSTableException(new CorruptBlockException(getFile().toString(), truncateChunk, CHUNK_SIZE), getFile());
throw new CorruptSSTableException(new CorruptBlockException(getFile(), truncateChunk, CHUNK_SIZE), getFile());
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public BufferHolder rebuffer(long desiredPosition)
}
catch (IOException e)
{
throw new CorruptFileException(e, channel().filePath());
throw new CorruptFileException(e, channel().getFile());
}

return this;
Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
{
compressed.limit(length);
if (channel.read(compressed, chunkOffset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);
throw new CorruptBlockException(channel.getFile(), chunk);

if (shouldCheckCrc)
{
Expand All @@ -151,7 +151,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
compressed.limit(length);
int storedChecksum = compressed.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
throw new CorruptBlockException(channel.getFile(), chunk, storedChecksum, checksum);
}

compressed.position(0).limit(chunk.length);
Expand All @@ -171,7 +171,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
}
catch (IOException e)
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
throw new CorruptBlockException(channel.getFile(), chunk, e);
}
}
finally
Expand All @@ -183,7 +183,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
{
uncompressed.position(0).limit(chunk.length);
if (channel.read(uncompressed, chunkOffset) != chunk.length)
throw new CorruptBlockException(channel.filePath(), chunk);
throw new CorruptBlockException(channel.getFile(), chunk);
}
uncompressed.flip();
}
Expand Down Expand Up @@ -239,7 +239,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
compressedChunk.limit(compressedChunk.capacity());
int storedChecksum = compressedChunk.getInt();
if (storedChecksum != checksum)
throw new CorruptBlockException(channel.filePath(), chunk, storedChecksum, checksum);
throw new CorruptBlockException(channel.getFile(), chunk, storedChecksum, checksum);
}

compressedChunk.position(chunkOffsetInSegment).limit(chunkOffsetInSegment + chunk.length);
Expand All @@ -257,7 +257,7 @@ public void readChunk(long position, ByteBuffer uncompressed)
}
catch (IOException e)
{
throw new CorruptBlockException(channel.filePath(), chunk, e);
throw new CorruptBlockException(channel.getFile(), chunk, e);
}
uncompressed.flip();
}
Expand Down
11 changes: 8 additions & 3 deletions src/java/org/apache/cassandra/io/util/CorruptFileException.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
@SuppressWarnings("serial")
public class CorruptFileException extends RuntimeException
{
public final String filePath;
public final File file;

public CorruptFileException(Exception cause, String filePath)
public CorruptFileException(Exception cause, File file)
{
super(cause);
this.filePath = filePath;
this.file = file;
}

public File getFile()
{
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected ByteBuffer decrypt(ByteBuffer input, int start, ByteBuffer output, lon
//Change the limit to include the checksum
input.limit(start + CHUNK_SIZE);
if (input.getInt() != checksum)
throw new CorruptBlockException(channel.filePath(), position, CHUNK_SIZE);
throw new CorruptBlockException(channel.getFile(), position, CHUNK_SIZE);
}

int length = input.getInt(start + CHUNK_SIZE - FOOTER_LENGTH);
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/repair/ValidationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.NonThrowingCloseable;
Expand Down Expand Up @@ -176,6 +177,7 @@ public Object call() throws IOException
// we need to inform the remote end of our failure, otherwise it will hang on repair forever
validator.fail();
logger.error("Validation failed.", e);
JVMStabilityInspector.inspectThrowable(e);
throw e;
}
return this;
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ public static void setCommitLogErrorHandler(Function<String, Consumer<Throwable>
commitLogHandler = errorHandler;
}

public static Consumer<Throwable> getGlobalErrorHandler()
{
return globalHandler;
}

public static Consumer<Throwable> getDiskErrorHandler()
{
return diskHandler;
}

/**
* Certain Throwables and Exceptions represent "Die" conditions for the server.
* This recursively checks the input Throwable's cause hierarchy until null.
Expand Down
Loading