Skip to content

Commit 0e9f0fa

Browse files
committed
Improve:
- PreLoadContext descriptions - Introduce LoadKeysFor so we can avoid loading range transactions except where necessary patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20758
1 parent 9df5481 commit 0e9f0fa

17 files changed

+79
-74
lines changed

modules/accord

Submodule accord updated 56 files

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ private AccordCommandStoreLoader(AccordCommandStore store)
507507
@Override
508508
public AsyncChain<Route> load(TxnId txnId)
509509
{
510-
return store.submit(txnId, safeStore -> {
510+
return store.submit(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
511511
initialiseState(safeStore, txnId);
512512
return safeStore.unsafeGet(txnId).current().route();
513513
});

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import accord.local.Command;
5757
import accord.local.CommandStore;
5858
import accord.local.CommandStores;
59-
import accord.local.KeyHistory;
6059
import accord.local.Node;
6160
import accord.local.Node.Id;
6261
import accord.local.PreLoadContext;
@@ -134,6 +133,8 @@
134133
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
135134

136135
import static accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE;
136+
import static accord.local.LoadKeys.SYNC;
137+
import static accord.local.LoadKeysFor.READ_WRITE;
137138
import static accord.local.durability.DurabilityService.SyncLocal.Self;
138139
import static accord.local.durability.DurabilityService.SyncRemote.All;
139140
import static accord.messages.SimpleReply.Ok;
@@ -791,7 +792,7 @@ private static void populate(CommandStoreTxnBlockedGraph.Builder state, AccordSa
791792
private static void populateAsync(CommandStoreTxnBlockedGraph.Builder state, CommandStore store, TxnId txnId)
792793
{
793794
state.asyncTxns.incrementAndGet();
794-
store.execute(txnId, in -> {
795+
store.execute(PreLoadContext.contextFor(txnId, "Populate txn_blocked_by"), in -> {
795796
populateSync(state, (AccordSafeCommandStore) in, txnId);
796797
if (0 == state.asyncTxns.decrementAndGet() && 0 == state.asyncKeys.get())
797798
state.complete();
@@ -841,7 +842,7 @@ private static void populate(CommandStoreTxnBlockedGraph.Builder state, AccordSa
841842
private static void populateAsync(CommandStoreTxnBlockedGraph.Builder state, CommandStore commandStore, TokenKey blockedBy, TxnId txnId, Timestamp executeAt)
842843
{
843844
state.asyncKeys.incrementAndGet();
844-
commandStore.execute(PreLoadContext.contextFor(txnId, RoutingKeys.of(blockedBy.toUnseekable()), KeyHistory.SYNC), in -> {
845+
commandStore.execute(PreLoadContext.contextFor(txnId, RoutingKeys.of(blockedBy.toUnseekable()), SYNC, READ_WRITE, "Populate txn_blocked_by"), in -> {
845846
populateSync(state, (AccordSafeCommandStore) in, blockedBy, txnId, executeAt);
846847
if (0 == state.asyncKeys.decrementAndGet() && 0 == state.asyncTxns.get())
847848
state.complete();

src/java/org/apache/cassandra/service/accord/AccordTask.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import accord.api.RoutingKey;
4343
import accord.local.Command;
4444
import accord.local.CommandStore;
45+
import accord.local.LoadKeys;
4546
import accord.local.PreLoadContext;
4647
import accord.local.SafeCommandStore;
4748
import accord.local.cfk.CommandsForKey;
@@ -52,6 +53,7 @@
5253
import accord.primitives.TxnId;
5354
import accord.primitives.Unseekables;
5455
import accord.utils.Invariants;
56+
import accord.utils.UnhandledEnum;
5557
import accord.utils.async.AsyncChain;
5658
import accord.utils.async.AsyncChains;
5759
import accord.utils.async.Cancellable;
@@ -69,6 +71,9 @@
6971
import org.apache.cassandra.utils.NoSpamLogger;
7072
import org.apache.cassandra.utils.concurrent.Condition;
7173

74+
import static accord.local.LoadKeysFor.READ_WRITE;
75+
import static accord.local.LoadKeysFor.RECOVERY;
76+
import static accord.local.LoadKeysFor.WRITE;
7277
import static accord.primitives.Routable.Domain.Key;
7378
import static accord.primitives.Txn.Kind.EphemeralRead;
7479
import static accord.utils.Invariants.illegalState;
@@ -342,14 +347,13 @@ public void presetup(AccordTask<?> parent)
342347

343348
if (parent.commandsForKey == null) return;
344349
if (preLoadContext.keys().domain() != Key) return;
345-
switch (preLoadContext.keyHistory())
350+
switch (preLoadContext.loadKeys())
346351
{
347-
default: throw new AssertionError("Unhandled KeyHistory: " + preLoadContext.keyHistory());
352+
default: throw new UnhandledEnum(preLoadContext.loadKeys());
348353
case NONE:
349354
break;
350355

351356
case ASYNC:
352-
case RECOVER:
353357
case INCR:
354358
case SYNC:
355359
for (RoutingKey key : (AbstractUnseekableKeys)preLoadContext.keys())
@@ -396,47 +400,38 @@ private void setupInternal(Caches caches)
396400

397401
private void setupKeyLoadsExclusive(Caches caches, Iterable<? extends RoutingKey> keys, boolean isToCompleteRangeScan)
398402
{
399-
switch (preLoadContext.keyHistory())
400-
{
401-
default: throw new AssertionError("Unhandled KeyHistory: " + preLoadContext.keyHistory());
402-
case NONE:
403-
break;
403+
if (preLoadContext.loadKeys() == LoadKeys.NONE)
404+
return;
404405

405-
case RECOVER:
406-
if (!isToCompleteRangeScan)
407-
{
408-
Invariants.require(rangeScanner == null);
409-
rangeScanner = new RangeTxnScanner();
410-
}
406+
if (!isToCompleteRangeScan && preLoadContext.loadKeysFor() == RECOVERY)
407+
{
408+
Invariants.require(rangeScanner == null);
409+
rangeScanner = new RangeTxnScanner();
410+
}
411411

412-
case ASYNC:
413-
case INCR:
414-
case SYNC:
415-
{
416-
boolean hasPreSetup = commandsForKey != null;
417-
for (RoutingKey key : keys)
418-
{
419-
if (hasPreSetup && completePresetupExclusive(key, commandsForKey, caches.commandsForKeys())) continue;
420-
setupExclusive(key, AccordTask::ensureCommandsForKey, caches.commandsForKeys());
421-
}
422-
break;
423-
}
412+
boolean hasPreSetup = commandsForKey != null;
413+
for (RoutingKey key : keys)
414+
{
415+
if (hasPreSetup && completePresetupExclusive(key, commandsForKey, caches.commandsForKeys())) continue;
416+
setupExclusive(key, AccordTask::ensureCommandsForKey, caches.commandsForKeys());
424417
}
425418
}
426419

427420
private void setupRangeLoadsExclusive(Caches caches)
428421
{
429-
switch (preLoadContext.keyHistory())
422+
if (preLoadContext.loadKeysFor() == WRITE)
423+
return;
424+
425+
switch (preLoadContext.loadKeys())
430426
{
431-
default: throw new AssertionError("Unhandled KeyHistory: " + preLoadContext.keyHistory());
427+
default: throw new UnhandledEnum(preLoadContext.loadKeys());
432428
case NONE:
433429
case ASYNC:
434430
break;
435431

436432
case INCR:
437433
throw new AssertionError("Incremental mode should only be used with an explicit list of keys");
438434

439-
case RECOVER:
440435
case SYNC:
441436
hasRanges = true;
442437
rangeScanner = new RangeTxnAndKeyScanner(caches.commandsForKeys());
@@ -1091,7 +1086,7 @@ public void start(BiFunction<Task, Runnable, Cancellable> executor)
10911086

10921087
void startInternal(Caches caches)
10931088
{
1094-
summaryLoader = commandStore.commandsForRanges().loader(preLoadContext.primaryTxnId(), preLoadContext.keyHistory(), keysOrRanges);
1089+
summaryLoader = commandStore.commandsForRanges().loader(preLoadContext.primaryTxnId(), preLoadContext.loadKeysFor(), keysOrRanges);
10951090
summaryLoader.forEachInCache(keysOrRanges, summary -> summaries.put(summary.txnId, summary), caches);
10961091
caches.commands().register(commandWatcher);
10971092
}
@@ -1149,6 +1144,6 @@ public long startTimeNanos()
11491144
@Override
11501145
public String description()
11511146
{
1152-
return toString();
1147+
return preLoadContext.describe();
11531148
}
11541149
}

src/java/org/apache/cassandra/service/accord/CommandsForRanges.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
import accord.local.Command;
4040
import accord.local.CommandSummaries;
4141
import accord.local.CommandSummaries.Summary;
42-
import accord.local.KeyHistory;
42+
import accord.local.LoadKeys;
43+
import accord.local.LoadKeysFor;
4344
import accord.local.MaxDecidedRX;
4445
import accord.local.RedundantBefore;
4546
import accord.primitives.AbstractRanges;
@@ -320,10 +321,10 @@ static Object[] toMap(TxnId txnId, RangeRoute route)
320321
}
321322
}
322323

323-
public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId, KeyHistory keyHistory, Unseekables<?> keysOrRanges)
324+
public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor, Unseekables<?> keysOrRanges)
324325
{
325326
RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore();
326-
return Loader.loader(redundantBefore, primaryTxnId, keyHistory, keysOrRanges, this::newLoader);
327+
return Loader.loader(redundantBefore, primaryTxnId, loadKeysFor, keysOrRanges, this::newLoader);
327328
}
328329

329330
private Loader newLoader(@Nullable TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep)

test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public void bootstrapTest() throws Throwable
278278
Assert.assertTrue(session.getNumKeyspaceTransfers() > 0);
279279
});
280280

281-
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore -> {
281+
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
282282
AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
283283
Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet()));
284284
Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet()));
@@ -321,7 +321,7 @@ public void bootstrapTest() throws Throwable
321321
Assert.assertEquals(key, row.getInt("c"));
322322
Assert.assertEquals(key, row.getInt("v"));
323323

324-
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore -> {
324+
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> {
325325
if (safeStore.ranges().currentRanges().contains(partitionKey))
326326
{
327327
AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore;
@@ -464,7 +464,7 @@ public void moveTest() throws Throwable
464464

465465
PartitionKey partitionKey = new PartitionKey(tableId, dk);
466466

467-
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(PreLoadContext.contextFor(partitionKey.toUnseekable()),
467+
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
468468
partitionKey.toUnseekable(), moveMax, moveMax,
469469
safeStore -> {
470470
if (!safeStore.ranges().allAt(preMove).contains(partitionKey))

test/distributed/org/apache/cassandra/distributed/test/accord/AccordDropTableBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import accord.api.RoutingKey;
2424
import accord.local.CommandStores;
25-
import accord.local.KeyHistory;
25+
import accord.local.LoadKeys;
2626
import accord.local.PreLoadContext;
2727
import accord.local.cfk.CommandsForKey;
2828
import accord.primitives.Ranges;
@@ -43,6 +43,7 @@
4343
import org.apache.cassandra.service.accord.TokenRange;
4444
import org.assertj.core.api.Assertions;
4545

46+
import static accord.local.LoadKeysFor.READ_WRITE;
4647
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
4748

4849
public class AccordDropTableBase extends TestBaseImpl
@@ -128,7 +129,7 @@ protected static void validateAccord(Cluster cluster, TableId id)
128129
inst.runOnInstance(() -> {
129130
TableId tableId = TableId.fromString(s);
130131
AccordService accord = (AccordService) AccordService.instance();
131-
PreLoadContext ctx = PreLoadContext.contextFor(Ranges.single(TokenRange.fullRange(tableId, getPartitioner())), KeyHistory.SYNC);
132+
PreLoadContext ctx = PreLoadContext.contextFor(Ranges.single(TokenRange.fullRange(tableId, getPartitioner())), LoadKeys.SYNC, READ_WRITE, "Test");
132133
CommandStores stores = accord.node().commandStores();
133134
for (int storeId : stores.ids())
134135
{

test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@
7272
import org.apache.cassandra.utils.concurrent.Future;
7373
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
7474

75-
import static accord.local.KeyHistory.SYNC;
75+
import static accord.local.LoadKeys.SYNC;
76+
import static accord.local.LoadKeysFor.READ_WRITE;
7677
import static java.lang.String.format;
7778

7879
public class AccordIncrementalRepairTest extends AccordTestBase
@@ -214,7 +215,7 @@ private static TxnId awaitLocalApplyOnKey(TokenKey key)
214215
{
215216
Node node = accordService().node();
216217
AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
217-
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
218+
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
218219
AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
219220
SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key);
220221
if (safeCfk == null)
@@ -236,7 +237,7 @@ private static TxnId awaitLocalApplyOnKey(TokenKey key)
236237
long now = Clock.Global.currentTimeMillis();
237238
if (now - start > TimeUnit.MINUTES.toMillis(1))
238239
throw new AssertionError("Timeout");
239-
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(txnId, key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
240+
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
240241
SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId));
241242
Assert.assertNotNull(command.current());
242243
if (command.current().status().hasBeen(Status.Applied))

test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
import org.apache.cassandra.service.accord.api.TokenKey;
8888
import org.apache.cassandra.utils.FBUtilities;
8989

90-
import static accord.local.KeyHistory.SYNC;
90+
import static accord.local.LoadKeys.SYNC;
91+
import static accord.local.LoadKeysFor.READ_WRITE;
9192
import static accord.local.PreLoadContext.contextFor;
9293
import static accord.local.RedundantStatus.SomeStatus.GC_BEFORE_AND_LOCALLY_APPLIED;
9394
import static accord.primitives.Routable.Domain.Range;
@@ -313,28 +314,28 @@ private void testWithCommandStoreInternal(TestWithCommandStore test, boolean add
313314
PartialDeps partialDeps = Deps.NONE.intersecting(AccordTestUtils.fullRange(txn));
314315
PartialTxn partialTxn = txn.slice(commandStore.unsafeGetRangesForEpoch().currentRanges(), true);
315316
Route<?> partialRoute = route.slice(commandStore.unsafeGetRangesForEpoch().currentRanges());
316-
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC), safe -> {
317+
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
317318
CheckedCommands.preaccept(safe, txnId, partialTxn, route, (a, b) -> {});
318319
}).beginAsResult());
319320
flush(commandStore);
320-
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC), safe -> {
321+
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
321322
CheckedCommands.accept(safe, txnId, Ballot.ZERO, partialRoute, txnId, partialDeps, (a, b) -> {});
322323
}).beginAsResult());
323324
flush(commandStore);
324-
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC), safe -> {
325+
getUninterruptibly(commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
325326
CheckedCommands.commit(safe, SaveStatus.Stable, Ballot.ZERO, txnId, route, partialTxn, txnId, partialDeps, (a, b) -> {});
326327
}).beginAsResult());
327328
flush(commandStore);
328-
getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC), safe -> {
329+
getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
329330
return AccordTestUtils.processTxnResultDirect(safe, txnId, partialTxn, txnId);
330-
}).flatMap(i -> i).flatMap(result -> commandStore.execute(contextFor(txnId, route, SYNC), safe -> {
331+
}).flatMap(i -> i).flatMap(result -> commandStore.execute(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
331332
CheckedCommands.apply(safe, txnId, route, txnId, partialDeps, partialTxn, result.left, result.right, (a, b) -> {});
332333
})));
333334
flush(commandStore);
334335
// The apply chain is asychronous, so it is easiest to just spin until it is applied
335336
// in order to have the updated state in the system table
336337
spinAssertEquals(true, 5, () -> {
337-
return getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC), safe -> {
338+
return getUninterruptibly(commandStore.submit(contextFor(txnId, route, SYNC, READ_WRITE, "Test"), safe -> {
338339
StoreParticipants participants = StoreParticipants.all(route);
339340
Command command = safe.get(txnId, participants).current();
340341
return command.hasBeen(Status.Applied);

test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.slf4j.LoggerFactory;
3333

3434
import accord.api.ProtocolModifiers;
35+
import accord.local.PreLoadContext;
3536
import accord.messages.TxnRequest;
3637
import accord.primitives.Ranges;
3738
import accord.primitives.Routable;
@@ -193,7 +194,7 @@ public void redundantBefore() throws ExecutionException, InterruptedException
193194
TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId());
194195
Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100))));
195196
Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200))));
196-
AsyncChains.getBlocking(accord.node().commandStores().forEach(safeStore -> {
197+
AsyncChains.getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> {
197198
safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, Status.Durability.Universal);
198199
safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, Status.Durability.Majority);
199200
}));

0 commit comments

Comments
 (0)