Skip to content

Commit 74e0794

Browse files
committed
Merge branch 'cassandra-4.1' into cassandra-5.0
2 parents 82880b8 + 4824ecc commit 74e0794

File tree

6 files changed

+169
-0
lines changed

6 files changed

+169
-0
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
5.0.6
22
* Sort SSTable TOC entries for determinism (CASSANDRA-20494)
3+
Merged from 4.0:
4+
* Make secondary index implementations notified about rows in fully expired SSTables in compaction (CASSANDRA-20829)
35

46

57
5.0.5

src/java/org/apache/cassandra/db/compaction/CompactionTask.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.cassandra.db.compaction;
1919

2020
import java.time.Instant;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.HashSet;
@@ -39,9 +40,16 @@
3940
import org.apache.cassandra.db.ColumnFamilyStore;
4041
import org.apache.cassandra.db.Directories;
4142
import org.apache.cassandra.db.SystemKeyspace;
43+
import org.apache.cassandra.db.WriteContext;
4244
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
4345
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
4446
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
47+
import org.apache.cassandra.db.rows.Row;
48+
import org.apache.cassandra.db.rows.Unfiltered;
49+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
50+
import org.apache.cassandra.index.Index;
51+
import org.apache.cassandra.index.transactions.IndexTransaction;
52+
import org.apache.cassandra.io.sstable.ISSTableScanner;
4553
import org.apache.cassandra.io.sstable.format.SSTableReader;
4654
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
4755
import org.apache.cassandra.io.util.File;
@@ -174,6 +182,8 @@ public boolean apply(SSTableReader sstable)
174182
long inputSizeBytes;
175183
long timeSpentWritingKeys;
176184

185+
maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(fullyExpiredSSTables);
186+
177187
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables);
178188
Collection<SSTableReader> newSStables;
179189

@@ -465,4 +475,67 @@ public static long getMaxDataAge(Collection<SSTableReader> sstables)
465475
}
466476
return max;
467477
}
478+
479+
private void maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(Set<SSTableReader> fullyExpiredSSTables)
480+
{
481+
if (fullyExpiredSSTables.isEmpty())
482+
return;
483+
484+
List<Index> indexes = new ArrayList<>();
485+
for (Index index : cfs.indexManager.listIndexes())
486+
{
487+
if (index.notifyIndexerAboutRowsInFullyExpiredSSTables())
488+
indexes.add(index);
489+
}
490+
491+
if (indexes.isEmpty())
492+
return;
493+
494+
for (SSTableReader expiredSSTable : fullyExpiredSSTables)
495+
{
496+
try (ISSTableScanner scanner = expiredSSTable.getScanner())
497+
{
498+
while (scanner.hasNext())
499+
{
500+
UnfilteredRowIterator partition = scanner.next();
501+
502+
try (WriteContext ctx = cfs.keyspace.getWriteHandler().createContextForIndexing())
503+
{
504+
List<Index.Indexer> indexers = new ArrayList<>();
505+
for (int i = 0; i < indexes.size(); i++)
506+
{
507+
Index.Indexer indexer = indexes.get(i).indexerFor(partition.partitionKey(),
508+
partition.columns(),
509+
FBUtilities.nowInSeconds(),
510+
ctx,
511+
IndexTransaction.Type.COMPACTION,
512+
null);
513+
514+
if (indexer != null)
515+
indexers.add(indexer);
516+
}
517+
518+
if (!indexers.isEmpty())
519+
{
520+
for (Index.Indexer indexer : indexers)
521+
indexer.begin();
522+
523+
while (partition.hasNext())
524+
{
525+
Unfiltered unfiltered = partition.next();
526+
if (unfiltered instanceof Row)
527+
{
528+
for (Index.Indexer indexer : indexers)
529+
indexer.removeRow((Row) unfiltered);
530+
}
531+
}
532+
533+
for (Index.Indexer indexer : indexers)
534+
indexer.finish();
535+
}
536+
}
537+
}
538+
}
539+
}
540+
}
468541
}

src/java/org/apache/cassandra/index/Index.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,28 @@ default Set<Component> getComponents()
515515
return Collections.emptySet();
516516
}
517517

518+
519+
/**
520+
* When a secondary index is created on a column for a table with e.g. TWCS strategy,
521+
* when this table contains SSTables which are evaluated as fully expired upon compaction,
522+
* they are by default filtered out as they can be dropped in their entirety. However, once dropped like that,
523+
* the index implementation is not notified about this fact via IndexGCTransaction as compaction on
524+
* non-fully expired tables would do. This in turn means that custom index will never know that some data have
525+
* been removed hence data custom index implementation is responsible for will grow beyond any limit.
526+
*
527+
* Override this method and return false in index implementation only in case you do not want to be notified about
528+
* dropped fully-expired data. This will eventually mean that {@link Indexer#removeRow(Row)} will not be called
529+
* for rows contained in fully expired table. Return true if you do want to be notified about that fact.
530+
*
531+
* This method returns true by default.
532+
*
533+
* @return true when fully expired tables should be included in compaction process, false otherwise.
534+
*/
535+
public default boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
536+
{
537+
return true;
538+
}
539+
518540
/*
519541
* Update processing
520542
*/

src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,12 @@ public Set<Component> getComponents()
553553
.collect(Collectors.toSet());
554554
}
555555

556+
@Override
557+
public boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
558+
{
559+
return false;
560+
}
561+
556562
@Override
557563
public Indexer indexerFor(DecoratedKey key,
558564
RegularAndStaticColumns columns,

src/java/org/apache/cassandra/index/sasi/SASIIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,12 @@ public long getEstimatedResultRows()
271271
public void validate(PartitionUpdate update, ClientState state) throws InvalidRequestException
272272
{}
273273

274+
@Override
275+
public boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
276+
{
277+
return false;
278+
}
279+
274280
@Override
275281
public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, long nowInSec, WriteContext context, IndexTransaction.Type transactionType, Memtable memtable)
276282
{

test/unit/org/apache/cassandra/index/CustomIndexTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.google.common.collect.Maps;
3434
import com.google.common.collect.Sets;
3535
import org.junit.Assume;
36+
import com.google.common.util.concurrent.Uninterruptibles;
37+
import org.junit.Assert;
3638
import org.junit.Test;
3739

3840
import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -649,6 +651,64 @@ public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable
649651
assertEquals(0, deletedClustering.intValue());
650652
}
651653

654+
655+
// two stub indexes just to track number of row deletions
656+
// when we have fully-expired tables and indexes on different columns
657+
public static class StubIndex1 extends StubIndex
658+
{
659+
public StubIndex1(ColumnFamilyStore baseCfs, IndexMetadata metadata)
660+
{
661+
super(baseCfs, metadata);
662+
}
663+
}
664+
665+
public static class StubIndex2 extends StubIndex
666+
{
667+
668+
public StubIndex2(ColumnFamilyStore baseCfs, IndexMetadata metadata)
669+
{
670+
super(baseCfs, metadata);
671+
}
672+
}
673+
674+
@Test
675+
public void notifyIndexesOfFullyExpiredSSTablesDuringCompaction()
676+
{
677+
createTable("CREATE TABLE %s (id int primary key, col1 int, col2 int) " +
678+
"WITH compaction = {'class': 'TimeWindowCompactionStrategy', " +
679+
" 'compaction_window_size': 1," +
680+
" 'compaction_window_unit': 'MINUTES'," +
681+
" 'expired_sstable_check_frequency_seconds': 10} " +
682+
"AND gc_grace_seconds = 0");
683+
684+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_1 ON %%s(col1) USING '%s'", StubIndex1.class.getName()));
685+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_2 ON %%s(col2) USING '%s'", StubIndex2.class.getName()));
686+
687+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
688+
StubIndex index1 = (StubIndex1)cfs.indexManager.getIndexByName("row_ttl_test_index_1");
689+
StubIndex index2 = (StubIndex2)cfs.indexManager.getIndexByName("row_ttl_test_index_2");
690+
691+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 0, 0);
692+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 1, 1);
693+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 2, 2);
694+
695+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 0, 0);
696+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 1, 1);
697+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 2, 2);
698+
699+
flush();
700+
701+
Uninterruptibles.sleepUninterruptibly(60, TimeUnit.SECONDS);
702+
703+
compact();
704+
705+
Assert.assertFalse(index1.rowsDeleted.isEmpty());
706+
Assert.assertEquals(3, index1.rowsDeleted.size());
707+
708+
Assert.assertFalse(index2.rowsDeleted.isEmpty());
709+
Assert.assertEquals(3, index2.rowsDeleted.size());
710+
}
711+
652712
@Test
653713
public void validateOptions()
654714
{

0 commit comments

Comments
 (0)