Skip to content

Commit 4824ecc

Browse files
committed
Merge branch 'cassandra-4.0' into cassandra-4.1
2 parents 62150b0 + eb9586d commit 4824ecc

File tree

5 files changed

+162
-0
lines changed

5 files changed

+162
-0
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* IntrusiveStack.accumulate is not accumulating correctly (CASSANDRA-20670)
66
* Add nodetool get/setguardrailsconfig commands (CASSANDRA-19552)
77
Merged from 4.0:
8+
* Make secondary index implementations notified about rows in fully expired SSTables in compaction (CASSANDRA-20829)
89
* Ensure prepared_statement INSERT timestamp precedes eviction DELETE (CASSANDRA-19703)
910

1011

src/java/org/apache/cassandra/db/compaction/CompactionTask.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.cassandra.db.compaction;
1919

2020
import java.time.Instant;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.HashMap;
2324
import java.util.HashSet;
25+
import java.util.List;
2426
import java.util.Map;
2527
import java.util.Set;
2628
import java.util.concurrent.TimeUnit;
@@ -37,9 +39,16 @@
3739
import org.apache.cassandra.db.ColumnFamilyStore;
3840
import org.apache.cassandra.db.Directories;
3941
import org.apache.cassandra.db.SystemKeyspace;
42+
import org.apache.cassandra.db.WriteContext;
4043
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
4144
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
4245
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
46+
import org.apache.cassandra.db.rows.Row;
47+
import org.apache.cassandra.db.rows.Unfiltered;
48+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
49+
import org.apache.cassandra.index.Index;
50+
import org.apache.cassandra.index.transactions.IndexTransaction;
51+
import org.apache.cassandra.io.sstable.ISSTableScanner;
4352
import org.apache.cassandra.io.sstable.format.SSTableReader;
4453
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
4554
import org.apache.cassandra.service.ActiveRepairService;
@@ -171,6 +180,8 @@ public boolean apply(SSTableReader sstable)
171180
long inputSizeBytes;
172181
long timeSpentWritingKeys;
173182

183+
maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(fullyExpiredSSTables);
184+
174185
Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables);
175186
Collection<SSTableReader> newSStables;
176187

@@ -440,4 +451,66 @@ public static long getMaxDataAge(Collection<SSTableReader> sstables)
440451
}
441452
return max;
442453
}
454+
455+
private void maybeNotifyIndexersAboutRowsInFullyExpiredSSTables(Set<SSTableReader> fullyExpiredSSTables)
456+
{
457+
if (fullyExpiredSSTables.isEmpty())
458+
return;
459+
460+
List<Index> indexes = new ArrayList<>();
461+
for (Index index : cfs.indexManager.listIndexes())
462+
{
463+
if (index.notifyIndexerAboutRowsInFullyExpiredSSTables())
464+
indexes.add(index);
465+
}
466+
467+
if (indexes.isEmpty())
468+
return;
469+
470+
for (SSTableReader expiredSSTable : fullyExpiredSSTables)
471+
{
472+
try (ISSTableScanner scanner = expiredSSTable.getScanner())
473+
{
474+
while (scanner.hasNext())
475+
{
476+
UnfilteredRowIterator partition = scanner.next();
477+
478+
try (WriteContext ctx = cfs.keyspace.getWriteHandler().createContextForIndexing())
479+
{
480+
List<Index.Indexer> indexers = new ArrayList<>();
481+
for (int i = 0; i < indexes.size(); i++)
482+
{
483+
Index.Indexer indexer = indexes.get(i).indexerFor(partition.partitionKey(),
484+
partition.columns(),
485+
FBUtilities.nowInSeconds(),
486+
ctx,
487+
IndexTransaction.Type.COMPACTION);
488+
489+
if (indexer != null)
490+
indexers.add(indexer);
491+
}
492+
493+
if (!indexers.isEmpty())
494+
{
495+
for (Index.Indexer indexer : indexers)
496+
indexer.begin();
497+
498+
while (partition.hasNext())
499+
{
500+
Unfiltered unfiltered = partition.next();
501+
if (unfiltered instanceof Row)
502+
{
503+
for (Index.Indexer indexer : indexers)
504+
indexer.removeRow((Row) unfiltered);
505+
}
506+
}
507+
508+
for (Index.Indexer indexer : indexers)
509+
indexer.finish();
510+
}
511+
}
512+
}
513+
}
514+
}
515+
}
443516
}

src/java/org/apache/cassandra/index/Index.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,28 @@ default SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationTy
422422
*/
423423
public void validate(PartitionUpdate update) throws InvalidRequestException;
424424

425+
426+
/**
427+
* When a secondary index is created on a column for a table with e.g. TWCS strategy,
428+
* when this table contains SSTables which are evaluated as fully expired upon compaction,
429+
* they are by default filtered out as they can be dropped in their entirety. However, once dropped like that,
430+
* the index implementation is not notified about this fact via IndexGCTransaction as compaction on
431+
* non-fully expired tables would do. This in turn means that custom index will never know that some data have
432+
* been removed hence data custom index implementation is responsible for will grow beyond any limit.
433+
*
434+
* Override this method and return false in index implementation only in case you do not want to be notified about
435+
* dropped fully-expired data. This will eventually mean that {@link Indexer#removeRow(Row)} will not be called
436+
* for rows contained in fully expired table. Return true if you do want to be notified about that fact.
437+
*
438+
* This method returns true by default.
439+
*
440+
* @return true when fully expired tables should be included in compaction process, false otherwise.
441+
*/
442+
public default boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
443+
{
444+
return true;
445+
}
446+
425447
/*
426448
* Update processing
427449
*/

src/java/org/apache/cassandra/index/sasi/SASIIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,12 @@ public boolean supportsReplicaFilteringProtection(RowFilter rowFilter)
252252
return false;
253253
}
254254

255+
@Override
256+
public boolean notifyIndexerAboutRowsInFullyExpiredSSTables()
257+
{
258+
return false;
259+
}
260+
255261
public Indexer indexerFor(DecoratedKey key, RegularAndStaticColumns columns, int nowInSec, WriteContext context, IndexTransaction.Type transactionType)
256262
{
257263
return new Indexer()

test/unit/org/apache/cassandra/index/CustomIndexTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import com.google.common.collect.ImmutableList;
3030
import com.google.common.collect.Sets;
31+
import com.google.common.util.concurrent.Uninterruptibles;
32+
import org.junit.Assert;
3133
import org.junit.Test;
3234

3335
import com.datastax.driver.core.exceptions.QueryValidationException;
@@ -606,6 +608,64 @@ public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable
606608
assertEquals(0, deletedClustering.intValue());
607609
}
608610

611+
612+
// two stub indexes just to track number of row deletions
613+
// when we have fully-expired tables and indexes on different columns
614+
public static class StubIndex1 extends StubIndex
615+
{
616+
public StubIndex1(ColumnFamilyStore baseCfs, IndexMetadata metadata)
617+
{
618+
super(baseCfs, metadata);
619+
}
620+
}
621+
622+
public static class StubIndex2 extends StubIndex
623+
{
624+
625+
public StubIndex2(ColumnFamilyStore baseCfs, IndexMetadata metadata)
626+
{
627+
super(baseCfs, metadata);
628+
}
629+
}
630+
631+
@Test
632+
public void notifyIndexesOfFullyExpiredSSTablesDuringCompaction() throws Throwable
633+
{
634+
createTable("CREATE TABLE %s (id int primary key, col1 int, col2 int) " +
635+
"WITH compaction = {'class': 'TimeWindowCompactionStrategy', " +
636+
" 'compaction_window_size': 1," +
637+
" 'compaction_window_unit': 'MINUTES'," +
638+
" 'expired_sstable_check_frequency_seconds': 10} " +
639+
"AND gc_grace_seconds = 0");
640+
641+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_1 ON %%s(col1) USING '%s'", StubIndex1.class.getName()));
642+
createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index_2 ON %%s(col2) USING '%s'", StubIndex2.class.getName()));
643+
644+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
645+
StubIndex index1 = (StubIndex1)cfs.indexManager.getIndexByName("row_ttl_test_index_1");
646+
StubIndex index2 = (StubIndex2)cfs.indexManager.getIndexByName("row_ttl_test_index_2");
647+
648+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 0, 0);
649+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 1, 1);
650+
execute("INSERT INTO %s (id, col1) VALUES (?, ?) USING TTL 20", 2, 2);
651+
652+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 0, 0);
653+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 1, 1);
654+
execute("INSERT INTO %s (id, col2) VALUES (?, ?) USING TTL 20", 2, 2);
655+
656+
flush();
657+
658+
Uninterruptibles.sleepUninterruptibly(60, TimeUnit.SECONDS);
659+
660+
compact();
661+
662+
Assert.assertFalse(index1.rowsDeleted.isEmpty());
663+
Assert.assertEquals(3, index1.rowsDeleted.size());
664+
665+
Assert.assertFalse(index2.rowsDeleted.isEmpty());
666+
Assert.assertEquals(3, index2.rowsDeleted.size());
667+
}
668+
609669
@Test
610670
public void validateOptions() throws Throwable
611671
{

0 commit comments

Comments
 (0)