Skip to content

Commit c6a2d79

Browse files
berengdriftx
authored andcommitted
HCD-130 incremental repair failure during compaction (#1728)
### What is the issue Concurrent and incremental repairs would spin fail or deadlock. ### What does this PR fix and why was it fixed Concurrent and incremental repairs would spin fail. This patch: - Removes an optimization failing to observe max parallelism - Provides an improved algorithm to enforce max parallelism - Closes transactions on some exceptions failing to be caught - Removes a deadlock between cfs and the compaction strategy for long running sequential operations
1 parent 37372a7 commit c6a2d79

File tree

6 files changed

+293
-62
lines changed

6 files changed

+293
-62
lines changed

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.TimeoutException;
4444
import java.util.concurrent.atomic.AtomicReference;
45+
import java.util.concurrent.locks.ReentrantLock;
4546
import java.util.concurrent.locks.ReentrantReadWriteLock;
4647
import java.util.function.Consumer;
4748
import java.util.function.Supplier;
@@ -435,6 +436,8 @@ TablePaxosRepairHistory get()
435436

436437
private final RequestTracker requestTracker = RequestTracker.instance;
437438

439+
private final ReentrantLock longRunningSerializedOperationsLock = new ReentrantLock();
440+
438441
public static void shutdownPostFlushExecutor() throws InterruptedException
439442
{
440443
postFlushExecutor.shutdown();
@@ -3126,7 +3129,8 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
31263129
{
31273130
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
31283131
// and so we only run one major compaction at a time
3129-
synchronized (this)
3132+
longRunningSerializedOperationsLock.lock();
3133+
try
31303134
{
31313135
logger.debug("Cancelling in-progress compactions for {}", metadata.name);
31323136
Iterable<ColumnFamilyStore> toInterruptFor = concatWith(interruptIndexes, interruptViews);
@@ -3154,16 +3158,9 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
31543158
CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate);
31553159

31563160
// doublecheck that we finished, instead of timing out
3157-
for (ColumnFamilyStore cfs : toInterruptFor)
3158-
{
3159-
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
3160-
{
3161-
logger.warn("Unable to cancel in-progress compactions for {}. " +
3162-
"Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.",
3163-
metadata.name);
3164-
return null;
3165-
}
3166-
}
3161+
if (!allCompactionsFinished(toInterruptFor, sstablesPredicate))
3162+
return null;
3163+
31673164
logger.trace("Compactions successfully cancelled");
31683165

31693166
// run our task
@@ -3177,6 +3174,26 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable,
31773174
}
31783175
}
31793176
}
3177+
finally
3178+
{
3179+
longRunningSerializedOperationsLock.unlock();
3180+
}
3181+
}
3182+
3183+
private boolean allCompactionsFinished( Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablesPredicate)
3184+
{
3185+
for (ColumnFamilyStore cfs : cfss)
3186+
{
3187+
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
3188+
{
3189+
logger.warn("Unable to cancel in-progress compactions for {}.{}. Perhaps there is an unusually " +
3190+
"large row in progress somewhere, or the system is simply overloaded.", metadata.keyspace, metadata.name);
3191+
logger.debug("In-flight compactions: {}", Arrays.toString(cfs.getTracker().getCompacting().toArray()));
3192+
return false;
3193+
}
3194+
}
3195+
3196+
return true;
31803197
}
31813198

31823199
private static CompactionManager.CompactionPauser pauseCompactionStrategies(Iterable<ColumnFamilyStore> toPause)

src/java/org/apache/cassandra/db/compaction/BackgroundCompactions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ void setSubmitted(CompactionStrategy strategy, TimeUUID id, CompactionAggregate
171171
if (id == null || aggregate == null)
172172
throw new IllegalArgumentException("arguments cannot be null");
173173

174-
logger.debug("Submitting background compaction {}", id);
174+
logger.debug("Submitting background compaction {} for {}.{}", id, metadata.keyspace, metadata.name);
175175
CompactionPick compaction = aggregate.getSelected();
176176

177177
CompactionPick prev = compactions.put(id, compaction);

src/java/org/apache/cassandra/db/compaction/ShardManager.java

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.Collection;
2323
import java.util.HashSet;
24+
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.PriorityQueue;
2627
import java.util.Set;
@@ -267,68 +268,88 @@ default <T, R extends CompactionSSTable> List<T> splitSSTablesInShardsLimited(Co
267268
{
268269
if (coveredShards <= maxParallelism)
269270
return splitSSTablesInShards(sstables, operationRange, numShardsForDensity, maker);
270-
// We may be in a simple case where we can reduce the number of shards by some power of 2.
271-
int multiple = Integer.highestOneBit(coveredShards / maxParallelism);
272-
if (maxParallelism * multiple == coveredShards)
273-
return splitSSTablesInShards(sstables, operationRange, numShardsForDensity / multiple, maker);
274271

275272
var shards = splitSSTablesInShards(sstables,
276273
operationRange,
277274
numShardsForDensity,
278275
(rangeSSTables, range) -> Pair.create(Set.copyOf(rangeSSTables), range));
276+
279277
return applyMaxParallelism(maxParallelism, maker, shards);
280278
}
281279

282-
private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int maxParallelism, BiFunction<Collection<R>, Range<Token>, T> maker, List<Pair<Set<R>, Range<Token>>> shards)
280+
private static <T, R extends CompactionSSTable> List<T> applyMaxParallelism(int maxParallelism,
281+
BiFunction<Collection<R>, Range<Token>, T> maker,
282+
List<Pair<Set<R>, Range<Token>>> shards)
283283
{
284-
int actualParallelism = shards.size();
285-
if (maxParallelism >= actualParallelism)
286-
{
287-
// We can fit within the parallelism limit without grouping, because some ranges are empty.
288-
// This is not expected to happen often, but if it does, take advantage.
289-
List<T> tasks = new ArrayList<>();
290-
for (Pair<Set<R>, Range<Token>> pair : shards)
291-
tasks.add(maker.apply(pair.left, pair.right));
292-
return tasks;
293-
}
294-
295-
// Otherwise we have to group shards together. Define a target token span per task and greedily group
296-
// to be as close to it as possible.
297-
double spanPerTask = shards.stream().map(Pair::right).mapToDouble(t -> t.left.size(t.right)).sum() / maxParallelism;
298-
double currentSpan = 0;
299-
Set<R> currentSSTables = new HashSet<>();
300-
Token rangeStart = null;
301-
Token prevEnd = null;
284+
Iterator<Pair<Set<R>, Range<Token>>> iter = shards.iterator();
302285
List<T> tasks = new ArrayList<>(maxParallelism);
303-
for (var pair : shards)
286+
int shardsRemaining = shards.size();
287+
int tasksRemaining = maxParallelism;
288+
289+
if (shardsRemaining > tasksRemaining)
304290
{
305-
final Token currentEnd = pair.right.right;
306-
final Token currentStart = pair.right.left;
307-
double span = currentStart.size(currentEnd);
308-
if (rangeStart == null)
309-
rangeStart = currentStart;
310-
if (currentSpan + span >= spanPerTask - 0.001) // rounding error safety
291+
double totalSpan = shards.stream().map(Pair::right).mapToDouble(r -> r.left.size(r.right)).sum();
292+
double spanPerTask = totalSpan / maxParallelism;
293+
294+
Set<R> currentSSTables = new HashSet<>();
295+
Token rangeStart = null;
296+
double currentSpan = 0;
297+
298+
// While we have more shards to process than there are tasks, we need to bunch shards up into tasks.
299+
while (shardsRemaining > tasksRemaining)
311300
{
312-
boolean includeCurrent = currentSpan + span - spanPerTask <= spanPerTask - currentSpan;
313-
if (includeCurrent)
314-
currentSSTables.addAll(pair.left);
315-
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, includeCurrent ? currentEnd : prevEnd)));
316-
currentSpan -= spanPerTask;
317-
rangeStart = null;
318-
currentSSTables.clear();
319-
if (!includeCurrent)
320-
{
321-
currentSSTables.addAll(pair.left);
301+
Pair<Set<R>, Range<Token>> pair = iter.next(); // shardsRemaining counts the shards so iter can't be exhausted at this point
302+
Token currentStart = pair.right.left;
303+
Token currentEnd = pair.right.right;
304+
double span = currentStart.size(currentEnd);
305+
306+
if (rangeStart == null)
322307
rangeStart = currentStart;
308+
309+
currentSSTables.addAll(pair.left);
310+
currentSpan += span;
311+
312+
// If there is only one task remaining, we should not issue it until we are processing the last shard.
313+
// The latter condition is normally guaranteed, but floating point rounding has a very small chance of making the calculations wrong
314+
if (currentSpan >= spanPerTask && tasksRemaining > 1)
315+
{
316+
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, currentEnd)));
317+
--tasksRemaining;
318+
currentSSTables = new HashSet<>();
319+
rangeStart = null;
320+
currentSpan = 0;
323321
}
322+
--shardsRemaining;
324323
}
325-
else
324+
325+
// At this point there are as many tasks remaining as there are shards
326+
// (this includes the case of issuing a task for the last shard when only one task remains).
327+
328+
// Add any already collected sstables to the next task.
329+
if (!currentSSTables.isEmpty())
330+
{
331+
assert shardsRemaining > 0;
332+
Pair<Set<R>, Range<Token>> pair = iter.next(); // shardsRemaining counts the shards so iter can't be exhausted at this point
326333
currentSSTables.addAll(pair.left);
334+
Token currentEnd = pair.right.right;
335+
tasks.add(maker.apply(currentSSTables, new Range<>(rangeStart, currentEnd)));
336+
--tasksRemaining;
337+
--shardsRemaining;
338+
}
339+
assert shardsRemaining == tasksRemaining : shardsRemaining + " != " + tasksRemaining;
340+
}
327341

328-
currentSpan += span;
329-
prevEnd = currentEnd;
342+
// If we still have tasks and shards to process, produce one task for each shard.
343+
while (iter.hasNext())
344+
{
345+
Pair<Set<R>, Range<Token>> pair = iter.next(); // shardsRemaining counts the shards so iter can't be exhausted at this point
346+
tasks.add(maker.apply(pair.left, pair.right));
347+
--tasksRemaining;
348+
--shardsRemaining;
330349
}
331-
assert currentSSTables.isEmpty();
350+
351+
assert tasks.size() == Math.min(maxParallelism, shards.size()) : tasks.size() + " != " + maxParallelism;
352+
assert shardsRemaining == 0 : shardsRemaining + " != 0";
332353
return tasks;
333354
}
334355

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,13 @@ public synchronized CompactionTasks getMaximalTasks(long gcBefore, boolean split
271271
permittedParallelism = Integer.MAX_VALUE;
272272

273273
List<AbstractCompactionTask> tasks = new ArrayList<>();
274+
LifecycleTransaction txn = null;
274275
try
275276
{
276277
// Split the space into independently compactable groups.
277278
for (var aggregate : getMaximalAggregates())
278279
{
279-
LifecycleTransaction txn = realm.tryModify(aggregate.getSelected().sstables(),
280+
txn = realm.tryModify(aggregate.getSelected().sstables(),
280281
OperationType.COMPACTION,
281282
aggregate.getSelected().id());
282283

@@ -296,6 +297,8 @@ public synchronized CompactionTasks getMaximalTasks(long gcBefore, boolean split
296297
}
297298
catch (Throwable t)
298299
{
300+
if (txn != null)
301+
txn.close();
299302
throw rejectTasks(tasks, t);
300303
}
301304
}
@@ -442,9 +445,17 @@ public void createAndAddTasks(long gcBefore, CompactionAggregate.UnifiedAggregat
442445
selected.id());
443446
if (transaction != null)
444447
{
445-
// This will ignore the range of the operation, which is fine.
446-
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
447-
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks, additionalObserver);
448+
try
449+
{
450+
// This will ignore the range of the operation, which is fine.
451+
backgroundCompactions.setSubmitted(this, transaction.opId(), aggregate);
452+
createAndAddTasks(gcBefore, transaction, aggregate.operationRange(), aggregate.keepOriginals(), getShardingStats(aggregate), parallelism, tasks, additionalObserver);
453+
}
454+
catch (Throwable e)
455+
{
456+
transaction.close();
457+
throw e;
458+
}
448459
}
449460
else
450461
{
@@ -744,8 +755,8 @@ private Collection<UnifiedCompactionTask> createParallelCompactionTasks(Lifecycl
744755
sharedOperation)
745756
);
746757
compositeTransaction.completeInitialization();
747-
assert tasks.size() <= parallelism;
748-
assert tasks.size() <= coveredShardCount;
758+
assert tasks.size() <= parallelism : "Task size: " + tasks.size() + " vs parallelism of: " + parallelism;
759+
assert tasks.size() <= coveredShardCount : "Task size: " + tasks.size() + " vs covered shard count: " + coveredShardCount;
749760

750761
if (tasks.isEmpty())
751762
transaction.close(); // this should not be reachable normally, close the transaction for safety

test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.Optional;
3333
import java.util.Set;
34+
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.ExecutionException;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.atomic.AtomicReference;
@@ -53,6 +54,7 @@
5354
import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
5455
import org.apache.cassandra.db.commitlog.CommitLogPosition;
5556
import org.apache.cassandra.db.compaction.OperationType;
57+
import org.apache.cassandra.db.compaction.TableOperation;
5658
import org.apache.cassandra.db.filter.ColumnFilter;
5759
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
5860
import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -137,6 +139,58 @@ public void truncateCFS()
137139
Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
138140
}
139141

142+
@Test
143+
public void testRWCDLocking() throws InterruptedException
144+
{
145+
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
146+
CountDownLatch task1StaredtLatch = new CountDownLatch(1);
147+
CountDownLatch task1FinishLatch = new CountDownLatch(1);
148+
CountDownLatch task2StaredtLatch = new CountDownLatch(1);
149+
150+
Thread task1 = new Thread(() -> {
151+
cfs.runWithCompactionsDisabled(() -> {
152+
task1StaredtLatch.countDown();
153+
try
154+
{
155+
task1FinishLatch.await();
156+
}
157+
catch (InterruptedException e)
158+
{
159+
throw new RuntimeException(e);
160+
}
161+
return null;
162+
},
163+
OperationType.P0,
164+
true,
165+
true,
166+
TableOperation.StopTrigger.UNIT_TESTS);
167+
});
168+
task1.start();
169+
170+
Thread task2 = new Thread(() -> {
171+
cfs.runWithCompactionsDisabled(() -> {
172+
task2StaredtLatch.countDown();
173+
return null;
174+
},
175+
OperationType.P0,
176+
true,
177+
true,
178+
TableOperation.StopTrigger.UNIT_TESTS);
179+
});
180+
task2.start();
181+
182+
// Check that task1 started but task2 is waiting
183+
assertTrue(task1StaredtLatch.await(30, TimeUnit.SECONDS));
184+
assertEquals(1, task2StaredtLatch.getCount());
185+
186+
// Allow task1 to complete and check task2 completed next
187+
task1FinishLatch.countDown();
188+
assertTrue(task2StaredtLatch.await(30, TimeUnit.SECONDS));
189+
190+
task1.join();
191+
task2.join();
192+
}
193+
140194
@Test
141195
public void testMemtableTimestamp() throws Throwable
142196
{

0 commit comments

Comments
 (0)