diff --git a/.gitmodules b/.gitmodules index 616dacf610a7..510c7c19e633 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = rebootstrap-sq diff --git a/modules/accord b/modules/accord index 520818a004a8..5b35a301d160 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 520818a004a89217cf86efa6c8fa2968401968ec +Subproject commit 5b35a301d160ede05e69ac1907cd10a4df1799c1 diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index 87029aa94fd2..87c69688eb72 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -47,6 +47,7 @@ import accord.coordinate.Coordinations; import accord.coordinate.PrepareRecovery; import accord.coordinate.tracking.AbstractTracker; +import accord.primitives.RoutingKeys; import accord.utils.SortedListMap; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.EmptyIterators; @@ -74,8 +75,6 @@ import accord.local.CommandStores.LatentStoreSelector; import accord.local.Commands; import accord.local.DurableBefore; -import accord.local.LoadKeys; -import accord.local.LoadKeysFor; import accord.local.MaxConflicts; import accord.local.Node; import accord.local.PreLoadContext; @@ -143,8 +142,9 @@ import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT; import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED; import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED; +import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE; import static accord.local.RedundantStatus.Property.QUORUM_APPLIED; -import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.Property.UNREADY; import static accord.local.RedundantStatus.Property.SHARD_APPLIED; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.String.format; @@ -365,9 +365,8 @@ public Partition getPartition(DecoratedKey partitionKey) TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner()); List cfks = new CopyOnWriteArrayList<>(); - PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query"); CommandStores commandStores = AccordService.instance().node().commandStores(); - AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { + AccordService.getBlocking(commandStores.forEach("commands_for_key table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommandsForKey safeCfk = safeStore.get(key); CommandsForKey cfk = safeCfk.current(); if (cfk == null) @@ -475,9 +474,8 @@ public Partition getPartition(DecoratedKey partitionKey) TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner()); List cfks = new CopyOnWriteArrayList<>(); - PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query"); CommandStores commandStores = AccordService.instance().node().commandStores(); - AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { + AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommandsForKey safeCfk = safeStore.get(key); CommandsForKey cfk = safeCfk.current(); if (cfk == null) @@ -888,8 +886,9 @@ private RedundantBeforeTable() " locally_redundant 'TxnIdUtf8Type',\n" + " locally_synced 'TxnIdUtf8Type',\n" + " locally_witnessed 'TxnIdUtf8Type',\n" + - " pre_bootstrap 'TxnIdUtf8Type',\n" + - " stale_until_at_least 'TxnIdUtf8Type',\n" + + " log_unavailable 'TxnIdUtf8Type',\n" + + " unready 'TxnIdUtf8Type',\n" + + " stale_until 'TxnIdUtf8Type',\n" + " PRIMARY KEY (keyspace_name, table_name, table_id, command_store_id, token_start)" + ')', UTF8Type.instance)); } @@ -923,8 +922,9 @@ public DataSet data() .column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString()) .column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString()) .column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString()) - .column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString()) - .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); + .column("log_unavailable", entry.maxBound(LOG_UNAVAILABLE).toString()) + .column("unready", entry.maxBound(UNREADY).toString()) + .column("stale_until", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); return ds; }, dataSet, @@ -1188,7 +1188,7 @@ public Partition getPartition(DecoratedKey partitionKey) TxnId txnId = TxnId.parse(txnIdStr); List commands = new CopyOnWriteArrayList<>(); - AccordService.instance().node().commandStores().forEachCommandStore(store -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { Command command = ((AccordCommandStore)store).loadCommand(txnId); if (command != null) commands.add(new Entry(store.id(), command)); @@ -1293,7 +1293,7 @@ public Partition getPartition(DecoratedKey partitionKey) TxnId txnId = TxnId.parse(txnIdStr); List entries = new ArrayList<>(); - AccordService.instance().node().commandStores().forEachCommandStore(store -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { for (AccordJournal.DebugEntry e : ((AccordCommandStore)store).debugCommand(txnId)) entries.add(new Entry(store.id(), e.segment, e.position, e.builder)); }); diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java index a47fff2302b3..c9f813022537 100644 --- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java @@ -175,7 +175,7 @@ private synchronized void refreshHistograms() int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000); SnapshotBuilder builder = new SnapshotBuilder(); - service.node().commandStores().forEachCommandStore(commandStore -> { + service.node().commandStores().forAllUnsafe(commandStore -> { DefaultProgressLog.ImmutableView view = ((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView(); builder.progressLogActive += view.activeCount(); builder.progressLogSize.increment(view.size()); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index c297ab191297..27e50157a3b5 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -582,8 +582,7 @@ else if (range.intersects(toRepair)) { throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " + "but is not fully contained in one; this would lead to " + - "imprecise repair. keyspace: %s", toRepair.toString(), - range.toString(), keyspaceName)); + "imprecise repair. keyspace: %s", toRepair, range, keyspaceName)); } } if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) diff --git a/src/java/org/apache/cassandra/service/Rebuild.java b/src/java/org/apache/cassandra/service/Rebuild.java index c7d40f08bf5b..56ad1f47ba88 100644 --- a/src/java/org/apache/cassandra/service/Rebuild.java +++ b/src/java/org/apache/cassandra/service/Rebuild.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; @@ -160,7 +161,7 @@ else if (tokens == null) StreamResultFuture streamResult = streamer.fetchAsync(); - Future accordReady = AccordService.instance().epochReadyFor(metadata); + Future accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future ready = FutureCombiner.allOf(streamResult, accordReady); // wait for result diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8d861618a58f..84a61f6b2646 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -71,9 +71,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,6 +131,7 @@ import org.apache.cassandra.io.sstable.IScrubber; import org.apache.cassandra.io.sstable.IVerifier; import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; @@ -158,7 +156,9 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairCoordinator; +import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -183,6 +183,7 @@ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator; import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamState; @@ -206,9 +207,9 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.SingleNodeSequences; +import org.apache.cassandra.tcm.transformations.AlterTopology; import org.apache.cassandra.tcm.transformations.Assassinate; import org.apache.cassandra.tcm.transformations.CancelInProgressSequence; -import org.apache.cassandra.tcm.transformations.AlterTopology; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.Unregister; @@ -3145,6 +3146,29 @@ private FutureTask createRepairTask(final int cmd, final String keyspace return new FutureTask<>(task); } + public RepairCoordinator repairAccordKeyspace(String keyspace, Collection> ranges) + { + int cmd = nextRepairCommand.incrementAndGet(); + RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // parallelism + false, // primaryRange + false, // incremental + false, // trace + 5, // jobThreads + ranges, // ranges + true, // pullRepair + true, // forceRepair + PreviewKind.NONE, // previewKind + false, // optimiseStreams + true, // ignoreUnreplicatedKeyspaces + true, // repairData + false, // repairPaxos + true, // dontPurgeTombstones + false // repairAccord + ); + + return new RepairCoordinator(this, cmd, options, keyspace); + } + private void tryRepairPaxosForTopologyChange(String reason) { try diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index cbf1fe49dc64..68fcaa8e4aec 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -37,9 +37,9 @@ import org.apache.cassandra.cache.CacheSize; import org.apache.cassandra.config.AccordSpec.QueueShardModel; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory; -import org.apache.cassandra.service.accord.api.TokenKey; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeState; import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD; import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount; @@ -112,15 +112,6 @@ static Factory factory() }; } - @Override - protected boolean shouldBootstrap(Node node, Topology previous, Topology updated, Range range) - { - if (!super.shouldBootstrap(node, previous, updated, range)) - return false; - // we see new ranges when a new keyspace is added, so avoid bootstrap in these cases - return contains(previous, ((TokenKey) range.start()).table()); - } - @Override public SequentialAsyncExecutor someSequentialExecutor() { @@ -128,17 +119,6 @@ public SequentialAsyncExecutor someSequentialExecutor() return executors[idx].newSequentialExecutor(); } - private static boolean contains(Topology previous, TableId searchTable) - { - for (Range range : previous.ranges()) - { - TableId table = ((TokenKey) range.start()).table(); - if (table.equals(searchTable)) - return true; - } - return false; - } - public synchronized void setCapacity(long bytes) { cacheSize = bytes; diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 5dc0d19d0969..80e3296f5695 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -113,7 +113,7 @@ AsyncResult acknowledged() @Nullable AsyncResult reads() { - return reads; + return ready == null ? null : ready.reads; } AsyncResult.Settable localSyncNotified() @@ -449,7 +449,7 @@ public void reportTopology(Topology topology, boolean isLoad, boolean startSync) } @Override - protected void localSyncComplete(Topology topology, boolean startSync) + protected void onReadyToCoordinate(Topology topology, boolean startSync) { long epoch = topology.epoch(); EpochState epochState = getOrCreateEpochState(epoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java index 0234184a383b..7934aebf769f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java @@ -30,6 +30,7 @@ import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.SyncPoint; +import accord.utils.UnhandledEnum; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.schema.Schema; @@ -40,14 +41,6 @@ public class AccordDataStore implements DataStore private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class); enum FlushListenerKey { KEY } - @Override - public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback) - { - AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore()); - coordinator.start(); - return coordinator.result(); - } - /** * Ensures data for the intersecting ranges is flushed to sstable before calling back with reportOnSuccess. * This is used to gate journal cleanup, since we skip the CommitLog for applying to the data table. @@ -95,4 +88,23 @@ public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBef prev = cfs; } } + + @Override + public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind) + { + switch (kind) + { + default: throw new UnhandledEnum(kind); + case Image: + { + AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore()); + coordinator.start(); + return coordinator.result(); + } + case Sync: + { + throw new UnsupportedOperationException(); + } + } + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 045acea8dfab..cfa224ab4945 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -585,7 +585,7 @@ public void forEach(Consumer consumer) @SuppressWarnings("unchecked") @Override - public void replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores) { // TODO (expected): make the parallelisms configurable // Replay is performed in parallel, where at most X commands can be in flight, accross at most Y commands stores. @@ -716,6 +716,7 @@ public void close() ++cur; } + return true; } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 058b9171ac16..34b7dd28c4b0 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.metrics.AccordReplicaMetrics; import org.apache.cassandra.service.accord.api.AccordViolationHandler; import org.apache.cassandra.utils.Clock; @@ -327,7 +328,7 @@ public synchronized static AccordService startup(NodeId tcmId) } @VisibleForTesting - public static void replayJournal(AccordService as) + public static boolean replayJournal(AccordService as) { logger.info("Starting journal replay."); long before = Clock.Global.nanoTime(); @@ -337,12 +338,12 @@ public static void replayJournal(AccordService as) if (as.journalConfiguration().replayMode() == RESET) AccordKeyspace.truncateCommandsForKey(); - as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop()); + as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop()); as.journal().replay(as.node().commandStores()); logger.info("Waiting for command stores to quiesce."); ((AccordCommandStores)as.node.commandStores()).waitForQuiescense(); as.journal.unsafeSetStarted(); - as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start()); + as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start()); } finally { @@ -351,14 +352,7 @@ public static void replayJournal(AccordService as) long after = Clock.Global.nanoTime(); logger.info("Finished journal replay. {}ms elapsed", NANOSECONDS.toMillis(after - before)); - } - - public static void shutdownServiceAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException - { - IAccordService i = instance; - if (i == null) - return; - i.shutdownAndWait(timeout, unit); + return true; } @Override @@ -565,7 +559,7 @@ public AsyncChain sync(Timestamp minBound, Keys keys, DurabilityService.Sy if (keys.size() != 1) return syncInternal(minBound, keys, syncLocal, syncRemote); - return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote).chain() .flatMap(found -> KeyBarriers.await(node, node.someSequentialExecutor(), found, syncLocal, syncRemote)) .flatMap(success -> { if (success) @@ -799,8 +793,8 @@ class Ready extends AsyncResults.CountingResult implements Runnable } Ready ready = new Ready(); AccordCommandStores commandStores = (AccordCommandStores) node.commandStores(); - getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> { - AccordCommandStore commandStore = (AccordCommandStore)safeStore.commandStore(); + commandStores.forAllUnsafe(unsafeStore -> { + AccordCommandStore commandStore = (AccordCommandStore)unsafeStore; try (AccordCommandStore.ExclusiveCaches caches = commandStore.lockCaches()) { caches.commandsForKeys().forEach(entry -> { @@ -811,7 +805,7 @@ class Ready extends AsyncResults.CountingResult implements Runnable } }); } - })); + }); ready.decrement(); AsyncPromise result = new AsyncPromise<>(); ready.invoke((success, fail) -> { @@ -1037,18 +1031,18 @@ public AccordJournal journal() } @Override - public Future epochReady(Epoch epoch) + public Future epochReady(Epoch epoch, Function> get) { - return toFuture(configService.epochReady(epoch.getEpoch())); + return toFuture(configService.epochReady(epoch.getEpoch(), get)); } @Override - public Future epochReadyFor(ClusterMetadata metadata) + public Future epochReadyFor(ClusterMetadata metadata, Function> get) { if (!metadata.schema.hasAccordKeyspaces()) return EPOCH_READY; - return epochReady(metadata.epoch); + return epochReady(metadata.epoch, get); } @Override @@ -1116,7 +1110,7 @@ public AccordConfigurationService configService() public AccordCompactionInfos getCompactionInfo() { AccordCompactionInfos compactionInfos = new AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch()); - node.commandStores().forEachCommandStore(commandStore -> { + node.commandStores().forAllUnsafe(commandStore -> { compactionInfos.put(commandStore.id(), ((AccordCommandStore)commandStore).getCompactionInfo()); }); return compactionInfos; diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index ca817e647d57..8c60ecae5e3a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -35,6 +35,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import accord.api.ConfigurationService.EpochReady; import accord.local.Node; import accord.local.Node.Id; import accord.primitives.Ranges; @@ -399,7 +400,7 @@ public static void awaitTopologyReadiness(Keyspaces.KeyspacesDiff keyspacesDiff, { ClusterMetadataService.instance().fetchLogFromCMS(epoch); IAccordService service = AccordService.instance(); - service.epochReady(epoch).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS); + service.epochReady(epoch, EpochReady::reads).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS); } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 34453d22c30b..422155449bd9 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -25,9 +25,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import java.util.function.Function; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.api.ConfigurationService.EpochReady; import accord.utils.async.AsyncResult; import org.apache.cassandra.tcm.ClusterMetadata; import org.slf4j.Logger; @@ -127,9 +129,8 @@ interface IAccordResult * Return a future that will complete once the accord has completed it's local bootstrap process * for any ranges gained in the given epoch */ - Future epochReady(Epoch epoch); - - Future epochReadyFor(ClusterMetadata epoch); + Future epochReady(Epoch epoch, Function> f); + Future epochReadyFor(ClusterMetadata epoch, Function> f); void receive(Message message); @@ -308,13 +309,13 @@ public AccordScheduler scheduler() } @Override - public Future epochReady(Epoch epoch) + public Future epochReady(Epoch epoch, Function> get) { return BOOTSTRAP_SUCCESS; } @Override - public Future epochReadyFor(ClusterMetadata epoch) + public Future epochReadyFor(ClusterMetadata epoch, Function> get) { return BOOTSTRAP_SUCCESS; } @@ -515,15 +516,15 @@ public AccordScheduler scheduler() } @Override - public Future epochReady(Epoch epoch) + public Future epochReady(Epoch epoch, Function> get) { - return delegate.epochReady(epoch); + return delegate.epochReady(epoch, get); } @Override - public Future epochReadyFor(ClusterMetadata epoch) + public Future epochReadyFor(ClusterMetadata epoch, Function> get) { - return delegate.epochReadyFor(epoch); + return delegate.epochReadyFor(epoch, get); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index be7153fc2c0c..e09a565c4130 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -30,6 +30,7 @@ import accord.api.Agent; import accord.api.CoordinatorEventListener; +import accord.api.OwnershipEventListener; import accord.api.ReplicaEventListener; import accord.api.ProgressLog.BlockedUntil; import accord.api.RoutingKey; @@ -93,7 +94,7 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime; // TODO (expected): merge with AccordService -public class AccordAgent implements Agent +public class AccordAgent implements Agent, OwnershipEventListener { private static final Logger logger = LoggerFactory.getLogger(AccordAgent.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, MINUTES); @@ -125,6 +126,12 @@ public AccordTracing tracing() return tracing.trace(txnId, eventType); } + @Override + public OwnershipEventListener ownershipEvents() + { + return this; + } + public void setNodeId(Node.Id id) { self = id; diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java index 67ec0111a8fe..c40278932942 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java @@ -18,6 +18,8 @@ package org.apache.cassandra.service.accord.api; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +31,6 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; -import static accord.utils.Invariants.illegalState; - public class AccordViolationHandler implements ViolationHandler { private static final Logger logger = LoggerFactory.getLogger(AccordViolationHandler.class); @@ -40,13 +40,11 @@ public static void setup() ViolationHandlerHolder.set(AccordViolationHandler::new); } - @Override - public void onTimestampViolation(SafeCommandStore safeStore, Command command, Participants otherParticipants, Route otherRoute, Timestamp otherExecuteAt) + public void onTimestampViolation(@Nullable SafeCommandStore safeStore, Command command, Participants otherParticipants, @Nullable Route otherRoute, Timestamp otherExecuteAt) { - throw illegalState(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt)); + logger.error(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt)); } - @Override public void onDependencyViolation(Participants participants, TxnId notWitnessed, Timestamp notWitnessedExecuteAt, TxnId by, Timestamp byExecuteAt) { logger.error(ViolationHandler.dependencyViolationMessage(participants, notWitnessed, notWitnessedExecuteAt, by, byExecuteAt)); diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java index 5dcbb4552fc7..468f5ff41373 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java @@ -144,13 +144,13 @@ public AccordInteropStableThenRead(TxnId txnId, Participants scope, Commit.Ki } @Override - public CommitOrReadNack apply(SafeCommandStore safeStore) + public CommitOrReadNack applyInternal(SafeCommandStore safeStore) { Route route = this.route == null ? (Route)scope : this.route; StoreParticipants participants = StoreParticipants.execute(safeStore, route, txnId, minEpoch(), executeAtEpoch); SafeCommand safeCommand = safeStore.get(txnId, participants); Commands.commit(safeStore, safeCommand, participants, kind.saveStatus, Ballot.ZERO, txnId, route, partialTxn, executeAt, partialDeps, kind); - return super.apply(safeStore, safeCommand, participants); + return super.applyInternal(safeStore, safeCommand, participants); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java index cd967961424d..53391cd322c5 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java @@ -88,7 +88,7 @@ public void serialize(Accept.NotAccept invalidate, DataOutputPlus out) throws IO CommandSerializers.status.serialize(invalidate.status, out); CommandSerializers.ballot.serialize(invalidate.ballot, out); CommandSerializers.txnId.serialize(invalidate.txnId, out); - KeySerializers.participants.serialize(invalidate.participants, out); + KeySerializers.participants.serialize(invalidate.scope, out); } @Override @@ -106,7 +106,7 @@ public long serializedSize(Accept.NotAccept invalidate) return CommandSerializers.status.serializedSize(invalidate.status) + CommandSerializers.ballot.serializedSize(invalidate.ballot) + CommandSerializers.txnId.serializedSize(invalidate.txnId) - + KeySerializers.participants.serializedSize(invalidate.participants); + + KeySerializers.participants.serializedSize(invalidate.scope); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java index af67c05b0482..0e017184fb53 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java @@ -122,7 +122,7 @@ public long serializedSize(A await) public void serialize(AsyncAwaitComplete ok, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(ok.txnId, out); - KeySerializers.route.serialize(ok.route, out); + KeySerializers.route.serialize(ok.scope, out); out.writeByte(ok.newStatus.ordinal()); out.writeUnsignedVInt32(ok.callbackId); } @@ -141,7 +141,7 @@ public AsyncAwaitComplete deserialize(DataInputPlus in) throws IOException public long serializedSize(AsyncAwaitComplete ok) { return CommandSerializers.txnId.serializedSize(ok.txnId) - + KeySerializers.route.serializedSize(ok.route) + + KeySerializers.route.serializedSize(ok.scope) + TypeSizes.BYTE_SIZE + VIntCoding.computeVIntSize(ok.callbackId); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java index 25a40a6a418c..231c93287836 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java @@ -39,7 +39,7 @@ public class BeginInvalidationSerializers public void serialize(BeginInvalidation begin, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(begin.txnId, out); - KeySerializers.participants.serialize(begin.participants, out); + KeySerializers.participants.serialize(begin.scope, out); CommandSerializers.ballot.serialize(begin.ballot, out); } @@ -55,7 +55,7 @@ public BeginInvalidation deserialize(DataInputPlus in) throws IOException public long serializedSize(BeginInvalidation begin) { return CommandSerializers.txnId.serializedSize(begin.txnId) - + KeySerializers.participants.serializedSize(begin.participants) + + KeySerializers.participants.serializedSize(begin.scope) + CommandSerializers.ballot.serializedSize(begin.ballot); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java index d1f81512b975..9747d9f72f4c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java @@ -128,8 +128,8 @@ public long serializedSize(KnownMap knownMap) public void serialize(CheckStatus check, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(check.txnId, out); - KeySerializers.participants.serialize(check.query, out); - out.writeUnsignedVInt(check.sourceEpoch); + KeySerializers.participants.serialize(check.scope, out); + out.writeUnsignedVInt(check.waitForEpoch); out.writeByte(check.includeInfo.ordinal()); CommandSerializers.ballot.serialize(check.bumpBallot, out); } @@ -149,8 +149,8 @@ public CheckStatus deserialize(DataInputPlus in) throws IOException public long serializedSize(CheckStatus check) { return CommandSerializers.txnId.serializedSize(check.txnId) - + KeySerializers.participants.serializedSize(check.query) - + TypeSizes.sizeofUnsignedVInt(check.sourceEpoch) + + KeySerializers.participants.serializedSize(check.scope) + + TypeSizes.sizeofUnsignedVInt(check.waitForEpoch) + TypeSizes.BYTE_SIZE + CommandSerializers.ballot.serializedSize(check.bumpBallot); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java index a1cb244b13c6..a30ea5927056 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java @@ -155,11 +155,18 @@ public void serialize(RedundantBefore.Bounds b, DataOutputPlus out) throws IOExc } for (int i = 0 ; i < b.bounds.length ; ++i) { - out.writeShort(b.status(i * 2)); - out.writeShort(b.status(i * 2 + 1)); + out.writeShort(cast(b.status(i * 2))); + out.writeShort(cast(b.status(i * 2 + 1))); } } + private short cast(long v) + { + if ((v & ~0xFFFF) != 0) + throw new IllegalStateException("Cannot serialize RedundantStatus larger than 0xFFFF. Requires serialization version bump."); + return (short)v; + } + @Override public RedundantBefore.Bounds deserialize(DataInputPlus in) throws IOException { @@ -174,7 +181,7 @@ public RedundantBefore.Bounds deserialize(DataInputPlus in) throws IOException TxnId[] bounds = new TxnId[count]; for (int i = 0 ; i < bounds.length ; ++i) bounds[i] = CommandSerializers.txnId.deserialize(in); - short[] statuses = new short[count * 2]; + int[] statuses = new int[count * 2]; for (int i = 0 ; i < statuses.length ; ++i) statuses[i] = in.readShort(); diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java index 31d879ce7c87..2d2592eee820 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java @@ -52,9 +52,9 @@ public void serializeBody(Commit msg, DataOutputPlus out, Version version) throw kind.serialize(msg.kind, out); CommandSerializers.ballot.serialize(msg.ballot, out); ExecuteAtSerializer.serialize(msg.txnId, msg.executeAt, out); - CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn, out, version); + CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn(), out, version); if (msg.kind.withDeps == Commit.WithDeps.HasDeps) - DepsSerializers.partialDeps.serialize(msg.partialDeps, out); + DepsSerializers.partialDeps.serialize(msg.partialDeps(), out); serializeNullable(msg.route, out, KeySerializers.fullRoute); } @@ -78,10 +78,10 @@ public long serializedBodySize(Commit msg, Version version) long size = kind.serializedSize(msg.kind) + CommandSerializers.ballot.serializedSize(msg.ballot) + ExecuteAtSerializer.serializedSize(msg.txnId, msg.executeAt) - + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn, version); + + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn(), version); if (msg.kind.withDeps == Commit.WithDeps.HasDeps) - size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps); + size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps()); size += serializedNullableSize(msg.route, KeySerializers.fullRoute); return size; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java index 3eb2fa498454..b3f81bec281c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java @@ -56,7 +56,7 @@ public class RecoverySerializers static final int HAS_EXECUTE_AT_EPOCH = 0x2; static final int IS_FAST_PATH_DECIDED = 0x4; static final int SIZE_OF_FLAGS = VIntCoding.computeUnsignedVIntSize(HAS_ROUTE | HAS_EXECUTE_AT_EPOCH | IS_FAST_PATH_DECIDED); - public static final IVersionedSerializer request = new WithUnsyncedSerializer() + public static final IVersionedSerializer request = new WithUnsyncedSerializer<>() { @Override public void serializeBody(BeginRecovery recover, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java index fe2cbe26136b..37e5efdb8f13 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java @@ -20,14 +20,14 @@ import java.io.IOException; -import accord.messages.TxnRequest; +import accord.messages.RouteRequest; import accord.primitives.Route; import accord.primitives.TxnId; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -public abstract class TxnRequestSerializer> implements IVersionedSerializer +public abstract class TxnRequestSerializer> implements IVersionedSerializer { void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException { @@ -72,7 +72,7 @@ public final long serializedSize(T msg, Version version) return serializedHeaderSize(msg, version) + serializedBodySize(msg, version); } - public static abstract class WithUnsyncedSerializer> extends TxnRequestSerializer + public static abstract class WithUnsyncedSerializer> extends TxnRequestSerializer { @Override void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index 341eff845bcc..a40a42dded9c 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import com.googlecode.concurrenttrees.common.Iterables; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.ColumnFamilyStore; @@ -362,7 +363,7 @@ public static boolean bootstrap(final Collection tokens, StorageService.instance.repairPaxosForTopologyChange("bootstrap"); Future bootstrapStream = StorageService.instance.startBootstrap(metadata, beingReplaced, movements, strictMovements); - Future accordReady = AccordService.instance().epochReadyFor(metadata); + Future accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future ready = FutureCombiner.allOf(bootstrapStream, accordReady); try diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java index 68fc2471ac59..8a121fd2c2a2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java +++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.Keyspaces; @@ -168,7 +169,7 @@ private SequenceState awaitSafeFromAccord() throws ExecutionException, Interrupt return error(new IllegalStateException(String.format("Table %s is in an invalid state to be dropped", table))); long startNanos = nanoTime(); - AccordService.instance().epochReady(metadata.epoch).get(); + AccordService.instance().epochReady(metadata.epoch, EpochReady::reads).get(); long epochEndNanos = nanoTime(); // As of this writing this logic is based off ExclusiveSyncPoints which is a bit heavy weight for what is needed, this could cause timeouts for clusters that have a lot of data. diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index b54b796749fa..25058e20f2ae 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.TypeSizes; @@ -258,7 +259,7 @@ else if (destination.isSelf()) StreamResultFuture streamResult = streamPlan.execute(); - Future accordReady = AccordService.instance().epochReadyFor(metadata); + Future accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future ready = FutureCombiner.allOf(streamResult, accordReady); ready.get(); StorageService.instance.repairPaxosForTopologyChange("move"); diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index 51a551a19abd..a2d0429c3f66 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; +import accord.api.ConfigurationService.EpochReady; import org.agrona.collections.IntArrayList; import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.utils.FBUtilities; @@ -1698,7 +1699,7 @@ public static void awaitAccordEpochReady(Cluster cluster, long epoch) i.runOnInstance(() -> { try { - AccordService.instance().epochReady(Epoch.create(epoch)).get(); + AccordService.instance().epochReady(Epoch.create(epoch), EpochReady::reads).get(); } catch (InterruptedException | ExecutionException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java index 7d4320aa01a0..6d91a592379a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java @@ -44,7 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.messages.AbstractRequest; +import accord.messages.NoWaitRequest; import net.openhft.chronicle.core.util.SerializablePredicate; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.CQLTester; @@ -109,8 +109,8 @@ protected static class MessageCountingSink implements IMessageSink // This isn't perfect at excluding messages so make sure it excludes the ones you care about in your test public static final SerializablePredicate> EXCLUDE_SYNC_POINT_MESSAGES = message -> { - if (message.payload instanceof AbstractRequest) - return !((AbstractRequest)message.payload).txnId.isSyncPoint(); + if (message.payload instanceof NoWaitRequest) + return !((NoWaitRequest)message.payload).txnId.isSyncPoint(); return true; }; @@ -251,7 +251,7 @@ public static String batch(boolean logged, String... queries) return sb.toString(); } - protected void bootstrapAndJoinNode(Cluster cluster) + protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster) { IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); @@ -261,6 +261,7 @@ protected void bootstrapAndJoinNode(Cluster cluster) () -> newInstance.startup(cluster)); newInstance.nodetoolResult("join").asserts().success(); newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later + return newInstance; } @SuppressWarnings("unchecked") diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index d30c3d887a39..49632ed1535d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -24,13 +24,16 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import org.junit.Assert; import org.junit.Test; -import accord.local.PreLoadContext; +import accord.api.ConfigurationService.EpochReady; +import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.topology.TopologyManager; +import accord.utils.async.AsyncResult; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -41,14 +44,15 @@ import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordConfigurationService; -import org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot; import org.apache.cassandra.service.accord.AccordSafeCommandStore; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.api.PartitionKey; @@ -66,6 +70,8 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot.ResultStatus.SUCCESS; +import static org.apache.cassandra.service.accord.AccordConfigurationService.SyncStatus.COMPLETED; public class AccordBootstrapTest extends TestBaseImpl { @@ -81,7 +87,7 @@ private static PartitionKey pk(int key, String keyspace, String table) return new PartitionKey(tid, dk(key)); } - protected void bootstrapAndJoinNode(Cluster cluster) + protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster) { IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); @@ -94,6 +100,7 @@ protected void bootstrapAndJoinNode(Cluster cluster) // () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster))); // newInstance.nodetoolResult("join").asserts().success(); newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later + return newInstance; } private static AccordService service() @@ -101,11 +108,11 @@ private static AccordService service() return (AccordService) AccordService.instance(); } - private static void awaitEpoch(long epoch) + private static void awaitEpoch(long epoch, Function> await) { try { - boolean completed = service().epochReady(Epoch.create(epoch)).await(60, TimeUnit.SECONDS); + boolean completed = service().epochReady(Epoch.create(epoch), await).await(60, TimeUnit.SECONDS); Assertions.assertThat(completed) .describedAs("Epoch %s did not become ready within timeout on %s -> %s", epoch, FBUtilities.getBroadcastAddressAndPort(), @@ -168,6 +175,14 @@ public synchronized void forSession(Consumer consumer) @Test public void bootstrapTest() throws Throwable + { + bootstrapTest(Function.identity(), cluster -> { + bootstrapAndJoinNode(cluster); + awaitMaxEpochReadyToRead(cluster); + }); + } + + public void bootstrapTest(Function setup, Consumer bootstrapAndJoinNode) throws Throwable { int originalNodeCount = 2; int expandedNodeCount = originalNodeCount + 1; @@ -188,49 +203,10 @@ public void bootstrapTest() throws Throwable cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':2}"); cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, primary key(k, c)) WITH transactional_mode='full'"); - long initialMax = maxEpoch(cluster); - + awaitMaxEpochReadyToRead(cluster); for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch()); - awaitEpoch(initialMax); - AccordConfigurationService configService = service().configService(); - long minEpoch = configService.minEpoch(); - - Assert.assertEquals(initialMax, configService.maxEpoch()); - - for (long epoch = minEpoch; epoch < initialMax; epoch++) - { - awaitEpoch(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - - awaitLocalSyncNotification(initialMax); - Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax)); - }); - } - - for (IInvokableInstance node : cluster) - { node.runOnInstance(StreamListener::register); - } - - long schemaChangeMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(schemaChangeMax)); - awaitEpoch(schemaChangeMax); - AccordConfigurationService configService = service().configService(); - - for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++) - { - awaitLocalSyncNotification(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - }); - } + awaitMaxEpochReadyToRead(cluster); for (int key = 0; key < 100; key++) { @@ -251,21 +227,7 @@ public void bootstrapTest() throws Throwable }); } - bootstrapAndJoinNode(cluster); - long bootstrapMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(bootstrapMax)); - Assert.assertEquals(bootstrapMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(bootstrapMax); - AccordConfigurationService configService = service.configService(); - - awaitLocalSyncNotification(bootstrapMax); - Assert.assertEquals(EpochSnapshot.completed(bootstrapMax), configService.getEpochSnapshot(bootstrapMax)); - }); - } + bootstrapAndJoinNode.accept(cluster); InetAddress node3Addr = cluster.get(3).broadcastAddress().getAddress(); for (IInvokableInstance node : cluster.get(1, 2)) @@ -278,15 +240,11 @@ public void bootstrapTest() throws Throwable Assert.assertTrue(session.getNumKeyspaceTransfers() > 0); }); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> { - AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; - Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet())); - Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet())); -// -// Assert.assertTrue(commandStore.maxBootstrapEpoch() > 0); -// Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty()); -// Assert.assertTrue(commandStore.safeToRead().isEmpty()); - })); + service().node().commandStores().forAllUnsafe(unsafeStore -> { + AccordCommandStore ss = (AccordCommandStore) unsafeStore; + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetBootstrapBeganAt().keySet())); + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetSafeToRead().keySet())); + }); }); } @@ -321,7 +279,7 @@ public void bootstrapTest() throws Throwable Assert.assertEquals(key, row.getInt("c")); Assert.assertEquals(key, row.getInt("v")); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> { + getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { if (safeStore.ranges().currentRanges().contains(partitionKey)) { AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; @@ -375,44 +333,7 @@ public void moveTest() throws Throwable tokens[i] = cluster.get(i+1).callOnInstance(() -> Long.valueOf(getOnlyElement(StorageService.instance.getTokens()))); } - for (IInvokableInstance node : cluster) - { - - node.runOnInstance(() -> { - Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch()); - awaitEpoch(initialMax); - AccordConfigurationService configService = service().configService(); - long minEpoch = configService.minEpoch(); - - Assert.assertEquals(initialMax, configService.maxEpoch()); - - for (long epoch = minEpoch; epoch < initialMax; epoch++) - { - awaitEpoch(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - - awaitLocalSyncNotification(initialMax); - Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax)); - }); - } - - long schemaChangeMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - Assert.assertEquals(schemaChangeMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(schemaChangeMax); - AccordConfigurationService configService = service.configService(); - - for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++) - { - awaitLocalSyncNotification(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - }); - } + awaitMaxEpochReadyToRead(cluster); for (int key = 0; key < 100; key++) { @@ -431,20 +352,7 @@ public void moveTest() throws Throwable cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token))); - long moveMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(moveMax)); - Assert.assertEquals(moveMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(moveMax); - AccordConfigurationService configService = service.configService(); - - awaitLocalSyncNotification(moveMax); - Assert.assertEquals(EpochSnapshot.completed(moveMax), configService.getEpochSnapshot(moveMax)); - }); - } + long moveMax = awaitMaxEpochReadyToRead(cluster); for (IInvokableInstance node : cluster) { @@ -464,9 +372,7 @@ public void moveTest() throws Throwable PartitionKey partitionKey = new PartitionKey(tableId, dk); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", - partitionKey.toUnseekable(), moveMax, moveMax, - safeStore -> { + getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), moveMax, moveMax, safeStore -> { if (!safeStore.ranges().allAt(preMove).contains(partitionKey)) { AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; @@ -493,4 +399,41 @@ public void moveTest() throws Throwable } } } + + private static long awaitMaxEpochReadyToRead(Cluster cluster) + { + return awaitMaxEpoch(cluster, EpochReady::reads, true); + } + + private static long awaitMaxEpochMetadataReady(Cluster cluster) + { + return awaitMaxEpoch(cluster, EpochReady::metadata, false); + } + + private static long awaitMaxEpoch(Cluster cluster, SerializableFunction> await, boolean expectReadyToRead) + { + long maxEpoch = maxEpoch(cluster); + for (IInvokableInstance node : cluster) + { + node.acceptOnInstance(aw -> { + ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(maxEpoch)); + Assert.assertEquals(maxEpoch, ClusterMetadata.current().epoch.getEpoch()); + AccordService service = (AccordService) AccordService.instance(); + awaitEpoch(maxEpoch, aw); + AccordConfigurationService configService = service.configService(); + + awaitLocalSyncNotification(maxEpoch); + for (long epoch = configService.minEpoch(); epoch <= maxEpoch; epoch++) + { + Assert.assertEquals(COMPLETED, configService.getEpochSnapshot(maxEpoch).syncStatus); + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).acknowledged); + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).received); + if (expectReadyToRead) + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).reads); + } + }, node.transfer(await)); + } + return maxEpoch; + } + } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java index a5a38edadf74..bbb8a7f4cb4d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import accord.local.Node; -import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.StoreParticipants; import accord.local.cfk.CommandsForKey; @@ -32,6 +31,7 @@ import accord.local.durability.DurabilityService; import accord.primitives.Keys; import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -68,8 +68,6 @@ import org.slf4j.LoggerFactory; -import static accord.local.LoadKeys.SYNC; -import static accord.local.LoadKeysFor.READ_WRITE; import static java.lang.String.format; import static org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry; import static org.apache.cassandra.service.accord.AccordService.getBlocking; @@ -158,7 +156,7 @@ private static void withCluster(ModelChecker.ThrowingConsumer run) thro { cluster.filters().reset(); for (IInvokableInstance instance : cluster) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start())); } } } @@ -207,7 +205,7 @@ private static TxnId awaitLocalApplyOnKey(TokenKey key) { Node node = accordService().node(); AtomicReference waitFor = new AtomicReference<>(null); - getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore; SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key); if (safeCfk == null) @@ -229,7 +227,7 @@ private static TxnId awaitLocalApplyOnKey(TokenKey key) long now = Clock.Global.currentTimeMillis(); if (now - start > TimeUnit.MINUTES.toMillis(1)) throw new AssertionError("Timeout"); - AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId)); Assert.assertNotNull(command.current()); if (command.current().status().hasBeen(Status.Applied)) @@ -291,7 +289,7 @@ public void txnRepairTest(Cluster cluster) throws Throwable // heal partition and wait for node 1 to see node 3 again for (IInvokableInstance instance : cluster) instance.runOnInstance(() -> { - AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop()); + AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop()); Assert.assertFalse(barrierRecordingService().executedBarriers); }); cluster.filters().reset(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java index c2b8f1e48618..a7354e17b7cf 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java @@ -125,6 +125,6 @@ public void testLostCommitReadTriggersFallbackRead() throws Exception private void pauseSimpleProgressLog() { for (IInvokableInstance instance : SHARED_CLUSTER) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop())); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index b4c53b6159b9..cb91e9a160e1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -166,7 +166,7 @@ public void tearDown() throws Exception { SHARED_CLUSTER.filters().reset(); for (IInvokableInstance instance : SHARED_CLUSTER) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start())); truncateSystemTables(); diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java index 1e311bf8a541..b6aeb6231d56 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java @@ -150,9 +150,9 @@ public String get() // Command Stores should not be lost on bounce Map> before = cluster.get(1).callOnInstance(() -> { Map> m = new HashMap<>(); - AccordService.instance().node().commandStores().forEach((store, ranges) -> { + AccordService.instance().node().commandStores().forAllUnsafe((store) -> { Set set = new HashSet<>(); - for (Range range : ranges.all()) + for (Range range : store.unsafeGetRangesForEpoch().all()) set.add(range.toString()); m.put(store.id(), set); }); @@ -169,9 +169,9 @@ public String get() Map> after = cluster.get(1).callOnInstance(() -> { Map> m = new HashMap<>(); - AccordService.instance().node().commandStores().forEach((store, ranges) -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { Set set = new HashSet<>(); - for (Range range : ranges.all()) + for (Range range : store.unsafeGetRangesForEpoch().all()) set.add(range.toString()); m.put(store.id(), set); }); diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index 25b7b952c888..42db56fb9494 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -354,11 +354,11 @@ private TreeMap read(CommandStores commandStores) } @Override - public void replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores) { // Make sure to replay _only_ static segments this.closeCurrentSegmentForTestingIfNonEmpty(); - super.replay(commandStores); + return super.replay(commandStores); } @Override @@ -388,7 +388,7 @@ public PersistentField.Persister durableBeforePers public static IAccordService.AccordCompactionInfos getCompactionInfo(Node node, TableId tableId) { IAccordService.AccordCompactionInfos compactionInfos = new IAccordService.AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch()); - node.commandStores().forEachCommandStore(commandStore -> { + node.commandStores().forAllUnsafe(commandStore -> { RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore(); if (redundantBefore == null) redundantBefore = RedundantBefore.EMPTY; diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 7f7e44fd9f64..024e343ed7f7 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -34,8 +34,7 @@ import org.slf4j.LoggerFactory; import accord.api.ProtocolModifiers; -import accord.local.PreLoadContext; -import accord.messages.TxnRequest; +import accord.messages.NoWaitRequest; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.SaveStatus; @@ -212,7 +211,7 @@ public void redundantBefore() throws ExecutionException, InterruptedException TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId()); Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100)))); Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200)))); - getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> { + getBlocking(accord.node().commandStores().forAll("Test", safeStore -> { safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, HasOutcome.Universal); safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, HasOutcome.Quorum); })); @@ -436,9 +435,9 @@ public boolean test(Message msg, InetAddressAndPort to) if (!msg.verb().name().startsWith("ACCORD_")) return true; TxnId txnId = null; - if (msg.payload instanceof TxnRequest) + if (msg.payload instanceof NoWaitRequest) { - txnId = ((TxnRequest) msg.payload).txnId; + txnId = ((NoWaitRequest) msg.payload).txnId; if (applyTo != null && !applyTo.contains(txnId)) return true; } diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index d0c2d87f1eba..b467174c42be 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -30,6 +30,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; + +import accord.api.ConfigurationService.EpochReady; import accord.api.Journal; import accord.api.RoutingKey; import accord.local.CommandStores; @@ -238,7 +240,7 @@ private InsertTxn(int storeId, TxnId txnId, SaveStatus saveStatus, Route rout this.storeId = storeId; this.txnId = txnId; this.saveStatus = saveStatus; - this.participants = StoreParticipants.all(route); + this.participants = StoreParticipants.all(route, saveStatus); } @Override @@ -503,7 +505,7 @@ public State(RandomSource rs) storeRangesForEpochs.put(i, new RangesForEpoch(1, Ranges.of(TokenRange.fullRange(tableId, getPartitioner())))); accordService = startAccord(); - accordService.epochReady(ClusterMetadata.current().epoch).awaitUninterruptibly(); + accordService.epochReady(ClusterMetadata.current().epoch, EpochReady::reads).awaitUninterruptibly(); minDecidedIdNull = rs.nextFloat(); txnWriteFrequency = rs.pickInt(1, // every txn is a Write diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index b714b833f1a8..ac322403d15d 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -176,7 +176,7 @@ public void basicCycleTest() throws Throwable Command before = safeStore.ifInitialised(txnId).current(); Assert.assertEquals(commit.executeAt, before.executeAt()); Assert.assertTrue(before.hasBeen(Status.Committed)); - Assert.assertEquals(commit.partialDeps, before.partialDeps()); + Assert.assertEquals(commit.partialDeps(), before.partialDeps()); CommandsForKey cfk = safeStore.get(key(1).toUnseekable()).current(); Assert.assertTrue(cfk.indexOf(txnId) >= 0); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java index adaff55578d4..081aaeec473e 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java @@ -41,7 +41,7 @@ import org.junit.Test; import accord.api.Agent; -import accord.impl.AbstractConfigurationService; +import accord.impl.AbstractTestConfigurationService; import accord.impl.TestAgent; import accord.impl.basic.Pending; import accord.impl.basic.PendingQueue; @@ -410,7 +410,7 @@ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListen } } - private class ConfigService extends AbstractConfigurationService.Minimal implements AccordSyncPropagator.Listener + private class ConfigService extends AbstractTestConfigurationService implements AccordSyncPropagator.Listener { private final Map> syncCompletes = new HashMap<>(); private final Map> endpointAcks = new HashMap<>(); @@ -436,7 +436,7 @@ public void fetchTopologyForEpoch(long epoch) } @Override - protected void localSyncComplete(Topology topology, boolean startSync) + protected void onReadyToCoordinate(Topology topology, boolean startSync) { Set notify = topology.nodes().stream().filter(i -> !localId.equals(i)).collect(Collectors.toSet()); instances.get(localId).propagator.reportSyncComplete(topology.epoch(), notify, localId); diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index b9602dec07fc..d73763ba5aac 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -59,7 +59,7 @@ import accord.messages.BeginRecovery; import accord.messages.PreAccept; import accord.messages.Reply; -import accord.messages.TxnRequest; +import accord.messages.RouteRequest; import accord.primitives.AbstractUnseekableKeys; import accord.primitives.Ballot; import accord.primitives.EpochSupplier; @@ -421,9 +421,9 @@ public void checkFailures() throw error; } - public T process(TxnRequest request) throws ExecutionException, InterruptedException + public T process(RouteRequest request) throws ExecutionException, InterruptedException { - return process(request, request::apply); + return process(request, request); } public T process(PreLoadContext loadCtx, Function function) throws ExecutionException, InterruptedException @@ -433,9 +433,9 @@ public T process(PreLoadContext loadCtx, Function AsyncResult processAsync(TxnRequest request) + public AsyncResult processAsync(RouteRequest request) { - return processAsync(request, request::apply); + return processAsync(request, request); } public AsyncResult processAsync(PreLoadContext loadCtx, Function function) diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java index d54058b72703..f18065adc96f 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java @@ -120,9 +120,9 @@ private static void test(RandomSource rs, int numSamples, TableMetadata tbl, Gen FullRoute route = txnWithRoute.right; PreAccept preAccept = new PreAccept(nodeId, instance.topologies, txnId, txn, null, false, route) { @Override - public PreAcceptReply apply(SafeCommandStore safeStore) + public PreAcceptReply applyInternal(SafeCommandStore safeStore) { - PreAcceptReply result = super.apply(safeStore); + PreAcceptReply result = super.applyInternal(safeStore); if (action == Action.FAILURE) throw new SimulatedFault("PreAccept failed for keys " + keys()); return result; diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 884638da98dc..edca7e1c6b0c 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -28,6 +28,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -49,6 +50,7 @@ import accord.api.DataStore; import accord.api.Journal; import accord.api.Key; +import accord.api.OwnershipEventListener; import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.api.Timeouts; @@ -657,8 +659,8 @@ protected TestCommandStore() @Override public Agent agent() { return this; } @Override public void execute(Runnable run) {} @Override public void shutdown() { } - @Override public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); } - @Override public void onStale(Timestamp staleSince, Ranges ranges) { throw new UnsupportedOperationException(); } + @Override public AsyncChain chain(Callable call) { throw new UnsupportedOperationException(); } + @Override public OwnershipEventListener ownershipEvents() { return null; } @Override public void onUncaughtException(Throwable t) { throw new UnsupportedOperationException(); } @Override public void onCaughtException(Throwable t, String context) { throw new UnsupportedOperationException(); } @Override public boolean rejectPreAccept(TimeService time, TxnId txnId) { throw new UnsupportedOperationException(); } diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index bd994fc27a10..2c8f292b6814 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -90,7 +90,7 @@ import static accord.local.CommandStores.RangesForEpoch; import static accord.local.RedundantStatus.Property.GC_BEFORE; -import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.Property.UNREADY; import static accord.local.RedundantStatus.SomeStatus.LOCALLY_APPLIED_ONLY; import static accord.local.RedundantStatus.SomeStatus.LOCALLY_WITNESSED_ONLY; import static accord.local.RedundantStatus.SomeStatus.SHARD_APPLIED_ONLY; @@ -276,7 +276,7 @@ private ICommand attributes(SaveStatus saveStatus) if (saveStatus.known.deps().hasPreAcceptedOrProposedOrDecidedDeps()) builder.partialDeps(partialDeps); - builder.setParticipants(StoreParticipants.all(route)); + builder.setParticipants(StoreParticipants.all(route, saveStatus)); builder.durability(NotDurable); if (saveStatus.compareTo(SaveStatus.PreAccepted) >= 0) builder.executeAt(executeAt); @@ -601,9 +601,9 @@ public static Gen redundantBeforeEntry(Gen emptyGen, Gen if (rs.nextBoolean()) bounds.add(Bounds.create(range, txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null )); if (rs.nextBoolean()) - bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(PRE_BOOTSTRAP), null )); + bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(UNREADY), null )); if (rs.nextBoolean()) - bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new short[0], txnIdGen.next(rs))); + bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new int[0], txnIdGen.next(rs))); Collections.shuffle(bounds); long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : rs.nextLong(0, Long.MAX_VALUE); @@ -618,7 +618,7 @@ public static Gen redundantBeforeEntry(Gen emptyGen, Gen } long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch)); - Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new short[0], null); + Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new int[0], null); if (result == null) return epochBounds; return Bounds.reduce(result, epochBounds);