Skip to content

Commit 1200040

Browse files
committed
Follow-up to CASSANDRA-20906: Fix Simulator
1 parent 0b20128 commit 1200040

File tree

10 files changed

+24
-44
lines changed

10 files changed

+24
-44
lines changed

src/java/org/apache/cassandra/service/accord/AccordExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
import org.slf4j.LoggerFactory;
4141

4242
import accord.api.Agent;
43-
import accord.api.AsyncExecutor;
4443
import accord.api.RoutingKey;
44+
import accord.impl.AbstractAsyncExecutor;
4545
import accord.local.Command;
4646
import accord.local.PreLoadContext;
4747
import accord.local.SequentialAsyncExecutor;
@@ -95,7 +95,7 @@
9595
* NOTE: We assume that NO BLOCKING TASKS are submitted to this executor AND WAITED ON by another task executing on this executor.
9696
* (as we do not immediately schedule additional threads for submitted tasks, but schedule new threads only if necessary when the submitting execution completes)
9797
*/
98-
public abstract class AccordExecutor implements CacheSize, LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, AsyncExecutor
98+
public abstract class AccordExecutor implements CacheSize, LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, AbstractAsyncExecutor
9999
{
100100
private static final Logger logger = LoggerFactory.getLogger(AccordExecutor.class);
101101
public interface AccordExecutorFactory

src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,15 @@
1818

1919
package org.apache.cassandra.service.accord;
2020

21-
import java.util.concurrent.Callable;
22-
import java.util.function.BiConsumer;
23-
2421
import javax.annotation.Nonnull;
2522

26-
import accord.api.AsyncExecutor;
27-
import accord.utils.async.AsyncChain;
28-
import accord.utils.async.AsyncChains;
23+
import accord.impl.AbstractAsyncExecutor;
2924
import org.apache.cassandra.service.accord.api.AccordAgent;
3025

31-
public class ImmediateAsyncExecutor implements AsyncExecutor, BiConsumer<Object, Throwable>
26+
public class ImmediateAsyncExecutor implements AbstractAsyncExecutor
3227
{
3328
public static final ImmediateAsyncExecutor INSTANCE = new ImmediateAsyncExecutor();
3429

35-
@Override
36-
public <T> AsyncChain<T> chain(Callable<T> call)
37-
{
38-
try
39-
{
40-
return AsyncChains.success(call.call());
41-
}
42-
catch (Throwable t)
43-
{
44-
AccordAgent.handleUncaughtException(t);
45-
return AsyncChains.failure(t);
46-
}
47-
}
48-
4930
@Override
5031
public void execute(@Nonnull Runnable command)
5132
{
@@ -58,11 +39,4 @@ public void execute(@Nonnull Runnable command)
5839
AccordAgent.handleUncaughtException(t);
5940
}
6041
}
61-
62-
@Override
63-
public void accept(Object o, Throwable throwable)
64-
{
65-
if (throwable != null)
66-
AccordAgent.handleUncaughtException(throwable);
67-
}
6842
}

src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ private List<AsyncChain<Data>> keyReadChains(Txn.InMemory txn, Dispatcher.Reques
258258
}
259259

260260
Group group = Group.one(command);
261-
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
261+
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
262262
TxnData result = new TxnData();
263263
// Enforcing limits is redundant since we only have a group of size 1, but checking anyways
264264
// documents the requirement here
@@ -294,7 +294,7 @@ private List<AsyncChain<Data>> rangeReadChains(Txn.InMemory txn, Dispatcher.Requ
294294

295295
// TODO (required): To make migration work we need to validate that the range is all on Accord
296296

297-
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
297+
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
298298
TxnData result = new TxnData();
299299
try (PartitionIterator iterator = StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
300300
{
@@ -396,7 +396,7 @@ public void start()
396396

397397
private AsyncChain<Data> executeUnrecoverableRepairUpdate()
398398
{
399-
return AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
399+
return AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
400400
UnrecoverableRepairUpdate repairUpdate = (UnrecoverableRepairUpdate)txn.update();
401401
// TODO (expected): We should send the read in the same message as the commit. This requires refactor ReadData.Kind so that it doesn't specify the ordinal encoding
402402
// and can be extended similar to MessageType which allows additional types not from Accord to be added

src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp execu
283283
return AsyncChains.success(new LocalReadData(new ArrayList<>(), readCommand));
284284

285285
ReadCommand submit = readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()), nowInSeconds);
286-
return AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
286+
return AsyncChains.chain(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command));
287287
}
288288

289289
// This path can have a subrange we have never seen before provided by short read protection or read repair so we need to
@@ -298,7 +298,7 @@ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp execu
298298
continue;
299299
ReadCommand submit = TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command, intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
300300
TokenKey routingKey = ((TokenRange)r).start();
301-
chains.add(AsyncExecutor.chain(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
301+
chains.add(AsyncChains.chain(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command)));
302302
}
303303

304304
if (chains.isEmpty())

src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import accord.primitives.TxnId;
3636
import accord.topology.Topologies;
3737
import accord.utils.async.AsyncChain;
38+
import accord.utils.async.AsyncChains;
3839
import org.apache.cassandra.db.Mutation;
3940
import org.apache.cassandra.db.ReadRepairVerbHandler;
4041
import org.apache.cassandra.db.TypeSizes;
@@ -145,10 +146,10 @@ public ReadType kind()
145146
protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute)
146147
{
147148
// TODO (required): subtract unavailable ranges, either from read or from response (or on coordinator)
148-
return AsyncExecutor.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
149-
ReadRepairVerbHandler.instance.applyMutation(mutation);
150-
return Data.NOOP_DATA;
151-
});
149+
return AsyncChains.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
150+
ReadRepairVerbHandler.instance.applyMutation(mutation);
151+
return Data.NOOP_DATA;
152+
});
152153
}
153154

154155
@Override

src/java/org/apache/cassandra/utils/concurrent/Future.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* A Future that integrates several different (but equivalent) APIs used within Cassandra into a single concept,
4343
* integrating also with our {@link Awaitable} abstraction, to overall improve coherency and clarity in the codebase.
4444
*/
45-
@Shared(scope = SIMULATION, ancestors = INTERFACES)
45+
@Shared(scope = SIMULATION, ancestors = INTERFACES, members = INTERFACES)
4646
public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable, AsyncResult<V>
4747
{
4848
/**

test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,11 @@ private static void forEach(Consumer<Class<?>> forEach, SharedParams shared, Cla
13441344
return;
13451345

13461346
forEach.accept(cur);
1347+
{
1348+
Shared overrideShared = cur.getAnnotation(Shared.class);
1349+
if (overrideShared != null)
1350+
shared = new SharedParams(overrideShared);
1351+
}
13471352

13481353
switch (shared.ancestors)
13491354
{

test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
4444

4545
@SuppressWarnings("unused")
46-
@Shared(scope = SIMULATION, inner = INTERFACES)
46+
@Shared(scope = SIMULATION, ancestors = INTERFACES)
4747
public interface InterceptorOfGlobalMethods extends InterceptorOfSystemMethods, Closeable
4848
{
4949
Semaphore newSemaphore(int count);

test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.junit.Before;
3333
import org.junit.BeforeClass;
3434

35-
import accord.api.AsyncExecutor;
3635
import accord.api.RoutingKey;
3736
import accord.impl.SizeOfIntersectionSorter;
3837
import accord.local.Node;
@@ -57,6 +56,7 @@
5756
import accord.utils.Gen;
5857
import accord.utils.Gens;
5958
import accord.utils.Invariants;
59+
import accord.utils.async.AsyncChains;
6060
import accord.utils.async.AsyncResult;
6161
import org.apache.cassandra.ServerTestUtils;
6262
import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -286,7 +286,7 @@ protected static Pair<TxnId, AsyncResult<?>> assertBeginRecoveryAfterPreAcceptAs
286286
assertDeps(success.txnId, success.deps, cloneKeyConflicts, cloneRangeConflicts);
287287
return success;
288288
});
289-
var delay = preAcceptAsync.flatMap(ignore -> AsyncExecutor.chain(instance.unorderedScheduled, () -> {
289+
var delay = preAcceptAsync.flatMap(ignore -> AsyncChains.chain(instance.unorderedScheduled, () -> {
290290
Ballot ballot = Ballot.fromValues(instance.storeService.epoch(), instance.storeService.now(), nodeId);
291291
return new BeginRecovery(nodeId, new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId, null, false, txn, route, ballot);
292292
}).beginAsResult());

0 commit comments

Comments
 (0)