Skip to content

Commit 0b20128

Browse files
committed
Split AsyncChain and AsyncResult; normalise AsyncResult with C* Future
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20906
1 parent c7de33c commit 0b20128

39 files changed

+752
-336
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
3-
url = https://github.com/belliottsmith/cassandra-accord.git
4-
branch = 20905-ssverifier
3+
url = https://github.com/apache/cassandra-accord.git
4+
branch = trunk

modules/accord

Submodule accord updated 80 files

src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public void finished()
229229
long deadlineNanos = startedAtNanos + timeoutNanos;
230230
// TODO (expected): use the source bounds for the streams to avoid waiting unnecessarily long
231231
AccordService.getBlocking(accordService.maxConflict(accordRanges)
232-
.flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS))
232+
.flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS).chain())
233233
, accordRanges, new LatencyRequestBookkeeping(cfs.metric.accordPostStreamRepair), startedAtNanos, deadlineNanos);
234234
}
235235

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@
146146
import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
147147
import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
148148
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
149-
import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
150149
import static com.google.common.collect.ImmutableList.toImmutableList;
151150
import static java.lang.String.format;
152151
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -368,14 +367,14 @@ public Partition getPartition(DecoratedKey partitionKey)
368367
List<Entry> cfks = new CopyOnWriteArrayList<>();
369368
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
370369
CommandStores commandStores = AccordService.instance().node().commandStores();
371-
getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
370+
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
372371
SafeCommandsForKey safeCfk = safeStore.get(key);
373372
CommandsForKey cfk = safeCfk.current();
374373
if (cfk == null)
375374
return;
376375

377376
cfks.add(new Entry(safeStore.commandStore().id(), cfk));
378-
}).beginAsResult());
377+
}));
379378

380379
if (cfks.isEmpty())
381380
return null;
@@ -478,7 +477,7 @@ public Partition getPartition(DecoratedKey partitionKey)
478477
List<Entry> cfks = new CopyOnWriteArrayList<>();
479478
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
480479
CommandStores commandStores = AccordService.instance().node().commandStores();
481-
getBlockingAndRethrow(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
480+
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
482481
SafeCommandsForKey safeCfk = safeStore.get(key);
483482
CommandsForKey cfk = safeCfk.current();
484483
if (cfk == null)
@@ -1469,7 +1468,7 @@ private void runWithRoute(TxnId txnId, int commandStoreId, Function<Command, BiC
14691468
{
14701469
consumer.accept(command.route(), result);
14711470
}
1472-
return result;
1471+
return result.chain();
14731472
});
14741473
}
14751474

@@ -1504,12 +1503,10 @@ private void recover(TxnId txnId, @Nullable Route<?> route, AsyncResult.Settable
15041503
private void run(TxnId txnId, int commandStoreId, Function<SafeCommandStore, AsyncChain<Void>> apply)
15051504
{
15061505
AccordService accord = (AccordService) AccordService.instance();
1507-
AsyncChains.awaitUninterruptibly(accord.node()
1508-
.commandStores()
1509-
.forId(commandStoreId)
1510-
.submit(PreLoadContext.contextFor(txnId, TXN_OPS), apply)
1511-
.flatMap(i -> i)
1512-
.beginAsResult());
1506+
AccordService.getBlocking(accord.node()
1507+
.commandStores()
1508+
.forId(commandStoreId)
1509+
.chain(PreLoadContext.contextFor(txnId, TXN_OPS), apply));
15131510
}
15141511

15151512
private void cleanup(TxnId txnId, int commandStoreId, Cleanup cleanup)

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.NavigableMap;
2323
import java.util.Objects;
2424
import java.util.concurrent.Callable;
25-
import java.util.concurrent.Executor;
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.concurrent.atomic.AtomicLong;
2827
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -38,6 +37,7 @@
3837
import org.slf4j.LoggerFactory;
3938

4039
import accord.api.Agent;
40+
import accord.api.AsyncExecutor;
4141
import accord.api.DataStore;
4242
import accord.api.Journal;
4343
import accord.api.LocalListeners;
@@ -247,7 +247,7 @@ public AccordExecutor executor()
247247
// TODO (desired): we use this for executing callbacks with mutual exclusivity,
248248
// but we don't need to block the actual CommandStore - could quite easily
249249
// inflate a separate queue dynamically in AccordExecutor
250-
public Executor taskExecutor()
250+
public AsyncExecutor taskExecutor()
251251
{
252252
return exclusiveExecutor;
253253
}
@@ -336,25 +336,30 @@ public long nextSystemTimestampMicros()
336336
return lastSystemTimestampMicros;
337337
}
338338
@Override
339-
public <T> AsyncChain<T> build(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
339+
public <T> AsyncChain<T> chain(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
340340
{
341341
return AccordTask.create(this, loadCtx, function).chain();
342342
}
343343

344344
@Override
345-
public <T> AsyncChain<T> build(Callable<T> task)
345+
public AsyncChain<Void> chain(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
346346
{
347-
return AsyncChains.ofCallable(taskExecutor(), task);
347+
return AccordTask.create(this, preLoadContext, consumer).chain();
348348
}
349349

350350
@Override
351-
public AsyncChain<Void> build(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
351+
public <T> AsyncChain<T> chain(Callable<T> call)
352352
{
353-
return AccordTask.create(this, preLoadContext, consumer).chain();
353+
return taskExecutor().chain(call);
354+
}
355+
356+
@Override
357+
public void execute(Runnable run)
358+
{
359+
taskExecutor().execute(run);
354360
}
355361

356-
public AccordSafeCommandStore begin(AccordTask<?> operation,
357-
@Nullable CommandsForRanges commandsForRanges)
362+
public AccordSafeCommandStore begin(AccordTask<?> operation, @Nullable CommandsForRanges commandsForRanges)
358363
{
359364
require(current == null);
360365
current = AccordSafeCommandStore.create(operation, commandsForRanges, this);
@@ -495,7 +500,7 @@ class Ready extends CountingResult implements Runnable
495500
}
496501
}
497502

498-
ready.begin((success, fail) -> {
503+
ready.invoke((success, fail) -> {
499504
if (fail != null)
500505
{
501506
logger.error("{}: failed to ensure durability of {} ({})", this, ranges, reportId, fail);
@@ -541,7 +546,7 @@ public AsyncChain<Route> replay(TxnId txnId)
541546
if (onlyNonDurable && !maybeShouldReplay(txnId))
542547
return AsyncChains.success(null);
543548

544-
return store.submit(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
549+
return store.chain(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
545550
if (onlyNonDurable && !shouldReplay(txnId, safeStore.unsafeGet(txnId).current().participants()))
546551
return null;
547552

src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ public long maxEpoch()
652652
public Future<Void> unsafeLocalSyncNotified(long epoch)
653653
{
654654
AsyncPromise<Void> promise = new AsyncPromise<>();
655-
getOrCreateEpochState(epoch).localSyncNotified().begin((result, failure) -> {
655+
getOrCreateEpochState(epoch).localSyncNotified().invoke((result, failure) -> {
656656
if (failure != null) promise.tryFailure(failure);
657657
else promise.trySuccess(result);
658658
});

0 commit comments

Comments
 (0)