diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 2c11e65b29df..ac93be3e3732 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -905,7 +905,7 @@ public List importNewSSTables(Set srcPaths, boolean resetLevel, .build()); } - Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory) + public Descriptor getUniqueDescriptorFor(Descriptor descriptor, File targetDirectory) { Descriptor newDescriptor; do diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 67b87d14e97c..1c4a2478c974 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -73,6 +73,7 @@ import org.apache.cassandra.service.snapshot.SnapshotManifest; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized; @@ -115,6 +116,7 @@ public class Directories public static final String BACKUPS_SUBDIR = "backups"; public static final String SNAPSHOT_SUBDIR = "snapshots"; + public static final String PENDING_SUBDIR = "pending"; public static final String TMP_SUBDIR = "tmp"; public static final String SECONDARY_INDEX_NAME_SEPARATOR = "."; @@ -727,6 +729,20 @@ public static File getSnapshotSchemaFile(File snapshotDir) return new File(snapshotDir, "schema.cql"); } + public File getPendingLocationForDisk(DataDirectory dataDirectory, TimeUUID planId) + { + for (File dir : dataPaths) + { + // Note that we must compare absolute paths (not canonical) here since keyspace directories might be symlinks + Path dirPath = dir.toAbsolute().toPath(); + Path locationPath = dataDirectory.location.toAbsolute().toPath(); + if (!dirPath.startsWith(locationPath)) + continue; + return getOrCreate(dir, PENDING_SUBDIR, planId.toString()); + } + throw new RuntimeException("Could not find pending location"); + } + public static File getBackupsDirectory(Descriptor desc) { return getBackupsDirectory(desc.directory); diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 0c5386c3adba..798e21f80829 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -408,6 +408,8 @@ protected void recordLatency(TableMetrics metric, long latencyNanos) public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController controller) { ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); + if (cfs.metadata().replicationType().isTracked()) + controller.addActivationIds(view); Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType)); // fetch data from current memtable, historical memtables, and SSTables in the correct order. diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java index 85b17d469077..346e9bf0b6d2 100644 --- a/src/java/org/apache/cassandra/db/ReadExecutionController.java +++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java @@ -18,13 +18,22 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.replication.ShortMutationId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -33,6 +42,8 @@ public class ReadExecutionController implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ReadExecutionController.class); + private static final long NO_SAMPLING = Long.MIN_VALUE; // For every reads @@ -50,6 +61,13 @@ public class ReadExecutionController implements AutoCloseable private final RepairedDataInfo repairedDataInfo; private long oldestUnrepairedTombstone = Long.MAX_VALUE; + /* + * Track bulk transfers involved in the read, so we can do read reconciliation. + * These come from the ViewFragment, not the SSTable read path, so bloom filters + short-circuiting SSTable scans + * will still include the total set of relevant bulk transfers. + */ + private Set activationIds = null; + ReadExecutionController(ReadCommand command, OpOrder.Group baseOp, TableMetadata baseMetadata, @@ -243,4 +261,20 @@ private void addSample() if (cfs != null) cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros); } + + public void addActivationIds(ColumnFamilyStore.ViewFragment view) + { + activationIds = new HashSet<>(); + for (SSTableReader sstable : view.sstables) + { + Collection ids = sstable.getCoordinatorLogOffsets().transfers(); + logger.trace("Adding transfer IDs from SSTable {} {}", sstable, ids); + activationIds.addAll(ids); + } + } + + public Iterator getActivationIds() + { + return activationIds == null ? null : activationIds.iterator(); + } } diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java index 66b023bee105..291563c2d3d1 100644 --- a/src/java/org/apache/cassandra/db/SSTableImporter.java +++ b/src/java/org/apache/cassandra/db/SSTableImporter.java @@ -44,6 +44,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat.Components; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.replication.MutationTrackingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; @@ -80,11 +81,8 @@ synchronized List importNewSSTables(Options options) UUID importID = UUID.randomUUID(); logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options); - // This will be supported in the future TableMetadata metadata = cfs.metadata(); - if (metadata.replicationType() != null && metadata.replicationType().isTracked()) - throw new IllegalStateException("Can't import into tables with mutation tracking enabled"); - + boolean isTracked = metadata.replicationType().isTracked(); List> listers = getSSTableListers(options.srcPaths); Set currentDescriptors = new HashSet<>(); @@ -183,7 +181,14 @@ synchronized List importNewSSTables(Options options) Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir); maybeMutateMetadata(entry.getKey(), options); movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue())); - SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData); + SSTableReader sstable; + if (isTracked) + sstable = SSTableReader.open(cfs, oldDescriptor, metadata.ref); + else + { + // Don't move tracked SSTables, since that will move them to the live set on bounce + sstable = SSTableReader.moveAndOpenSSTable(cfs, oldDescriptor, newDescriptor, entry.getValue(), options.copyData); + } newSSTablesPerDirectory.add(sstable); } catch (Throwable t) @@ -233,7 +238,13 @@ synchronized List importNewSSTables(Options options) if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum)) cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables); - cfs.getTracker().addSSTables(newSSTables); + if (isTracked) + { + TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables); + } + else + cfs.getTracker().addSSTables(newSSTables); + for (SSTableReader reader : newSSTables) { if (options.invalidateCaches && cfs.isRowCacheEnabled()) @@ -250,6 +261,17 @@ synchronized List importNewSSTables(Options options) return failedDirectories; } + /** + * TODO: Support user-defined consistency level for import, for import with replicas down + */ + private static class TrackedBulkTransfer + { + private static void execute(String keyspace, Set sstables) + { + MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL); + } + } + /** * Check the state of this node and throws an {@link InterruptedException} if it is currently draining * diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index ecd7cc7253ec..32ccc1e5dc0b 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -781,6 +781,8 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); + if (cfs.metadata().replicationType().isTracked()) + controller.addActivationIds(view); view.sstables.sort(SSTableReader.maxTimestampDescending); ClusteringIndexFilter filter = clusteringIndexFilter(); long minTimestamp = Long.MAX_VALUE; @@ -1022,6 +1024,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam { Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); + if (cfs.metadata().replicationType().isTracked()) + controller.addActivationIds(view); ImmutableBTreePartition result = null; SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); @@ -1044,6 +1048,8 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam /* add the SSTables on disk */ view.sstables.sort(SSTableReader.maxTimestampDescending); + if (cfs.metadata().replicationType().isTracked()) + logger.trace("Executing read against SSTables {}", view.sstables); // read sorted sstables for (SSTableReader sstable : view.sstables) { diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 73c18328a3d6..f2d0046bc366 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; @@ -61,6 +62,7 @@ import org.apache.cassandra.notifications.TableDroppedNotification; import org.apache.cassandra.notifications.TablePreScrubNotification; import org.apache.cassandra.notifications.TruncationNotification; +import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.TimeUUID; @@ -270,6 +272,20 @@ public void updateInitialSSTableSize(Iterable sstables) public void addSSTables(Collection sstables) { + Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()); + addSSTablesInternal(sstables, false, true, true); + } + + public void addSSTablesTracked(Collection sstables) + { + Preconditions.checkState(cfstore.metadata().replicationType().isTracked()); + for (SSTableReader sstable : sstables) + { + ImmutableCoordinatorLogOffsets logOffsets = sstable.getCoordinatorLogOffsets(); + Preconditions.checkState(logOffsets.isEmpty()); + Preconditions.checkState(!logOffsets.transfers().isEmpty()); + } + addSSTablesInternal(sstables, false, true, true); } diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index c6fd7ed52af2..6772b77166b0 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -402,4 +402,4 @@ public boolean apply(T t) } }; } -} \ No newline at end of file +} diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java index f2f1784a9c0b..cfa88f6d2d20 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -48,6 +48,7 @@ import org.apache.cassandra.streaming.StreamReceiver; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; import static java.lang.String.format; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; @@ -159,9 +160,13 @@ public SSTableMultiWriter read(DataInputPlus in) throws IOException private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException { + boolean isTracked = cfs.metadata().replicationType().isTracked(); + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) - throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize))); + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); + if (isTracked) + return cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId()); File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey)); diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 84e71268b3cf..c19339eba746 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -46,19 +46,23 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.UnknownColumnException; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.RangeAwareSSTableWriter; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; import org.apache.cassandra.io.sstable.SSTableTxnSingleStreamWriter; +import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.TrackedDataInputPlus; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.StreamReceivedOutOfTokenRangeException; @@ -180,6 +184,8 @@ protected SerializationHeader getHeader(TableMetadata metadata) throws UnknownCo } protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, TimeUUID pendingRepair, ImmutableCoordinatorLogOffsets coordinatorLogOffsets, SSTableFormat format) throws IOException { + boolean isTracked = cfs.metadata().replicationType().isTracked(); + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); if (localDir == null) throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); @@ -187,8 +193,20 @@ protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs, long StreamReceiver streamReceiver = session.getAggregator(tableId); Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); ILifecycleTransaction txn = createTxn(); - RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); - return new SSTableTxnSingleStreamWriter(txn, writer); + if (isTracked) + { + File location = cfs.getDirectories().getPendingLocationForDisk(localDir, session.planId()); + Descriptor desc = cfs.newSSTableDescriptor(location, format); + SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(desc, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, false, + coordinatorLogOffsets, cfs.metadata, null, sstableLevel, getHeader(cfs.metadata()), + cfs.indexManager.listIndexGroups(), txn, cfs); + return new SSTableTxnSingleStreamWriter(txn, writer); + } + else + { + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, coordinatorLogOffsets, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); + return new SSTableTxnSingleStreamWriter(txn, writer); + } } private ILifecycleTransaction createTxn() diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 73d599fe04fe..09bc3acfc056 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -52,6 +52,8 @@ import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.service.accord.IAccordService; import org.apache.cassandra.service.accord.TimeOnlyRequestBookkeeping.LatencyRequestBookkeeping; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.PendingLocalTransfer; import org.apache.cassandra.streaming.IncomingStream; import org.apache.cassandra.streaming.StreamReceiver; import org.apache.cassandra.streaming.StreamSession; @@ -256,6 +258,15 @@ public void finished() // add sstables (this will build non-SSTable-attached secondary indexes too, see CASSANDRA-10130) logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); + + if (cfs.metadata().replicationType().isTracked()) + { + // Don't mark as live until activated by the stream coordinator + PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables); + MutationTrackingService.instance.received(transfer); + return; + } + cfs.addSSTables(readers); //invalidate row and counter cache diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 956ed801684e..8217d965a18f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -95,6 +95,7 @@ import org.apache.cassandra.io.util.FileUtils.DuplicateHardlinkException; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.replication.CoordinatorLogOffsets; import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadataRef; @@ -1382,6 +1383,18 @@ public void mutateRepairedAndReload(long newRepairedAt, TimeUUID newPendingRepai } } + /** + * Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata + */ + public void mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets logOffsets) throws IOException + { + synchronized (tidy.global) + { + descriptor.getMetadataSerializer().mutateCoordinatorLogOffsets(descriptor, logOffsets); + reloadSSTableMetadata(); + } + } + /** * Reloads the sstable metadata from disk. *

diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java index ff1e604d04c5..46d737cbbc80 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java @@ -26,6 +26,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.utils.TimeUUID; /** @@ -96,6 +97,11 @@ public interface IMetadataSerializer */ public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, TimeUUID newPendingRepair, boolean isTransient) throws IOException; + /** + * Replace mutation tracking metadata. + */ + public void mutateCoordinatorLogOffsets(Descriptor descriptor, ImmutableCoordinatorLogOffsets logOffsets) throws IOException; + /** * Replace the sstable metadata file ({@code -Statistics.db}) with the given components. */ diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index f2d11b816014..2e524936dd10 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -45,6 +45,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.replication.ImmutableCoordinatorLogOffsets; import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -248,6 +249,15 @@ public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, Time mutate(descriptor, stats -> stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient)); } + @Override + public void mutateCoordinatorLogOffsets(Descriptor descriptor, ImmutableCoordinatorLogOffsets logOffsets) throws IOException + { + if (logger.isTraceEnabled()) + logger.trace("Mutating {} to {}", descriptor.fileFor(Components.STATS), logOffsets); + + mutate(descriptor, stats -> stats.mutateCoordinatorLogOffsets(logOffsets)); + } + private void mutate(Descriptor descriptor, UnaryOperator transform) throws IOException { Map currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class)); diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index b43b86f9daa0..93bab7927ca2 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -247,6 +247,36 @@ public StatsMetadata mutateRepairedMetadata(long newRepairedAt, TimeUUID newPend lastKey); } + public StatsMetadata mutateCoordinatorLogOffsets(ImmutableCoordinatorLogOffsets newLogOffsets) + { + return new StatsMetadata(estimatedPartitionSize, + estimatedCellPerPartitionCount, + commitLogIntervals, + minTimestamp, + maxTimestamp, + minLocalDeletionTime, + maxLocalDeletionTime, + minTTL, + maxTTL, + compressionRatio, + estimatedTombstoneDropTime, + sstableLevel, + clusteringTypes, + coveredClustering, + hasLegacyCounterShards, + repairedAt, + totalColumnsSet, + totalRows, + tokenSpaceCoverage, + originatingHostId, + pendingRepair, + isTransient, + hasPartitionLevelDeletions, + newLogOffsets, + firstKey, + lastKey); + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index c26e3af09097..20536f709f88 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -78,6 +78,7 @@ import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.replication.PullMutationsRequest; import org.apache.cassandra.replication.PushMutationRequest; +import org.apache.cassandra.replication.TransferActivation; import org.apache.cassandra.schema.SchemaMutationsSerializer; import org.apache.cassandra.schema.SchemaPullVerbHandler; import org.apache.cassandra.schema.SchemaPushVerbHandler; @@ -336,6 +337,8 @@ public enum Verb TRACKED_SUMMARY_RSP (910, P2, readTimeout, REQUEST_RESPONSE, () -> TrackedSummaryResponse.serializer, () -> TrackedSummaryResponse.verbHandler ), TRACKED_SUMMARY_REQ (911, P3, readTimeout, READ, () -> TrackedRead.SummaryRequest.serializer, () -> TrackedRead.verbHandler, TRACKED_SUMMARY_RSP ), + TRACKED_TRANSFER_ACTIVATE_RSP (912, P1, rpcTimeout, REQUEST_RESPONSE, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance), + TRACKED_TRANSFER_ACTIVATE_REQ (913, P1, rpcTimeout, ANTI_ENTROPY, () -> TransferActivation.serializer, () -> TransferActivation.verbHandler, TRACKED_TRANSFER_ACTIVATE_RSP), // accord ACCORD_SIMPLE_RSP (119, P2, writeTimeout, IMMEDIATE, () -> accordEmbedded(EnumSerializer.simpleReply), AccordService::responseHandlerOrNoop ), diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java index 54ded3766c75..193e6f2545e4 100644 --- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java +++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java @@ -17,11 +17,15 @@ */ package org.apache.cassandra.replication; +import java.util.Collections; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue; import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.Shutdownable; @@ -46,6 +50,8 @@ // TODO (expected): handle temporarily down nodes public final class ActiveLogReconciler implements Shutdownable { + private static final Logger logger = LoggerFactory.getLogger(ActiveLogReconciler.class); + public enum Priority { HIGH, REGULAR } // prioritised delivery of mutations that are needed by reads; @@ -71,7 +77,7 @@ public enum Priority { HIGH, REGULAR } */ void schedule(ShortMutationId mutationId, InetAddressAndPort toHost, Priority priority) { - queue(priority).offer(new Task(mutationId, toHost)); + queue(priority).offer(Task.from(mutationId, toHost)); haveWork.release(1); } @@ -82,7 +88,7 @@ void schedule(ShortMutationId mutationId, InetAddressAndPort toHost, Priority pr void schedule(Offsets offsets, InetAddressAndPort toHost, Priority priority) { ManyToOneConcurrentLinkedQueue queue = queue(priority); - offsets.forEach(id -> queue.offer(new Task(id, toHost))); + offsets.forEach(id -> queue.offer(Task.from(id, toHost))); haveWork.release(1); } @@ -114,12 +120,26 @@ public void run(Interruptible.State state) throws InterruptedException } } - private static final class Task implements RequestCallback + private static abstract class Task implements RequestCallback + { + private static Task from(ShortMutationId id, InetAddressAndPort toHost) + { + CoordinatedTransfer transfer = LocalTransfers.instance().getActivatedTransfer(id); + if (transfer != null) + return new TransferTask(transfer, toHost); + else + return new MutationTask(id, toHost); + } + + abstract void send(); + } + + private static final class MutationTask extends Task { private final ShortMutationId mutationId; private final InetAddressAndPort toHost; - Task(ShortMutationId mutationId, InetAddressAndPort toHost) + MutationTask(ShortMutationId mutationId, InetAddressAndPort toHost) { this.mutationId = mutationId; this.toHost = toHost; @@ -156,6 +176,34 @@ void send() } } + private static final class TransferTask extends Task + { + private final CoordinatedTransfer transfer; + private final InetAddressAndPort toHost; + + TransferTask(CoordinatedTransfer transfer, InetAddressAndPort toHost) + { + this.transfer = transfer; + this.toHost = toHost; + } + + @Override + public void onResponse(Message msg) + { + logger.debug("Received activation for TransferTask from {}", toHost); + MutationTrackingService.instance.receivedActivationAck(transfer, toHost); + } + + void send() + { + logger.debug("Sending activation to {}", toHost); + transfer.activateOn(Collections.singleton(toHost)).addCallback((ok, err) -> { + if (err == null) + onResponse(null); + }); + } + } + private volatile boolean isShutdown = false; private volatile boolean isPaused = false; diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index b3fc118613b7..34527465e7f3 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -62,7 +62,7 @@ public String toString() public static final IVerbHandler verbHandler = message -> { BroadcastLogOffsets replicatedOffsets = message.payload; - logger.trace("Received replicated offsets {} from {}", replicatedOffsets, message.from()); + // logger.trace("Received replicated offsets {} from {}", replicatedOffsets, message.from()); MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace, replicatedOffsets.range, replicatedOffsets.replicatedOffsets, diff --git a/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java b/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java new file mode 100644 index 000000000000..002842b8a135 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.RequestCallbackWithFailure; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.streaming.OutgoingStream; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncFuture; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; + +/** + * A transfer for a single replica set. + *

+ * For simplicity, streaming from coordinator to itself instead of copying files. This has some perks: + * (1) it allows us to import out-of-range SSTables using the same paths, and + * (2) it uses the existing lifecycle management to handle crash-safety, so don't need to deal with atomic multi-file + * copy. + */ +public class CoordinatedTransfer +{ + private static final Logger logger = LoggerFactory.getLogger(CoordinatedTransfer.class); + + String logPrefix() + { + return String.format("[CoordinatedTransfer #%s]", transferId); + } + + final TimeUUID transferId = TimeUUID.Generator.nextTimeUUID(); + + // TODO(expected): Add epoch at time of creation + final String keyspace; + public final Range range; + + final ConcurrentMap streams; + + final Collection sstables; + + final ConsistencyLevel cl; + + final Supplier getActivationId; + volatile MutationId activationId = null; + + CoordinatedTransfer(String keyspace, Range range, Participants participants, Collection sstables, ConsistencyLevel cl, Supplier getActivationId) + { + this.keyspace = keyspace; + this.range = range; + this.sstables = sstables; + this.cl = cl; + this.getActivationId = getActivationId; + + ClusterMetadata cm = ClusterMetadata.current(); + this.streams = new ConcurrentHashMap<>(participants.size()); + for (int i = 0; i < participants.size(); i++) + { + InetAddressAndPort addr = cm.directory.getNodeAddresses(new NodeId(participants.get(i))).broadcastAddress; + this.streams.put(addr, SingleTransferResult.Unknown()); + } + } + + void execute() + { + logger.debug("Executing tracked bulk transfer {}", this); + LocalTransfers.instance().save(this); + stream(); + } + + private void stream() + { + // TODO: Don't stream multiple copies over the WAN, send one copy and indicate forwarding + List> streaming = new ArrayList<>(streams.size()); + for (InetAddressAndPort to : streams.keySet()) + streaming.add(stream(to)); + + Future> future = FutureCombiner.allOf(streaming); + future.awaitUninterruptibly(); + future.rethrowIfFailed(); + } + + private boolean sufficient() + { + AbstractReplicationStrategy ars = Keyspace.open(keyspace).getReplicationStrategy(); + int blockFor = cl.blockFor(ars); + int responses = 0; + for (Map.Entry entry : streams.entrySet()) + { + if (entry.getValue().state == SingleTransferResult.State.STREAM_COMPLETE) + responses++; + } + return responses >= blockFor; + } + + Future stream(InetAddressAndPort to) + { + return streamTask(to).andThenAsync(result -> streamComplete(to, result)); + } + + private Future streamComplete(InetAddressAndPort to, SingleTransferResult result) + { + streams.put(to, result); + logger.info("{} Completed streaming to {}, {}", logPrefix(), to, this); + return maybeActivate(); + } + + synchronized Future maybeActivate() + { + /* TODO + If topology has changed after streaming, need to ensure new topology doesn't break consistency of completed + streams. + */ + + logger.debug("maybeActivate {}", streams); + + // If any activations have already been sent out, send new activations to any received plans that have not yet + // been activated + boolean anyActivated = false; + Set awaitingActivation = new HashSet<>(); + for (Map.Entry entry : streams.entrySet()) + { + InetAddressAndPort peer = entry.getKey(); + SingleTransferResult result = entry.getValue(); + if (result.state == SingleTransferResult.State.ACTIVATE_COMPLETE) + { + anyActivated = true; + } + else if (result.state == SingleTransferResult.State.STREAM_COMPLETE) + awaitingActivation.add(peer); + } + if (anyActivated && !awaitingActivation.isEmpty()) + { + logger.debug("{} Transfer already activated on peers, sending activations to {}", logPrefix(), awaitingActivation); + return activateOn(awaitingActivation); + } + + // If no activations have been sent out, check whether we have enough planIds back to meet the required CL + else if (sufficient()) + { + Set peers = new HashSet<>(); + for (Map.Entry entry : streams.entrySet()) + { + InetAddressAndPort peer = entry.getKey(); + SingleTransferResult result = entry.getValue(); + if (result.state == SingleTransferResult.State.STREAM_COMPLETE) + peers.add(peer); + } + logger.debug("{} Transfer meets consistency level {}, sending activations to {}", logPrefix(), cl, peers); + return activateOn(peers); + } + + logger.debug("Nothing to activate"); + return ImmediateFuture.success(null); + } + + synchronized Future activateOn(Collection peers) + { + Preconditions.checkState(!peers.isEmpty()); + + if (activationId == null) + { + activationId = getActivationId.get(); + logger.info("{} Assigned activationId {}", logPrefix(), activationId); + } + LocalTransfers.instance().activating(this); + + // First phase is dryRun to ensure data is present on disk, then second phase does the actual import. This + // ensures that if something goes wrong (like a topology change during import), we don't have divergence. + class DryRun extends AsyncFuture implements RequestCallbackWithFailure + { + final Set responses = ConcurrentHashMap.newKeySet(); + + public DryRun() + { + responses.addAll(peers); + } + + @Override + public void onResponse(Message msg) + { + logger.debug("{} Got response from: {}", logPrefix(), msg.from()); + responses.remove(msg.from()); + if (responses.isEmpty()) + trySuccess(null); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + logger.debug("{} Got failure {} from {}", logPrefix(), failure, from); + tryFailure(null); + } + } + + DryRun dryRun = new DryRun(); + for (InetAddressAndPort peer : peers) + { + TransferActivation activation = new TransferActivation(this, peer, true); + Message msg = Message.out(Verb.TRACKED_TRANSFER_ACTIVATE_REQ, activation); + logger.debug("{} Sending {} to peer {}", logPrefix(), activation, peer); + MessagingService.instance().sendWithCallback(msg, peer, dryRun); + SingleTransferResult result = CoordinatedTransfer.this.streams.get(msg.from()); + if (result != null) + result.sentActivation(); + } + dryRun.awaitUninterruptibly(); + logger.debug("{} Dry run complete for {}", logPrefix(), peers); + + // Acknowledgement of activation is equivalent to a remote write acknowledgement. The imported SSTables + // are now part of the live set, visible to reads. + class EachActivate extends AsyncFuture implements RequestCallbackWithFailure + { + final Set responses = ConcurrentHashMap.newKeySet(); + + private EachActivate(Collection peers) + { + responses.addAll(peers); + } + + @Override + public void onResponse(Message msg) + { + logger.debug("Activation successfully applied on {}", msg.from()); + SingleTransferResult result = CoordinatedTransfer.this.streams.get(msg.from()); + if (result != null) + result.completedActivation(); + + MutationTrackingService.instance.receivedActivationAck(CoordinatedTransfer.this, msg.from()); + responses.remove(msg.from()); + if (responses.isEmpty()) + trySuccess(null); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + logger.error("Failed activation on {} due to {}", from, failure); + // TODO(expected): should only fail if we don't meet requested CL, individual failures are fine + responses.remove(from); + if (responses.isEmpty()) + trySuccess(null); + } + } + + EachActivate eachActivate = new EachActivate(peers); + for (InetAddressAndPort peer : peers) + { + TransferActivation activation = new TransferActivation(this, peer, false); + Message msg = Message.out(Verb.TRACKED_TRANSFER_ACTIVATE_REQ, activation); + + logger.debug("{} Sending {} to peer {}", logPrefix(), activation, peer); + MessagingService.instance().sendWithCallback(msg, peer, eachActivate); + } + + return eachActivate; + } + + static class SingleTransferResult + { + enum State + { + UNKNOWN, + STREAM_NOOP, + STREAM_COMPLETE, + ACTIVATE_START, + ACTIVATE_COMPLETE; + } + + private volatile State state; + private final TimeUUID planId; + + private SingleTransferResult(State state, TimeUUID planId) + { + this.state = state; + this.planId = planId; + } + + private static SingleTransferResult Complete(TimeUUID planId) + { + return new SingleTransferResult(State.STREAM_COMPLETE, planId); + } + + private static SingleTransferResult Noop() + { + return new SingleTransferResult(State.STREAM_NOOP, null); + } + + private static SingleTransferResult Unknown() + { + return new SingleTransferResult(State.UNKNOWN, null); + } + + public void sentActivation() + { + state = State.ACTIVATE_START; + } + + public void completedActivation() + { + state = State.ACTIVATE_COMPLETE; + } + + public TimeUUID planId() + { + Preconditions.checkState(planId != null); + return planId; + } + + @Override + public String toString() + { + return "SingleTransferResult{" + + "state=" + state + + ", planId=" + planId + + '}'; + } + } + + private Future streamTask(InetAddressAndPort to) + { + Callable callable = () -> { + StreamPlan plan = new StreamPlan(StreamOperation.IMPORT); + + // No need to flush, only using non-live SSTables already on disk + plan.flushBeforeTransfer(false); + + for (SSTableReader sstable : sstables) + { + List> ranges = Collections.singletonList(range); + List positions = sstable.getPositionsForRanges(ranges); + long estimatedKeys = sstable.estimatedKeysForRanges(ranges); + OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.IMPORT, sstable.ref(), positions, ranges, estimatedKeys); + plan.transferStreams(to, Collections.singleton(stream)); + } + + logger.info("{} Starting streaming transfer {} to peer {}", logPrefix(), this, to); + StreamResultFuture execute = plan.execute(); + StreamState state; + try + { + state = execute.get(); + logger.debug("{} Completed streaming transfer {} to peer {}", logPrefix(), this, to); + } + catch (InterruptedException | ExecutionException e) + { + logger.error("Stream session failed with error", e); + return SingleTransferResult.Unknown(); + } + + if (state.hasFailedSession() || state.hasAbortedSession()) + { + logger.error("Stream failed due to failed or aborted sessions: {}", state.sessions()); + return SingleTransferResult.Unknown(); + } + + // If the SSTable doesn't contain any rows in the provided range, no streams delivered, nothing to activate + if (state.sessions().isEmpty()) + return SingleTransferResult.Noop(); + + return SingleTransferResult.Complete(plan.planId()); + }; + return LocalTransfers.instance().executor.submit(callable); + } + + @Override + public String toString() + { + return "CoordinatedTransfer{" + + "transferId=" + transferId + + ", range=" + range + + ", streams=" + streams + + ", sstables=" + sstables + + ", activationId=" + activationId + + '}'; + } +} diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 13228256e284..5155e6e12bc9 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -48,12 +48,14 @@ public abstract class CoordinatorLog protected final Participants participants; protected final UnreconciledMutations unreconciledMutations; + protected final UnreconciledTransfers unreconciledTransfers; protected final Offsets.Mutable[] witnessedOffsets; protected final Offsets.Mutable reconciledOffsets; protected final ReadWriteLock lock; abstract void receivedWriteResponse(ShortMutationId mutationId, int fromHostId); + abstract void receivedActivationAck(ShortMutationId mutationId, int fromHostId); CoordinatorLog(int localHostId, CoordinatorLogId logId, Participants participants) { @@ -62,6 +64,7 @@ public abstract class CoordinatorLog this.participants = participants; this.unreconciledMutations = new UnreconciledMutations(); + this.unreconciledTransfers = new UnreconciledTransfers(); this.lock = new ReentrantReadWriteLock(); Offsets.Mutable[] ids = new Offsets.Mutable[participants.size()]; @@ -91,7 +94,9 @@ void updateReplicatedOffsets(Offsets offsets, int onHostId) if (othersWitnessed(offset, onHostId)) { reconciledOffsets.add(offset); - unreconciledMutations.remove(offset); + // A given offset is either a mutation or a transfer + if (!unreconciledTransfers.remove(offset)) + unreconciledMutations.remove(offset); } } }); @@ -180,13 +185,14 @@ protected boolean remoteReplicasWitnessed(int offset) * Look up unreconciled sequence ids of mutations witnessed by this host in this coordinataor log. * Adds the ids to the supplied collection, so it can be reused to aggregate lookups for multiple logs. */ - boolean collectOffsetsFor(Token token, TableId tableId, boolean includePending, Offsets.OffsetReciever unreconciledInto, Offsets.OffsetReciever reconciledInto) + void collectOffsetsFor(Token token, TableId tableId, boolean includePending, Offsets.OffsetReciever unreconciledInto, Offsets.OffsetReciever reconciledInto) { lock.readLock().lock(); try { reconciledInto.addAll(reconciledOffsets); - return unreconciledMutations.collect(token, tableId, includePending, unreconciledInto); + unreconciledMutations.collect(token, tableId, includePending, unreconciledInto); + unreconciledTransfers.collect(token, tableId, unreconciledInto); } finally { @@ -198,13 +204,14 @@ boolean collectOffsetsFor(Token token, TableId tableId, boolean includePending, * Look up unreconciled sequence ids of mutations witnessed by this host in this coordinataor log. * Adds the ids to the supplied collection, so it can be reused to aggregate lookups for multiple logs. */ - boolean collectOffsetsFor(AbstractBounds range, TableId tableId, boolean includePending, Offsets.OffsetReciever unreconciledInto, Offsets.OffsetReciever reconciledInto) + void collectOffsetsFor(AbstractBounds range, TableId tableId, boolean includePending, Offsets.OffsetReciever unreconciledInto, Offsets.OffsetReciever reconciledInto) { lock.readLock().lock(); try { reconciledInto.addAll(reconciledOffsets); - return unreconciledMutations.collect(range, tableId, includePending, unreconciledInto); + unreconciledMutations.collect(range, tableId, includePending, unreconciledInto); + unreconciledTransfers.collect(range, tableId, unreconciledInto); } finally { @@ -318,6 +325,38 @@ void receivedWriteResponse(ShortMutationId mutationId, int fromHostId) } } + @Override + void receivedActivationAck(ShortMutationId activationId, int onHostId) + { + logger.trace("witnessed transfer activation ack {} from {}", activationId, onHostId); + + Preconditions.checkArgument(!activationId.isNone()); + lock.writeLock().lock(); + try + { + if (onHostId == ClusterMetadata.current().myNodeId().id()) + unreconciledTransfers.activated(activationId.offset()); + + if (!get(onHostId).add(activationId.offset())) + return; // already witnessed; very uncommon but possible path + + if (!getLocal().contains(activationId.offset())) + return; // local host hasn't witnessed yet -> no cleanup needed + + if (remoteReplicasWitnessed(activationId.offset())) + { + logger.trace("marking transfer {} as fully reconciled", activationId); + // if all replicas have now witnessed the id, remove it from the index + unreconciledTransfers.remove(activationId.offset()); + reconciledOffsets.add(activationId.offset()); + } + } + finally + { + lock.writeLock().unlock(); + } + } + MutationId nextId() { return new MutationId(logId.asLong(), nextSequenceId()); @@ -353,6 +392,11 @@ void receivedWriteResponse(ShortMutationId mutationId, int fromHostId) { // no-op } - } + @Override + void receivedActivationAck(ShortMutationId activationId, int fromHostId) + { + // no-op + } + } } diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogOffsets.java b/src/java/org/apache/cassandra/replication/CoordinatorLogOffsets.java index dfe34f580050..55ec4d33aa67 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLogOffsets.java @@ -34,6 +34,10 @@ public interface CoordinatorLogOffsets extends Iterable { O offsets(long logId); int size(); + default boolean isEmpty() + { + return size() == 0; + } ImmutableCoordinatorLogOffsets NONE = new ImmutableCoordinatorLogOffsets.Builder(0).build(); } diff --git a/src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java b/src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java index 865abbda088c..c04d2f546a3b 100644 --- a/src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java @@ -19,7 +19,10 @@ package org.apache.cassandra.replication; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import javax.annotation.concurrent.NotThreadSafe; @@ -31,11 +34,13 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.CollectionSerializers; import org.apache.cassandra.utils.vint.VIntCoding; public class ImmutableCoordinatorLogOffsets implements CoordinatorLogOffsets { private final Long2ObjectHashMap ids; + private final List transfers; @Override public Offsets.Immutable offsets(long logId) @@ -46,6 +51,11 @@ public Offsets.Immutable offsets(long logId) return offsets; } + public Collection transfers() + { + return transfers; + } + @Override public int size() { @@ -63,16 +73,16 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; ImmutableCoordinatorLogOffsets longs = (ImmutableCoordinatorLogOffsets) o; - return Objects.equals(ids, longs.ids); + return Objects.equals(ids, longs.ids) && Objects.equals(transfers, longs.transfers); } @Override public int hashCode() { - return Objects.hashCode(ids); + return Objects.hash(ids, transfers); } - public ImmutableCoordinatorLogOffsets(Builder builder) + private ImmutableCoordinatorLogOffsets(Builder builder) { // Important to set shouldAvoidAllocation=false, otherwise iterators are cached and not thread safe, even when // immutable and read-only @@ -80,12 +90,15 @@ public ImmutableCoordinatorLogOffsets(Builder builder) for (Map.Entry entry : builder.ids.entrySet()) ids.put(entry.getKey(), entry.getValue().build()); + + this.transfers = builder.transfers; } @NotThreadSafe public static class Builder { private final Long2ObjectHashMap ids; + private final List transfers; public Builder() { @@ -95,6 +108,9 @@ public Builder() public Builder(int size) { this.ids = new Long2ObjectHashMap<>(size, 0.9f, false); + + // Transfers are very rare, opt to save memory + this.transfers = new ArrayList<>(1); } public Builder add(MutationId mutationId) @@ -124,6 +140,14 @@ public Builder addAll(Offsets.Immutable offsets) return this; } + public Builder addTransfer(ShortMutationId activationId) + { + if (activationId.isNone()) + return this; + transfers.add(activationId); + return this; + } + public ImmutableCoordinatorLogOffsets build() { return new ImmutableCoordinatorLogOffsets(this); @@ -140,6 +164,7 @@ public void serialize(ImmutableCoordinatorLogOffsets logOffsets, DataOutputPlus out.writeUnsignedVInt32(logOffsets.size()); for (long logId : logOffsets) Offsets.serializer.serialize(logOffsets.offsets(logId), out, version); + CollectionSerializers.serializeCollection(logOffsets.transfers, out, version, ShortMutationId.serializer); } @Override @@ -154,6 +179,9 @@ public ImmutableCoordinatorLogOffsets deserialize(DataInputPlus in, int version) Offsets.Immutable offsets = Offsets.serializer.deserialize(in, version); builder.addAll(offsets); } + List transfers = CollectionSerializers.deserializeList(in, version, ShortMutationId.serializer); + for (ShortMutationId transfer : transfers) + builder.addTransfer(transfer); return builder.build(); } @@ -166,6 +194,7 @@ public long serializedSize(ImmutableCoordinatorLogOffsets logOffsets, int versio size += VIntCoding.computeUnsignedVIntSize(logOffsets.size()); for (long logId : logOffsets) size += Offsets.serializer.serializedSize(logOffsets.offsets(logId), version); + size += CollectionSerializers.serializedCollectionSize(logOffsets.transfers, version, ShortMutationId.serializer); return size; } } diff --git a/src/java/org/apache/cassandra/replication/LocalTransfers.java b/src/java/org/apache/cassandra/replication/LocalTransfers.java new file mode 100644 index 000000000000..58d089beea70 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/LocalTransfers.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.utils.TimeUUID; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; + +/** + * Stores coordinated and received transfers. + * + * TODO: Make changes to pending set durable with SystemKeyspace.savePendingLocalTransfer(transfer)? + * TODO: GC + */ +class LocalTransfers +{ + private static final Logger logger = LoggerFactory.getLogger(LocalTransfers.class); + + private final Map coordinating = new HashMap<>(); + private final Map coordinatingActivated = new HashMap<>(); + private final Map received = new HashMap<>(); + + final ExecutorPlus executor = executorFactory().pooled("LocalTrackedTransfers", Integer.MAX_VALUE); + + private static final LocalTransfers instance = new LocalTransfers(); + static LocalTransfers instance() + { + return instance; + } + + synchronized void save(CoordinatedTransfer transfer) + { + CoordinatedTransfer existing = coordinating.put(transfer.transferId, transfer); + Preconditions.checkState(existing == null); + } + + synchronized void activating(CoordinatedTransfer transfer) + { + coordinatingActivated.put(transfer.activationId, transfer); + } + + synchronized void received(PendingLocalTransfer transfer) + { + logger.debug("received: {}", transfer); + Preconditions.checkState(!transfer.sstables.isEmpty()); + + PendingLocalTransfer existing = received.put(transfer.planId, transfer); + Preconditions.checkState(existing == null); + } + + PendingLocalTransfer getPendingTransfer(TimeUUID planId) + { + return checkNotNull(received.get(planId)); + } + + CoordinatedTransfer getActivatedTransfer(ShortMutationId activationId) + { + return checkNotNull(coordinatingActivated.get(activationId)); + } +} diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 7ec675870744..e5c0764933f0 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -18,6 +18,9 @@ package org.apache.cassandra.replication; import java.util.Collections; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -41,15 +44,18 @@ import org.agrona.collections.IntHashSet; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.Shutdownable; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Splitter; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; @@ -62,11 +68,13 @@ import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Interval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.util.concurrent.TimeUnit.MINUTES; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.concurrent.ExecutorFactory.SimulatorSemantics.NORMAL; @@ -77,9 +85,12 @@ public class MutationTrackingService /** * Split ranges into this many shards. * + * REVIEW: Reset back to 1 because for transfers, replicas need to know each others' shards, since transfers are + * sliced to fit within shards. Can we achieve sharding via split range ownership, instead of it being local-only? + * * TODO (expected): ability to rebalance / change this constant */ - private static final int SHARD_MULTIPLIER = 8; + private static final int SHARD_MULTIPLIER = 1; private static final Logger logger = LoggerFactory.getLogger(MutationTrackingService.class); public static final MutationTrackingService instance = new MutationTrackingService(); @@ -158,6 +169,16 @@ public void receivedWriteResponse(ShortMutationId mutationId, InetAddressAndPort // outgoingMutations.receivedWriteResponse(mutationId, ClusterMetadata.current().directory.peerId(fromHost).id()); } + public void receivedActivationAck(CoordinatedTransfer transfer, InetAddressAndPort fromHost) + { + logger.debug("{} receivedActivationAck from {}", transfer.logPrefix(), fromHost); + MutationId activationId = transfer.activationId; + Preconditions.checkArgument(!activationId.isNone()); + Shard shard = getShardNullable(activationId); + if (shard != null) + shard.receivedActivationAck(activationId, fromHost); + } + public void retryFailedWrite(ShortMutationId mutationId, InetAddressAndPort onHost, RequestFailure reason) { Preconditions.checkArgument(!mutationId.isNone()); @@ -192,6 +213,40 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta return incomingMutations.subscribe(mutationId, callback); } + public void executeTransfers(String keyspace, Set sstables, ConsistencyLevel cl) + { + logger.info("Creating tracked bulk transfers for keyspace {} sstables {}", keyspace, sstables); + + KeyspaceShards shards = keyspaceShards.get(keyspace); + checkNotNull(shards); + + CoordinatedTransfers transfers = CoordinatedTransfers.create(shards, sstables, cl); + logger.info("Split input SSTables into transfers {}", transfers); + + for (CoordinatedTransfer transfer : transfers) + transfer.execute(); + } + + public void received(PendingLocalTransfer transfer) + { + logger.debug("Received pending transfer for tracked table {}", transfer); + LocalTransfers.instance().received(transfer); + } + + synchronized void activateLocal(TransferActivation activation) + { + logger.debug("activateLocal {}", activation); + + PendingLocalTransfer pending = LocalTransfers.instance().getPendingTransfer(activation.planId); + pending.activate(activation); + + if (!activation.dryRun) + { + keyspaceShards.get(pending.keyspace).lookUp(pending.range).receivedActivationAck(activation.activationId, FBUtilities.getBroadcastAddressAndPort()); + incomingMutations.invokeListeners(activation.activationId); + } + } + public MutationSummary createSummaryForKey(DecoratedKey key, TableId tableId, boolean includePending) { return getOrCreateShards(tableId).createSummaryForKey(key, tableId, includePending); @@ -310,7 +365,8 @@ private void onNewLog(Shard shard, CoordinatorLog log) private static class KeyspaceShards implements Shard.Subscriber { - private final String keyspace; + // TODO: private + final String keyspace; private final Map, Shard> shards; private final ReplicaGroups groups; private final BiConsumer onNewLog; @@ -430,6 +486,14 @@ Shard lookUp(Token token) return shards.get(groups.forRange(token).range()); } + Shard lookUp(Range range) + { + ClusterMetadata csm = ClusterMetadata.current(); + KeyspaceMetadata ksm = csm.schema.getKeyspaceMetadata(keyspace); + Range replicationRange = ClusterMetadata.current().placements.get(ksm.params.replication).writes.forRange(range).range(); + return shards.get(replicationRange); + } + @Override public void onLogCreation(CoordinatorLog log) { @@ -445,6 +509,68 @@ public void onSubscribe(CoordinatorLog currentLog) } } + private static class CoordinatedTransfers implements Iterable + { + private final Collection transfers; + + private CoordinatedTransfers(Collection transfers) + { + this.transfers = transfers; + } + + private static CoordinatedTransfers create(KeyspaceShards shards, Collection sstables, ConsistencyLevel cl) + { + // Clean up incoming SSTables to remove any existing CoordinatorLogOffsets, can't be trusted + for (SSTableReader sstable : sstables) + { + try + { + sstable.mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets.NONE); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + // Expensive - add a metric? + // TODO(expected): Fail if incoming transfer is outside owned shard ranges + SSTableIntervalTree intervals = SSTableIntervalTree.buildSSTableIntervalTree(sstables); + List transfers = new ArrayList<>(); + + String keyspace = shards.keyspace; + shards.forEachShard(shard -> { + Range range = shard.tokenRange; + Collection sstablesForRange = intervals.search(Interval.create(range.left.minKeyBound(), range.right.maxKeyBound())); + + CoordinatedTransfer transfer = new CoordinatedTransfer(keyspace, range, shard.participants, sstablesForRange, cl, shard::nextId); + if (!transfer.sstables.isEmpty()) + transfers.add(transfer); + + /* REVIEW NOTES + Right now for simplicity, streaming from coordinator to itself instead of copying files. This has some + perks: (1) it allows us to import out-of-range SSTables using the same paths, and (2) it uses the + existing lifecycle management to handle crash-safety, so don't need to deal with atomic multi-file copy. + */ + }); + return new CoordinatedTransfers(transfers); + } + + @Override + public Iterator iterator() + { + return transfers.iterator(); + } + + @Override + public String toString() + { + return "CoordinatedTransfers{" + + "transfers=" + transfers + + '}'; + } + } + // TODO (later): a more intelligent heuristic for offsets included in broadcasts private static class ReplicatedOffsetsBroadcaster implements Runnable, Shutdownable { diff --git a/src/java/org/apache/cassandra/replication/PendingLocalTransfer.java b/src/java/org/apache/cassandra/replication/PendingLocalTransfer.java new file mode 100644 index 000000000000..9faa9cf4d324 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/PendingLocalTransfer.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.streaming.CassandraStreamReceiver; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; +import org.apache.cassandra.utils.TimeUUID; + +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; + +/** + * A transfer on a replica, once present on disk. + */ +public class PendingLocalTransfer +{ + private static final Logger logger = LoggerFactory.getLogger(PendingLocalTransfer.class); + + private String logPrefix() + { + return String.format("[PendingLocalTransfer #%s]", planId); + } + + final TimeUUID planId; + final TableId tableId; + final Collection sstables; + final long createdAt = currentTimeMillis(); + transient String keyspace; + transient Range range; + + private volatile boolean activated = false; + + public PendingLocalTransfer(TableId tableId, TimeUUID planId, Collection sstables) + { + Preconditions.checkState(!sstables.isEmpty()); + this.tableId = tableId; + this.planId = planId; + this.sstables = sstables; + this.keyspace = Objects.requireNonNull(ColumnFamilyStore.getIfExists(tableId)).keyspace.getName(); + this.range = shardRange(keyspace, sstables); + } + + /** + * Pending transfers should be within a single shard, which are aligned to natural ranges. + * See ({@link MutationTrackingService.KeyspaceShards#make}). + */ + private static Range shardRange(String keyspace, Collection sstables) + { + ClusterMetadata cm = ClusterMetadata.current(); + ReplicaGroups writes = cm.placements.get(Keyspace.open(keyspace).getMetadata().params.replication).writes; + Range range = null; + for (SSTableReader sstable : sstables) + { + if (range == null) + { + Token first = sstable.getFirst().getToken(); + range = writes.forRange(first).range(); + } + else + { + AbstractBounds bounds = sstable.getBounds(); + Preconditions.checkState(!range.isTrulyWrapAround()); + Preconditions.checkState(range.contains(bounds.left)); + Preconditions.checkState(range.contains(bounds.right)); + } + } + + Preconditions.checkNotNull(range); + return range; + } + + /** + * Safely move a transfer into the live set. This must be crash-safe, and the primary invariant we need to + * preserve is a transfer is only added to the live set iff the transfer ID is present in its mutation summaries. + * + * We don't validate checksums here, mostly because a transfer can be activated during a read, if one replica + * missed the TransferActivation. Transfers should not be pending for very long, and should be protected by + * internode integrity checks provided by TLS. + * + * TODO: Clear out the row cache and counter cache, like {@link CassandraStreamReceiver#finished}. + * TODO: Don't add to the live set if coordinator and not an owner for the range + */ + public void activate(TransferActivation activation) + { + if (activated) + { + logger.warn("{} Received duplicate activation?", logPrefix()); + return; + } + + logger.info("{} Activating transfer {}, {} ms since pending", logPrefix(), this, currentTimeMillis() - createdAt); + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + Preconditions.checkNotNull(cfs); + Preconditions.checkState(!sstables.isEmpty()); + + if (activation.dryRun) + { + logger.info("{} Not adding SSTables to live set for dryRun {}", logPrefix(), activation); + return; + } + + // Ensure no lingering mutation IDs, only activation IDs + for (SSTableReader sstable : sstables) + { + Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty()); + + // Modify SSTables metadata to durably set transfer ID before importing + ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() + .addTransfer(activation.activationId) + .build(); + try + { + sstable.mutateCoordinatorLogOffsetsAndReload(logOffsets); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty()); + Preconditions.checkState(!sstable.getCoordinatorLogOffsets().transfers().isEmpty()); + } + + File dst = cfs.getDirectories().getDirectoryForNewSSTables(); + logger.debug("{} Moving pending SSTables for activation to {}", logPrefix(), dst); + dst.createFileIfNotExists(); + for (SSTableReader sstable : sstables) + { + SSTableReader moved = SSTableReader.moveAndOpenSSTable(cfs, sstable.descriptor, cfs.getUniqueDescriptorFor(sstable.descriptor, dst), sstable.getComponents(), false); + cfs.getTracker().addSSTablesTracked(Collections.singleton(moved)); + } + + activated = true; + } + + @Override + public String toString() + { + return "PendingLocalTransfer{" + + "planId=" + planId + + ", tableId=" + tableId + + ", sstables=" + sstables + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + PendingLocalTransfer transfer = (PendingLocalTransfer) o; + return Objects.equals(planId, transfer.planId) && Objects.equals(tableId, transfer.tableId) && Objects.equals(sstables, transfer.sstables); + } + + @Override + public int hashCode() + { + return Objects.hash(planId, tableId, sstables); + } +} diff --git a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java index ec5632ab5ba0..d8e6f0febb98 100644 --- a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java +++ b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java @@ -72,4 +72,12 @@ public void doVerb(Message message) MutationTrackingService.instance.requestMissingMutations(offsets, forHost); } }; + + @Override + public String toString() + { + return "PullMutationsRequest{" + + "offsets=" + offsets + + '}'; + } } diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 6588bede0932..e128b841eb61 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -27,8 +27,10 @@ import com.google.common.base.Preconditions; import org.agrona.collections.IntArrayList; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -41,9 +43,9 @@ public class Shard { private final String keyspace; - private final Range tokenRange; + final Range tokenRange; private final int localHostId; - private final Participants participants; + final Participants participants; private final Epoch sinceEpoch; private final BiConsumer onNewLog; private final NonBlockingHashMapLong logs; @@ -92,6 +94,12 @@ void receivedWriteResponse(ShortMutationId mutationId, InetAddressAndPort fromHo getOrCreate(mutationId).receivedWriteResponse(mutationId, fromHostId); } + void receivedActivationAck(MutationId activationId, InetAddressAndPort onHost) + { + int onHostId = ClusterMetadata.current().directory.peerId(onHost).id(); + getOrCreate(activationId).receivedActivationAck(activationId, onHostId); + } + void updateReplicatedOffsets(List offsets, InetAddressAndPort onHost) { int onHostId = ClusterMetadata.current().directory.peerId(onHost).id(); diff --git a/src/java/org/apache/cassandra/replication/TransferActivation.java b/src/java/org/apache/cassandra/replication/TransferActivation.java new file mode 100644 index 000000000000..9fc680aef502 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/TransferActivation.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.utils.TimeUUID; + +public class TransferActivation +{ + public final TimeUUID transferId; + public final TimeUUID planId; + public final MutationId activationId; + public final boolean dryRun; + + public TransferActivation(CoordinatedTransfer transfer, InetAddressAndPort peer, boolean dryRun) + { + this(transfer.transferId, transfer.streams.get(peer).planId(), transfer.activationId, dryRun); + } + + TransferActivation(TimeUUID transferId, TimeUUID planId, MutationId activationId, boolean dryRun) + { + this.transferId = transferId; + Preconditions.checkArgument(!activationId.isNone()); + Preconditions.checkNotNull(planId); + this.planId = planId; + this.activationId = activationId; + this.dryRun = dryRun; + } + + public void apply() + { + MutationTrackingService.instance.activateLocal(this); + } + + public static final Serializer serializer = new Serializer(); + + public static class Serializer implements IVersionedSerializer + { + @Override + public void serialize(TransferActivation activate, DataOutputPlus out, int version) throws IOException + { + TimeUUID.Serializer.instance.serialize(activate.transferId, out, version); + TimeUUID.Serializer.instance.serialize(activate.planId, out, version); + MutationId.serializer.serialize(activate.activationId, out, version); + out.writeBoolean(activate.dryRun); + } + + @Override + public TransferActivation deserialize(DataInputPlus in, int version) throws IOException + { + TimeUUID transferId = TimeUUID.Serializer.instance.deserialize(in, version); + TimeUUID planId = TimeUUID.Serializer.instance.deserialize(in, version); + MutationId activationId = MutationId.serializer.deserialize(in, version); + boolean dryRun = in.readBoolean(); + return new TransferActivation(transferId, planId, activationId, dryRun); + } + + @Override + public long serializedSize(TransferActivation activate, int version) + { + long size = 0; + size += TimeUUID.Serializer.instance.serializedSize(activate.transferId, version); + size += TimeUUID.Serializer.instance.serializedSize(activate.planId, version); + size += MutationId.serializer.serializedSize(activate.activationId, version); + size += TypeSizes.BOOL_SIZE; + return size; + } + } + + public static class VerbHandler implements IVerbHandler + { + @Override + public void doVerb(Message msg) throws IOException + { + LocalTransfers.instance().executor.submit(() -> { + msg.payload.apply(); + MessagingService.instance().respond(NoPayload.noPayload, msg); + }).rethrowIfFailed(); + } + } + + public static final VerbHandler verbHandler = new VerbHandler(); + + @Override + public String toString() + { + return "Activate{" + + "transferId=" + transferId + + ", planId=" + planId + + ", activationId=" + activationId + + ", dryRun=" + dryRun + + '}'; + } +} diff --git a/src/java/org/apache/cassandra/replication/UnreconciledTransfers.java b/src/java/org/apache/cassandra/replication/UnreconciledTransfers.java new file mode 100644 index 000000000000..aee6ded537fd --- /dev/null +++ b/src/java/org/apache/cassandra/replication/UnreconciledTransfers.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.TableId; + +/** + * See {@link UnreconciledMutations}. + * + * For now, all reads intersect with all transfers, but we could be more discerning and only return transfers ƒor the + * specific table and range. Transfers should be very rare. + */ +public class UnreconciledTransfers +{ + private static final Logger logger = LoggerFactory.getLogger(UnreconciledTransfers.class); + + private final SortedSet offsets = new TreeSet<>(); + + public void activated(int offset) + { + logger.trace("Activating {}", offset); + offsets.add(offset); + } + + public boolean remove(int offset) + { + logger.trace("Removing {}", offset); + return offsets.remove(offset); + } + + void collect(Token token, TableId tableId, Offsets.OffsetReciever into) + { + logger.trace("Collecting offsets {}", offsets); + offsets.forEach(into::add); + } + + void collect(AbstractBounds range, TableId tableId, Offsets.OffsetReciever into) + { + logger.trace("Collecting offsets {}", offsets); + offsets.forEach(into::add); + } +} diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 17851a24e31c..a2de67c47787 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -23,6 +23,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.agrona.collections.IntArrayList; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.FailureDetector; @@ -126,6 +129,7 @@ public void acceptSyncAck(InetAddressAndPort from, TrackedRead.Id id) public void acceptMutation(TrackedRead.Id id, ShortMutationId mutationId) { + logger.debug("Accepted mutation {} {}", id, mutationId); Coordinator reconcile = coordinators.get(id); if (reconcile != null && reconcile.acceptMutation(mutationId)) // could be already timed out / expired coordinators.remove(id); @@ -155,6 +159,8 @@ public int expire(long nanoTime) private static final class Coordinator { + private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); + private static final AtomicLongFieldUpdater remainingUpdater = AtomicLongFieldUpdater.newUpdater(Coordinator.class, "remaining"); private volatile long remaining; // three values packed into one atomic long @@ -329,6 +335,7 @@ private long updateRemaining(int mutationsDelta, int summariesDelta, int syncAck int mutations = remainingMutations(prev) + mutationsDelta; int summaries = remainingSummaries(prev) + summariesDelta; int syncAcks = remainingSyncAcks(prev) + syncAcksDelta; + logger.trace("[Read {}] Still waiting for {} mutations, {} summaries, {} syncAcks", id, mutations, summaries, syncAcks); next = remaining(mutations, summaries, syncAcks); } while (!remainingUpdater.compareAndSet(this, prev, next)); return next; @@ -355,6 +362,8 @@ boolean isPurgeable(long nanoTime) } } + private static final Logger logger = LoggerFactory.getLogger(ReadReconciliations.class); + /** * @param node node id of the remote replica from which we got the summary * @param offsets offsets that we need to pull - from the coordinator, if alive, or from the @@ -375,6 +384,7 @@ private static void pull(int node, Offsets offsets, IncomingMutations.Callback c if (!toPull.isEmpty()) { PullMutationsRequest pull = new PullMutationsRequest(Offsets.Immutable.copy(toPull)); + logger.debug("Pulling {} from {}", pull, pullFrom); MessagingService.instance().send(Message.out(Verb.PULL_MUTATIONS_REQ, pull), pullFrom); } } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java index 4f8bca8f25b4..dc5fad51830b 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service.reads.tracked; import java.util.ArrayList; +import java.util.Iterator; import java.util.Map; import com.google.common.annotations.VisibleForTesting; @@ -128,6 +129,9 @@ private void beginReadInternal( // any mutations that may have arrived during initial read execution. secondarySummary = command.createMutationSummary(true); processDelta(read, initialSummary, secondarySummary); + + // Include in summary any transfer IDs that were present for the read + secondarySummary = merge(controller.getActivationIds(), secondarySummary); } catch (Exception e) { @@ -151,6 +155,30 @@ private void beginReadInternal( ReadReconciliations.instance.acceptLocalSummary(readId, secondarySummary, summaryNodes); } + private static MutationSummary merge(Iterator activationIds, MutationSummary summary) + { + if (activationIds == null || !activationIds.hasNext()) + return summary; + + MutationSummary.Builder builder = new MutationSummary.Builder(summary.tableId()); + + // TODO: Make faster without a copy + for (int i = 0; i < summary.size(); i++) + { + MutationSummary.CoordinatorSummary coordinatorSummary = summary.get(i); + MutationSummary.CoordinatorSummary.Builder coordinatorSummaryBuilder = builder.builderForLog(coordinatorSummary.logId()); + coordinatorSummaryBuilder.unreconciled.addAll(coordinatorSummary.unreconciled); + coordinatorSummaryBuilder.reconciled.addAll(coordinatorSummary.reconciled); + } + + while (activationIds.hasNext()) + { + ShortMutationId id = activationIds.next(); + builder.builderForLog(id).unreconciled.add(id.offset()); + } + return builder.build(); + } + @VisibleForTesting public static void processDelta(PartialTrackedRead read, MutationSummary initialSummary, MutationSummary secondarySummary) { diff --git a/src/java/org/apache/cassandra/streaming/StreamOperation.java b/src/java/org/apache/cassandra/streaming/StreamOperation.java index b1c5908f7fe8..5b29674d273b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOperation.java +++ b/src/java/org/apache/cassandra/streaming/StreamOperation.java @@ -26,7 +26,8 @@ public enum StreamOperation BOOTSTRAP("Bootstrap", false, true, false), REBUILD("Rebuild", false, true, false), BULK_LOAD("Bulk Load", true, false, false), - REPAIR("Repair", true, false, true); + REPAIR("Repair", true, false, true), + IMPORT("Tracked Import", false, false, false); private final String description; private final boolean requiresViewBuild; diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 93b864d79b07..b6ed244b5663 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -67,8 +67,7 @@ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean connectSequentially, TimeUUID pendingRepair, PreviewKind previewKind) { this.streamOperation = streamOperation; - this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, streamingFactory(), - false, connectSequentially, pendingRepair, previewKind); + this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, streamingFactory(), false, connectSequentially, pendingRepair, previewKind); } /** diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java new file mode 100644 index 000000000000..04cad5cd7d99 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tracking; + +import java.nio.file.Files; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.junit.Ignore; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceInitializer; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.distributed.shared.Uninterruptibles; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.TransferActivation; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.assertj.core.api.Assertions; + +import static net.bytebuddy.matcher.ElementMatchers.named; + +/** + * For now, tracked import with a replica down is not supported. The intention is to support this scenario by allowing + * users to provide a {@link ConsistencyLevel} for tracked import operations, where the import will complete if + * sufficient replicas acknowledge the transfer and activate it. + */ +public class BulkTransfersTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(BulkTransfersTest.class); + private static final String TABLE = "tbl"; + + @Test + public void importHappyPath() throws Throwable + { + Hooks hooks = new Hooks() { + @Override + public void afterImport(Cluster cluster) + { + // Sleep for a while to make sure import completes + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + + for (IInvokableInstance instance : cluster) + { + logger.info("Checking propagation of imported SSTable to {}", instance.config().num()); + // SinglePartition + PartitionRange + { + Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE + " WHERE k = 1")); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + } + { + Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE)); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + } + } + } + }; + testTrackedImport(hooks); + } + + @Test + @Ignore + public void importReplicaDown() throws Throwable + { + Hooks hooks = new Hooks() { + @Override + public void beforeImport(Cluster cluster) + { + try + { + cluster.get(3).shutdown().get(); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void afterImport(Cluster cluster) + { + cluster.get(3).startup(); + + // Sleep for a while to make sure import completes + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + + for (IInvokableInstance instance : cluster) + { + logger.info("Checking propagation of imported SSTable to {}", instance.config().num()); + // SinglePartition + PartitionRange + { + Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE + " WHERE k = 1")); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + } + { + Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE)); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + } + } + } + }; + testTrackedImport(hooks); + } + + @Test + public void importMissedActivation() throws Throwable + { + Hooks hooks = new Hooks() { + @Override + public IInstanceInitializer getInstanceInitializer() + { + return ByteBuddyInjections.SkipActivation.install(2); + } + + @Override + public void afterImport(Cluster cluster) + { + cluster.get(1).runOnInstance(() -> { + DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(1)); + TableId tableId = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE).metadata().id; + MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(key, tableId, false); + Assertions.assertThat(summary).satisfies(s -> { + assert s.reconciledIds() == 0; + assert s.unreconciledIds() == 1; + }); + }); + + cluster.get(2).runOnInstance(() -> { + DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(1)); + TableId tableId = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE).metadata().id; + MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(key, tableId, false); + Assertions.assertThat(summary).satisfies(s -> { + assert s.reconciledIds() == 0; + assert s.unreconciledIds() == 0; + }); + }); + + cluster.forEach(() -> ByteBuddyInjections.SkipActivation.skip = false); + + logger.debug("Checking read at ALL"); + + // Use coordinated query rather than executeInternal to confirm read reconciliation triggers activation + String cql = "SELECT * FROM %s." + TABLE + " WHERE k = 1"; + Object[][] rows = cluster.get(1).coordinator().execute(withKeyspace(cql), ConsistencyLevel.ALL); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + + // Confirm instance2 gets activated + rows = cluster.get(2).executeInternal(withKeyspace(cql)); + AssertUtils.assertRows(rows, AssertUtils.row(1, 1)); + } + }; + testTrackedImport(hooks); + } + + public static class ByteBuddyInjections + { + // Only skips direct transfer activation, not activation as part of read reconciliation + public static class SkipActivation + { + public static volatile boolean skip = true; + + public static IInstanceInitializer install(int...nodes) + { + return (ClassLoader cl, ThreadGroup tg, int num, int generation) -> { + for (int node : nodes) + if (node == num) + new ByteBuddy().rebase(TransferActivation.VerbHandler.class) + .method(named("doVerb")) + .intercept(MethodDelegation.to(ByteBuddyInjections.SkipActivation.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + }; + } + + @SuppressWarnings("unused") + public static void doVerb(Message msg, @SuperCall Callable zuper) + { + if (skip && !msg.payload.dryRun) + { + logger.info("Skipping activation for test {}", msg.payload); + return; + } + + logger.info("Test running activation as usual {}", msg.payload); + + try + { + zuper.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } + } + + @Test + public void importBounceAfterPending() throws Throwable + { + Hooks hooks = new Hooks() { + @Override + public IInstanceInitializer getInstanceInitializer() + { + // No activation, transfer stays pending everywhere + return ByteBuddyInjections.SkipActivation.install(1, 2, 3); + } + + @Override + public void afterImport(Cluster cluster) + { + String cql = "SELECT * FROM %s." + TABLE + " WHERE k = 1"; + Object[][] EMPTY = new Object[0][0]; + + for (IInvokableInstance instance : cluster) + { + Object[][] rows = instance.coordinator().execute(withKeyspace(cql), ConsistencyLevel.ALL); + AssertUtils.assertRows(rows, EMPTY); + } + + // When an import fails, bounce must not move the pending SSTables into the live set + bounce(cluster); + + for (IInvokableInstance instance : cluster) + { + Object[][] rows = instance.coordinator().execute(withKeyspace(cql), ConsistencyLevel.ALL); + AssertUtils.assertRows(rows, EMPTY); + } + } + }; + testTrackedImport(hooks); + } + + private interface Hooks + { + default IInstanceInitializer getInstanceInitializer() + { + return (classLoader, threadGroup, num, generation) -> {}; + } + + default void beforeImport(Cluster cluster) {}; + void afterImport(Cluster cluster); + } + + private void testTrackedImport(Hooks hooks) throws Throwable + { + try (Cluster cluster = Cluster.build(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK) + .with(Feature.GOSSIP) + .set("mutation_tracking_enabled", "true") + .set("write_request_timeout", "1000ms")) + .withInstanceInitializer(hooks.getInstanceInitializer()) + .start()) + { + cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked';")); + String KEYSPACE_TABLE = String.format("%s.%s", KEYSPACE, TABLE); + String schema = String.format(withKeyspace("CREATE TABLE %s." + TABLE + " (k int primary key, v int);")); + cluster.schemaChange(schema); + + // Hack: need to bounce for KeyspaceShards to be created for new table, schema changes not yet supported + bounce(cluster); + + // Needs to run outside of instance executor because creates schema + String file = Files.createTempDirectory(MutationTrackingTest.class.getSimpleName()).toString(); + + try (CQLSSTableWriter writer = CQLSSTableWriter.builder() + .forTable(schema) + .inDirectory(file) + .using("INSERT INTO " + KEYSPACE_TABLE + " (k, v) " + "VALUES (?, ?)") + .build()) + { + writer.addRow(1, 1); + } + + for (IInvokableInstance instance : cluster) + { + logger.info("Checking instance {} empty before import", instance.config().num()); + Object[][] rows = instance.executeInternal(withKeyspace("SELECT * FROM %s." + TABLE)); + AssertUtils.assertRows(rows); // empty + } + + hooks.beforeImport(cluster); + + cluster.get(1).runOnInstance(() -> { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, TABLE); + Set paths = Set.of(file); + logger.info("Importing SSTables {}", paths); + cfs.importNewSSTables(paths, true, true, true, true, true, true, true); + }); + + hooks.afterImport(cluster); + } + } + + private static void bounce(Cluster cluster) + { + cluster.forEach(instance -> { + try + { + instance.shutdown().get(); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + instance.startup(); + }); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index 8e6a8e108fa7..c87dfcd3ccde 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -26,8 +26,6 @@ import org.apache.cassandra.replication.Offsets; import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -50,8 +48,6 @@ public class MutationTrackingTest extends TestBaseImpl { - private static final Logger logger = LoggerFactory.getLogger(MutationTrackingTest.class); - @Test public void testBasicWritePath() throws Throwable {