Skip to content

Commit 291ae9c

Browse files
author
Nitsan Wakart
committed
Validate tombstones (if configured). The error is not the same as current for rows.
1 parent 621e39d commit 291ae9c

File tree

6 files changed

+123
-19
lines changed

6 files changed

+123
-19
lines changed

src/java/org/apache/cassandra/db/DeletionTime.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ public void reset(DeletionTime deletionTime)
459459
@Override
460460
public boolean validate()
461461
{
462-
return CassandraUInt.compare(Cell.MAX_DELETION_TIME_UNSIGNED_INTEGER, localDeletionTimeUnsignedInteger) < 0;
462+
return localDeletionTimeUnsignedInteger == LOCAL_DELETION_TIME_LIVE || CassandraUInt.compare(Cell.MAX_DELETION_TIME_UNSIGNED_INTEGER, localDeletionTimeUnsignedInteger) >= 0;
463463
}
464464

465465
public static ReusableDeletionTime copy(DeletionTime original)

src/java/org/apache/cassandra/db/SerializationHeader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ public DeletionTime readDeletionTime(DataInputPlus in) throws IOException
207207
return DeletionTime.build(markedAt, localDeletionTime);
208208
}
209209

210+
public void readDeletionTime(DataInputPlus in, DeletionTime.ReusableDeletionTime reuse) throws IOException
211+
{
212+
long markedAt = readTimestamp(in);
213+
long localDeletionTime = readLocalDeletionTime(in);
214+
reuse.reset(markedAt, localDeletionTime);
215+
}
216+
217+
210218
public long timestampSerializedSize(long timestamp)
211219
{
212220
return TypeSizes.sizeofUnsignedVInt(timestamp - stats.minTimestamp);

src/java/org/apache/cassandra/db/compaction/CursorCompactor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,11 +1226,11 @@ private int prepareAndSortUnfilteredForMerge(int partitionMergeLimit, int prevMe
12261226
if (readerState == ROW_START)
12271227
{
12281228
totalSourceCQLRows++;
1229-
sstableCursor.readRowHeader(sstableCursor.unfiltered());
1229+
sstableCursor.readRowHeader();
12301230
}
12311231
if (readerState == TOMBSTONE_START)
12321232
{
1233-
sstableCursor.readTombstoneMarker(sstableCursor.unfiltered());
1233+
sstableCursor.readTombstoneMarker();
12341234
}
12351235
if (readerState == STATIC_ROW_START)
12361236
throw new IllegalStateException("Unexpected static row after static row merge:" + sstableCursor);
@@ -1263,14 +1263,14 @@ private int prepareAndSortStaticForMerge(int partitionMergeLimit) throws IOExcep
12631263
return 0;
12641264
}
12651265
totalSourceCQLRows++;
1266-
sstableCursors[0].readStaticRowHeader(sstableCursors[0].unfiltered());
1266+
sstableCursors[0].readStaticRowHeader();
12671267
int staticRowMergeLimit = 1;
12681268
for (; staticRowMergeLimit < partitionMergeLimit; staticRowMergeLimit++)
12691269
{
12701270
if (sstableCursorsEqualsNext[staticRowMergeLimit - 1])
12711271
{
12721272
totalSourceCQLRows++;
1273-
sstableCursors[staticRowMergeLimit].readStaticRowHeader(sstableCursors[staticRowMergeLimit].unfiltered());
1273+
sstableCursors[staticRowMergeLimit].readStaticRowHeader();
12741274
}
12751275
else
12761276
break;

src/java/org/apache/cassandra/db/compaction/StatefulCursor.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818

1919
package org.apache.cassandra.db.compaction;
2020

21+
import org.apache.cassandra.config.Config;
22+
import org.apache.cassandra.config.DatabaseDescriptor;
2123
import org.apache.cassandra.db.DecoratedKey;
24+
import org.apache.cassandra.db.UnfilteredValidation;
2225
import org.apache.cassandra.io.sstable.PartitionDescriptor;
26+
import org.apache.cassandra.io.sstable.ReusableLivenessInfo;
2327
import org.apache.cassandra.io.sstable.SSTableCursorReader;
2428
import org.apache.cassandra.io.sstable.UnfilteredDescriptor;
2529
import org.apache.cassandra.io.sstable.format.SSTableReader;
2630

31+
import static org.apache.cassandra.db.rows.Cell.INVALID_DELETION_TIME;
32+
import static org.apache.cassandra.db.rows.Cell.NO_DELETION_TIME;
2733
import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_END;
2834
import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_HEADER_START;
2935
import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_VALUE_START;
@@ -32,6 +38,9 @@
3238
// Cursor state
3339
class StatefulCursor extends SSTableCursorReader
3440
{
41+
private final Config.CorruptedTombstoneStrategy corruptedTombstoneStrategy = DatabaseDescriptor.getCorruptedTombstoneStrategy();
42+
private final boolean corruptedTombstoneValidationEnabled = corruptedTombstoneStrategy != Config.CorruptedTombstoneStrategy.disabled;
43+
3544
private PartitionDescriptor currPartition;
3645
/**
3746
* used for tracking reader and writer order, as well as last pk
@@ -57,6 +66,8 @@ public int readPartitionHeader()
5766

5867
if (prevPartition.keyLength() != 0 && prevPartition.key().compareTo(currPartition.key()) >= 0)
5968
corruptSSTableKeysOOO();
69+
if (corruptedTombstoneValidationEnabled)
70+
validateInvalidPartitionDeletion();
6071
return state;
6172
}
6273

@@ -154,4 +165,96 @@ private String keyToString(DecoratedKey key)
154165
}
155166
return keyString;
156167
}
168+
169+
public void readRowHeader()
170+
{
171+
super.readRowHeader(unfiltered);
172+
if (corruptedTombstoneValidationEnabled)
173+
validateInvalidRowDeletion();
174+
}
175+
176+
public void readTombstoneMarker()
177+
{
178+
super.readTombstoneMarker(unfiltered);
179+
if (corruptedTombstoneValidationEnabled)
180+
validateInvalidTombstoneDeletion();
181+
}
182+
183+
public void readStaticRowHeader()
184+
{
185+
super.readStaticRowHeader(unfiltered);
186+
if (corruptedTombstoneValidationEnabled)
187+
validateInvalidRowDeletion();
188+
}
189+
190+
@Override
191+
public int readCellHeader()
192+
{
193+
int state = super.readCellHeader();
194+
if (corruptedTombstoneValidationEnabled)
195+
validateInvalidCellDeletion();
196+
return state;
197+
}
198+
199+
private void validateInvalidTombstoneDeletion()
200+
{
201+
if (!unfiltered.deletionTime().validate()) {
202+
UnfilteredValidation.handleInvalid(
203+
ssTableReader().metadata(),
204+
currPartition.key(),
205+
ssTableReader(),
206+
"rowDeletion="+currPartition.deletionTime().toString());
207+
}
208+
if (unfiltered.isBoundary() && !unfiltered.deletionTime2().validate()) {
209+
UnfilteredValidation.handleInvalid(
210+
ssTableReader().metadata(),
211+
currPartition.key(),
212+
ssTableReader(),
213+
"rowDeletion2="+currPartition.deletionTime().toString());
214+
}
215+
}
216+
217+
private void validateInvalidCellDeletion()
218+
{
219+
ReusableLivenessInfo cellLiveness = cellCursor().cellLiveness;
220+
long ldt = cellLiveness.localExpirationTime();
221+
if (cellLiveness.ttl() < 0 || ldt == INVALID_DELETION_TIME || ldt < 0 || (cellLiveness.isExpiring() && ldt == NO_DELETION_TIME)) {
222+
UnfilteredValidation.handleInvalid(
223+
ssTableReader().metadata(),
224+
currPartition.key(),
225+
ssTableReader(),
226+
"cellLiveness="+cellLiveness);
227+
}
228+
}
229+
230+
private void validateInvalidRowDeletion()
231+
{
232+
if (!unfiltered.deletionTime().validate()) {
233+
UnfilteredValidation.handleInvalid(
234+
ssTableReader().metadata(),
235+
currPartition.key(),
236+
ssTableReader(),
237+
"rowDeletion="+currPartition.deletionTime().toString());
238+
}
239+
ReusableLivenessInfo livenessInfo = unfiltered.livenessInfo();
240+
if (livenessInfo.isExpiring() && (livenessInfo.ttl() < 0 || livenessInfo.localExpirationTime() < 0)) {
241+
UnfilteredValidation.handleInvalid(
242+
ssTableReader().metadata(),
243+
currPartition.key(),
244+
ssTableReader(),
245+
"rowLiveness="+livenessInfo.toString());
246+
}
247+
248+
}
249+
250+
private void validateInvalidPartitionDeletion()
251+
{
252+
if (!currPartition.deletionTime().validate()) {
253+
UnfilteredValidation.handleInvalid(
254+
ssTableReader().metadata(),
255+
currPartition.key(),
256+
ssTableReader(),
257+
"partitionLevelDeletion="+currPartition.deletionTime().toString());
258+
}
259+
}
157260
}

src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import com.google.common.collect.ImmutableList;
2424

25-
import org.apache.cassandra.db.DeletionTime.ReusableDeletionTime;
2625
import org.apache.cassandra.io.util.ResizableByteBuffer;
2726
import org.apache.cassandra.db.ClusteringPrefix;
2827
import org.apache.cassandra.db.Columns;
@@ -860,13 +859,6 @@ else if (UnfilteredSerializer.isRow(basicUnfilteredFlags))
860859
}
861860
}
862861

863-
static void readUnfilteredDeletionTime(RandomAccessReader dataReader, SerializationHeader serializationHeader, ReusableDeletionTime reuse) throws IOException
864-
{
865-
long markedAt = serializationHeader.readTimestamp(dataReader);
866-
long localDeletionTime = serializationHeader.readLocalDeletionTime(dataReader);
867-
reuse.reset(markedAt, localDeletionTime);
868-
}
869-
870862
public boolean isEOF() {
871863
return state == DONE || dataReader.isEOF();
872864
}

src/java/org/apache/cassandra/io/sstable/UnfilteredDescriptor.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.cassandra.db.rows.UnfilteredSerializer;
3030
import org.apache.cassandra.io.util.RandomAccessReader;
3131

32-
import static org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredDeletionTime;
33-
3432
public class UnfilteredDescriptor extends ClusteringDescriptor
3533
{
3634
private final ReusableLivenessInfo rowLivenessInfo = new ReusableLivenessInfo();
@@ -69,13 +67,16 @@ void loadTombstone(RandomAccessReader dataReader,
6967
clusteringKind == INCL_END_EXCL_START_BOUNDARY_CLUSTERING_KIND)
7068
{
7169
// boundary
72-
readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime); // CLOSE
73-
readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime2); // OPEN
70+
// CLOSE
71+
serializationHeader.readDeletionTime(dataReader, deletionTime);
72+
// OPEN
73+
serializationHeader.readDeletionTime(dataReader, deletionTime2);
7474
}
7575
else
7676
{
7777
// bound
78-
readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime); // CLOSE|OPEN
78+
// CLOSE|OPEN
79+
serializationHeader.readDeletionTime(dataReader, deletionTime);
7980
}
8081
}
8182

@@ -134,7 +135,7 @@ private void loadCommonRowFields(RandomAccessReader dataReader,
134135
// varint delta_marked_for_delete_at;
135136
// varint delta_local_deletion_time;
136137
//};
137-
readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime);
138+
serializationHeader.readDeletionTime(dataReader, deletionTime);
138139
}
139140
else
140141
{

0 commit comments

Comments
 (0)