Skip to content

Commit 7b5fce3

Browse files
jasonstackdriftx
authored andcommitted
CNDB-14353: fetch non-compacting sstables instead of live sstables for UCS (#1861)
This is to fix a race in UCS: 1. sstable-1 is compacting 2. 2nd compaction thread fetches live sstable which includes sstable-1 3. sstable-1 is now compacted, no longer as compacting in Tracker 4. 2nd compaction thread continues: UCS#getCompactionArenas fetches list of compacting sstables which doesn't include sstable-1 5. sstable-1 is now included for next compaction but failed to create txn because it's already compacted use non-compacting sstables instead of live sstables, so there won't be race
1 parent e1c587f commit 7b5fce3

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
5757
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
5858
import org.apache.cassandra.db.lifecycle.PartialLifecycleTransaction;
59+
import org.apache.cassandra.db.lifecycle.SSTableSet;
5960
import org.apache.cassandra.dht.Range;
6061
import org.apache.cassandra.dht.Token;
6162
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -217,7 +218,7 @@ public synchronized CompactionTasks getUserDefinedTasks(Collection<? extends Com
217218
/// same effect as compacting all of the sstables in the arena together in one operation.
218219
public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates()
219220
{
220-
return getMaximalAggregates(realm.getLiveSSTables());
221+
return getMaximalAggregates(Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING)));
221222
}
222223

223224
public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates(Collection<? extends CompactionSSTable> sstables)
@@ -1208,7 +1209,7 @@ public Level getLevel(int index, double min, double max)
12081209
@VisibleForTesting
12091210
Map<Arena, List<Level>> getLevels()
12101211
{
1211-
return getLevels(realm.getLiveSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction);
1212+
return getLevels(Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING)), UnifiedCompactionStrategy::isSuitableForCompaction);
12121213
}
12131214

12141215
private static boolean isSuitableForCompaction(CompactionSSTable sstable, boolean isCompacting)
@@ -1223,8 +1224,7 @@ Iterable<? extends CompactionSSTable> getSuitableSSTables()
12231224

12241225
Iterable<? extends CompactionSSTable> getFilteredSSTables(BiPredicate<CompactionSSTable, Boolean> predicate)
12251226
{
1226-
Set<? extends CompactionSSTable> compacting = realm.getCompactingSSTables();
1227-
return Iterables.filter(realm.getLiveSSTables(), s -> predicate.test(s, compacting.contains(s)));
1227+
return Iterables.filter(realm.getSSTables(SSTableSet.NONCOMPACTING), s -> predicate.test(s, false));
12281228
}
12291229

12301230
/// Groups the sstables passed in into arenas and buckets. This is used by the strategy to determine
@@ -1313,7 +1313,7 @@ public Map<Arena, List<Level>> getLevels(Collection<? extends CompactionSSTable>
13131313
@Override
13141314
public Map<String, String> getMaxOverlapsMap()
13151315
{
1316-
final Set<? extends CompactionSSTable> liveSSTables = realm.getLiveSSTables();
1316+
final Set<? extends CompactionSSTable> liveSSTables = Sets.newHashSet(realm.getSSTables(SSTableSet.NONCOMPACTING));
13171317
Map<UnifiedCompactionStrategy.Arena, List<UnifiedCompactionStrategy.Level>> arenas =
13181318
getLevels(liveSSTables, (i1, i2) -> true); // take all sstables
13191319

test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,52 @@ private void testGetMultipleBucketsOneArenaNonOverlappingAggregates(Map<Integer,
402402
System.out.println(strategy.getMaxOverlapsMap());
403403
}
404404

405+
@Test
406+
public void testGetLevelsWithCompactingSSTables()
407+
{
408+
final int m = 2; // minimal sorted run size in MB m
409+
long minimalSizeBytes = m << 20;
410+
411+
Controller controller = Mockito.mock(Controller.class);
412+
when(controller.getMinSstableSizeBytes()).thenReturn(minimalSizeBytes);
413+
when(controller.getNumShards(anyDouble())).thenReturn(1);
414+
when(controller.getBaseSstableSize(anyInt())).thenReturn((double) minimalSizeBytes);
415+
when(controller.maxConcurrentCompactions()).thenReturn(1000); // let it generate as many candidates as it can
416+
when(controller.maxThroughput()).thenReturn(Double.MAX_VALUE);
417+
when(controller.maxSSTablesToCompact()).thenReturn(1000);
418+
when(controller.prioritize(anyList())).thenAnswer(answ -> answ.getArgument(0));
419+
when(controller.getReservedThreads()).thenReturn(Integer.MAX_VALUE);
420+
when(controller.getReservationsType()).thenReturn(Reservations.Type.PER_LEVEL);
421+
when(controller.parallelizeOutputShards()).thenReturn(true);
422+
when(controller.getScalingParameter(anyInt())).thenReturn(2);
423+
when(controller.getFanout(anyInt())).thenCallRealMethod();
424+
when(controller.getThreshold(anyInt())).thenCallRealMethod();
425+
when(controller.getMaxLevelDensity(anyInt(), anyDouble())).thenCallRealMethod();
426+
when(controller.getSurvivalFactor(anyInt())).thenReturn(1.0);
427+
when(controller.random()).thenCallRealMethod();
428+
429+
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(strategyFactory, controller);
430+
431+
int sstableCount = 4;
432+
List<SSTableReader> sstables = mockSSTables(sstableCount, 100, 0, System.currentTimeMillis(), 0, true, null);
433+
dataTracker.addInitialSSTables(sstables);
434+
435+
// all sstables are non-compacting
436+
assertEquals(sstableCount, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
437+
438+
// mark one sstable as compacting
439+
try (LifecycleTransaction txn = dataTracker.tryModify(List.of(sstables.get(0)), OperationType.COMPACTION))
440+
{
441+
assertEquals(sstableCount - 1, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
442+
443+
txn.obsoleteOriginals();
444+
txn.abort(); // unmark compacting
445+
}
446+
447+
// all sstables are non-compacting
448+
assertEquals(sstableCount, strategy.getLevels().values().stream().flatMap(Collection::stream).mapToInt(l -> l.sstables.size()).sum());
449+
}
450+
405451
private BufferDecoratedKey key(long token)
406452
{
407453
return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(token), ByteBuffer.allocate(0));

0 commit comments

Comments
 (0)