Skip to content

Commit b667550

Browse files
authored
CNDB-14237-main-cherry-pick: register composite compaction observer as additional obverser on top of UCS (#1783)
### What is the issue UCS didn't clear pending compaction tasks in `BackgroundCompactions#compactions` for parallel background compaction ### What does this PR fix and why was it fixed Register both UCS and composite compaction observer for parallel compaction task: both UCS and CNDB are notified
1 parent c133fab commit b667550

File tree

4 files changed

+155
-26
lines changed

4 files changed

+155
-26
lines changed

src/java/org/apache/cassandra/db/compaction/SharedCompactionObserver.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@
1818

1919
package org.apache.cassandra.db.compaction;
2020

21+
import java.util.List;
2122
import java.util.UUID;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicInteger;
2424
import java.util.concurrent.atomic.AtomicReference;
2525

26+
import javax.annotation.Nullable;
27+
28+
import com.google.common.collect.ImmutableList;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.cassandra.utils.Throwables;
33+
2634
/// Utility class to share a compaction observer among multiple compaction tasks and only report start and completion
2735
/// once when the first task starts and completion when all tasks complete (successfully or not, where the passed
2836
/// `isSuccess` state is a logical and of the subtasks').
@@ -34,14 +42,25 @@
3442
/// if assertions are enabled.
3543
public class SharedCompactionObserver implements CompactionObserver
3644
{
45+
private static final Logger logger = LoggerFactory.getLogger(SharedCompactionObserver.class);
46+
3747
private final AtomicInteger toReportOnComplete = new AtomicInteger(0);
3848
private final AtomicReference<Throwable> onCompleteException = new AtomicReference(null);
3949
private final AtomicReference<CompactionProgress> inProgressReported = new AtomicReference<>(null);
40-
private final CompactionObserver observer;
50+
51+
private final List<CompactionObserver> compObservers;
4152

4253
public SharedCompactionObserver(CompactionObserver observer)
4354
{
44-
this.observer = observer;
55+
this(observer, null);
56+
}
57+
58+
public SharedCompactionObserver(CompactionObserver primary, @Nullable CompactionObserver secondary)
59+
{
60+
if (primary == null)
61+
throw new IllegalArgumentException("Primary observer cannot be null");
62+
63+
this.compObservers = secondary != null ? ImmutableList.of(primary, secondary) : ImmutableList.of(primary);
4564
}
4665

4766
public void registerExpectedSubtask()
@@ -55,7 +74,13 @@ public void registerExpectedSubtask()
5574
public void onInProgress(CompactionProgress progress)
5675
{
5776
if (inProgressReported.compareAndSet(null, progress))
58-
observer.onInProgress(progress);
77+
{
78+
Throwable err = null;
79+
for (CompactionObserver compObserver : compObservers)
80+
err = Throwables.perform(err, () -> compObserver.onInProgress(progress));
81+
82+
Throwables.maybeFail(err);
83+
}
5984
else
6085
assert inProgressReported.get() == progress; // progress object must also be shared
6186
}
@@ -69,6 +94,12 @@ public void onCompleted(UUID id, Throwable err)
6994
assert remainingToComplete >= 0 : "onCompleted called without corresponding registerExpectedSubtask";
7095
// The individual operation ID given here may be different from the shared ID. Pass on the shared one.
7196
if (remainingToComplete == 0)
72-
observer.onCompleted(inProgressReported.get().operationId(), onCompleteException.get());
97+
{
98+
Throwable error = null;
99+
for (CompactionObserver compObserver : compObservers)
100+
error = Throwables.perform(error, () -> compObserver.onCompleted(inProgressReported.get().operationId(), onCompleteException.get()));
101+
102+
Throwables.maybeFail(error);
103+
}
73104
}
74105
}

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.regex.Matcher;
3333
import java.util.regex.Pattern;
3434

35+
import javax.annotation.Nullable;
36+
3537
import com.google.common.annotations.VisibleForTesting;
3638
import com.google.common.base.Preconditions;
3739
import com.google.common.collect.Iterables;
@@ -342,7 +344,7 @@ public synchronized Collection<AbstractCompactionTask> getNextBackgroundTasks(in
342344
if (expirationTasks != null)
343345
return expirationTasks;
344346

345-
return getNextBackgroundTasks(getNextCompactionAggregates(), gcBefore, this);
347+
return getNextBackgroundTasks(getNextCompactionAggregates(), gcBefore, null);
346348
}
347349

348350
/// Check for fully expired sstables and return a collection of expiration tasks if found.
@@ -403,20 +405,20 @@ public Set<CompactionSSTable> getFullyExpiredSSTables(int gcBefore)
403405
/// Used by CNDB where compaction aggregates come from etcd rather than the strategy.
404406
/// @return collection of `AbstractCompactionTask`, which could be either a `CompactionTask` or a `UnifiedCompactionTask`
405407
public synchronized Collection<AbstractCompactionTask> getNextBackgroundTasks(Collection<CompactionAggregate> aggregates, int gcBefore,
406-
CompactionObserver compositeCompactionObserver)
408+
@Nullable CompactionObserver additionalObserver)
407409
{
408410
controller.onStrategyBackgroundTaskRequest();
409-
return createCompactionTasks(aggregates, gcBefore, compositeCompactionObserver);
411+
return createCompactionTasks(aggregates, gcBefore, additionalObserver);
410412
}
411413

412414
private Collection<AbstractCompactionTask> createCompactionTasks(Collection<CompactionAggregate> aggregates, int gcBefore,
413-
CompactionObserver compositeCompactionObserver)
415+
@Nullable CompactionObserver additionalObserver)
414416
{
415417
Collection<AbstractCompactionTask> tasks = new ArrayList<>(aggregates.size());
416418
try
417419
{
418420
for (CompactionAggregate aggregate : aggregates)
419-
createAndAddTasks(gcBefore, (CompactionAggregate.UnifiedAggregate) aggregate, tasks, compositeCompactionObserver);
421+
createAndAddTasks(gcBefore, (CompactionAggregate.UnifiedAggregate) aggregate, tasks, additionalObserver);
420422

421423
return tasks;
422424
}
@@ -428,7 +430,7 @@ private Collection<AbstractCompactionTask> createCompactionTasks(Collection<Comp
428430

429431
/// Create compaction tasks for the given aggregate and add them to the given tasks list.
430432
public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate aggregate,
431-
Collection<? super UnifiedCompactionTask> tasks, CompactionObserver compositeCompactionObserver)
433+
Collection<? super UnifiedCompactionTask> tasks, @Nullable CompactionObserver additionalObserver)
432434
{
433435
CompactionPick selected = aggregate.getSelected();
434436
int parallelism = aggregate.getPermittedParallelism();
@@ -442,7 +444,7 @@ public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate
442444
{
443445
// This will ignore the range of the operation, which is fine.
444446
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
445-
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks, compositeCompactionObserver);
447+
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks, additionalObserver);
446448
}
447449
else
448450
{
@@ -451,6 +453,12 @@ public void createAndAddTasks(int gcBefore, CompactionAggregate.UnifiedAggregate
451453
}
452454
}
453455

456+
/// Return the num of in-progress compactions tracked by UCS
457+
public int getCompactionInProgress()
458+
{
459+
return backgroundCompactions.getCompactionsInProgress().size();
460+
}
461+
454462
private static RuntimeException rejectTasks(Iterable<? extends AbstractCompactionTask> tasks, Throwable error)
455463
{
456464
for (var task : tasks)
@@ -614,7 +622,7 @@ void createAndAddTasks(int gcBefore,
614622
int parallelism,
615623
Collection<? super CompactionTask> tasks)
616624
{
617-
createAndAddTasks(gcBefore, transaction, null, false, shardingStats, parallelism, tasks, this);
625+
createAndAddTasks(gcBefore, transaction, null, false, shardingStats, parallelism, tasks, null);
618626
}
619627

620628
@VisibleForTesting
@@ -625,12 +633,12 @@ void createAndAddTasks(int gcBefore,
625633
ShardingStats shardingStats,
626634
int parallelism,
627635
Collection<? super UnifiedCompactionTask> tasks,
628-
CompactionObserver compositeCompactionObserver)
636+
@Nullable CompactionObserver additionalObserver)
629637
{
630638
if (controller.parallelizeOutputShards() && parallelism > 1)
631-
tasks.addAll(createParallelCompactionTasks(transaction, operationRange, keepOriginals, shardingStats, gcBefore, parallelism, compositeCompactionObserver));
639+
tasks.addAll(createParallelCompactionTasks(transaction, operationRange, keepOriginals, shardingStats, gcBefore, parallelism, additionalObserver));
632640
else
633-
tasks.add(createCompactionTask(transaction, operationRange, keepOriginals, shardingStats, gcBefore));
641+
tasks.add(createCompactionTask(transaction, operationRange, keepOriginals, shardingStats, gcBefore, additionalObserver));
634642
}
635643

636644
/// Create the sstable writer used for flushing.
@@ -677,9 +685,13 @@ private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transact
677685
/// where we produce outputs but cannot delete the input sstables until all components of the operation are complete.
678686
///
679687
/// @return a sharded compaction task that in turn will create a sharded compaction writer.
680-
private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transaction, Range<Token> operationRange, boolean keepOriginals, ShardingStats shardingStats, int gcBefore)
688+
private UnifiedCompactionTask createCompactionTask(LifecycleTransaction transaction, Range<Token> operationRange, boolean keepOriginals, ShardingStats shardingStats, int gcBefore,
689+
@Nullable CompactionObserver additionalObserver)
681690
{
682-
return new UnifiedCompactionTask(realm, this, transaction, gcBefore, keepOriginals, getShardManager(), shardingStats, operationRange, transaction.originals(), null, null, null);
691+
UnifiedCompactionTask task = new UnifiedCompactionTask(realm, this, transaction, gcBefore, keepOriginals, getShardManager(), shardingStats, operationRange, transaction.originals(), null, null, null);
692+
if (additionalObserver != null)
693+
task.addObserver(additionalObserver);
694+
return task;
683695
}
684696

685697
@Override
@@ -701,7 +713,7 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
701713
ShardingStats shardingStats,
702714
int gcBefore,
703715
int parallelism,
704-
CompactionObserver compositeCompactionObserver)
716+
@Nullable CompactionObserver additionalObserver)
705717
{
706718
final int coveredShardCount = shardingStats.coveredShardCount;
707719
assert parallelism > 1;
@@ -710,7 +722,8 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
710722
ShardManager shardManager = getShardManager();
711723
CompositeLifecycleTransaction compositeTransaction = new CompositeLifecycleTransaction(transaction);
712724
SharedCompactionProgress sharedProgress = new SharedCompactionProgress(transaction.opId(), transaction.opType(), TableOperation.Unit.BYTES);
713-
SharedCompactionObserver sharedObserver = new SharedCompactionObserver(compositeCompactionObserver);
725+
SharedCompactionObserver sharedObserver = new SharedCompactionObserver(this, additionalObserver);
726+
714727
SharedTableOperation sharedOperation = new SharedTableOperation(sharedProgress);
715728
List<UnifiedCompactionTask> tasks = shardManager.splitSSTablesInShardsLimited(
716729
sstables,
@@ -741,7 +754,7 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
741754
if (tasks.size() == 1) // if there's just one range, make it a non-ranged task (to apply early open etc.)
742755
{
743756
assert tasks.get(0).inputSSTables().equals(sstables);
744-
return Collections.singletonList(createCompactionTask(transaction, operationRange, keepOriginals, shardingStats, gcBefore));
757+
return Collections.singletonList(createCompactionTask(transaction, operationRange, keepOriginals, shardingStats, gcBefore, additionalObserver));
745758
}
746759
else
747760
return tasks;

test/unit/org/apache/cassandra/db/compaction/SharedCompactionObserverTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.Future;
3737
import java.util.concurrent.ThreadLocalRandom;
3838

39+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3940
import static org.mockito.Mockito.*;
4041

4142
public class SharedCompactionObserverTest
@@ -185,4 +186,57 @@ public void testErrorWrongProgress()
185186
when(mockProgress2.operationId()).thenReturn(UUID.randomUUID());
186187
Assert.assertThrows(AssertionError.class, () -> sharedCompactionObserver.onInProgress(mockProgress2));
187188
}
189+
190+
@Test
191+
public void testMultipleObserver()
192+
{
193+
CompactionObserver mockObserver1 = Mockito.mock(CompactionObserver.class);
194+
CompactionObserver mockObserver2 = Mockito.mock(CompactionObserver.class);
195+
SharedCompactionObserver sharedCompactionObserver = new SharedCompactionObserver(mockObserver1, mockObserver2);
196+
197+
sharedCompactionObserver.registerExpectedSubtask();
198+
sharedCompactionObserver.onInProgress(mockProgress);
199+
verify(mockObserver1, times(1)).onInProgress(mockProgress);
200+
verify(mockObserver2, times(1)).onInProgress(mockProgress);
201+
202+
sharedCompactionObserver.onCompleted(operationId, null);
203+
verify(mockObserver1, times(1)).onCompleted(operationId, null);
204+
verify(mockObserver2, times(1)).onCompleted(operationId, null);
205+
}
206+
207+
@Test
208+
public void testMultipleObserverWithOnInProgressError()
209+
{
210+
CompactionObserver mockObserver1 = Mockito.mock(CompactionObserver.class);
211+
CompactionObserver mockObserver2 = Mockito.mock(CompactionObserver.class);
212+
Mockito.doThrow(new RuntimeException("Injected Exception")).when(mockObserver1).onInProgress(any());
213+
214+
SharedCompactionObserver sharedCompactionObserver = new SharedCompactionObserver(mockObserver1, mockObserver2);
215+
216+
sharedCompactionObserver.registerExpectedSubtask();
217+
assertThatThrownBy(() -> sharedCompactionObserver.onInProgress(mockProgress)).isInstanceOf(RuntimeException.class);
218+
219+
// both onInProgress are called
220+
verify(mockObserver1, times(1)).onInProgress(mockProgress);
221+
verify(mockObserver2, times(1)).onInProgress(mockProgress);
222+
}
223+
224+
@Test
225+
public void testMultipleObserverWithOnCompleteError()
226+
{
227+
CompactionObserver mockObserver1 = Mockito.mock(CompactionObserver.class);
228+
CompactionObserver mockObserver2 = Mockito.mock(CompactionObserver.class);
229+
Mockito.doThrow(new RuntimeException("Injected Exception")).when(mockObserver1).onCompleted(any(), any());
230+
231+
SharedCompactionObserver sharedCompactionObserver = new SharedCompactionObserver(mockObserver1, mockObserver2);
232+
233+
sharedCompactionObserver.registerExpectedSubtask();
234+
sharedCompactionObserver.onInProgress(mockProgress);
235+
verify(mockObserver1, times(1)).onInProgress(mockProgress);
236+
verify(mockObserver2, times(1)).onInProgress(mockProgress);
237+
238+
assertThatThrownBy(() -> sharedCompactionObserver.onCompleted(operationId, null)).isInstanceOf(RuntimeException.class);
239+
verify(mockObserver1, times(1)).onCompleted(operationId, null);
240+
verify(mockObserver2, times(1)).onCompleted(operationId, null);
241+
}
188242
}

test/unit/org/apache/cassandra/db/compaction/UnifiedCompactionStrategyTest.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import static org.mockito.ArgumentMatchers.anyInt;
8282
import static org.mockito.ArgumentMatchers.anyList;
8383
import static org.mockito.ArgumentMatchers.anyLong;
84+
import static org.mockito.Mockito.times;
8485
import static org.mockito.Mockito.when;
8586

8687
/**
@@ -1758,9 +1759,25 @@ public void testDontCreateParallelTasks()
17581759
}
17591760

17601761
@Test
1761-
public void testCustomCompositeCompactionObserver()
1762+
public void testAdditionalCompactionObserverForParallelCompaction()
1763+
{
1764+
testAdditionalCompactionObserver(5, 1000);
1765+
}
1766+
1767+
@Test
1768+
public void testAdditionalCompactionObserverForSingleCompactionSingleShard()
1769+
{
1770+
testAdditionalCompactionObserver(1, 1000);
1771+
}
1772+
1773+
@Test
1774+
public void testAdditionalCompactionObserverForSingleCompactionLimitedParallelism()
1775+
{
1776+
testAdditionalCompactionObserver(5, 1);
1777+
}
1778+
1779+
private void testAdditionalCompactionObserver(int numShards, int parallelism)
17621780
{
1763-
int numShards = 5;
17641781
Set<SSTableReader> allSSTables = new HashSet<>();
17651782
allSSTables.addAll(mockNonOverlappingSSTables(10, 0, 100 << 20));
17661783
allSSTables.addAll(mockNonOverlappingSSTables(15, 1, 200 << 20));
@@ -1769,7 +1786,9 @@ public void testCustomCompositeCompactionObserver()
17691786
Controller controller = Mockito.mock(Controller.class);
17701787
when(controller.getNumShards(anyDouble())).thenReturn(numShards);
17711788
when(controller.parallelizeOutputShards()).thenReturn(true);
1772-
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(strategyFactory, controller);
1789+
1790+
BackgroundCompactions backgroundCompactions = Mockito.mock(BackgroundCompactions.class);
1791+
UnifiedCompactionStrategy strategy = new UnifiedCompactionStrategy(strategyFactory, backgroundCompactions, controller);
17731792
strategy.startup();
17741793
LifecycleTransaction txn = dataTracker.tryModify(allSSTables, OperationType.COMPACTION);
17751794
var tasks = new ArrayList<CompactionTask>();
@@ -1790,8 +1809,15 @@ public void onCompleted(UUID id, @Nullable Throwable error)
17901809
compositedCompleted.incrementAndGet();
17911810
}
17921811
};
1793-
strategy.createAndAddTasks(0, txn, null, false, strategy.makeShardingStats(txn), 1000, tasks, compositeCompactionObserver);
1794-
assertEquals(numShards, tasks.size());
1812+
strategy.createAndAddTasks(0, txn, null, false, strategy.makeShardingStats(txn), parallelism, tasks, compositeCompactionObserver);
1813+
if (parallelism > 1)
1814+
assertEquals(numShards, tasks.size());
1815+
else
1816+
assertEquals(1, tasks.size());
1817+
1818+
assertThat(compositedCompleted).hasValue(0);
1819+
Mockito.verify(backgroundCompactions, times(0)).onInProgress(Mockito.any());
1820+
Mockito.verify(backgroundCompactions, times(0)).onCompleted(Mockito.any(), Mockito.any());
17951821

17961822
// move all tasks to in-progress
17971823
CompactionProgress progress = mockProgress(strategy, txn.opId());
@@ -1800,15 +1826,20 @@ public void onCompleted(UUID id, @Nullable Throwable error)
18001826

18011827
assertThat(compositeInProgress).hasValue(1);
18021828
assertThat(compositedCompleted).hasValue(0);
1829+
Mockito.verify(backgroundCompactions, times(1)).onInProgress(Mockito.any());
1830+
Mockito.verify(backgroundCompactions, times(0)).onCompleted(Mockito.any(), Mockito.any());
18031831

18041832
// move all tasks to complete
18051833
for (CompactionTask task : tasks)
18061834
task.getCompObservers().forEach(o -> o.onCompleted(task.transaction.opId(), null));
18071835

18081836
assertThat(compositeInProgress).hasValue(1);
18091837
assertThat(compositedCompleted).hasValue(1);
1838+
Mockito.verify(backgroundCompactions, times(1)).onInProgress(Mockito.any());
1839+
Mockito.verify(backgroundCompactions, times(1)).onCompleted(Mockito.any(), Mockito.any());
18101840
}
18111841

1842+
18121843
@Test
18131844
public void testMaximalSelection()
18141845
{

0 commit comments

Comments
 (0)