Skip to content

Commit 708b56d

Browse files
authored
cndb-14201: allow UCS#getNextBackgroundTasks to config custom compaction observer for composite compaction (#1767)
CNDB: riptano/cndb#14203
1 parent 172adfb commit 708b56d

File tree

3 files changed

+75
-15
lines changed

3 files changed

+75
-15
lines changed

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.function.BiPredicate;
3232
import java.util.regex.Matcher;
3333
import java.util.regex.Pattern;
34-
import java.util.stream.Collectors;
3534

3635
import com.google.common.annotations.VisibleForTesting;
3736
import com.google.common.base.Preconditions;
@@ -343,7 +342,7 @@ public synchronized Collection<AbstractCompactionTask> getNextBackgroundTasks(in
343342
if (expirationTasks != null)
344343
return expirationTasks;
345344

346-
return getNextBackgroundTasks(getNextCompactionAggregates(), gcBefore);
345+
return getNextBackgroundTasks(getNextCompactionAggregates(), gcBefore, this);
347346
}
348347

349348
/// Check for fully expired sstables and return a collection of expiration tasks if found.
@@ -401,21 +400,23 @@ public Set<CompactionSSTable> getFullyExpiredSSTables(int gcBefore)
401400
controller.getIgnoreOverlapsInExpirationCheck());
402401
}
403402

404-
/// Used by CNDB where compaction aggregates come from etcd rather than the strategy
403+
/// Used by CNDB where compaction aggregates come from etcd rather than the strategy.
405404
/// @return collection of `AbstractCompactionTask`, which could be either a `CompactionTask` or a `UnifiedCompactionTask`
406-
public synchronized Collection<AbstractCompactionTask> getNextBackgroundTasks(Collection<CompactionAggregate> aggregates, int gcBefore)
405+
public synchronized Collection<AbstractCompactionTask> getNextBackgroundTasks(Collection<CompactionAggregate> aggregates, int gcBefore,
406+
CompactionObserver compositeCompactionObserver)
407407
{
408408
controller.onStrategyBackgroundTaskRequest();
409-
return createCompactionTasks(aggregates, gcBefore);
409+
return createCompactionTasks(aggregates, gcBefore, compositeCompactionObserver);
410410
}
411411

412-
private Collection<AbstractCompactionTask> createCompactionTasks(Collection<CompactionAggregate> aggregates, int gcBefore)
412+
private Collection<AbstractCompactionTask> createCompactionTasks(Collection<CompactionAggregate> aggregates, int gcBefore,
413+
CompactionObserver compositeCompactionObserver)
413414
{
414415
Collection<AbstractCompactionTask> tasks = new ArrayList<>(aggregates.size());
415416
try
416417
{
417418
for (CompactionAggregate aggregate : aggregates)
418-
createAndAddTasks(gcBefore, (CompactionAggregate.UnifiedAggregate) aggregate, tasks);
419+
createAndAddTasks(gcBefore, (CompactionAggregate.UnifiedAggregate) aggregate, tasks, compositeCompactionObserver);
419420

420421
return tasks;
421422
}
@@ -426,7 +427,8 @@ private Collection<AbstractCompactionTask> createCompactionTasks(Collection<Comp
426427
}
427428

428429
/// Create compaction tasks for the given aggregate and add them to the given tasks list.
429-
public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate aggregate, Collection<? super UnifiedCompactionTask> tasks)
430+
public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate aggregate,
431+
Collection<? super UnifiedCompactionTask> tasks, CompactionObserver compositeCompactionObserver)
430432
{
431433
CompactionPick selected = aggregate.getSelected();
432434
int parallelism = aggregate.getPermittedParallelism();
@@ -440,7 +442,7 @@ public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate
440442
{
441443
// This will ignore the range of the operation, which is fine.
442444
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
443-
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks);
445+
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks, compositeCompactionObserver);
444446
}
445447
else
446448
{
@@ -612,7 +614,7 @@ void createAndAddTasks(int gcBefore,
612614
int parallelism,
613615
Collection<? super CompactionTask> tasks)
614616
{
615-
createAndAddTasks(gcBefore, transaction, null, false, shardingStats, parallelism, tasks);
617+
createAndAddTasks(gcBefore, transaction, null, false, shardingStats, parallelism, tasks, this);
616618
}
617619

618620
@VisibleForTesting
@@ -622,10 +624,11 @@ void createAndAddTasks(int gcBefore,
622624
boolean keepOriginals,
623625
ShardingStats shardingStats,
624626
int parallelism,
625-
Collection<? super UnifiedCompactionTask> tasks)
627+
Collection<? super UnifiedCompactionTask> tasks,
628+
CompactionObserver compositeCompactionObserver)
626629
{
627630
if (controller.parallelizeOutputShards() && parallelism > 1)
628-
tasks.addAll(createParallelCompactionTasks(transaction, operationRange, keepOriginals, shardingStats, gcBefore, parallelism));
631+
tasks.addAll(createParallelCompactionTasks(transaction, operationRange, keepOriginals, shardingStats, gcBefore, parallelism, compositeCompactionObserver));
629632
else
630633
tasks.add(createCompactionTask(transaction, operationRange, keepOriginals, shardingStats, gcBefore));
631634
}
@@ -697,7 +700,8 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
697700
boolean keepOriginals,
698701
ShardingStats shardingStats,
699702
int gcBefore,
700-
int parallelism)
703+
int parallelism,
704+
CompactionObserver compositeCompactionObserver)
701705
{
702706
final int coveredShardCount = shardingStats.coveredShardCount;
703707
assert parallelism > 1;
@@ -706,7 +710,7 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
706710
ShardManager shardManager = getShardManager();
707711
CompositeLifecycleTransaction compositeTransaction = new CompositeLifecycleTransaction(transaction);
708712
SharedCompactionProgress sharedProgress = new SharedCompactionProgress(transaction.opId(), transaction.opType(), TableOperation.Unit.BYTES);
709-
SharedCompactionObserver sharedObserver = new SharedCompactionObserver(this);
713+
SharedCompactionObserver sharedObserver = new SharedCompactionObserver(compositeCompactionObserver);
710714
SharedTableOperation sharedOperation = new SharedTableOperation(sharedProgress);
711715
List<UnifiedCompactionTask> tasks = shardManager.splitSSTablesInShardsLimited(
712716
sstables,

test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import java.util.Set;
3030
import java.util.TreeMap;
3131
import java.util.UUID;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.function.Supplier;
3435
import java.util.stream.Collectors;
3536
import java.util.stream.Stream;
3637

38+
import javax.annotation.Nullable;
39+
3740
import com.google.common.collect.ImmutableList;
3841
import com.google.common.collect.ImmutableSet;
3942
import com.google.common.collect.Iterables;
@@ -1753,6 +1756,59 @@ public void testDontCreateParallelTasks()
17531756
assertEquals(1, tasks.size());
17541757
assertEquals(allSSTables, tasks.get(0).inputSSTables());
17551758
}
1759+
1760+
@Test
1761+
public void testCustomCompositeCompactionObserver()
1762+
{
1763+
int numShards = 5;
1764+
Set<SSTableReader> allSSTables = new HashSet<>();
1765+
allSSTables.addAll(mockNonOverlappingSSTables(10, 0, 100 << 20));
1766+
allSSTables.addAll(mockNonOverlappingSSTables(15, 1, 200 << 20));
1767+
allSSTables.addAll(mockNonOverlappingSSTables(25, 2, 400 << 20));
1768+
dataTracker.addInitialSSTables(allSSTables);
1769+
Controller controller = Mockito.mock(Controller.class);
1770+
when(controller.getNumShards(anyDouble())).thenReturn(numShards);
1771+
when(controller.parallelizeOutputShards()).thenReturn(true);
1772+
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(strategyFactory, controller);
1773+
strategy.startup();
1774+
LifecycleTransaction txn = dataTracker.tryModify(allSSTables, OperationType.COMPACTION);
1775+
var tasks = new ArrayList<CompactionTask>();
1776+
1777+
AtomicInteger compositeInProgress = new AtomicInteger(0);
1778+
AtomicInteger compositedCompleted = new AtomicInteger(0);
1779+
CompactionObserver compositeCompactionObserver = new CompactionObserver()
1780+
{
1781+
@Override
1782+
public void onInProgress(CompactionProgress progress)
1783+
{
1784+
compositeInProgress.incrementAndGet();
1785+
}
1786+
1787+
@Override
1788+
public void onCompleted(UUID id, @Nullable Throwable error)
1789+
{
1790+
compositedCompleted.incrementAndGet();
1791+
}
1792+
};
1793+
strategy.createAndAddTasks(0, txn, null, false, strategy.makeShardingStats(txn), 1000, tasks, compositeCompactionObserver);
1794+
assertEquals(numShards, tasks.size());
1795+
1796+
// move all tasks to in-progress
1797+
CompactionProgress progress = mockProgress(strategy, txn.opId());
1798+
for (CompactionTask task : tasks)
1799+
task.getCompObservers().forEach(o -> o.onInProgress(progress));
1800+
1801+
assertThat(compositeInProgress).hasValue(1);
1802+
assertThat(compositedCompleted).hasValue(0);
1803+
1804+
// move all tasks to complete
1805+
for (CompactionTask task : tasks)
1806+
task.getCompObservers().forEach(o -> o.onCompleted(task.transaction.opId(), null));
1807+
1808+
assertThat(compositeInProgress).hasValue(1);
1809+
assertThat(compositedCompleted).hasValue(1);
1810+
}
1811+
17561812
@Test
17571813
public void testMaximalSelection()
17581814
{

test/unit/org/apache/cassandra/db/compaction/unified/RangedAggregatesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private void testRangedAggregates(int numShards, int aggregateParallelism, int r
122122
{
123123
// execute each partial aggregate separately because we can only mark the inputs compacting once
124124
List<AbstractCompactionTask> tasks = new ArrayList<>();
125-
strategy.createAndAddTasks(0, maximal, tasks);
125+
strategy.createAndAddTasks(0, maximal, tasks, strategy);
126126
totalTaskCount += tasks.size();
127127
List<Future<?>> futures = tasks.stream()
128128
.map(t -> ForkJoinPool.commonPool()

0 commit comments

Comments
 (0)