Skip to content

Commit 761851f

Browse files
adelapenadriftx
authored andcommitted
CNDB-14602: Fix bytes-based paging for partition deletions (#1835)
Only preserve the original data size or rows in case of purging. Fixes DBPE-16935.
1 parent 959676d commit 761851f

File tree

6 files changed

+312
-63
lines changed

6 files changed

+312
-63
lines changed

src/java/org/apache/cassandra/db/filter/DataLimits.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -628,14 +628,14 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
628628
{
629629
rowsInCurrentPartition = 0;
630630
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
631-
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.originalDataSize() : 0;
631+
staticRowBytes = hasLiveStaticRow && bytesLimit != NO_LIMIT ? staticRow.dataSizeBeforePurge() : 0;
632632
}
633633

634634
@Override
635635
public Row applyToRow(Row row)
636636
{
637637
if (isLive(row))
638-
incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
638+
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
639639
return row;
640640
}
641641

@@ -1110,7 +1110,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
11101110
}
11111111
hasReturnedRowsFromCurrentPartition = false;
11121112
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
1113-
staticRowBytes = hasLiveStaticRow ? staticRow.originalDataSize() : 0;
1113+
staticRowBytes = hasLiveStaticRow ? staticRow.dataSizeBeforePurge() : 0;
11141114
}
11151115
currentPartitionKey = partitionKey;
11161116
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
@@ -1176,7 +1176,7 @@ public Row applyToRow(Row row)
11761176
if (isLive(row))
11771177
{
11781178
hasUnfinishedGroup = true;
1179-
incrementRowCount(bytesLimit != NO_LIMIT ? row.originalDataSize() : 0);
1179+
incrementRowCount(bytesLimit != NO_LIMIT ? row.dataSizeBeforePurge() : 0);
11801180
hasReturnedRowsFromCurrentPartition = true;
11811181
}
11821182

src/java/org/apache/cassandra/db/rows/BTreeRow.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,22 +93,22 @@ public class BTreeRow extends AbstractRow
9393
/**
9494
* The original data size of this row before purging it, or -1 if it hasn't been purged.
9595
*/
96-
private final int originalDataSize;
96+
private final int dataSizeBeforePurge;
9797

9898
private BTreeRow(Clustering<?> clustering,
9999
LivenessInfo primaryKeyLivenessInfo,
100100
Deletion deletion,
101101
Object[] btree,
102102
long minLocalDeletionTime,
103-
int originalDataSize)
103+
int dataSizeBeforePurge)
104104
{
105105
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
106106
this.clustering = clustering;
107107
this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
108108
this.deletion = deletion;
109109
this.btree = btree;
110110
this.minLocalDeletionTime = minLocalDeletionTime;
111-
this.originalDataSize = originalDataSize;
111+
this.dataSizeBeforePurge = dataSizeBeforePurge;
112112
}
113113

114114
private BTreeRow(Clustering<?> clustering,
@@ -146,9 +146,9 @@ public static BTreeRow create(Clustering<?> clustering,
146146
Deletion deletion,
147147
Object[] btree,
148148
long minDeletionTime,
149-
int originalDataSize)
149+
int dataSizeBeforePurge)
150150
{
151-
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, originalDataSize);
151+
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime, dataSizeBeforePurge);
152152
}
153153

154154
public static BTreeRow create(Clustering<?> clustering,
@@ -522,7 +522,8 @@ public Row purge(DeletionPurger purger, long nowInSec, boolean enforceStrictLive
522522
if (enforceStrictLiveness && newDeletion.isLive() && newInfo.isEmpty())
523523
return null;
524524

525-
return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
525+
Function<ColumnData, ColumnData> columnDataPurger = (cd) -> cd.purge(purger, nowInSec);
526+
return update(newInfo, newDeletion, BTree.transformAndFilter(btree, columnDataPurger), true);
526527
}
527528

528529
public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness)
@@ -540,10 +541,10 @@ public Row purgeDataOlderThan(long timestamp, boolean enforceStrictLiveness)
540541
@Override
541542
public Row transformAndFilter(LivenessInfo info, Deletion deletion, Function<ColumnData, ColumnData> function)
542543
{
543-
return update(info, deletion, BTree.transformAndFilter(btree, function));
544+
return update(info, deletion, BTree.transformAndFilter(btree, function), false);
544545
}
545546

546-
private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
547+
private Row update(LivenessInfo info, Deletion deletion, Object[] newTree, boolean preserveDataSizeBeforePurge)
547548
{
548549
if (btree == newTree && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
549550
return this;
@@ -552,7 +553,9 @@ private Row update(LivenessInfo info, Deletion deletion, Object[] newTree)
552553
return null;
553554

554555
long minDeletionTime = minDeletionTime(newTree, info, deletion.time());
555-
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, originalDataSize());
556+
557+
int dataSizeBeforePurge = preserveDataSizeBeforePurge ? dataSizeBeforePurge() : -1;
558+
return BTreeRow.create(clustering, info, deletion, newTree, minDeletionTime, dataSizeBeforePurge);
556559
}
557560

558561
@Override
@@ -563,7 +566,7 @@ public Row transformAndFilter(Function<ColumnData, ColumnData> function)
563566

564567
public Row transform(Function<ColumnData, ColumnData> function)
565568
{
566-
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function));
569+
return update(primaryKeyLivenessInfo, deletion, BTree.transform(btree, function), false);
567570
}
568571

569572
@Override
@@ -595,9 +598,9 @@ public long unsharedHeapSize()
595598
}
596599

597600
@Override
598-
public int originalDataSize()
601+
public int dataSizeBeforePurge()
599602
{
600-
return originalDataSize >= 0 ? originalDataSize : dataSize();
603+
return dataSizeBeforePurge >= 0 ? dataSizeBeforePurge : dataSize();
601604
}
602605

603606
public long unsharedHeapSizeExcludingData()

src/java/org/apache/cassandra/db/rows/Row.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory
331331
*
332332
* @return the original data size of this row in bytes before purging
333333
*/
334-
int originalDataSize();
334+
int dataSizeBeforePurge();
335335

336336
public long unsharedHeapSizeExcludingData();
337337

src/java/org/apache/cassandra/index/sai/utils/RowWithSourceTable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,9 @@ public int dataSize()
286286
}
287287

288288
@Override
289-
public int originalDataSize()
289+
public int dataSizeBeforePurge()
290290
{
291-
return row.originalDataSize();
291+
return row.dataSizeBeforePurge();
292292
}
293293

294294
@Override

0 commit comments

Comments
 (0)