Skip to content

Commit eb9586d

Browse files
committed
Make secondary index implementations notified about rows in fully expired SSTables in compaction
patch by Stefan Miklosovic; reviewed by Branimir Lambov for CASSANDRA-20829
1 parent f4eb550 commit eb9586d

File tree

5 files changed

+162
-0
lines changed

5 files changed

+162
-0
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
4.0.19
2+
* Make secondary index implementations notified about rows in fully expired SSTables in compaction (CASSANDRA-20829)
23
* Ensure prepared_statement INSERT timestamp precedes eviction DELETE (CASSANDRA-19703)
34
* Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659)
45

src/java/org/apache/cassandra/db/compaction/CompactionTask.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
*/
1818
package org.apache.cassandra.db.compaction;
1919

20+
import java.util.ArrayList;
2021
import java.util.Collection;
2122
import java.util.HashMap;
2223
import java.util.HashSet;
24+
import java.util.List;
2325
import java.util.Map;
2426
import java.util.Set;
2527
import java.util.UUID;
@@ -37,9 +39,16 @@
3739
import org.apache.cassandra.db.ColumnFamilyStore;
3840
import org.apache.cassandra.db.Directories;
3941
import org.apache.cassandra.db.SystemKeyspace;
42+
import org.apache.cassandra.db.WriteContext;
4043
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
4144
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
4245
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
46+
import org.apache.cassandra.db.rows.Row;
47+
import org.apache.cassandra.db.rows.Unfiltered;
48+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
49+
import org.apache.cassandra.index.Index;
50+
import org.apache.cassandra.index.transactions.IndexTransaction;
51+
import org.apache.cassandra.io.sstable.ISSTableScanner;
4352
import org.apache.cassandra.io.sstable.format.SSTableReader;
4453
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
4554
import org.apache.cassandra.service.ActiveRepairService;
@@ -163,6 +172,8 @@ public boolean apply(SSTableReader sstable)
163172
long estimatedKeys = 0;
164173
long inputSizeBytes;
165174

175+
maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(fullyExpiredSSTables);
176+
166177
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables);
167178
Collection<SSTableReader> newSStables;
168179

@@ -436,4 +447,66 @@ public static long getMaxDataAge(Collection<SSTableReader> sstables)
436447
}
437448
return max;
438449
}
450+
451+
private void maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(Set<SSTableReader> fullyExpiredSSTables)
452+
{
453+
if (fullyExpiredSSTables.isEmpty())
454+
return;
455+
456+
List<Index> indexes = new ArrayList<>();
457+
for (Index index : cfs.indexManager.listIndexes())
458+
{
459+
if (index.notifyIndexerAboutRowsInFullyExpiredSSTables())
460+
indexes.add(index);
461+
}
462+
463+
if (indexes.isEmpty())
464+
return;
465+
466+
for (SSTableReader expiredSSTable : fullyExpiredSSTables)
467+
{
468+
try (ISSTableScanner scanner = expiredSSTable.getScanner())
469+
{
470+
while (scanner.hasNext())
471+
{
472+
UnfilteredRowIterator partition = scanner.next();
473+
474+
try (WriteContext ctx = cfs.keyspace.getWriteHandler().createContextForIndexing())
475+
{
476+
List<Index.Indexer> indexers = new ArrayList<>();
477+
for (int i = 0; i < indexes.size(); i++)
478+
{
479+
Index.Indexer indexer = indexes.get(i).indexerFor(partition.partitionKey(),
480+
partition.columns(),
481+
FBUtilities.nowInSeconds(),
482+
ctx,
483+
IndexTransaction.Type.COMPACTION);
484+
485+
if (indexer != null)
486+
indexers.add(indexer);
487+
}
488+
489+
if (!indexers.isEmpty())
490+
{
491+
for (Index.Indexer indexer : indexers)
492+
indexer.begin();
493+
494+
while (partition.hasNext())
495+
{
496+
Unfiltered unfiltered = partition.next();
497+
if (unfiltered instanceof Row)
498+
{
499+
for (Index.Indexer indexer : indexers)
500+
indexer.removeRow((Row) unfiltered);
501+
}
502+
}
503+
504+
for (Index.Indexer indexer : indexers)
505+
indexer.finish();
506+
}
507+
}
508+
}
509+
}
510+
}
511+
}
439512
}

src/java/org/apache/cassandra/index/Index.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,28 @@ default SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationTy
401401
*/
402402
public void validate(PartitionUpdate update) throws InvalidRequestException;
403403

404+
405+
/**
406+
* When a secondary index is created on a column for a table with e.g. TWCS strategy,
407+
* when this table contains SSTables which are evaluated as fully expired upon compaction,
408+
* they are by default filtered out as they can be dropped in their entirety. However, once dropped like that,
409+
* the index implementation is not notified about this fact via IndexGCTransaction as compaction on
410+
* non-fully expired tables would do. This in turn means that custom index will never know that some data have
411+
* been removed hence data custom index implementation is responsible for will grow beyond any limit.
412+
*
413+
* Override this method and return false in index implementation only in case you do not want to be notified about
414+
* dropped fully-expired data. This will eventually mean that {@link Indexer#removeRow(Row)} will not be called
415+
* for rows contained in fully expired table. Return true if you do want to be notified about that fact.
416+
*
417+
* This method returns true by default.
418+
*
419+
* @return true when fully expired tables should be included in compaction process, false otherwise.
420+
*/
421+
public default boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
422+
{
423+
return true;
424+
}
425+
404426
/*
405427
* Update processing
406428
*/

src/java/org/apache/cassandra/index/sasi/SASIIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ public boolean supportsReplicaFilteringProtection(RowFilter rowFilter)
255255
return false;
256256
}
257257

258+
@Override
259+
public boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
260+
{
261+
return false;
262+
}
263+
258264
public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, WriteContext context, IndexTransaction.Type transactionType)
259265
{
260266
return new Indexer()

test/unit/org/apache/cassandra/index/CustomIndexTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import com.google.common.collect.ImmutableList;
3030
import com.google.common.collect.Sets;
31+
import com.google.common.util.concurrent.Uninterruptibles;
32+
import org.junit.Assert;
3133
import org.junit.Test;
3234

3335
import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -606,6 +608,64 @@ public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable
606608
assertEquals(0, deletedClustering.intValue());
607609
}
608610

611+
612+
// two stub indexes just to track number of row deletions
613+
// when we have fully-expired tables and indexes on different columns
614+
public static class StubIndex1 extends StubIndex
615+
{
616+
public StubIndex1(ColumnFamilyStore baseCfs, IndexMetadata metadata)
617+
{
618+
super(baseCfs, metadata);
619+
}
620+
}
621+
622+
public static class StubIndex2 extends StubIndex
623+
{
624+
625+
public StubIndex2(ColumnFamilyStore baseCfs, IndexMetadata metadata)
626+
{
627+
super(baseCfs, metadata);
628+
}
629+
}
630+
631+
@Test
632+
public void notifyIndexesOfFullyExpiredSSTablesDuringCompaction() throws Throwable
633+
{
634+
createTable("CREATE TABLE %s (id int primary key, col1 int, col2 int) " +
635+
"WITH compaction = {'class': 'TimeWindowCompactionStrategy', " +
636+
" 'compaction_window_size': 1," +
637+
" 'compaction_window_unit': 'MINUTES'," +
638+
" 'expired_sstable_check_frequency_seconds': 10} " +
639+
"AND gc_grace_seconds = 0");
640+
641+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_1 ON %%s(col1) USING '%s'", StubIndex1.class.getName()));
642+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_2 ON %%s(col2) USING '%s'", StubIndex2.class.getName()));
643+
644+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
645+
StubIndex index1 = (StubIndex1)cfs.indexManager.getIndexByName("row_ttl_test_index_1");
646+
StubIndex index2 = (StubIndex2)cfs.indexManager.getIndexByName("row_ttl_test_index_2");
647+
648+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 0, 0);
649+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 1, 1);
650+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 2, 2);
651+
652+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 0, 0);
653+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 1, 1);
654+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 2, 2);
655+
656+
flush();
657+
658+
Uninterruptibles.sleepUninterruptibly(60, TimeUnit.SECONDS);
659+
660+
compact();
661+
662+
Assert.assertFalse(index1.rowsDeleted.isEmpty());
663+
Assert.assertEquals(3, index1.rowsDeleted.size());
664+
665+
Assert.assertFalse(index2.rowsDeleted.isEmpty());
666+
Assert.assertEquals(3, index2.rowsDeleted.size());
667+
}
668+
609669
@Test
610670
public void validateOptions() throws Throwable
611671
{

0 commit comments

Comments
 (0)