Skip to content

Commit e6cf213

Browse files
committed
Add testing of consensus live migration to simulator
Patch by Ariel Weisberg; Reviewed by Benedict Elliott Smith for CASSANDRA-20587
1 parent 54a9c40 commit e6cf213

File tree

106 files changed

+2221
-713
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

106 files changed

+2221
-713
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
3-
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
3+
url = https://github.com/aweisberg/cassandra-accord.git
4+
branch = 20587

ide/idea/workspace.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@
196196
-Dcassandra.test.messagingService.nonGracefulShutdown=true
197197
-Dcassandra.test.simulator.determinismcheck=strict
198198
-Dcassandra.test.sstableformatdevelopment=true
199+
-Dcassandra.test.accord.allow_test_modes=true
199200
-Dcassandra.tolerate_sstable_size=true
200201
-Dcassandra.use_nix_recursive_delete=true
201202
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables

src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.cassandra.db.Columns;
4242
import org.apache.cassandra.db.ConsistencyLevel;
4343
import org.apache.cassandra.db.DecoratedKey;
44+
import org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts;
4445
import org.apache.cassandra.db.RegularAndStaticColumns;
4546
import org.apache.cassandra.db.SinglePartitionReadCommand;
4647
import org.apache.cassandra.db.Slice;
@@ -248,12 +249,13 @@ public SinglePartitionReadCommand readCommand(long nowInSec)
248249
// that exists (has some live data) but has not static content. So we query the first live row of the partition.
249250
if (conditions.isEmpty())
250251
return SinglePartitionReadCommand.create(metadata,
251-
nowInSec,
252-
columnFilter,
253-
RowFilter.none(),
254-
DataLimits.cqlLimits(1),
255-
key,
256-
new ClusteringIndexSliceFilter(Slices.ALL, false));
252+
nowInSec,
253+
columnFilter,
254+
RowFilter.none(),
255+
DataLimits.cqlLimits(1),
256+
key,
257+
new ClusteringIndexSliceFilter(Slices.ALL, false),
258+
PotentialTxnConflicts.ALLOW);
257259

258260
ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(conditions.navigableKeySet(), false);
259261
return SinglePartitionReadCommand.create(metadata, nowInSec, key, columnFilter, filter);

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,12 @@ public void maybeValidateIndex()
488488
*/
489489
// iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
490490
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
491+
{
492+
return executeLocally(executionController, null);
493+
}
494+
495+
// ClusterMetadata is null on startup when there are local reads from system tables before it's initialized
496+
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, @Nullable ClusterMetadata cm)
491497
{
492498
long startTimeNanos = nanoTime();
493499

@@ -496,7 +502,7 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut
496502
{
497503
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
498504
if (!potentialTxnConflicts.allowed)
499-
ConsensusRequestRouter.validateSafeToReadNonTransactionally(this);
505+
ConsensusRequestRouter.validateSafeToReadNonTransactionally(this, cm);
500506
Index.QueryPlan indexQueryPlan = indexQueryPlan();
501507

502508
Index.Searcher searcher = null;

src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public static SinglePartitionReadCommand create(TableMetadata metadata,
221221
* @param limits the limits to use for the query.
222222
* @param partitionKey the partition key for the partition to query.
223223
* @param clusteringIndexFilter the clustering index filter to use for the query.
224+
* @param potentialTxnConflicts Whether to generate an error if this read could potentially conflict with a txn
224225
*
225226
* @return a newly created read command.
226227
*/

src/java/org/apache/cassandra/db/SystemKeyspace.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@
102102
import org.apache.cassandra.schema.UserFunctions;
103103
import org.apache.cassandra.schema.Views;
104104
import org.apache.cassandra.service.StorageService;
105+
import org.apache.cassandra.service.consensus.migration.ConsensusMigratedAt;
106+
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
105107
import org.apache.cassandra.service.paxos.Ballot;
106108
import org.apache.cassandra.service.paxos.Commit;
107109
import org.apache.cassandra.service.paxos.Commit.Accepted;
@@ -148,8 +150,6 @@
148150
import static org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION;
149151
import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT;
150152
import static org.apache.cassandra.gms.ApplicationState.TOKENS;
151-
import org.apache.cassandra.service.consensus.migration.ConsensusMigratedAt;
152-
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
153153
import static org.apache.cassandra.service.paxos.Commit.latest;
154154
import static org.apache.cassandra.service.snapshot.SnapshotOptions.systemSnapshot;
155155
import static org.apache.cassandra.utils.CassandraVersion.NULL_VERSION;
@@ -270,6 +270,7 @@ private SystemKeyspace()
270270
+ "row_key blob, "
271271
+ "cf_id UUID, "
272272
+ "consensus_migrated_at_epoch bigint, "
273+
+ "consensus_max_hlc bigint, "
273274
+ "consensus_target tinyint, "
274275
+ "PRIMARY KEY ((row_key), cf_id, consensus_migrated_at_epoch)) "
275276
+ "WITH CLUSTERING ORDER BY (cf_id ASC, consensus_migrated_at_epoch DESC)")
@@ -1628,13 +1629,13 @@ public static PaxosRepairHistory loadPaxosRepairHistory(String keyspace, String
16281629

16291630
public static void saveConsensusKeyMigrationState(ByteBuffer partitionKey, UUID cfId, ConsensusMigratedAt consensusMigratedAt)
16301631
{
1631-
String cql = "UPDATE system." + CONSENSUS_MIGRATION_STATE + " SET consensus_target = ? WHERE row_key = ? AND cf_id = ? AND consensus_migrated_at_epoch = ?";
1632-
executeInternal(cql, consensusMigratedAt.migratedAtTarget.value, partitionKey, cfId, consensusMigratedAt.migratedAtEpoch.getEpoch());
1632+
String cql = "UPDATE system." + CONSENSUS_MIGRATION_STATE + " SET consensus_target = ?, consensus_max_hlc = ? WHERE row_key = ? AND cf_id = ? AND consensus_migrated_at_epoch = ?";
1633+
executeInternal(cql, consensusMigratedAt.migratedAtTarget.value, consensusMigratedAt.maxHLC, partitionKey, cfId, consensusMigratedAt.migratedAtEpoch.getEpoch());
16331634
}
16341635

16351636
public static ConsensusMigratedAt loadConsensusKeyMigrationState(ByteBuffer partitionKey, UUID cfId)
16361637
{
1637-
String cql = "SELECT consensus_migrated_at_epoch, consensus_target FROM system." + CONSENSUS_MIGRATION_STATE + " WHERE row_key = ? AND cf_id = ? LIMIT 1";
1638+
String cql = "SELECT consensus_migrated_at_epoch, consensus_target, consensus_max_hlc FROM system." + CONSENSUS_MIGRATION_STATE + " WHERE row_key = ? AND cf_id = ? LIMIT 1";
16381639
UntypedResultSet results = executeInternal(cql, partitionKey, cfId);
16391640

16401641
if (results.isEmpty())
@@ -1644,7 +1645,8 @@ public static ConsensusMigratedAt loadConsensusKeyMigrationState(ByteBuffer part
16441645
// TODO Period won't be necessary eventually
16451646
Epoch migratedAtEpoch = Epoch.create(row.getLong("consensus_migrated_at_epoch"));
16461647
ConsensusMigrationTarget target = ConsensusMigrationTarget.fromValue(row.getByte("consensus_target"));
1647-
return new ConsensusMigratedAt(migratedAtEpoch, target);
1648+
long maxHLC = row.getLong("consensus_max_hlc");
1649+
return new ConsensusMigratedAt(migratedAtEpoch, maxHLC, target);
16481650
}
16491651

16501652
/**

src/java/org/apache/cassandra/exceptions/ReadFailureException.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,16 @@ protected ReadFailureException(String msg, ConsistencyLevel consistency, int rec
3939
super(ExceptionCode.READ_FAILURE, msg, consistency, received, blockFor, failureReasonByEndpoint);
4040
this.dataPresent = dataPresent;
4141
}
42+
43+
public ReadFailureException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint, Throwable cause)
44+
{
45+
super(ExceptionCode.READ_FAILURE, msg, consistency, received, blockFor, failureReasonByEndpoint, cause);
46+
this.dataPresent = dataPresent;
47+
}
48+
49+
public ReadFailureException(ReadFailureException rfe)
50+
{
51+
super(ExceptionCode.READ_FAILURE, rfe.getMessage(), rfe.consistency, rfe.received, rfe.blockFor, rfe.failureReasonByEndpoint, rfe);
52+
this.dataPresent = rfe.dataPresent;
53+
}
4254
}

src/java/org/apache/cassandra/exceptions/ReadTimeoutException.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,16 @@ public ReadTimeoutException(ConsistencyLevel consistency, int received, int bloc
3434
super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor, msg);
3535
this.dataPresent = dataPresent;
3636
}
37+
38+
public ReadTimeoutException(ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Throwable cause)
39+
{
40+
super(ExceptionCode.READ_TIMEOUT, consistency, received, blockFor, cause);
41+
this.dataPresent = dataPresent;
42+
}
43+
44+
public ReadTimeoutException(ReadFailureException rfe)
45+
{
46+
super(ExceptionCode.READ_TIMEOUT, rfe.consistency, rfe.received, rfe.blockFor, rfe);
47+
this.dataPresent = rfe.dataPresent;
48+
}
3749
}

src/java/org/apache/cassandra/exceptions/RequestFailureException.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ public RequestFailureException(ExceptionCode code, String msg, ConsistencyLevel
4444
this.failureReasonByEndpoint = failureReasonByEndpoint;
4545
}
4646

47+
public RequestFailureException(ExceptionCode code, String msg, ConsistencyLevel consistency, int received, int blockFor, Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint, Throwable cause)
48+
{
49+
super(code, buildErrorMessage(msg, failureReasonByEndpoint), cause);
50+
this.consistency = consistency;
51+
this.received = received;
52+
this.blockFor = blockFor;
53+
this.failureReasonByEndpoint = failureReasonByEndpoint;
54+
}
55+
4756
private static String buildErrorMessage(int received, Map<InetAddressAndPort, RequestFailureReason> failures)
4857
{
4958
return String.format("received %d responses and %d failures", received, failures.size());

0 commit comments

Comments
 (0)