Skip to content

Commit 8bd9d69

Browse files
committed
Facilitate queueing, execution and performance improvements
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20069
1 parent 4ec8d26 commit 8bd9d69

File tree

96 files changed

+1455
-818
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+1455
-818
lines changed

accord-core/src/main/java/accord/api/Agent.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import accord.primitives.TxnId;
3333
import accord.topology.Topologies;
3434

35+
import static accord.utils.Invariants.illegalState;
36+
3537
/**
3638
* Facility for augmenting node behaviour at specific points
3739
*
@@ -122,8 +124,6 @@ default EventsListener metricsEventsListener()
122124
*/
123125
default Topologies selectPreferred(Node.Id from, Topologies to) { return to; }
124126

125-
long replyTimeout(ReplyContext replyContext, TimeUnit units);
126-
127127
/**
128128
* This method permits implementations to configure the time at which a local home shard will attempt
129129
* to coordinate a transaction to completion.
@@ -148,4 +148,8 @@ default EventsListener metricsEventsListener()
148148
* and re-query the local state.
149149
*/
150150
long retryAwaitTimeout(Node node, SafeCommandStore safeStore, TxnId txnId, int retryCount, BlockedUntil retrying, TimeUnit units);
151+
152+
long expiresAt(ReplyContext replyContext, TimeUnit unit);
153+
154+
default void onViolation(String message) { throw illegalState(message); }
151155
}

accord-core/src/main/java/accord/api/ConfigurationService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ class EpochReady
7474
public final AsyncResult<Void> metadata;
7575

7676
/**
77-
* The node has retrieved enough remote information to become a coordinator for the new epoch.
77+
* The node has retrieved enough remote information to become a fast path coordinator for the new epoch.
7878
*/
79-
public final AsyncResult<Void> coordination;
79+
public final AsyncResult<Void> fastPath;
8080

8181
/**
8282
* The node has successfully replicated the underlying DataStore information for the new epoch, but may need
@@ -90,11 +90,11 @@ class EpochReady
9090
*/
9191
public final AsyncResult<Void> reads;
9292

93-
public EpochReady(long epoch, AsyncResult<Void> metadata, AsyncResult<Void> coordination, AsyncResult<Void> data, AsyncResult<Void> reads)
93+
public EpochReady(long epoch, AsyncResult<Void> metadata, AsyncResult<Void> fastPath, AsyncResult<Void> data, AsyncResult<Void> reads)
9494
{
9595
this.epoch = epoch;
9696
this.metadata = metadata;
97-
this.coordination = coordination;
97+
this.fastPath = fastPath;
9898
this.data = data;
9999
this.reads = reads;
100100
}

accord-core/src/main/java/accord/api/RequestTimeouts.java renamed to accord-core/src/main/java/accord/api/Timeouts.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@
2020

2121
import java.util.concurrent.TimeUnit;
2222

23-
public interface RequestTimeouts
24-
{
25-
interface RegisteredTimeout
26-
{
27-
void cancel();
28-
}
23+
import accord.utils.async.Cancellable;
2924

25+
public interface Timeouts
26+
{
3027
interface Timeout
3128
{
3229
void timeout();
3330
int stripe();
3431
}
3532

36-
RegisteredTimeout register(Timeout timeout, long delay, TimeUnit units);
33+
interface RegisteredTimeout extends Cancellable
34+
{
35+
void cancel();
36+
}
37+
38+
RegisteredTimeout registerWithDelay(Timeout timeout, long delay, TimeUnit units);
39+
RegisteredTimeout registerAt(Timeout timeout, long deadline, TimeUnit units);
3740
void maybeNotify();
3841
}

accord-core/src/main/java/accord/coordinate/AbstractCoordinatePreAccept.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ final void onPreAcceptedOrNewEpoch()
208208
{
209209
long latestEpoch = executeAtEpoch();
210210
if (latestEpoch > topologies.currentEpoch()) node.withEpoch(latestEpoch, node.agent(), () -> onPreAcceptedInNewEpoch(topologies, latestEpoch));
211-
else onPreAcceptedInNewEpoch(topologies, latestEpoch);
211+
else onPreAccepted(topologies);
212212
return;
213213
}
214214

accord-core/src/main/java/accord/coordinate/Barrier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ private ExistingTransactionCheck checkForExistingTransaction()
233233
ExistingTransactionCheck check = new ExistingTransactionCheck();
234234
RoutingKey k = route.get(0).asRoutingKey();
235235
node.commandStores().mapReduceConsume(
236-
contextFor(k, KeyHistory.COMMANDS),
236+
contextFor(k, KeyHistory.SYNC),
237237
k.toUnseekable(),
238238
minEpoch,
239239
Long.MAX_VALUE,

accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
*/
5353
abstract class CoordinatePreAccept<T> extends AbstractCoordinatePreAccept<T, PreAcceptReply>
5454
{
55-
// TODO (required): do not use FastPathTracker for coordinating exclusive sync points - these always take the slow path
5655
final PreAcceptTracker<?> tracker;
5756
// TODO (expected): this can be cleared after preaccept
5857
// TODO (expected): back by SortedListMap; must handle additional preaccepts (but this is no longer ordinarily enabled)

accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ protected Action process(Id from, ReadReply reply)
9898
if (ok.futureEpoch > allTopologies.currentEpoch())
9999
{
100100
// TODO (expected): only submit new requests for the keys that execute in a later epoch
101-
new ExecuteEphemeralRead(node, node.topology().preciseEpochs(route, ok.futureEpoch, ok.futureEpoch), route, txnId, txn, ok.futureEpoch, deps, callback).start();
101+
new ExecuteEphemeralRead(node, node.topology().preciseEpochs(route, ok.futureEpoch, ok.futureEpoch), route, txnId.withEpoch(ok.futureEpoch), txn, ok.futureEpoch, deps, callback).start();
102102
return Aborted;
103103
}
104104

accord-core/src/main/java/accord/coordinate/ExecuteTxn.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
5454
{
5555
@SuppressWarnings("unused")
5656
private static final Logger logger = LoggerFactory.getLogger(ExecuteTxn.class);
57+
private static boolean SEND_MINIMUM_STABLE_MESSAGES = true;
58+
public static void setSendMinimumStableMessages(boolean sendMin) {SEND_MINIMUM_STABLE_MESSAGES = sendMin; }
5759

5860
final ExecutePath path;
5961
final Txn txn;
@@ -85,22 +87,25 @@ protected void start(Iterable<Id> to)
8587
{
8688
IntHashSet readSet = new IntHashSet();
8789
to.forEach(i -> readSet.add(i.id));
88-
Commit.Kind kind;
90+
Commit.stableAndRead(node, allTopologies, commitKind(), txnId, txn, route, readScope, executeAt, stableDeps, readSet, this, SEND_MINIMUM_STABLE_MESSAGES);
91+
}
92+
93+
private Commit.Kind commitKind()
94+
{
8995
switch (path)
9096
{
9197
default: throw new AssertionError("Unhandled path: " + path);
92-
case FAST: kind = StableFastPath; break;
93-
case SLOW: kind = StableSlowPath; break;
94-
case RECOVER: kind = StableWithTxnAndDeps; break;
98+
case FAST: return StableFastPath;
99+
case SLOW: return StableSlowPath;
100+
case RECOVER: return StableWithTxnAndDeps;
95101
}
96-
// we want to send to all topologies, but we only want to track responses from the readScope
97-
Commit.stableAndRead(node, allTopologies, kind, txnId, txn, route, readScope, executeAt, stableDeps, readSet, this);
98102
}
99103

100104
@Override
101105
public void contact(Id to)
102106
{
103-
node.send(to, new ReadTxnData(to, topologies(), txnId, readScope, executeAt.epoch()), this);
107+
if (SEND_MINIMUM_STABLE_MESSAGES) Commit.stableAndRead(to, node, allTopologies, commitKind(), txnId, txn, route, readScope, executeAt, stableDeps, this, SEND_MINIMUM_STABLE_MESSAGES);
108+
else node.send(to, new ReadTxnData(to, topologies(), txnId, readScope, executeAt.epoch()), this);
104109
}
105110

106111
@Override

accord-core/src/main/java/accord/coordinate/MaybeRecover.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ public class MaybeRecover extends CheckShards<Route<?>>
4747
this.callback = callback;
4848
}
4949

50-
public static void maybeRecover(Node node, TxnId txnId, Route<?> someRoute, ProgressToken prevProgress, BiConsumer<Outcome, Throwable> callback)
50+
public static Object maybeRecover(Node node, TxnId txnId, Route<?> someRoute, ProgressToken prevProgress, BiConsumer<Outcome, Throwable> callback)
5151
{
5252
MaybeRecover maybeRecover = new MaybeRecover(node, txnId, someRoute, prevProgress, callback);
5353
maybeRecover.start();
54+
return maybeRecover;
5455
}
5556

5657
@Override

accord-core/src/main/java/accord/coordinate/Recover.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ else if (failure instanceof Redundant)
140140
node.agent().metricsEventsListener().onPreempted(txnId);
141141
else if (failure instanceof Timeout)
142142
node.agent().metricsEventsListener().onTimeout(txnId);
143-
else if (failure instanceof Invalidated)
143+
else if (failure instanceof Invalidated) // TODO (expected): should we tick this counter? we haven't invalidated anything
144144
node.agent().metricsEventsListener().onInvalidated(txnId);
145145
}
146146

0 commit comments

Comments
 (0)