Skip to content

Commit b4210ac

Browse files
Dmitry Konstantinovsmiklosovic
authored andcommitted
Improve CommitLogSegmentReader to skip SyncBlocks correctly in case of CRC errors
patch by Dmitry Konstantinov; reviewed by Stefan Miklosovic for CASSANDRA-20664
1 parent fcaa1b3 commit b4210ac

File tree

3 files changed

+152
-41
lines changed

3 files changed

+152
-41
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
4.1.10
2+
* Improve CommitLogSegmentReader to skip SyncBlocks correctly in case of CRC errors (CASSANDRA-20664)
23
* Do not crash on first boot with data_disk_usage_max_disk_size set when data directory is not created yet (CASSANDRA-20787)
34
* Rework / simplification of nodetool get/setguardrailsconfig commands (CASSANDRA-20778)
45
* IntrusiveStack.accumulate is not accumulating correctly (CASSANDRA-20670)

src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentReader.java

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -101,56 +101,69 @@ protected SyncSegment computeNext()
101101
{
102102
while (true)
103103
{
104+
final int currentStart = end;
104105
try
105106
{
106-
final int currentStart = end;
107107
end = readSyncMarker(descriptor, currentStart, reader);
108-
if (end == -1)
109-
{
110-
return endOfData();
111-
}
112-
if (end > reader.length())
113-
{
114-
// the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
115-
// try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
116-
end = (int) reader.length();
117-
}
108+
}
109+
catch (CommitLogSegmentReader.SegmentReadException e)
110+
{
111+
handleUnrecoverableError(e, !e.invalidCrc && tolerateTruncation);
112+
end = -1; // skip the remaining part of the corrupted log segment
113+
}
114+
catch (IOException e)
115+
{
116+
boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
117+
handleUnrecoverableError(e, tolerateErrorsInSection);
118+
end = -1; // skip the remaining part of the corrupted log segment
119+
}
120+
121+
if (end == -1)
122+
{
123+
return endOfData();
124+
}
125+
if (end > reader.length())
126+
{
127+
// the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
128+
// try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
129+
end = (int) reader.length();
130+
}
131+
132+
try
133+
{
118134
return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
119135
}
120-
catch(CommitLogSegmentReader.SegmentReadException e)
136+
catch (CommitLogSegmentReader.SegmentReadException e)
121137
{
122-
try
123-
{
124-
handler.handleUnrecoverableError(new CommitLogReadException(
125-
e.getMessage(),
126-
CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
127-
!e.invalidCrc && tolerateTruncation));
128-
}
129-
catch (IOException ioe)
130-
{
131-
throw new RuntimeException(ioe);
132-
}
138+
handleUnrecoverableError(e, !e.invalidCrc && tolerateTruncation);
139+
// if no exception is thrown, the while loop will continue
133140
}
134141
catch (IOException e)
135142
{
136-
try
137-
{
138-
boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
139-
// if no exception is thrown, the while loop will continue
140-
handler.handleUnrecoverableError(new CommitLogReadException(
141-
e.getMessage(),
142-
CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
143-
tolerateErrorsInSection));
144-
}
145-
catch (IOException ioe)
146-
{
147-
throw new RuntimeException(ioe);
148-
}
143+
boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
144+
handleUnrecoverableError(e, tolerateErrorsInSection);
145+
// if no exception is thrown, the while loop will continue
149146
}
150147
}
151148
}
152149
}
153150

151+
private void handleUnrecoverableError(Exception e, boolean permissible)
152+
{
153+
try
154+
{
155+
handler.handleUnrecoverableError(new CommitLogReadException(
156+
e.getMessage(),
157+
CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
158+
permissible)
159+
);
160+
}
161+
catch (IOException ioe)
162+
{
163+
throw new RuntimeException(ioe);
164+
}
165+
}
166+
154167
private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
155168
{
156169
if (offset > reader.length() - SYNC_MARKER_SIZE)

test/unit/org/apache/cassandra/db/commitlog/CommitLogReaderTest.java

Lines changed: 102 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
*/
1818
package org.apache.cassandra.db.commitlog;
1919

20+
import java.io.FileOutputStream;
2021
import java.io.IOException;
22+
import java.nio.ByteBuffer;
2123
import java.util.ArrayList;
2224
import java.util.List;
2325

26+
import org.apache.cassandra.distributed.shared.WithProperties;
2427
import org.apache.cassandra.io.util.File;
28+
import org.apache.cassandra.security.EncryptionContextGenerator;
2529
import org.junit.Assert;
2630
import org.junit.Before;
2731
import org.junit.BeforeClass;
@@ -40,15 +44,20 @@
4044
import org.apache.cassandra.db.rows.Row;
4145
import org.apache.cassandra.utils.JVMStabilityInspector;
4246
import org.apache.cassandra.utils.KillerForTests;
47+
import org.assertj.core.api.Assertions;
48+
49+
import static org.apache.cassandra.db.commitlog.CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY;
4350

4451
public class CommitLogReaderTest extends CQLTester
4552
{
53+
private static final long CORRUPTED_COMMIT_LOG_FILE_ID = 111L;
54+
private static final String CORRUPTED_COMMIT_LOG_FILE_NAME = "CommitLog-7-1234567.log";
55+
4656
@BeforeClass
4757
public static void setUpClass()
4858
{
4959
prePrepareServer();
5060

51-
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
5261
JVMStabilityInspector.replaceKiller(new KillerForTests(false));
5362

5463
DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.batch);
@@ -60,7 +69,12 @@ public static void setUpClass()
6069
@Before
6170
public void before() throws IOException
6271
{
72+
clearCorruptedCommitLogFile();
6373
CommitLog.instance.resetUnsafe(true);
74+
75+
// always reset to what Cassandra's default is and let each test method
76+
// handle its expected failure policy itself for better test encapsulation.
77+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
6478
}
6579

6680
@Test
@@ -165,6 +179,58 @@ public void testReadCountFromMidpoint() throws Throwable
165179
confirmReadOrder(testHandler, samples / 2);
166180
}
167181

182+
@Test
183+
public void testSyncMarkerChecksumReadFailed_ignoreReplayErrorsDisabled() throws Throwable
184+
{
185+
File corruptedSegmentFile = createAndWriteCorruptedCommitLogFile();
186+
CommitLogReader reader = new CommitLogReader();
187+
// use real CLR handler to test actual behavior
188+
CommitLogReadHandler clrHandler =
189+
new CommitLogReplayer(new CommitLog(null), null, null, null);
190+
191+
// ignore.replay.errors disabled, so we expect the exception here
192+
Assertions.assertThatThrownBy(() ->
193+
reader.readCommitLogSegment(clrHandler,
194+
corruptedSegmentFile,
195+
CommitLogPosition.NONE,
196+
CommitLogReader.ALL_MUTATIONS,
197+
false)
198+
).isInstanceOf(CommitLogReplayer.CommitLogReplayException.class);
199+
}
200+
201+
@Test
202+
public void testSyncMarkerChecksumReadFailed_ignoreReplayErrorsEnabled() throws Throwable
203+
{
204+
try (WithProperties properties = new WithProperties(IGNORE_REPLAY_ERRORS_PROPERTY, "true"))
205+
{
206+
File corruptedSegmentFile = createAndWriteCorruptedCommitLogFile();
207+
208+
CommitLogReader reader = new CommitLogReader();
209+
// use real CLR handler to test actual behavior
210+
CommitLogReadHandler clrHandler =
211+
new CommitLogReplayer(new CommitLog(null), null, null, null);
212+
213+
// ignore.replay.errors enabled, so we don't expect any errors
214+
reader.readCommitLogSegment(clrHandler, corruptedSegmentFile, CommitLogPosition.NONE, CommitLogReader.ALL_MUTATIONS, false);
215+
}
216+
}
217+
218+
@Test
219+
public void testSyncMarkerChecksumReadFailed_ignoreReplayErrorsDisabled_commitFailurePolicyIgnore() throws Throwable
220+
{
221+
DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
222+
223+
File corruptedSegmentFile = createAndWriteCorruptedCommitLogFile();
224+
225+
CommitLogReader reader = new CommitLogReader();
226+
// use real CLR handler to test actual behavior
227+
CommitLogReadHandler clrHandler =
228+
new CommitLogReplayer(new CommitLog(null), null, null, null);
229+
230+
// commit.failure.policy=ignore, so we don't expect any errors
231+
reader.readCommitLogSegment(clrHandler, corruptedSegmentFile, CommitLogPosition.NONE, CommitLogReader.ALL_MUTATIONS, false);
232+
}
233+
168234
/**
169235
* Since we have both table and non mixed into the CL, we ignore updates that aren't for the table the test handler
170236
* is configured to check.
@@ -207,7 +273,7 @@ static ArrayList<File> getCommitLogs()
207273
continue;
208274
results.add(f);
209275
}
210-
Assert.assertTrue("Didn't find any commit log files.", 0 != results.size());
276+
Assert.assertFalse("Didn't find any commit log files.", results.isEmpty());
211277
return results;
212278
}
213279

@@ -229,20 +295,20 @@ public TestCLRHandler(TableMetadata metadata)
229295
this.metadata = metadata;
230296
}
231297

232-
public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException
298+
public boolean shouldSkipSegmentOnError(CommitLogReadException exception)
233299
{
234300
sawStopOnErrorCheck = true;
235301
return false;
236302
}
237303

238-
public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
304+
public void handleUnrecoverableError(CommitLogReadException exception)
239305
{
240306
sawStopOnErrorCheck = true;
241307
}
242308

243309
public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
244310
{
245-
if ((metadata == null) || (metadata != null && m.getPartitionUpdate(metadata) != null)) {
311+
if (metadata == null || m.getPartitionUpdate(metadata) != null) {
246312
seenMutations.add(m);
247313
}
248314
}
@@ -274,4 +340,35 @@ CommitLogPosition populateData(int entryCount) throws Throwable
274340
.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
275341
return result;
276342
}
343+
344+
private static File createAndWriteCorruptedCommitLogFile() throws IOException
345+
{
346+
final ByteBuffer corruptedSegmentByteBuffer =
347+
ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize());
348+
349+
final CommitLogDescriptor commitLogDescriptor =
350+
new CommitLogDescriptor(CORRUPTED_COMMIT_LOG_FILE_ID, null, EncryptionContextGenerator.createDisabledContext());
351+
352+
CommitLogDescriptor.writeHeader(corruptedSegmentByteBuffer, commitLogDescriptor);
353+
354+
// write corrupted sync marker:
355+
// put wrong offset
356+
corruptedSegmentByteBuffer.putInt(42);
357+
// put wrong CRC
358+
corruptedSegmentByteBuffer.putInt(42);
359+
360+
final File corruptedLogFile = new File(DatabaseDescriptor.getCommitLogLocation(), CORRUPTED_COMMIT_LOG_FILE_NAME);
361+
try (FileOutputStream fos = new FileOutputStream(corruptedLogFile.toJavaIOFile()))
362+
{
363+
fos.write(corruptedSegmentByteBuffer.array());
364+
fos.flush();
365+
}
366+
367+
return corruptedLogFile;
368+
}
369+
370+
private static void clearCorruptedCommitLogFile()
371+
{
372+
new File(DatabaseDescriptor.getCommitLogLocation(), CORRUPTED_COMMIT_LOG_FILE_NAME).deleteIfExists();
373+
}
277374
}

0 commit comments

Comments
 (0)