Skip to content

Commit cba4208

Browse files
committed
Standardise Replay logic to ensure SafeCommandStore.update is called
Also Fix: - Initialise home state when calling waiting() if not already initialised - Don't reportClosed/reportRetired for epochs that are already closed/retired Also Improve: - Implement waitForQuiescence in AccordExecutor - Permit replay parallelism patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20750
1 parent a866f92 commit cba4208

File tree

9 files changed

+103
-85
lines changed

9 files changed

+103
-85
lines changed

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ public StaticSegmentKeyIterator staticSegmentKeyIterator()
946946
*/
947947
public static class KeyRefs<K>
948948
{
949-
long segments[];
949+
long[] segments;
950950
K key;
951951
int size;
952952

@@ -966,6 +966,11 @@ public void segments(LongConsumer consumer)
966966
consumer.accept(segments[i]);
967967
}
968968

969+
public long[] copyOfSegments()
970+
{
971+
return segments == null ? new long[0] : Arrays.copyOf(segments, size);
972+
}
973+
969974
public K key()
970975
{
971976
return key;

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import accord.local.Command;
4646
import accord.local.CommandStore;
4747
import accord.local.CommandStores;
48-
import accord.local.Commands;
4948
import accord.local.NodeCommandStoreService;
5049
import accord.local.PreLoadContext;
5150
import accord.local.RedundantBefore;
@@ -55,6 +54,7 @@
5554
import accord.primitives.RangeDeps;
5655
import accord.primitives.Ranges;
5756
import accord.primitives.RoutableKey;
57+
import accord.primitives.Route;
5858
import accord.primitives.Status;
5959
import accord.primitives.Timestamp;
6060
import accord.primitives.TxnId;
@@ -71,7 +71,6 @@
7171
import static accord.api.Journal.CommandUpdate;
7272
import static accord.api.Journal.FieldUpdates;
7373
import static accord.api.Journal.Load.MINIMAL;
74-
import static accord.api.Journal.Loader;
7574
import static accord.utils.Invariants.require;
7675

7776
public class AccordCommandStore extends CommandStore
@@ -153,7 +152,7 @@ public void close()
153152

154153
private AccordSafeCommandStore current;
155154

156-
private final CommandStoreLoader loader;
155+
private final AccordCommandStoreLoader loader;
157156

158157
public AccordCommandStore(int id,
159158
NodeCommandStoreService node,
@@ -182,7 +181,7 @@ public AccordCommandStore(int id,
182181

183182
this.exclusiveExecutor = sharedExecutor.executor();
184183
this.commandsForRanges = new CommandsForRanges.Manager(this);
185-
this.loader = new CommandStoreLoader(this);
184+
this.loader = new AccordCommandStoreLoader(this);
186185

187186
maybeLoadRedundantBefore(journal.loadRedundantBefore(id()));
188187
maybeLoadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
@@ -485,7 +484,7 @@ public RangeSearcher rangeSearcher()
485484
return rangeSearcher;
486485
}
487486

488-
public Loader loader()
487+
public AccordCommandStoreLoader loader()
489488
{
490489
return loader;
491490
}
@@ -496,23 +495,21 @@ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore)
496495
super.unsafeUpsertRedundantBefore(addRedundantBefore);
497496
}
498497

499-
private static class CommandStoreLoader extends AbstractLoader
498+
public static class AccordCommandStoreLoader extends AbstractLoader
500499
{
501500
private final AccordCommandStore store;
502501

503-
private CommandStoreLoader(AccordCommandStore store)
502+
private AccordCommandStoreLoader(AccordCommandStore store)
504503
{
505504
this.store = store;
506505
}
507506

508507
@Override
509-
public AsyncChain<Command> load(TxnId txnId)
508+
public AsyncChain<Route> load(TxnId txnId)
510509
{
511510
return store.submit(txnId, safeStore -> {
512-
maybeApplyWrites(txnId, safeStore, (safeCommand, cmd) -> {
513-
Commands.applyWrites(safeStore, txnId, cmd).begin(store.agent);
514-
});
515-
return safeStore.unsafeGet(txnId).current();
511+
maybeApplyWrites(safeStore, txnId);
512+
return safeStore.unsafeGet(txnId).current().route();
516513
});
517514
}
518515
}

src/java/org/apache/cassandra/service/accord/AccordCommandStores.java

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717
*/
1818
package org.apache.cassandra.service.accord;
1919

20-
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.List;
23-
import java.util.concurrent.ExecutionException;
24-
import java.util.concurrent.Future;
2522
import java.util.concurrent.TimeUnit;
2623

2724
import accord.api.Agent;
@@ -46,7 +43,6 @@
4643
import org.apache.cassandra.schema.TableId;
4744
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
4845
import org.apache.cassandra.service.accord.api.TokenKey;
49-
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
5046

5147
import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
5248
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -229,38 +225,8 @@ public List<AccordExecutor> executors()
229225

230226
public void waitForQuiescense()
231227
{
232-
boolean runAgain = true;
233-
try
234-
{
235-
while (true)
236-
{
237-
boolean hasTasks = false;
238-
List<Future<?>> futures = new ArrayList<>();
239-
for (AccordExecutor executor : this.executors)
240-
{
241-
hasTasks |= executor.hasTasks();
242-
hasTasks |= Stage.MUTATION.executor().getPendingTaskCount() > 0;
243-
hasTasks |= Stage.MUTATION.executor().getActiveTaskCount() > 0;
244-
futures.add(executor.submit(() -> {}));
245-
}
246-
for (Future<?> future : futures)
247-
future.get();
248-
futures.clear();
249-
250-
if (!runAgain)
251-
return;
252-
253-
runAgain = hasTasks;
254-
}
255-
}
256-
catch (ExecutionException e)
257-
{
258-
throw new IllegalStateException("Should have never been thrown", e);
259-
}
260-
catch (InterruptedException e)
261-
{
262-
throw new UncheckedInterruptedException(e);
263-
}
228+
for (AccordExecutor executor : this.executors)
229+
executor.waitForQuiescence();
264230
}
265231

266232
@Override

src/java/org/apache/cassandra/service/accord/AccordExecutor.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.cassandra.service.accord;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.concurrent.Callable;
2224
import java.util.concurrent.CancellationException;
2325
import java.util.concurrent.TimeUnit;
@@ -62,6 +64,7 @@
6264
import org.apache.cassandra.metrics.AccordCacheMetrics;
6365
import org.apache.cassandra.utils.MonotonicClock;
6466
import org.apache.cassandra.utils.concurrent.AsyncPromise;
67+
import org.apache.cassandra.utils.concurrent.Condition;
6568
import org.apache.cassandra.utils.concurrent.Future;
6669

6770
import static org.apache.cassandra.service.accord.AccordCache.CommandAdapter.COMMAND_ADAPTER;
@@ -138,6 +141,8 @@ public GlobalCaches(AccordCache global, AccordCache.Type<TxnId, Command, AccordS
138141
private final AccordCacheEntry.OnLoaded onRangeLoaded = this::onRangeLoaded;
139142
private final ExclusiveGlobalCaches caches;
140143

144+
private List<Condition> waitingForQuiescence;
145+
141146
/**
142147
* The maximum total number of loads we can queue at once - this includes loads for range transactions,
143148
* which are subject to this limit as well as that imposed by {@link #maxQueuedRangeLoads}
@@ -224,6 +229,36 @@ public Stream<? extends DebuggableTaskRunner> active()
224229
return Stream.of();
225230
}
226231

232+
public void waitForQuiescence()
233+
{
234+
Condition condition;
235+
lock.lock();
236+
try
237+
{
238+
if (tasks == 0 && running == 0)
239+
return;
240+
241+
if (waitingForQuiescence == null)
242+
waitingForQuiescence = new ArrayList<>();
243+
condition = Condition.newOneTimeCondition();
244+
waitingForQuiescence.add(condition);
245+
}
246+
finally
247+
{
248+
lock.unlock();
249+
}
250+
condition.awaitThrowUncheckedOnInterrupt();
251+
}
252+
253+
protected void signalQuiescentExclusive()
254+
{
255+
if (waitingForQuiescence != null)
256+
{
257+
waitingForQuiescence.forEach(Condition::signalAll);
258+
waitingForQuiescence = null;
259+
}
260+
}
261+
227262
void maybeUnpauseLoading()
228263
{
229264
if (!hasPausedLoading)

src/java/org/apache/cassandra/service/accord/AccordExecutorAbstractLockLoop.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ private void exitLockExclusive()
125125
private void pauseExclusive()
126126
{
127127
isHeldByExecutor = false;
128-
--running;
128+
if (--running == 0 && tasks == 0)
129+
signalQuiescentExclusive();
129130
}
130131

131132
private void resumeExclusive()
@@ -234,14 +235,15 @@ public void run()
234235
break;
235236
}
236237

238+
pauseExclusive();
239+
237240
if (shutdown)
238241
{
239242
exitLockExclusive();
240243
notifyWorkExclusive();
241244
return;
242245
}
243246

244-
pauseExclusive();
245247
awaitExclusive();
246248
resumeExclusive();
247249
}

src/java/org/apache/cassandra/service/accord/AccordExecutorSimple.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public AccordExecutorSimple(ReentrantLock lock, int executorId, Mode mode, int t
7070
Invariants.requireArgument(threads == 1);
7171
this.lock = lock;
7272
this.executor = executorFactory().sequential(name.apply(0));
73-
7473
}
7574

7675
@Override
@@ -89,7 +88,11 @@ protected void run()
8988
{
9089
Task task = pollWaitingToRunExclusive();
9190
if (task == null)
91+
{
92+
running = 0;
93+
signalQuiescentExclusive();
9294
return;
95+
}
9396

9497
--tasks;
9598
try { task.preRunExclusive(null); task.run(); }

src/java/org/apache/cassandra/service/accord/AccordJournal.java

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.NavigableMap;
2828
import java.util.concurrent.TimeUnit;
2929
import java.util.concurrent.TimeoutException;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.function.Consumer;
3132
import java.util.function.Supplier;
3233
import javax.annotation.Nullable;
@@ -52,7 +53,6 @@
5253
import accord.utils.Invariants;
5354
import accord.utils.PersistentField;
5455
import accord.utils.UnhandledEnum;
55-
import accord.utils.async.AsyncChains;
5656
import accord.utils.async.AsyncResult;
5757
import accord.utils.async.AsyncResults;
5858
import org.apache.cassandra.concurrent.Shutdownable;
@@ -70,6 +70,7 @@
7070
import org.apache.cassandra.journal.SegmentCompactor;
7171
import org.apache.cassandra.journal.StaticSegment;
7272
import org.apache.cassandra.journal.ValueSerializer;
73+
import org.apache.cassandra.service.accord.AccordCommandStore.AccordCommandStoreLoader;
7374
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightImage;
7475
import org.apache.cassandra.service.accord.AccordJournalValueSerializers.IdentityAccumulator;
7576
import org.apache.cassandra.service.accord.JournalKey.JournalKeySupport;
@@ -82,6 +83,8 @@
8283
import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
8384
import org.apache.cassandra.utils.CloseableIterator;
8485
import org.apache.cassandra.utils.ExecutorUtils;
86+
import org.apache.cassandra.utils.FBUtilities;
87+
import org.apache.cassandra.utils.concurrent.Semaphore;
8588

8689
import static accord.impl.CommandChange.Field.CLEANUP;
8790
import static accord.impl.CommandChange.anyFieldChanged;
@@ -484,43 +487,52 @@ public void forEach(Consumer<JournalKey> consumer)
484487
@Override
485488
public void replay(CommandStores commandStores)
486489
{
490+
final Semaphore concurrency = Semaphore.newSemaphore(FBUtilities.getAvailableProcessors());
491+
final AtomicBoolean abort = new AtomicBoolean();
492+
487493
try (CloseableIterator<Journal.KeyRefs<JournalKey>> iter = journalTable.keyIterator())
488494
{
489495
JournalKey prev = null;
490496
while (iter.hasNext())
491497
{
492-
Journal.KeyRefs<JournalKey> ref = iter.next();
498+
if (abort.get())
499+
break;
493500

494-
if (ref.key().type != JournalKey.Type.COMMAND_DIFF)
495-
continue;
496-
CommandStore commandStore = commandStores.forId(ref.key().commandStoreId);
497-
Loader loader = commandStore.loader();
498-
TxnId txnId = ref.key().id;
499-
try
501+
JournalKey key;
502+
long[] segments;
500503
{
501-
Invariants.require(prev == null ||
502-
ref.key().commandStoreId != prev.commandStoreId ||
503-
ref.key().id.compareTo(prev.id) != 0,
504-
"duplicate key detected %s == %s", ref.key(), prev);
505-
prev = ref.key();
506-
AsyncChains.getUnchecked(loader.load(txnId)
507-
.map(command -> {
508-
if (journalTable.shouldIndex(ref.key())
509-
&& command.participants() != null
510-
&& command.participants().route() != null)
511-
{
512-
ref.segments(segment -> {
513-
journalTable.safeNotify(index -> index.update(segment, ref.key().commandStoreId, txnId, command.participants().route()));
514-
});
515-
}
516-
return command;
517-
})
518-
.beginAsResult());
519-
}
520-
catch (Throwable t)
521-
{
522-
journal.handleError("Could not replay command " + ref.key().id, t);
504+
Journal.KeyRefs<JournalKey> ref = iter.next();
505+
key = ref.key();
506+
if (key.type != JournalKey.Type.COMMAND_DIFF)
507+
continue;
508+
509+
segments = journalTable.shouldIndex(key) ? ref.copyOfSegments() : null;
523510
}
511+
512+
CommandStore commandStore = commandStores.forId(key.commandStoreId);
513+
AccordCommandStoreLoader loader = (AccordCommandStoreLoader) commandStore.loader();
514+
515+
TxnId txnId = key.id;
516+
Invariants.require(prev == null ||
517+
key.commandStoreId != prev.commandStoreId ||
518+
key.id.compareTo(prev.id) != 0,
519+
"duplicate key detected %s == %s", key, prev);
520+
prev = key;
521+
522+
concurrency.acquireThrowUncheckedOnInterrupt(1);
523+
loader.load(txnId)
524+
.map(route -> {
525+
if (segments != null)
526+
{
527+
for (long segment : segments)
528+
journalTable.safeNotify(index -> index.update(segment, key.commandStoreId, txnId, route));
529+
}
530+
return null;
531+
}).begin((success, fail) -> {
532+
concurrency.release(1);
533+
if (fail != null && !journal.handleError("Could not replay command " + txnId, fail))
534+
abort.set(true);
535+
});
524536
}
525537
}
526538
}

0 commit comments

Comments
 (0)