diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 2bcf423c7ef3..f482fedbcb63 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -232,6 +232,7 @@ public enum CassandraRelevantProperties
DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
/** In_JVM dtest property indicating that the test should use "latest" configuration */
DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"),
+ ENABLE_CURSOR_COMPACTION("cassandra.enable_cursor_compaction", "true"),
ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
/**
* Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index e931b5a9d9dc..f611ea311a22 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -49,6 +49,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTOCOMPACTION_ON_STARTUP_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_AVAILABLE_PROCESSORS;
+import static org.apache.cassandra.config.CassandraRelevantProperties.ENABLE_CURSOR_COMPACTION;
import static org.apache.cassandra.config.CassandraRelevantProperties.FILE_CACHE_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE;
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES;
@@ -644,6 +645,8 @@ public static class SSTableConfig
@Replaces(oldName = "enable_drop_compact_storage", converter = Converters.IDENTITY, deprecated = true)
public volatile boolean drop_compact_storage_enabled = false;
+ public boolean enable_cursor_compaction = ENABLE_CURSOR_COMPACTION.getBoolean();
+
public volatile boolean use_statements_enabled = true;
/**
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ec76193e1046..a856ca02652f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -4642,6 +4642,17 @@ public static void setTransientReplicationEnabledUnsafe(boolean enabled)
conf.transient_replication_enabled = enabled;
}
+ public static boolean enableCursorCompaction()
+ {
+ return conf.enable_cursor_compaction;
+ }
+
+ @VisibleForTesting
+ public static void setEnableCursorCompaction(boolean enable_cursor_compaction)
+ {
+ conf.enable_cursor_compaction = enable_cursor_compaction;
+ }
+
public static boolean enableDropCompactStorage()
{
return conf.drop_compact_storage_enabled;
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 3e42e4a361b7..efd59291a5f3 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -133,16 +133,16 @@ public String toString(TableMetadata metadata)
/**
* Serializer for Clustering object.
*
- * Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
+ * Because every clustering in a given table must have the same size (and that size cannot actually change once the table
* has been defined), we don't record that size.
*/
public static class Serializer
{
- public void serialize(Clustering> clustering, DataOutputPlus out, int version, List> types) throws IOException
+ public void serialize(Clustering> clustering, DataOutputPlus out, int unused, List> types) throws IOException
{
assert clustering != STATIC_CLUSTERING : "We should never serialize a static clustering";
assert clustering.size() == types.size() : "Invalid clustering for the table: " + clustering;
- ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
+ ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, unused, types);
}
public ByteBuffer serialize(Clustering> clustering, int version, List> types)
@@ -158,9 +158,9 @@ public ByteBuffer serialize(Clustering> clustering, int version, List clustering, int version, List> types)
+ public long serializedSize(Clustering> clustering, int unused, List> types)
{
- return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
+ return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, unused, types);
}
public void skip(DataInputPlus in, int version, List> types) throws IOException
diff --git a/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java b/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
index 14a9158681e4..2dfac87b3110 100644
--- a/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
+++ b/src/java/org/apache/cassandra/db/ClusteringBoundOrBoundary.java
@@ -100,11 +100,11 @@ default String toString(ClusteringComparator comparator)
public static class Serializer
{
- public void serialize(ClusteringBoundOrBoundary bound, DataOutputPlus out, int version, List> types) throws IOException
+ public void serialize(ClusteringBoundOrBoundary bound, DataOutputPlus out, int unused, List> types) throws IOException
{
out.writeByte(bound.kind().ordinal());
out.writeShort(bound.size());
- ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
+ ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, unused, types);
}
public long serializedSize(ClusteringBoundOrBoundary bound, int version, List> types)
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index 2949130707fe..3ae31e30a056 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -26,14 +26,17 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.io.sstable.ClusteringDescriptor;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.serializers.MarshalException;
-
import org.apache.cassandra.io.sstable.IndexInfo;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.vint.VIntCoding;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
@@ -156,6 +159,126 @@ public int compare(ClusteringPrefix c1, ClusteringPrefix c2)
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
}
+ public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
+ {
+ final int c1Size = c1.clusteringColumnsBound();
+ final int c2Size = c2.clusteringColumnsBound();
+ final int minColumns = Math.min(c1Size, c2Size);
+
+ final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
+ if (cmp != 0)
+ return cmp;
+
+ final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
+ final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
+ if (c1Size == c2Size)
+ {
+ return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
+ }
+
+ return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
+ }
+
+ public static int compare(AbstractType>[] types, ByteBuffer c1, ByteBuffer c2) {
+ return compare(types, c1, c2, types.length);
+ }
+
+ private static int compare(AbstractType>[] types, ByteBuffer c1, ByteBuffer c2, int size)
+ {
+ long clusteringBlock1 = 0;
+ long clusteringBlock2 = 0;
+ final int position1 = c1.position();
+ final int position2 = c2.position();
+ final int limit1 = c1.limit();
+ final int limit2 = c2.limit();
+ try
+ {
+ int ofst1 = position1;
+ int ofst2 = position2;
+ for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
+ {
+ if (clusteringIndex % 32 == 0)
+ {
+ clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1);
+ ofst1 += VIntCoding.computeUnsignedVIntSize(clusteringBlock1);
+ clusteringBlock2 = VIntCoding.getUnsignedVInt(c2, ofst2, limit2);
+ ofst2 += VIntCoding.computeUnsignedVIntSize(clusteringBlock2);
+ }
+
+ AbstractType> type = types[clusteringIndex];
+
+ boolean v1Present = (clusteringBlock1 & 0b11) == 0;
+ boolean v2Present = (clusteringBlock2 & 0b11) == 0;
+
+ if (v1Present && v2Present)
+ {
+ boolean isByteOrderComparable = type.isByteOrderComparable;
+ int vlen1,vlen2;
+ if (type.isValueLengthFixed())
+ {
+ vlen1 = vlen2 = type.valueLengthIfFixed();
+ }
+ else
+ {
+ vlen1 = VIntCoding.getUnsignedVInt32(c1, ofst1, limit1);
+ ofst1 += VIntCoding.computeUnsignedVIntSize(vlen1);
+ vlen2 = VIntCoding.getUnsignedVInt32(c2, ofst2, limit2);
+ ofst2 += VIntCoding.computeUnsignedVIntSize(vlen2);
+ }
+ int v1Limit = ofst1 + vlen1;
+ if (v1Limit > limit1)
+ throw new IllegalArgumentException("Value limit exceeds buffer limit.");
+ c1.position(ofst1).limit(v1Limit);
+ int v2Limit = ofst2 + vlen2;
+ if (v2Limit > limit2)
+ throw new IllegalArgumentException("Value limit exceeds buffer limit.");
+ c2.position(ofst2).limit(v2Limit);
+ int cmp = isByteOrderComparable ?
+ ByteBufferUtil.compareUnsigned(c1, c2) :
+ type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
+ if (cmp != 0)
+ return cmp;
+ c1.limit(limit1);
+ c2.limit(limit2);
+ ofst1 += vlen1;
+ ofst2 += vlen2;
+ }
+ // present > not present
+ else if (v1Present && !v2Present)
+ {
+ return 1;
+ }
+ else if (!v1Present && v2Present)
+ {
+ return -1;
+ }
+ else
+ {
+ boolean v1Null = (clusteringBlock1 & 0b10) == 0;
+ boolean v2Null = (clusteringBlock2 & 0b10) == 0;
+ // empty > null
+ if (!v1Null && v2Null)
+ {
+ return 1;
+ }
+ else if (v1Null && !v2Null)
+ {
+ return -1;
+ }
+ // empty == empty, continue...
+ }
+ clusteringBlock1 = clusteringBlock1 >>> 2;
+ clusteringBlock2 = clusteringBlock2 >>> 2;
+ }
+ }
+ finally
+ {
+ c1.position(position1).limit(limit1);
+ c2.position(position2).limit(limit2);
+ }
+ return 0;
+ }
+
public int compare(Clustering c1, Clustering c2)
{
return compare(c1, c2, size());
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index c7687bb80f3e..88a400c21340 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -425,18 +425,18 @@ public default String clusteringString(List> types)
public static class Serializer
{
- public void serialize(ClusteringPrefix> clustering, DataOutputPlus out, int version, List> types) throws IOException
+ public void serialize(ClusteringPrefix> clustering, DataOutputPlus out, int unused, List> types) throws IOException
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
{
out.writeByte(clustering.kind().ordinal());
- Clustering.serializer.serialize((Clustering>)clustering, out, version, types);
+ Clustering.serializer.serialize((Clustering>)clustering, out, unused, types);
}
else
{
- ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary>)clustering, out, version, types);
+ ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary>)clustering, out, unused, types);
}
}
@@ -462,17 +462,17 @@ public ClusteringPrefix deserialize(DataInputPlus in, int version, List<
return ClusteringBoundOrBoundary.serializer.deserializeValues(in, kind, version, types);
}
- public long serializedSize(ClusteringPrefix> clustering, int version, List> types)
+ public long serializedSize(ClusteringPrefix> clustering, int unused, List> types)
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
- return 1 + Clustering.serializer.serializedSize((Clustering>)clustering, version, types);
+ return 1 + Clustering.serializer.serializedSize((Clustering>)clustering, unused, types);
else
- return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary>)clustering, version, types);
+ return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary>)clustering, unused, types);
}
- void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int version, List> types) throws IOException
+ void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPlus out, int unused, List> types) throws IOException
{
int offset = 0;
int clusteringSize = clustering.size();
@@ -496,7 +496,7 @@ void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPl
}
}
- long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int version, List> types)
+ long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int unused, List> types)
{
long result = 0;
int offset = 0;
@@ -519,7 +519,7 @@ long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int ver
return result;
}
- byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List> types) throws IOException
+ public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List> types) throws IOException
{
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
assert size > 0;
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0d6cd2eb9be2..33a1554401df 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2323,6 +2323,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
return partitionKeySetIgnoreGcGrace.contains(dk);
}
+ public boolean shouldIgnoreGcGraceForAnyKey()
+ {
+ return !partitionKeySetIgnoreGcGrace.isEmpty();
+ }
+
public static Iterable all()
{
List> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 3e014d3254f7..79437ef5b18c 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -33,6 +33,8 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -529,6 +531,7 @@ public void serializeSubset(Collection columns, Columns superset
int supersetCount = superset.size();
if (columnCount == supersetCount)
{
+ /** This is prevented by caller for row serialization: {@link UnfilteredSerializer#serializeRowBody(Row, int, SerializationHelper, DataOutputPlus)}*/
out.writeUnsignedVInt32(0);
}
else if (supersetCount < 64)
diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java
index 309f764a9619..36612040d000 100644
--- a/src/java/org/apache/cassandra/db/DecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/DecoratedKey.java
@@ -114,7 +114,7 @@ public ByteSource asComparableBytes(Version version)
// The OSS50 version avoids this by adding a terminator.
return ByteSource.withTerminatorMaybeLegacy(version,
ByteSource.END_OF_STREAM,
- token.asComparableBytes(version),
+ getToken().asComparableBytes(version),
keyComparableBytes(version));
}
@@ -127,7 +127,7 @@ public ByteComparable asComparableBound(boolean before)
return ByteSource.withTerminator(
before ? ByteSource.LT_NEXT_COMPONENT : ByteSource.GT_NEXT_COMPONENT,
- token.asComparableBytes(version),
+ getToken().asComparableBytes(version),
keyComparableBytes(version));
};
}
diff --git a/src/java/org/apache/cassandra/db/DeletionPurger.java b/src/java/org/apache/cassandra/db/DeletionPurger.java
index 795817fd3deb..2c3f69a7cbc4 100644
--- a/src/java/org/apache/cassandra/db/DeletionPurger.java
+++ b/src/java/org/apache/cassandra/db/DeletionPurger.java
@@ -19,16 +19,16 @@
public interface DeletionPurger
{
- public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true;
+ DeletionPurger PURGE_ALL = (ts, ldt) -> true;
- public boolean shouldPurge(long timestamp, long localDeletionTime);
+ boolean shouldPurge(long timestamp, long localDeletionTime);
- public default boolean shouldPurge(DeletionTime dt)
+ default boolean shouldPurge(DeletionTime dt)
{
return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), dt.localDeletionTime());
}
- public default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
+ default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
{
return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), liveness.localExpirationTime());
}
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 5970fbb042a4..0e3147796a43 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -38,18 +38,20 @@
*/
public class DeletionTime implements Comparable, IMeasurableMemory
{
+ private static final int LOCAL_DELETION_TIME_LIVE = Cell.deletionTimeLongToUnsignedInteger(Long.MAX_VALUE);
+ private static final long MARKED_FOR_DELETE_AT_LIVE = Long.MIN_VALUE;
public static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
/**
* A special DeletionTime that signifies that there is no top-level (row) tombstone.
*/
- public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Long.MAX_VALUE);
+ public static final DeletionTime LIVE = new DeletionTime(MARKED_FOR_DELETE_AT_LIVE, LOCAL_DELETION_TIME_LIVE);
private static final Serializer serializer = new Serializer();
private static final Serializer legacySerializer = new LegacySerializer();
- private final long markedForDeleteAt;
- final int localDeletionTimeUnsignedInteger;
+ private long markedForDeleteAt;
+ private int localDeletionTimeUnsignedInteger;
public static DeletionTime build(long markedForDeleteAt, long localDeletionTime)
{
@@ -59,6 +61,12 @@ public static DeletionTime build(long markedForDeleteAt, long localDeletionTime)
: new DeletionTime(markedForDeleteAt, localDeletionTime);
}
+ public static DeletionTime copy(DeletionTime original)
+ {
+ // Negative ldts can only be a result of a corruption or when scrubbing legacy sstables with overflown int ldts
+ return new DeletionTime(original.markedForDeleteAt, original.localDeletionTimeUnsignedInteger);
+ }
+
// Do not use. This is a perf optimization where some data structures known to hold valid uints are allowed to use it.
// You should use 'build' instead to not workaround validations, corruption detections, etc
static DeletionTime buildUnsafeWithUnsignedInteger(long markedForDeleteAt, int localDeletionTimeUnsignedInteger)
@@ -79,6 +87,27 @@ private DeletionTime(long markedForDeleteAt, int localDeletionTimeUnsignedIntege
this.localDeletionTimeUnsignedInteger = localDeletionTimeUnsignedInteger;
}
+ /**
+ * TODO: Seems like a bad idea to make this public
+ */
+ public void resetLive()
+ {
+ markedForDeleteAt = MARKED_FOR_DELETE_AT_LIVE;
+ localDeletionTimeUnsignedInteger = LOCAL_DELETION_TIME_LIVE;
+ }
+
+ void reset(long markedForDeleteAt, int localDeletionTimeUnsignedInteger)
+ {
+ this.markedForDeleteAt = markedForDeleteAt;
+ this.localDeletionTimeUnsignedInteger = localDeletionTimeUnsignedInteger;
+ }
+
+ public void reset(long markedForDeleteAt, long localDeletionTime)
+ {
+ this.markedForDeleteAt = markedForDeleteAt;
+ this.localDeletionTimeUnsignedInteger = Cell.deletionTimeLongToUnsignedInteger(localDeletionTime);
+ }
+
/**
* A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
* data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
@@ -98,6 +127,11 @@ public long localDeletionTime()
return Cell.deletionTimeUnsignedIntegerToLong(localDeletionTimeUnsignedInteger);
}
+ public int localDeletionTimeUnsignedInteger()
+ {
+ return localDeletionTimeUnsignedInteger;
+ }
+
/**
* Returns whether this DeletionTime is live, that is deletes no columns.
*/
@@ -143,7 +177,7 @@ public final int hashCode()
@Override
public String toString()
{
- return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt(), localDeletionTime());
+ return isLive() ? "LIVE" : String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt(), localDeletionTime());
}
public int compareTo(DeletionTime dt)
@@ -155,6 +189,10 @@ else if (markedForDeleteAt() > dt.markedForDeleteAt())
else return CassandraUInt.compare(localDeletionTimeUnsignedInteger, dt.localDeletionTimeUnsignedInteger);
}
+ /**
+ * supersedes: supplants, replaces, in this case: "is more recent"
+ * @return true if dt is deleted BEFORE this (markedForDeleteAt > dt.markedForDeleteAt || (markedForDeleteAt == dt.markedForDeleteAt && localDeletionTime > dt.localDeletionTime))
+ */
public boolean supersedes(DeletionTime dt)
{
return markedForDeleteAt() > dt.markedForDeleteAt() || (markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > dt.localDeletionTime());
@@ -196,6 +234,11 @@ public static Serializer getSerializer(Version version)
return legacySerializer;
}
+ public void reset(DeletionTime deletionTime)
+ {
+ reset(deletionTime.markedForDeleteAt, deletionTime.localDeletionTimeUnsignedInteger);
+ }
+
/* Serializer for Usigned Integer ldt
*
* ldt is encoded as a uint in seconds since unix epoch, it can go up o 2106-02-07T06:28:13+00:00 only.
@@ -209,7 +252,7 @@ public static class Serializer implements ISerializer
public void serialize(DeletionTime delTime, DataOutputPlus out) throws IOException
{
- if (delTime == LIVE)
+ if (delTime == LIVE || delTime.isLive())
out.writeByte(IS_LIVE_DELETION);
else
{
@@ -242,6 +285,29 @@ public DeletionTime deserialize(DataInputPlus in) throws IOException
}
}
+ public void deserialize(DataInputPlus in, DeletionTime reuse) throws IOException
+ {
+ int flags = in.readByte();
+ if ((flags & IS_LIVE_DELETION) != 0)
+ {
+ if ((flags & 0xFF) != IS_LIVE_DELETION)
+ throw new IOException("Corrupted sstable. Invalid flags found deserializing DeletionTime: " + Integer.toBinaryString(flags & 0xFF));
+ reuse.resetLive();
+ }
+ else
+ {
+ // Read the remaining 7 bytes
+ int bytes1 = in.readByte();
+ int bytes2 = in.readShort();
+ int bytes4 = in.readInt();
+
+ long mfda = readBytesToMFDA(flags, bytes1, bytes2, bytes4);
+ int localDeletionTimeUnsignedInteger = in.readInt();
+
+ reuse.reset(mfda, localDeletionTimeUnsignedInteger);
+ }
+ }
+
public DeletionTime deserialize(ByteBuffer buf, int offset) throws IOException
{
int flags = buf.get(offset);
@@ -315,6 +381,20 @@ public DeletionTime deserialize(DataInputPlus in) throws IOException
: DeletionTime.build(mfda, ldt);
}
+ public void deserialize(DataInputPlus in, DeletionTime reuse) throws IOException
+ {
+ int ldt = in.readInt();
+ long mfda = in.readLong();
+ if (mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE) {
+ reuse.resetLive();
+ }
+ else
+ {
+ reuse.reset(mfda, ldt);
+ }
+
+ }
+
public DeletionTime deserialize(ByteBuffer buf, int offset)
{
int ldt = buf.getInt(offset);
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index 168473add552..ce30216936c7 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -53,7 +53,7 @@ public class LivenessInfo implements IMeasurableMemory
public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
- protected final long timestamp;
+ protected long timestamp;
protected LivenessInfo(long timestamp)
{
@@ -107,7 +107,7 @@ public static LivenessInfo withExpirationTime(long timestamp, int ttl, long loca
*
* @return whether this liveness info is empty or not.
*/
- public boolean isEmpty()
+ public final boolean isEmpty()
{
return timestamp == NO_TIMESTAMP;
}
@@ -117,7 +117,7 @@ public boolean isEmpty()
*
* @return the liveness info timestamp (or {@link #NO_TIMESTAMP} if the info is empty).
*/
- public long timestamp()
+ public final long timestamp()
{
return timestamp;
}
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 963985788a9e..cf85d728cfb3 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -145,7 +145,7 @@ public void add(RangeTombstone tombstone)
add(tombstone.deletedSlice().start(),
tombstone.deletedSlice().end(),
tombstone.deletionTime().markedForDeleteAt(),
- tombstone.deletionTime().localDeletionTimeUnsignedInteger);
+ tombstone.deletionTime().localDeletionTimeUnsignedInteger());
}
/**
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java
new file mode 100644
index 000000000000..baae0e3d2832
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionPipeline.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.TimeUUID;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+abstract class AbstractCompactionPipeline extends CompactionInfo.Holder implements AutoCloseable {
+ static AbstractCompactionPipeline create(CompactionTask task, OperationType type, AbstractCompactionStrategy.ScannerList scanners, AbstractCompactionController controller, long nowInSec, TimeUUID compactionId)
+ {
+ if (DatabaseDescriptor.enableCursorCompaction()) {
+ if (CompactionCursor.isSupported(scanners, controller))
+ {
+ return new CursorCompactionPipeline(task, type, scanners, controller, nowInSec, compactionId);
+ }
+ }
+ return new IteratorCompactionPipeline(task, type, scanners, controller, nowInSec, compactionId);
+ }
+
+ abstract boolean processNextPartitionKey() throws IOException;
+
+ public abstract long[] getMergedRowCounts();
+
+ public abstract long getTotalSourceCQLRows();
+
+ public abstract long getTotalKeysWritten();
+
+ public abstract long getTotalBytesScanned();
+
+ public abstract AutoCloseable openWriterResource(ColumnFamilyStore cfs,
+ Directories directories,
+ ILifecycleTransaction transaction,
+ Set nonExpiredSSTables);
+
+ @Override
+ public abstract void close() throws IOException;
+
+ public abstract Collection finishWriting();
+
+ public abstract long estimatedKeys();
+
+ public abstract void stop();
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionCursor.java b/src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
new file mode 100644
index 000000000000..375813a3e8ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
@@ -0,0 +1,1599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.UnmodifiableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.ElementDescriptor;
+import org.apache.cassandra.io.sstable.PartitionDescriptor;
+import org.apache.cassandra.io.sstable.ReusableLivenessInfo;
+import org.apache.cassandra.io.sstable.SSTableCursorReader;
+import org.apache.cassandra.io.sstable.SSTableCursorWriter;
+import org.apache.cassandra.io.util.ReusableDecoratedKey;
+import org.apache.cassandra.io.util.ReusableLongToken;
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Cells;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.db.transform.DuplicateRowChecker;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.SortedTableWriter;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.COMPARE;
+import static org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.LEFT;
+import static org.apache.cassandra.db.compaction.CompactionCursor.CellRosolution.RIGHT;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_END;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_HEADER_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.CELL_VALUE_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.DONE;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.ELEMENT_END;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_END;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.PARTITION_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.ROW_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.STATIC_ROW_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.TOMBSTONE_START;
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.State.isState;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
+
+/**
+ * Merge multiple iterators over the content of sstable into a "compacted" iterator.
+ *
+ * On top of the actual merging the source iterators, this class:
+ *
+ *
purge gc-able tombstones if possible (see PurgeFunction below).
+ *
invalidate cached partitions that are empty post-compaction. This avoids keeping partitions with
+ * only purgable tombstones in the row cache.
+ *
keep tracks of the compaction progress.
+ *
+ * This compaction implementation does not support 2ndary indexes or trie indexes at this time.
+ *
+* This compaction implmentation avoids garbage creation per partition/row/cell by utilizing reader/writer code
+* which supports reusable copies of sstable entry components. The implementation consolidates and duplicates code
+ * from various classes to support the use of these reusable structures.
+ *
+ */
+public class CompactionCursor extends CompactionInfo.Holder
+{
+ public static boolean isSupported(AbstractCompactionStrategy.ScannerList scanners, AbstractCompactionController controller)
+ {
+ TableMetadata metadata = controller.cfs.metadata();
+ if (metadata.getTableDirectoryName().contains("system") ||
+ !(metadata.partitioner instanceof Murmur3Partitioner) ||
+ metadata.indexes.size() != 0)
+ {
+ return false;
+ }
+
+ for (ColumnMetadata column : metadata.columns())
+ {
+ if (column.isComplex())
+ {
+ return false;
+ }
+ else if (column.isCounterColumn())
+ {
+ return false;
+ }
+ }
+
+ for (ISSTableScanner scanner : scanners.scanners)
+ {
+ // TODO: implement partial range reader
+ if (!scanner.isFullRange())
+ return false;
+
+ for (SSTableReader reader : scanner.getBackingSSTables()) {
+ Version version = reader.descriptor.version;
+ if (!(version.format instanceof BigFormat))
+ return false;
+ if (!version.isLatestVersion())
+ return false;
+ }
+ }
+
+ // TODO: Implement CompactionIterator.GarbageSkipper like functionality
+ if (controller.tombstoneOption != CompactionParams.TombstoneOption.NONE)
+ return false;
+
+ return true;
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CompactionCursor.class.getName());
+
+ private final OperationType type;
+ private final AbstractCompactionController controller;
+ private final ActiveCompactionsTracker activeCompactions;
+ private final ImmutableSet sstables;
+ private final long nowInSec;
+ private final TimeUUID compactionId;
+ private final long totalInputBytes;
+ private final StatefulCursor[] sstableCursors;
+ private final boolean[] sstableCursorsEqualsNext;
+ private final boolean hasStaticColumns;
+ private final boolean enforceStrictLiveness;
+
+ // Keep targetDirectory for compactions, needed for `nodetool compactionstats`
+ private volatile String targetDirectory;
+
+ private SSTableCursorWriter ssTableCursorWriter;
+ private boolean finished = false;
+
+ /*
+ * counters for merged partitions/rows/cells.
+ * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
+ * index 1 is counter for 2 rows merged, and so on.
+ */
+ private final long[] partitionMergeCounters;
+ private final long[] staticRowMergeCounters;
+ private final long[] rowMergeCounters;
+ private final long[] rangeTombstonesMergeCounters;
+ private final long[] cellMergeCounters;
+
+ // Progress accounting
+ private long totalBytesRead = 0;
+ private long totalSourceCQLRows;
+ private long totalDataBytesWritten;
+
+ // state
+ final Purger purger;
+
+ private ReusableDecoratedKey prevKey = null;
+ // Partition state. Writes can be delayed if the deletion is purged, or live and partition is empty -> LIVE deletion.
+ ReusableDecoratedKey partitionKey;
+ PartitionDescriptor partitionDescriptor;
+ DeletionTime partitionDeletion;
+ // This will be 0 if we haven't written partition header.
+ int partitionHeaderLength = 0;
+ private CompactionAwareWriter compactionAwareWriter;
+
+ public CompactionCursor(OperationType type, List scanners, AbstractCompactionController controller, long nowInSec, TimeUUID compactionId)
+ {
+ this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP, null);
+ }
+
+ public CompactionCursor(OperationType type,
+ List scanners,
+ AbstractCompactionController controller,
+ long nowInSec,
+ TimeUUID compactionId,
+ ActiveCompactionsTracker activeCompactions,
+ TopPartitionTracker.Collector topPartitionCollector)
+ {
+ this.controller = controller;
+ this.type = type;
+ this.nowInSec = nowInSec;
+ this.compactionId = compactionId;
+
+ long inputBytes = 0;
+ for (ISSTableScanner scanner : scanners)
+ inputBytes += scanner.getLengthInBytes();
+ this.totalInputBytes = inputBytes;
+ this.partitionMergeCounters = new long[scanners.size()];
+ this.staticRowMergeCounters = new long[partitionMergeCounters.length];
+ this.rowMergeCounters = new long[partitionMergeCounters.length];
+ this.rangeTombstonesMergeCounters = new long[partitionMergeCounters.length];
+ this.cellMergeCounters = new long[partitionMergeCounters.length];
+ // note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before
+ // calling that to avoid a NPE.
+ this.sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
+ this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
+ this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions
+
+ TableMetadata metadata = metadata();
+ this.hasStaticColumns = metadata.hasStaticColumns();
+ /**
+ * Pipeline should end up similar to the one in {@link CompactionIterator}:
+ * [MERGED -> ?TopPartitionTracker -> GarbageSkipper -> Purger -> DuplicateRowChecker -> Abortable] -> next()
+ * V - Merge - This is drawing on code all over the place to iterate through the data and merge partitions/rows/cells
+ * * {@link org.apache.cassandra.db.transform.Transformation}s, applied to above iterator:
+ * X - TODO: We can leave for now? - {@link TopPartitionTracker.TombstoneCounter} - Hooked into CFS metadata, tracks tombstone counts per pk.
+ * X - TODO: We can leave for now? - {@link CompactionIterator.GarbageSkipper} - filters out, or "skips" data shadowed by the provided "tombstone source".
+ * V * {@link CompactionIterator.Purger} - filters out, or "purges" gc-able tombstones. Also updates bytes read on every row % 100.
+ * X - TODO: We can leave for now? - {@link DuplicateRowChecker} - reports duplicate rows across replicas.
+ * X - TODO: We can leave for now? - Abortable - aborts the compaction if the user has requested it (at a certain granularity).
+ * {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker, TopPartitionTracker.Collector)}
+ */
+
+ // Convert Readers to Cursors
+ this.sstableCursors = new StatefulCursor[sstables.size()];
+ this.sstableCursorsEqualsNext = new boolean[sstables.size()];
+ UnmodifiableIterator iterator = sstables.iterator();
+ for (int i = 0; i < this.sstableCursors.length; i++)
+ {
+ SSTableReader ssTableReader = iterator.next();
+ this.sstableCursors[i] = new StatefulCursor(ssTableReader);
+ }
+ this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness();
+
+ purger = new Purger(type, controller, nowInSec);
+ }
+
+ /**
+ * @return false if finished, true if partition is written (which might require multiple partition reads)
+ */
+ public boolean writeNextPartition(CompactionAwareWriter compactionAwareWriter) throws IOException {
+ while (!finished) {
+ if (tryWriteNextPartition(compactionAwareWriter)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return true if a partition was written
+ */
+ private boolean tryWriteNextPartition(CompactionAwareWriter compactionAwareWriter) throws IOException
+ {
+ if (!prepareForPartitionMerge())
+ {
+ finish();
+ return false;
+ }
+
+ // Top reader is on the current key/header
+ partitionDescriptor = sstableCursors[0].pHeader;
+ partitionKey = sstableCursors[0].currentDecoratedKey;
+
+ // possibly reached boundary of the current writer
+ try
+ {
+ // TODO: Potentially redundant validation... Can be done on the writer level?
+ if (prevKey != null && prevKey.compareTo(partitionKey) >= 0)
+ throw new RuntimeException(String.format("Last written key %s >= current key %s", prevKey, partitionKey));
+ // NOTE: We now have prevKey == partitionKey, and sstableCursors[0].currentDecoratedKey == prevKey. Which is confusing in a debugger.
+ retainPrevKeyForValidation();
+
+ int partitionMergeLimit = findPartitionMergeLimit();
+ // needed if we actually write a partition, not used otherwise
+ this.compactionAwareWriter = compactionAwareWriter;
+
+ purger.resetOnNewPartition(partitionKey);
+ boolean written = mergePartitions(partitionMergeLimit);
+ if (!written)
+ purger.onEmptyPartitionPostPurge();
+ return written;
+ }
+ finally
+ {
+ partitionKey = null;
+ partitionDescriptor = null;
+ partitionHeaderLength = 0;
+ }
+ }
+
+
+ /**
+ * See {@link UnfilteredPartitionIterators#merge(List, UnfilteredPartitionIterators.MergeListener)}
+ */
+ private boolean mergePartitions(int partitionMergeLimit) throws IOException
+ {
+ partitionMergeCounters[partitionMergeLimit - 1]++;
+ // p-key is the same for all the merged
+ DeletionTime effectivePartitionDeletion;
+
+ // Pick "max" pDeletion
+ if (partitionMergeLimit > 1)
+ {
+ /** {@link UnfilteredRowIterators.UnfilteredRowMergeIterator#collectPartitionLevelDeletion(List, UnfilteredRowIterators.MergeListener)}*/
+ effectivePartitionDeletion = sstableCursors[0].pHeader.deletionTime();
+ for (int i = 1; i < partitionMergeLimit; i++)
+ {
+ DeletionTime otherDeletionTime = sstableCursors[i].pHeader.deletionTime();
+ if (!effectivePartitionDeletion.supersedes(otherDeletionTime))
+ effectivePartitionDeletion = otherDeletionTime;
+ }
+ }
+ else
+ {
+ effectivePartitionDeletion = partitionDescriptor.deletionTime();
+ }
+ partitionDeletion = effectivePartitionDeletion;
+ // maybe purge?
+ if (!effectivePartitionDeletion.isLive())
+ {
+ boolean shouldPurge = purger.shouldPurge(effectivePartitionDeletion);
+ if (!shouldPurge)
+ {
+ maybeSwitchWriter(compactionAwareWriter);
+ partitionHeaderLength = ssTableCursorWriter.writePartitionStart(partitionDescriptor.keyBytes(), partitionDescriptor.keyLength(), effectivePartitionDeletion);
+ }
+ else {
+ effectivePartitionDeletion = DeletionTime.LIVE;
+ }
+ }
+
+ // Merge any common static rows
+ DeletionTime partitionDeletion = this.partitionDeletion;
+ if (hasStaticColumns)
+ {
+ sortForStaticRow(partitionMergeLimit);
+ // move cursors that need to move past the row header
+ int staticRowMergeLimit = findStaticRowMergeLimit(partitionMergeLimit);
+
+ mergeRows(staticRowMergeLimit, partitionDeletion, true, false);
+ if (isPartitionStarted())
+ partitionHeaderLength = (int) (ssTableCursorWriter.getPosition() - ssTableCursorWriter.getPartitionStart());
+ }
+
+ // Merge any common normal rows
+ int elementMergeLimit = partitionMergeLimit;
+ DeletionTime activeDeletion = partitionDeletion;
+ boolean isFirstElement = true;
+ int elementCount = 0;
+ ElementDescriptor lastClustering = null;
+ while (true)
+ {
+ // move cursors that need to move passed the row header
+ prepareCursorsForNextElement(elementMergeLimit);
+
+ // Sort rows by their clustering
+ sortForElementMerge(elementMergeLimit, partitionMergeLimit);
+ int readerState = sstableCursors[0].state();
+ if (readerState == PARTITION_END)
+ break;
+
+ // At least one partition has not ended
+ elementMergeLimit = findElementMergeLimit(partitionMergeLimit);
+ int flags = sstableCursors[0].rHeader.flags();
+ if (UnfilteredSerializer.isRow(flags))
+ {
+ if (mergeRows(elementMergeLimit, activeDeletion, false, isFirstElement))
+ {
+ isFirstElement = false;
+ elementCount++;
+ lastClustering = sstableCursors[0].rHeader;
+ }
+ }
+ else if (UnfilteredSerializer.isTombstoneMarker(flags)) {
+ // the tombstone processing *maybe* writes a marker, and *maybe* changes the `activeOpenRangeDeletion`
+ if (mergeRangeTombstones(elementMergeLimit, partitionDeletion, isFirstElement))
+ {
+ isFirstElement = false;
+ elementCount++;
+ lastClustering = sstableCursors[0].rHeader;
+ }
+ if (activeOpenRangeDeletion == DeletionTime.LIVE) {
+ activeDeletion = partitionDeletion;
+ }
+ else {
+ activeDeletion = activeOpenRangeDeletion;
+ }
+ }
+ else {
+ throw new IllegalStateException("Unexpected element type (not row or tombstone):" + flags);
+ }
+ // move along
+ continueReadingAfterMerge(elementMergeLimit, ELEMENT_END);
+ }
+
+ boolean partitionWritten = isPartitionStarted();
+ if (partitionWritten)
+ {
+ ssTableCursorWriter.writePartitionEnd(partitionDescriptor.keyBytes(), partitionDescriptor.keyLength(), effectivePartitionDeletion, partitionHeaderLength);
+ // update metadata tracking of min/max clustering on last element
+ if (elementCount > 1) {
+ ssTableCursorWriter.updateClusteringMetadata(lastClustering);
+ }
+ }
+ // move along
+ continueReadingAfterMerge(partitionMergeLimit, PARTITION_END);
+ return partitionWritten;
+ }
+
+ /**
+ * We have a common clustering and need to merge data. Cells might be different in different rows, but collision is
+ * likely at this stage (probably).
+ * {@link Row.Merger#merge(DeletionTime)}
+ */
+ private boolean mergeRows(int rowMergeLimit, DeletionTime partitionDeletion, boolean isStatic, boolean isFirstElement) throws IOException
+ {
+ if (rowMergeLimit == 0)
+ {
+ if (isStatic && isPartitionStarted())
+ ssTableCursorWriter.writeEmptyStaticRow();
+
+ return false;
+ }
+
+ if (isStatic)
+ {
+ staticRowMergeCounters[rowMergeLimit - 1]++;
+ }
+ else
+ {
+ rowMergeCounters[rowMergeLimit - 1]++;
+ }
+
+ // merge deletion/liveness
+ /** {@link Row.Merger#merge(DeletionTime)}*/
+ ElementDescriptor row = sstableCursors[0].rHeader;
+
+ LivenessInfo rowInfo = row.livenessInfo();
+ DeletionTime rowDeletion = row.deletionTime();
+
+ for (int i = 1; i < rowMergeLimit; i++)
+ {
+ // TODO: can validate state here
+ row = sstableCursors[i].rHeader;
+ // TODO: maybe flags more optimal(avoid ref loads and comaparisons etc)
+ if (row.livenessInfo().supersedes(rowInfo))
+ rowInfo = row.livenessInfo();
+ if (row.deletionTime().supersedes(rowDeletion))
+ rowDeletion = row.deletionTime();
+ }
+
+ /**
+ * See: {@link BTreeRow#purge(DeletionPurger, long, boolean)}
+ */
+ DeletionTime activeDeletion = partitionDeletion;
+ if (rowDeletion.supersedes(activeDeletion))
+ {
+ activeDeletion = rowDeletion; // deletion is in effect before purge takes effect
+ rowDeletion = purger.shouldPurge(rowDeletion) ? DeletionTime.LIVE : rowDeletion;
+ }
+ else
+ {
+ rowDeletion = DeletionTime.LIVE;
+ }
+
+ if (activeDeletion.deletes(rowInfo) || purger.shouldPurge(rowInfo, nowInSec))
+ {
+ rowInfo = LivenessInfo.EMPTY;
+ }
+
+ boolean isRowDropped = rowDeletion.isLive() && rowInfo.isEmpty();
+
+ if (!isRowDropped)
+ {
+ lateStartRow(rowInfo, rowDeletion, isStatic);
+ }
+
+ if (isRowDropped && enforceStrictLiveness)
+ {
+ skipRowsOnStrictLiveness(rowMergeLimit, isStatic);
+ }
+ else
+ {
+ int cellMergeLimit = rowMergeLimit;
+ // loop through the columns and copy/merge each cell
+ while (true)
+ {
+ // advance cursors that need to read the cell header
+ for (int i = 0; i < cellMergeLimit; i++)
+ {
+ int readerState = sstableCursors[i].state();
+ if (readerState == CELL_HEADER_START)
+ {
+ sstableCursors[i].readCellHeader();
+ }
+ }
+ // Sort rows by cells
+ sortForCellMerge(cellMergeLimit, rowMergeLimit);
+ final StatefulCursor sstableCursor = sstableCursors[0];
+ int readerState = sstableCursor.state();
+ // next row/partition/done
+ if (readerState == ELEMENT_END)
+ break;
+
+ cellMergeLimit = findCellMergeLimit(rowMergeLimit);
+
+ isRowDropped = mergeCells(cellMergeLimit, activeDeletion, rowInfo, isRowDropped, isStatic);
+ // move along
+ continueReadingAfterMerge(cellMergeLimit, CELL_END);
+ }
+ if (!isRowDropped)
+ ssTableCursorWriter.writeRowEnd(sstableCursors[0].rHeader, isFirstElement);
+ }
+ if (isRowDropped && isStatic &&
+ isPartitionStarted()) // if the partition write has not started, keep delaying it, might be an empty partition(purged+no data)
+ {
+ ssTableCursorWriter.writeEmptyStaticRow();
+ }
+ return !isRowDropped;
+ }
+
+ private void skipRowsOnStrictLiveness(int rowMergeLimit, boolean isStatic) throws IOException
+ {
+ for (int i = 0; i < rowMergeLimit; i++)
+ {
+ if (sstableCursors[i].state() != ELEMENT_END){
+ if (isStatic)
+ sstableCursors[i].skipStaticRow();
+ else
+ sstableCursors[i].skipUnfiltered();
+ }
+ }
+ }
+
+ private DataOutputBuffer tempCellBuffer1 = new DataOutputBuffer();
+ private DataOutputBuffer tempCellBuffer2 = new DataOutputBuffer();
+
+ /**
+ * {@link Row.Merger.ColumnDataReducer#getReduced()} <-- applied the delete before reconcile, should not make a difference?
+ * {@link Cells#reconcile(Cell, Cell)}
+ */
+ private boolean mergeCells(int cellMergeLimit, DeletionTime activeDeletion, LivenessInfo rowLiveness, boolean isRowDropped, boolean isStatic) throws IOException
+ {
+ cellMergeCounters[cellMergeLimit - 1]++;
+ // Nothing to sort, we basically need to pick the correct data to copy.
+ // -> the latest data.
+ // TODO: handle value based merge & counters/complex cells
+ StatefulCursor cellSource = sstableCursors[0];
+ SSTableCursorReader.CellCursor cellCursor = cellSource.cellCursor;
+ ReusableLivenessInfo cellLiveness = cellCursor.cellLiveness;
+ DataOutputBuffer tempCellBuffer = null;
+
+ if (cellCursor.cellColumn.isComplex())
+ throw new UnsupportedOperationException("TODO: Not ready for complex cells.");
+ if (cellCursor.cellColumn.isCounterColumn())
+ throw new UnsupportedOperationException("TODO: Not ready for counter cells.");
+
+ /** See: {@link Cells#reconcile(Cell, Cell)} */
+ // Find latest cell value/delete info, only one cell can win(for now... same timestamp handling awaits)!
+ for (int i = 1; i < cellMergeLimit; i++)
+ {
+ StatefulCursor oCellSource = sstableCursors[i];
+ SSTableCursorReader.CellCursor oCellCursor = oCellSource.cellCursor;
+ ReusableLivenessInfo oCellLiveness = oCellCursor.cellLiveness;
+
+ CellRosolution cellRosolution = resolveRegular(cellLiveness, oCellLiveness);
+ if (cellRosolution == LEFT) {
+ if (oCellSource.state() == CELL_VALUE_START) oCellSource.skipCellValue();
+ }
+ else if (cellRosolution == RIGHT) {
+ if (cellSource.state() == CELL_VALUE_START) cellSource.skipCellValue();
+ cellSource = oCellSource;
+ cellCursor = oCellCursor;
+ cellLiveness = oCellLiveness;
+ tempCellBuffer = null;
+ }
+ else { // COMPARE
+ if (activeDeletion.deletes(oCellLiveness)) {
+ if (oCellSource.state() == CELL_VALUE_START) oCellSource.skipCellValue();
+ }
+ else {
+ // copy out the values for comparison
+ if (cellSource.state() == CELL_VALUE_START)
+ {
+ if (tempCellBuffer != null)
+ throw new IllegalStateException("tempCellBuffer should be null if cellSource has a value to be read.");
+ tempCellBuffer1.clear();
+ ssTableCursorWriter.copyCellValue(cellSource, tempCellBuffer1);
+ tempCellBuffer = tempCellBuffer1; // assume cell1 is going to be bigger
+ }
+ else if (tempCellBuffer == null) {
+ // potential trash value in buffer1
+ tempCellBuffer1.clear();
+ }
+ else if (tempCellBuffer != tempCellBuffer1) {
+ throw new IllegalStateException("tempCellBuffer should be tempCellBuffer1 if cellSource has been read.");
+ }
+ tempCellBuffer2.clear();
+ if (oCellSource.state() == CELL_VALUE_START) ssTableCursorWriter.copyCellValue(oCellSource, tempCellBuffer2);
+
+ int compare = Arrays.compareUnsigned(tempCellBuffer1.getData(), 0, tempCellBuffer1.getLength(), tempCellBuffer2.getData(), 0, tempCellBuffer2.getLength());
+ if (compare >= 0) {
+ // swap the buffers
+ tempCellBuffer = tempCellBuffer1;
+ tempCellBuffer1 = tempCellBuffer2;
+ tempCellBuffer2 = tempCellBuffer;
+
+ // tempCellBuffer != null -> tempCellBuffer == tempCellBuffer1
+ tempCellBuffer = tempCellBuffer1;
+
+ cellSource = oCellSource;
+ cellCursor = oCellCursor;
+ cellLiveness = oCellLiveness;
+ }
+ }
+ }
+ }
+
+
+ /**
+ * {@link Cell.Serializer#serialize}
+ */
+ int cellFlags = cellCursor.cellFlags;
+
+ /** {@link org.apache.cassandra.db.rows.AbstractCell#purge(org.apache.cassandra.db.DeletionPurger, long)} */
+ // if `isExpiring` => has ttl, and TTL has lapsed, convert the TTL to a tombstone
+ if (Cell.Serializer.isExpiring(cellFlags) && cellLiveness.isExpired(nowInSec)) {
+ cellLiveness.ttlToTombstone();
+ // remove the value, this is a tombstone now
+ if (Cell.Serializer.hasValue(cellFlags))
+ {
+ cellFlags = cellFlags | Cell.Serializer.HAS_EMPTY_VALUE_MASK;
+ if (cellSource.state() == CELL_VALUE_START)
+ {
+ if (tempCellBuffer != null) throw new IllegalStateException("Either copied buffer or ready to copy reader, not both.");
+ cellSource.skipCellValue();
+ }
+ else if (tempCellBuffer != null) {
+ tempCellBuffer = null;
+ }
+ else
+ {
+ throw new IllegalStateException("Flags and state contradict");
+ }
+ }
+ }
+
+ if (activeDeletion.deletes(cellLiveness) || purger.shouldPurge(cellLiveness, nowInSec))
+ {
+ if (Cell.Serializer.hasValue(cellFlags))
+ {
+ // we're dropping the cell, but could do: cellFlags = cellFlags | Cell.Serializer.HAS_EMPTY_VALUE_MASK;
+ if (cellSource.state() == CELL_VALUE_START)
+ {
+ if (tempCellBuffer != null) throw new IllegalStateException("Either copied buffer or ready to copy reader, not both.");
+ cellSource.skipCellValue();
+ }
+ else if (tempCellBuffer != null) {
+ // we're dropping the cell, but could do: tempCellBuffer = null;
+ }
+ else
+ {
+ throw new IllegalStateException("Flags and state contradict");
+ }
+ }
+ }
+ else
+ {
+ if (isRowDropped)
+ {
+ isRowDropped = false;
+ lateStartRow(isStatic);
+ }
+ /** {@link org.apache.cassandra.db.rows.Cell.Serializer#serialize(Cell, ColumnMetadata, DataOutputPlus, LivenessInfo, SerializationHeader)} */
+ boolean isDeleted = cellLiveness.isTombstone();
+ boolean isExpiring = cellLiveness.isExpiring();
+ boolean useRowTimestamp = !rowLiveness.isEmpty() && cellLiveness.timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.isExpiring() &&
+ cellLiveness.ttl() == rowLiveness.ttl() &&
+ cellLiveness.localExpirationTime() == rowLiveness.localExpirationTime();
+ // Re-write cell flags to reflect resulting contents
+ cellFlags &= Cell.Serializer.HAS_EMPTY_VALUE_MASK;
+ if (isDeleted) cellFlags |= Cell.Serializer.IS_DELETED_MASK;
+ if (isExpiring) cellFlags |= Cell.Serializer.IS_EXPIRING_MASK;
+ if (useRowTimestamp) cellFlags |= Cell.Serializer.USE_ROW_TIMESTAMP_MASK;
+ if (useRowTTL) cellFlags |= Cell.Serializer.USE_ROW_TTL_MASK;
+ ssTableCursorWriter.writeCellHeader(cellFlags, cellLiveness, cellSource.cellCursor.cellColumn);
+ if (Cell.Serializer.hasValue(cellFlags)) {
+ if (cellSource.state() == CELL_VALUE_START)
+ {
+ if (tempCellBuffer != null) throw new IllegalStateException("Either copied buffer or ready to copy reader, not both.");
+ ssTableCursorWriter.writeCellValue(cellSource);
+ }
+ else if (tempCellBuffer != null)
+ {
+ ssTableCursorWriter.writeCellValue(tempCellBuffer);
+ }
+ else
+ {
+ throw new IllegalStateException("Flags and state contradict");
+ }
+ }
+
+ }
+ return isRowDropped;
+ }
+
+ enum CellRosolution {
+ LEFT, RIGHT, COMPARE
+ }
+
+ private static CellRosolution resolveRegular(ReusableLivenessInfo left, ReusableLivenessInfo right)
+ {
+ long leftTimestamp = left.timestamp();
+ long rightTimestamp = right.timestamp();
+ if (leftTimestamp != rightTimestamp)
+ return leftTimestamp > rightTimestamp ? LEFT : RIGHT;
+
+ long leftLocalDeletionTime = left.localExpirationTime();
+ long rightLocalDeletionTime = right.localExpirationTime();
+
+ boolean leftIsExpiringOrTombstone = leftLocalDeletionTime != Cell.NO_DELETION_TIME;
+ boolean rightIsExpiringOrTombstone = rightLocalDeletionTime != Cell.NO_DELETION_TIME;
+
+ if (leftIsExpiringOrTombstone | rightIsExpiringOrTombstone)
+ {
+ // Tombstones always win reconciliation with live cells of the same timstamp
+ // CASSANDRA-14592: for consistency of reconciliation, regardless of system clock at time of reconciliation
+ // this requires us to treat expiring cells (which will become tombstones at some future date) the same wrt regular cells
+ if (leftIsExpiringOrTombstone != rightIsExpiringOrTombstone)
+ return leftIsExpiringOrTombstone ? LEFT : RIGHT;
+
+ // for most historical consistency, we still prefer tombstones over expiring cells.
+ // While this leads to an inconsistency over which is chosen
+ // (i.e. before expiry, the pure tombstone; after expiry, whichever is more recent)
+ // this inconsistency has no user-visible distinction, as at this point they are both logically tombstones
+ // (the only possible difference is the time at which the cells become purgeable)
+ boolean leftIsTombstone = !left.isExpiring(); // !isExpiring() == isTombstone(), but does not need to consider localDeletionTime()
+ boolean rightIsTombstone = !right.isExpiring();
+ if (leftIsTombstone != rightIsTombstone)
+ return leftIsTombstone ? LEFT : RIGHT;
+
+ // ==> (leftIsExpiring && rightIsExpiring) or (leftIsTombstone && rightIsTombstone)
+ // if both are expiring, we do not want to consult the value bytes if we can avoid it, as like with C-14592
+ // the value bytes implicitly depend on the system time at reconciliation, as a
+ // would otherwise always win (unless it had an empty value), until it expired and was translated to a tombstone
+ if (leftLocalDeletionTime != rightLocalDeletionTime)
+ return leftLocalDeletionTime > rightLocalDeletionTime ? LEFT : RIGHT;
+ }
+ return COMPARE;
+ }
+
+ DeletionTime activeOpenRangeDeletion = DeletionTime.LIVE;
+ final List openMarkers = new ArrayList<>();
+ final ArrayDeque reusableMarkersPool = new ArrayDeque<>();
+
+ /**
+ * We have a common clustering and need to merge tombstones. Alternatively, we have a series of range tombstones
+ * whose intersections mutate from bounds into boundary (a combination of 2 bounds). We also need to purge any GC'ed
+ * deletes.
+ *
+ * {@link RangeTombstoneMarker.Merger#merge()}
+ */
+ private boolean mergeRangeTombstones(int rangeTombstoneMergeLimit, DeletionTime partitionDeletion, boolean isFirstElement) throws IOException
+ {
+ if (rangeTombstoneMergeLimit == 0)
+ {
+ throw new IllegalStateException();
+ }
+ rangeTombstonesMergeCounters[rangeTombstoneMergeLimit - 1]++;
+ DeletionTime previousDeletionTimeInMerged = DeletionTime.LIVE;
+ if (activeOpenRangeDeletion != DeletionTime.LIVE) {
+ previousDeletionTimeInMerged = getDeletionTimeReusableCopy(activeOpenRangeDeletion);
+ }
+ try
+ {
+ updateOpenMarkers(rangeTombstoneMergeLimit, partitionDeletion);
+
+ DeletionTime newDeletionTimeInMerged = activeOpenRangeDeletion;
+ if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged))
+ return false;
+
+ // we will stomp on the element descriptor and write it out
+ ElementDescriptor rangeTombstone = sstableCursors[0].rHeader;
+ boolean isBeforeClustering = rangeTombstone.clusteringKind().comparedToClustering < 0;
+
+ // Combining the merge and purge code
+ if (previousDeletionTimeInMerged == DeletionTime.LIVE)
+ {
+ if (purger.shouldPurge(newDeletionTimeInMerged))
+ {
+ return false;
+ }
+ else
+ {
+ rangeTombstone.clusteringKind(isBeforeClustering ? INCL_START_BOUND : EXCL_START_BOUND);
+ rangeTombstone.deletionTime().reset(newDeletionTimeInMerged);
+ }
+ }
+ else if (newDeletionTimeInMerged == DeletionTime.LIVE)
+ {
+ if (purger.shouldPurge(previousDeletionTimeInMerged))
+ {
+ return false;
+ }
+ else
+ {
+ rangeTombstone.clusteringKind(isBeforeClustering ? EXCL_END_BOUND : INCL_END_BOUND);
+ rangeTombstone.deletionTime().reset(previousDeletionTimeInMerged);
+ }
+ }
+ else
+ {
+ boolean shouldPurgeClose = purger.shouldPurge(previousDeletionTimeInMerged);
+ boolean shouldPurgeOpen = purger.shouldPurge(newDeletionTimeInMerged);
+
+ if (shouldPurgeClose && shouldPurgeOpen)
+ return false;
+
+ if (shouldPurgeClose)
+ {
+ rangeTombstone.clusteringKind(isBeforeClustering ? INCL_START_BOUND : EXCL_START_BOUND);
+ rangeTombstone.deletionTime().reset(newDeletionTimeInMerged);
+ }
+ else if (shouldPurgeOpen)
+ {
+ rangeTombstone.clusteringKind(isBeforeClustering ? EXCL_END_BOUND : INCL_END_BOUND);
+ rangeTombstone.deletionTime().reset(previousDeletionTimeInMerged);
+ }
+ else {
+ // Boundary
+ rangeTombstone.clusteringKind(isBeforeClustering ? EXCL_END_INCL_START_BOUNDARY : INCL_END_EXCL_START_BOUNDARY);
+ rangeTombstone.deletionTime().reset(previousDeletionTimeInMerged); // close
+ rangeTombstone.deletionTime2().reset(newDeletionTimeInMerged); // open
+ }
+ }
+
+ if (isPartitionStartDelayed())
+ {
+ lateStartPartition(false);
+ ssTableCursorWriter.writeRangeTombstone(rangeTombstone, true);
+ }
+ else {
+ ssTableCursorWriter.writeRangeTombstone(rangeTombstone, isFirstElement);
+ }
+ return true;
+ }
+ finally
+ {
+ if (previousDeletionTimeInMerged != DeletionTime.LIVE)
+ {
+ reusableMarkersPool.offer(previousDeletionTimeInMerged);
+ }
+ }
+ }
+
+ private void updateOpenMarkers(int rangeTombstoneMergeLimit, DeletionTime partitionDeletion)
+ {
+ /** Similar to {@link RangeTombstoneMarker.Merger#updateOpenMarkers()} but we validate a close exists for every open.*/
+ for (int i = 0; i < rangeTombstoneMergeLimit; i++)
+ {
+ ElementDescriptor rangeTombstone = sstableCursors[i].rHeader;
+ if (rangeTombstone.isStartBound())
+ {
+ DeletionTime openRangeDeletion = rangeTombstone.deletionTime();
+ addOpenRangeDeletion(partitionDeletion, openRangeDeletion);
+ }
+ else if (rangeTombstone.isEndBound())
+ {
+ DeletionTime closeRangeDeletion = rangeTombstone.deletionTime();
+ removeOpenRangeDeletion(partitionDeletion, closeRangeDeletion, rangeTombstone);
+ }
+ else if (rangeTombstone.isBoundary())
+ {
+ DeletionTime closeRangeDeletion = rangeTombstone.deletionTime();
+ removeOpenRangeDeletion(partitionDeletion, closeRangeDeletion, rangeTombstone);
+ DeletionTime openRangeDeletion = rangeTombstone.deletionTime2();
+ addOpenRangeDeletion(partitionDeletion, openRangeDeletion);
+ }
+ else
+ throw new IllegalStateException("Unexpected bound type:" + rangeTombstone.clusteringKind());
+ }
+
+ if (activeOpenRangeDeletion == null)
+ {
+ recalculateActiveOpen();
+ }
+ }
+
+ private void recalculateActiveOpen()
+ {
+ // active open has been invalidated by a close bound matching it, need to scan the list for new max
+ int size = openMarkers.size();
+ if (size == 0)
+ {
+ activeOpenRangeDeletion = DeletionTime.LIVE;
+ return;
+ }
+ // find max open marker
+ DeletionTime maxOpenDeletion = openMarkers.get(0);
+ for (int i = 1; i < size; i++)
+ {
+ DeletionTime openDeletionTime = openMarkers.get(i);
+ if (openDeletionTime.supersedes(maxOpenDeletion))
+ maxOpenDeletion = openDeletionTime;
+ }
+ activeOpenRangeDeletion = maxOpenDeletion;
+ }
+
+ private void removeOpenRangeDeletion(DeletionTime partitionDeletion, DeletionTime closeRangeDeletion, ElementDescriptor rangeTombstone)
+ {
+ // filter out markers that are deleted by the `partitionDelete`
+ if (partitionDeletion != DeletionTime.LIVE && !closeRangeDeletion.supersedes(partitionDeletion))
+ {
+ return;
+ }
+ // a close marker should have a matching open in the list
+ int j=0;
+ int size = openMarkers.size();
+ DeletionTime reusableOpenMarker = null;
+ for (; j < size;j++) {
+ reusableOpenMarker = openMarkers.get(j);
+ if (reusableOpenMarker.equals(closeRangeDeletion))
+ break;
+ }
+ if (j == size)
+ throw new IllegalStateException("Expected an open marker for this closing marker:" + rangeTombstone);
+ reusableMarkersPool.offer(reusableOpenMarker);
+ if (activeOpenRangeDeletion == reusableOpenMarker) {
+ // trigger recalculation
+ activeOpenRangeDeletion = null;
+ }
+ if (size == 1) {
+ openMarkers.clear();
+ }
+ else {
+ // avoid expensive array copy
+ DeletionTime deletionTime = openMarkers.remove(size - 1);
+ openMarkers.set(j, deletionTime);
+ }
+ }
+
+ private void addOpenRangeDeletion(DeletionTime partitionDeletion, DeletionTime openRangeDeletion)
+ {
+ // filter out markers that are deleted by the `partitionDelete`
+ if (partitionDeletion != DeletionTime.LIVE && !openRangeDeletion.supersedes(partitionDeletion))
+ {
+ return;
+ }
+
+ DeletionTime reusable = getDeletionTimeReusableCopy(openRangeDeletion);
+ openMarkers.add(reusable);
+ if (activeOpenRangeDeletion != null && // invalidated by remove, so full scan is required
+ (activeOpenRangeDeletion == DeletionTime.LIVE || reusable.supersedes(activeOpenRangeDeletion))) {
+ activeOpenRangeDeletion = reusable;
+ }
+ }
+
+ private DeletionTime getDeletionTimeReusableCopy(DeletionTime openRangeDeletion)
+ {
+ DeletionTime reusable = reusableMarkersPool.pollLast();
+ if (reusable == null) {
+ reusable = DeletionTime.copy(openRangeDeletion);
+ }
+ else {
+ reusable.reset(openRangeDeletion);
+ }
+ return reusable;
+ }
+
+ private boolean isPartitionStarted()
+ {
+ return partitionHeaderLength != 0;
+ }
+
+ private boolean isPartitionStartDelayed()
+ {
+ return !isPartitionStarted();
+ }
+
+ private void continueReadingAfterMerge(int mergeLimit, int endState)
+ {
+ for (int i = 0; i < mergeLimit; i++)
+ {
+ if (sstableCursors[i].state() == endState){
+ sstableCursors[i].continueReading();
+ }
+ }
+ }
+
+ private void lateStartRow(boolean isStatic) throws IOException
+ {
+ lateStartRow(LivenessInfo.EMPTY, DeletionTime.LIVE, isStatic);
+ }
+
+ private void lateStartRow(LivenessInfo livenessInfo, DeletionTime deletionTime, boolean isStatic) throws IOException
+ {
+ if (isPartitionStartDelayed())
+ {
+ lateStartPartition(isStatic);
+ }
+ ssTableCursorWriter.writeRowStart(livenessInfo, deletionTime, isStatic);
+ }
+
+ private void lateStartPartition(boolean isStatic) throws IOException
+ {
+ maybeSwitchWriter(compactionAwareWriter);
+ partitionHeaderLength = ssTableCursorWriter.writePartitionStart(partitionDescriptor.keyBytes(), partitionDescriptor.keyLength(), DeletionTime.LIVE);
+ // Did we miss writing an empty static row?
+ if (!isStatic)
+ {
+ if(ssTableCursorWriter.writeEmptyStaticRow())
+ partitionHeaderLength = (int) (ssTableCursorWriter.getPosition() - ssTableCursorWriter.getPartitionStart());
+ }
+ }
+
+ private void finish()
+ {
+ // only finish writing once
+ if (!finished)
+ {
+ finished = true;
+ if (ssTableCursorWriter != null)
+ ssTableCursorWriter.setLast(prevKey);
+ }
+ }
+
+ private void maybeSwitchWriter(CompactionAwareWriter writer)
+ {
+ // Set last key, so this is ready to be opened.
+ if (ssTableCursorWriter != null)
+ {
+ ssTableCursorWriter.setLast(prevKey);
+ }
+
+ SSTableWriter ssTableWriter = writer.maybeSwitchWriter(partitionKey);
+ if (ssTableWriter != null)
+ {
+ if (ssTableCursorWriter != null) {
+ totalDataBytesWritten += ssTableCursorWriter.getPosition();
+ }
+
+ SSTableCursorWriter nextWriter = new SSTableCursorWriter((SortedTableWriter) ssTableWriter);
+
+ ssTableCursorWriter = nextWriter;
+ ssTableCursorWriter.setFirst(partitionKey.getKey());
+ prevKey = null;
+ }
+ }
+
+ // SORT AND COMPARE
+
+ /**
+ * Sorts the cursors array in preparation for partition merge. This assumes cursors are in one of 3 states:
+ *
+ *
PARTITION_START - Partition header is loaded in preparation for merge
+ *
begining of unfiltered/end of partition - header is loaded, list is sorted after this point
+ *
DONE - need to be reset
+ *
+ * Once the bounds of the sorting are known we insert sort the freshly read cursors into the pre-sorted list.
+ *
+ * @return false if there are no cursors moved as a result of this operation, or if the top most reader is DONE,
+ * indicating the work of the compaction cursor is finished
+ */
+ private boolean prepareForPartitionMerge() throws IOException
+ {
+ // start by loading in new partition keys from any readers for which we just merged partitions => are
+ // on partition edge. Exhausted cursors are at the bottom. Mid-read partitions are in the middle.
+ int progressedCursorsIndex = 0;
+ for (; progressedCursorsIndex < sstableCursors.length; progressedCursorsIndex++)
+ {
+ StatefulCursor sstableCursor = sstableCursors[progressedCursorsIndex];
+ int sstableCursorState = sstableCursor.state();
+
+ if (sstableCursorState == PARTITION_START)
+ {
+ sstableCursor.readPartitionHeader(sstableCursor.pHeader);
+ updateCursorBytesRead(sstableCursor);
+ }
+ else if (isState(sstableCursorState, STATIC_ROW_START | ROW_START | TOMBSTONE_START | PARTITION_END))
+ {
+ // The cursors after this point are sorted, and unmoved
+ break;
+ }
+ else if (sstableCursorState == DONE)
+ {
+ sstableCursor.currentDecoratedKey.reset();
+ sstableCursor.pHeader.resetPartition();
+ sstableCursor.rHeader.resetElement();
+ updateCursorBytesRead(sstableCursor);
+ }
+ else
+ {
+ throw new IllegalStateException("Cursor is in an unexpected state:" + sstableCursor.toString());
+ }
+ }
+
+ // no cursors were moved => all done
+ if (progressedCursorsIndex == 0)
+ {
+ return false;
+ }
+
+ sortPerturbedCursors(progressedCursorsIndex, sstableCursors.length, CompactionCursor::compareByPartitionKey);
+ return sstableCursors[0].state() != DONE;
+ }
+
+ private int findPartitionMergeLimit()
+ {
+ int partitionMergeLimit = 1;
+ for (; partitionMergeLimit < sstableCursors.length; partitionMergeLimit++)
+ {
+ if (sstableCursors[partitionMergeLimit].state() == DONE ||
+ !sstableCursorsEqualsNext[partitionMergeLimit-1])
+ break;
+ }
+ return partitionMergeLimit;
+ }
+
+ private void prepareCursorsForNextElement(int elementMergeLimit) throws IOException
+ {
+ for (int i = 0; i < elementMergeLimit; i++)
+ {
+ int readerState = sstableCursors[i].state();
+ if (readerState == ROW_START)
+ {
+ totalSourceCQLRows++;
+ sstableCursors[i].readRowHeader(sstableCursors[i].rHeader);
+ }
+ if (readerState == TOMBSTONE_START)
+ sstableCursors[i].readTombstoneMarker(sstableCursors[i].rHeader);;
+ if (readerState == STATIC_ROW_START)
+ throw new IllegalStateException("Unexpected static row after static row merge");
+ }
+ }
+
+ private int findStaticRowMergeLimit(int partitionMergeLimit) throws IOException
+ {
+ int staticRowMergeLimit = 0;
+ for (; staticRowMergeLimit < partitionMergeLimit; staticRowMergeLimit++)
+ {
+ if (sstableCursors[staticRowMergeLimit].state() == STATIC_ROW_START)
+ {
+ totalSourceCQLRows++;
+ sstableCursors[staticRowMergeLimit].readStaticRowHeader(sstableCursors[staticRowMergeLimit].rHeader);
+ }
+ else
+ break;
+ }
+ return staticRowMergeLimit;
+ }
+
+ private void sortForStaticRow(int partitionMergeLimit)
+ {
+ sortPerturbedCursors(partitionMergeLimit, partitionMergeLimit, CompactionCursor::compareByStatic);
+ }
+
+ private void sortForElementMerge(int perturbedLimit, int partitionMergeLimit)
+ {
+ sortPerturbedCursors(perturbedLimit, partitionMergeLimit, CompactionCursor::compareByRowClustering);
+ }
+
+ private int findElementMergeLimit(int partitionMergeLimit)
+ {
+ int rowMergeLimit = 1;
+ for (; rowMergeLimit < partitionMergeLimit; rowMergeLimit++)
+ {
+ int state = sstableCursors[rowMergeLimit].state();
+ boolean isInRow = isState(state, ELEMENT_END | CELL_HEADER_START);
+ if (!isInRow)
+ break;
+ if (!sstableCursorsEqualsNext[rowMergeLimit-1])
+ break;
+ }
+ return rowMergeLimit;
+ }
+
+ private void sortForCellMerge(int perturbedLimit, int rowMergeLimit)
+ {
+ sortPerturbedCursors(perturbedLimit, rowMergeLimit, CompactionCursor::compareByColumn);
+ }
+
+ private int findCellMergeLimit(int rowMergeLimit)
+ {
+ int cellMergeLimit = 0;
+ for (; cellMergeLimit < rowMergeLimit; cellMergeLimit++)
+ {
+
+ int state = sstableCursors[cellMergeLimit].state();
+ if (isState(state, ELEMENT_END | CELL_HEADER_START))
+ break;
+
+ if (cellMergeLimit > 0 &&
+ (isState(state, CELL_VALUE_START | CELL_END)) &&
+ !sstableCursorsEqualsNext[cellMergeLimit - 1])
+ break;
+ }
+ return cellMergeLimit;
+ }
+
+ private static int compareByPartitionKey(StatefulCursor c1, StatefulCursor c2)
+ {
+ if (c1 == c2) return 0;
+ int tint = c1.state();
+ int oint = c2.state();
+ if (tint == DONE && oint == DONE) return 0;
+ if (tint == DONE) return 1;
+ if (oint == DONE) return -1;
+ return c1.currentDecoratedKey.compareTo(c2.currentDecoratedKey);
+ }
+
+ private static int compareByStatic(StatefulCursor c1, StatefulCursor c2)
+ {
+ if (c1 == c2) return 0;
+ int tState = c1.state();
+ int oState = c2.state();
+
+ if (tState == PARTITION_END && oState == PARTITION_END) return 0;
+ if (tState == PARTITION_END) return 1;
+ if (oState == PARTITION_END) return -1;
+
+ // Also push the static rows to the top while we're here
+ return -Boolean.compare(tState == STATIC_ROW_START, oState == STATIC_ROW_START);
+ }
+
+ private static int compareByRowClustering(StatefulCursor c1, StatefulCursor c2)
+ {
+ if (c1 == c2) return 0;
+ int tState = c1.state();
+ int oState = c2.state();
+
+ if (tState == PARTITION_END && oState == PARTITION_END) return 0;
+ if (tState == PARTITION_END) return 1;
+ if (oState == PARTITION_END) return -1;
+ // Either have cells, or an empty row
+ boolean tIsAfterHeader = isState(tState, CELL_HEADER_START | ELEMENT_END);
+ boolean oIsAfterHeader = isState(oState, CELL_HEADER_START | ELEMENT_END);
+ if (tIsAfterHeader && oIsAfterHeader)
+ return ClusteringComparator.compare(c1.rHeader, c2.rHeader);
+ else
+ throw new IllegalStateException("We only sort through rows ready to be merged/copied. c1 = " + c1 + ", c2 = " + c2);
+ }
+
+ private static int compareByColumn(StatefulCursor c1, StatefulCursor c2)
+ {
+ if (c1 == c2) return 0;
+ int tState = c1.state();
+ int oState = c2.state();
+ if (tState == ELEMENT_END && oState == ELEMENT_END) return 0;
+ if (tState == ELEMENT_END) return 1;
+ if (oState == ELEMENT_END) return -1;
+
+ boolean tIsAfterHeader = isState(tState, CELL_VALUE_START | CELL_END);
+ boolean oIsAfterHeader = isState(oState, CELL_VALUE_START | CELL_END);
+ if (tIsAfterHeader && oIsAfterHeader)
+ return c1.cellCursor.cellColumn.compareTo(c2.cellCursor.cellColumn);
+ else
+ throw new IllegalStateException("We only sort through cells ready to be merged/copied. c1 = " + c1 + ", c2 = " + c2);
+ }
+
+ // Cursor state
+ static class StatefulCursor extends SSTableCursorReader
+ {
+ long bytesReadPositionSnapshot = 0;
+ final PartitionDescriptor pHeader = new PartitionDescriptor();
+ final ElementDescriptor rHeader = new ElementDescriptor();
+ // Only works for murmur
+ ReusableDecoratedKey currentDecoratedKey = new ReusableDecoratedKey(new ReusableLongToken());
+
+ public StatefulCursor(SSTableReader reader)
+ {
+ super(reader);
+ }
+
+ @Override
+ public int readPartitionHeader(PartitionDescriptor pHeader) throws IOException
+ {
+ int state = super.readPartitionHeader(pHeader);
+ // TODO: workout a way to shadow rather than copy
+ currentDecoratedKey.copyKey(pHeader.keyBuffer());
+
+ return state;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StatefulCursor{" +
+ "pHeader=" + pHeader +
+ ", rHeader=" + rHeader +
+ ", currentDecoratedKey=" + currentDecoratedKey +
+ ", state=" + state() +
+ '}';
+ }
+ }
+
+ // Purge
+
+ /**
+ * We are combining code from:
+ * - {@link org.apache.cassandra.db.compaction.CompactionIterator.Purger}
+ * - {@link org.apache.cassandra.db.partitions.PurgeFunction}
+ * - {@link DeletionPurger}
+ * The original code leans on the {@link org.apache.cassandra.db.transform.Transformation} abstraction and the
+ * iterator infrastructure which is not fit for purpose here.
+ */
+ static class Purger implements DeletionPurger
+ {
+ private final long nowInSec;
+
+ private final long oldestUnrepairedTombstone;
+ private final boolean onlyPurgeRepairedTombstones;
+ private final boolean shouldIgnoreGcGraceForAnyKey;
+ private final OperationType type;
+
+ private boolean ignoreGcGraceSeconds;
+ private final AbstractCompactionController controller;
+
+ private ReusableDecoratedKey partitionKey;
+ private LongPredicate purgeEvaluator;
+
+ private long compactedUnfiltered;
+
+ Purger(OperationType type, AbstractCompactionController controller, long nowInSec)
+ {
+ oldestUnrepairedTombstone = controller.compactingRepaired() ? Long.MAX_VALUE : Integer.MIN_VALUE;
+ onlyPurgeRepairedTombstones = controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones();
+ shouldIgnoreGcGraceForAnyKey = controller.cfs.shouldIgnoreGcGraceForAnyKey();
+ this.nowInSec = nowInSec;
+ this.controller = controller;
+ this.type = type;
+ }
+
+ void resetOnNewPartition(ReusableDecoratedKey key)
+ {
+ partitionKey = key;
+ purgeEvaluator = null;
+ ignoreGcGraceSeconds = shouldIgnoreGcGraceForAnyKey && controller.cfs.shouldIgnoreGcGraceForKey(partitionKey);
+ }
+
+ void onEmptyPartitionPostPurge()
+ {
+ if (type == OperationType.COMPACTION)
+ controller.cfs.invalidateCachedPartition(partitionKey);
+ }
+
+ @Override
+ public boolean shouldPurge(long timestamp, long localDeletionTime)
+ {
+ return !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
+ && (localDeletionTime < controller.gcBefore || ignoreGcGraceSeconds)
+ && getPurgeEvaluator().test(timestamp);
+ }
+
+ /*
+ * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
+ * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
+ * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive
+ * (see #8914).
+ */
+ private LongPredicate getPurgeEvaluator()
+ {
+ if (purgeEvaluator == null)
+ {
+ purgeEvaluator = controller.getPurgeEvaluator(partitionKey);
+ }
+ return purgeEvaluator;
+ }
+ }
+
+ // ACCOUNTING CODE
+ public TableMetadata metadata()
+ {
+ return controller.cfs.metadata();
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(controller.cfs.metadata(),
+ type,
+ getBytesRead(),
+ totalInputBytes,
+ compactionId,
+ sstables,
+ targetDirectory);
+ }
+
+ public boolean isGlobal()
+ {
+ return false;
+ }
+
+ public void setTargetDirectory(final String targetDirectory)
+ {
+ this.targetDirectory = targetDirectory;
+ }
+
+ public long[] getMergedParitionsCounts()
+ {
+ return partitionMergeCounters;
+ }
+
+ public long[] getMergedRowsCounts()
+ {
+ return rowMergeCounters;
+ }
+
+ public long[] getMergedCellsCounts()
+ {
+ return cellMergeCounters;
+ }
+
+ public long getTotalSourceCQLRows()
+ {
+ return totalSourceCQLRows;
+ }
+
+ public long getBytesRead()
+ {
+ return totalBytesRead;
+ }
+
+ private void updateCursorBytesRead(StatefulCursor cursor)
+ {
+ long latestByteReadPosition = cursor.isEOF() ? cursor.ssTableReader.uncompressedLength() : cursor.position();
+ long cursorBytesRead = latestByteReadPosition - cursor.bytesReadPositionSnapshot;
+ cursor.bytesReadPositionSnapshot = latestByteReadPosition;
+ totalBytesRead += cursorBytesRead;
+ }
+
+ public String toString()
+ {
+ return this.getCompactionInfo().toString();
+ }
+
+ public long getTotalBytesScanned()
+ {
+ return getBytesRead();
+ }
+
+ private static boolean isPaxos(ColumnFamilyStore cfs)
+ {
+ return cfs.name.equals(SystemKeyspace.PAXOS) && cfs.getKeyspaceName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+ }
+
+ private long sumHistogram(long[] histogram)
+ {
+ long sum = 0;
+ for (long count : histogram)
+ {
+ sum += count;
+ }
+ return sum;
+ }
+
+ private static String mergeHistogramToString(long[] histogram)
+ {
+ StringBuilder sb = new StringBuilder();
+ long sum = 0;
+ sb.append("[");
+ for (int i = 0; i < histogram.length; i++)
+ {
+ if (histogram[i] != 0)
+ {
+ sb.append(i + 1).append(":").append(histogram[i]).append(", ");
+ sum += (i + 1) * histogram[i];
+ }
+ }
+ if (sb.length() > 1)
+ sb.setLength(sb.length() - 1); //trim trailing comma
+ sb.append("] = " + sum);
+ return sb.toString();
+ }
+
+ private void retainPrevKeyForValidation()
+ {
+ // swap the reusable keys
+ if (prevKey == null)
+ {
+ prevKey = new ReusableDecoratedKey(new ReusableLongToken());
+ }
+ ReusableDecoratedKey temp = prevKey;
+ prevKey = partitionKey;
+ sstableCursors[0].currentDecoratedKey = temp;
+ }
+
+ public void close()
+ {
+ finish();
+ try
+ {
+ for (SSTableCursorReader reader : sstableCursors)
+ {
+ reader.close();
+ }
+ }
+ finally
+ {
+ activeCompactions.finishCompaction(this);
+ }
+
+ if (LOGGER.isInfoEnabled())
+ {
+ long position = ssTableCursorWriter == null ? 0 : ssTableCursorWriter.getPosition();
+ LOGGER.info("Compaction ended {}: { data bytes read = {}, data bytes written = {}, " +
+ " input (keys = {}, rows = {}, cells = {}), " +
+ " output (keys = {}, rows = {}, cells = {})}",
+ this.compactionId, getTotalBytesScanned(), position + totalDataBytesWritten,
+ mergeHistogramToString(partitionMergeCounters), mergeHistogramToString(rowMergeCounters), mergeHistogramToString(cellMergeCounters),
+ sumHistogram(partitionMergeCounters), sumHistogram(rowMergeCounters), sumHistogram(cellMergeCounters));
+ }
+ }
+
+ private void sortPerturbedCursors(int perturbedLimit, int mergeLimit, Comparator super StatefulCursor> comparator) {
+ for (; perturbedLimit > 0; perturbedLimit--) {
+ bubbleInsertElementToPreSorted(sstableCursors, sstableCursorsEqualsNext, perturbedLimit, mergeLimit, comparator);
+ }
+ }
+
+ /**
+ * Use bubble sort to insert the sortedFrom - 1 element into a pre-sorted array, and track element
+ * equality to next element to help in finding merge ranges.
+ *
+ * We use this method to sort the cursor array on 3 levels:
+ *
+ *
Partition - insert sort the newly read partitions into the full list, comparing on pKey
+ *
Unfiltered - insert sort the newly read rows into the sub-list of merging partitions, comparing on clustering
+ *
Cell - insert sort the newly read cells into the sub-list of merging rows, comparing on column
+ *
+ *
+ * @param preSortedArray partially pre-sorted array of elements to be sorted in place
+ * @param equalsNext tracking the equality between each element and the next in the sorted array
+ * @param sortedFrom elements from sortedFrom are assumed sorted
+ * @param sortedTo the limit of our sort effort
+ * @param comparator comparing elements in the array
+ * @param element type
+ */
+ public static void bubbleInsertElementToPreSorted(T[] preSortedArray, boolean[] equalsNext, int sortedFrom, int sortedTo, Comparator comparator){
+ T insert = preSortedArray[sortedFrom - 1];
+
+ for (int j = sortedFrom - 1; j < sortedTo - 1; j++) {
+ int cmp = comparator.compare(insert, preSortedArray[j + 1]);
+ if (cmp < 0)
+ {
+ equalsNext[j] = false;
+ break;
+ }
+ else if (cmp == 0) {
+ equalsNext[j] = true;
+ break;
+ }
+ else
+ {
+ for (; j < sortedTo - 1; j++) {
+ if (!equalsNext[j+1]) {
+ break;
+ }
+ preSortedArray[j] = preSortedArray[j + 1];
+ equalsNext[j] = equalsNext[j+1];
+ }
+ preSortedArray[j] = preSortedArray[j + 1];
+ equalsNext[j] = false;
+ preSortedArray[j + 1] = insert;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index b8eaa5bd812c..c622582a0719 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -820,7 +820,7 @@ ImmutableList getHolders()
*
* lives in matches the list index of the holder that's responsible for it
*/
- public List groupSSTables(Iterable sstables)
+ public final List groupSSTables(Iterable sstables)
{
List classified = new ArrayList<>(holders.size());
for (AbstractStrategyHolder holder : holders)
@@ -970,7 +970,7 @@ public void disable()
* @param ranges
* @return
*/
- public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection sstables, Collection> ranges)
+ public final AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection sstables, Collection> ranges)
{
maybeReloadDiskBoundaries();
List scanners = new ArrayList<>(sstables.size());
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c9af97fe95bf..a413446f9a68 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.compaction;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -70,7 +71,9 @@
public class CompactionTask extends AbstractCompactionTask
{
+ private static final int MEGABYTE = 1024 * 1024;
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
+
protected final long gcBefore;
protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
@@ -151,9 +154,11 @@ protected boolean shouldReduceScopeForSpace()
* For internal use and testing only. The rest of the system should go through the submit* methods,
* which are properly serialized.
* Caller is in charge of marking/unmarking the sstables as compacting.
+ *
+ * NOTE: this method is a Byteman hook location
*/
@Override
- protected void runMayThrow() throws Exception
+ protected final void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
@@ -245,7 +250,7 @@ public boolean apply(SSTableReader sstable)
long nowInSec = FBUtilities.nowInSeconds();
try (Refs refs = Refs.ref(actuallyCompact);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact, rangeList);
- CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId))
+ AbstractCompactionPipeline ci = AbstractCompactionPipeline.create(this, compactionType, scanners, controller, nowInSec, taskId))
{
long lastCheckObsoletion = start;
inputSizeBytes = scanners.getTotalCompressedSize();
@@ -256,7 +261,7 @@ public boolean apply(SSTableReader sstable)
long lastBytesScanned = 0;
activeCompactions.beginCompaction(ci);
- try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
+ try (AutoCloseable resource = getCompactionAwareWriter(actuallyCompact, ci))
{
// Note that we need to re-check this flag after calling beginCompaction above to avoid a window
// where the compaction does not exist in activeCompactions but the CSM gets paused.
@@ -264,19 +269,19 @@ public boolean apply(SSTableReader sstable)
// block until the below exception is thrown and the transaction is cancelled.
if (!controller.cfs.getCompactionStrategyManager().isActive())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- estimatedKeys = writer.estimatedKeys();
- while (ci.hasNext())
+ estimatedKeys = ci.estimatedKeys();
+ while (ci.processNextPartitionKey())
{
- if (writer.append(ci.next()))
- totalKeysWritten++;
-
- ci.setTargetDirectory(writer.getSStableDirectory().path());
- long bytesScanned = scanners.getTotalBytesScanned();
+ long bytesScanned = ci.getTotalBytesScanned();
- // Rate limit the scanners, and account for compression
- CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
+ // If we ingested less than a MB, keep going
+ if (bytesScanned - lastBytesScanned > MEGABYTE)
+ {
+ // Rate limit the scanners, and account for compression
+ CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
- lastBytesScanned = bytesScanned;
+ lastBytesScanned = bytesScanned;
+ }
if (nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
@@ -287,16 +292,28 @@ public boolean apply(SSTableReader sstable)
timeSpentWritingKeys = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);
// point of no return
- newSStables = writer.finish();
+ newSStables = finish(ci);
+ }
+ catch (Exception e)
+ {
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else if (e instanceof CompactionInterruptedException)
+ throw (CompactionInterruptedException) e;
+ else
+ throw new IllegalStateException(e);
}
finally
{
activeCompactions.finishCompaction(ci);
mergedRowCounts = ci.getMergedRowCounts();
totalSourceCQLRows = ci.getTotalSourceCQLRows();
+
+ totalKeysWritten = ci.getTotalKeysWritten();
}
}
+
if (transaction.isOffline())
return;
@@ -345,6 +362,22 @@ public boolean apply(SSTableReader sstable)
}
}
+ /**
+ * NOTE: a Byteman hook
+ */
+ protected Collection finish(AbstractCompactionPipeline pipeline)
+ {
+ return pipeline.finishWriting();
+ }
+
+ /**
+ * NOTE: a Byteman hook
+ */
+ protected AutoCloseable getCompactionAwareWriter(Set actuallyCompact, AbstractCompactionPipeline pipeline)
+ {
+ return pipeline.openWriterResource(cfs, getDirectories(), transaction, actuallyCompact);
+ }
+
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
ILifecycleTransaction transaction,
diff --git a/src/java/org/apache/cassandra/db/compaction/CursorCompactionPipeline.java b/src/java/org/apache/cassandra/db/compaction/CursorCompactionPipeline.java
new file mode 100644
index 000000000000..b56310068509
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/CursorCompactionPipeline.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.TimeUUID;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+class CursorCompactionPipeline extends AbstractCompactionPipeline {
+ final CompactionCursor compactionCursor;
+ final CompactionTask task;
+ long totalKeysWritten;
+ CompactionAwareWriter writer;
+
+ CursorCompactionPipeline(CompactionTask task, OperationType type, AbstractCompactionStrategy.ScannerList scanners, AbstractCompactionController controller, long nowInSec, TimeUUID compactionId) {
+ this.task = task;
+ compactionCursor = new CompactionCursor(type, scanners.scanners, controller, nowInSec, compactionId);
+ }
+
+ public AutoCloseable openWriterResource(ColumnFamilyStore cfs,
+ Directories directories,
+ ILifecycleTransaction transaction,
+ Set nonExpiredSSTables) {
+ this.writer = task.getCompactionAwareWriter(cfs, directories, transaction, nonExpiredSSTables);
+ return writer;
+ }
+
+
+ @Override
+ public Collection finishWriting() {
+ return writer.finish();
+ }
+
+ @Override
+ public long estimatedKeys() {
+ return writer.estimatedKeys();
+ }
+
+ @Override
+ public CompactionInfo getCompactionInfo() {
+ return compactionCursor.getCompactionInfo();
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return compactionCursor.isGlobal();
+ }
+
+ @Override
+ boolean processNextPartitionKey() throws IOException {
+ if (compactionCursor.writeNextPartition(writer)) {
+ totalKeysWritten++;
+ compactionCursor.setTargetDirectory(writer.getSStableDirectoryPath());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public long[] getMergedRowCounts() {
+ return compactionCursor.getMergedRowsCounts();
+ }
+
+ @Override
+ public long getTotalSourceCQLRows() {
+ return compactionCursor.getTotalSourceCQLRows();
+ }
+
+ @Override
+ public long getTotalKeysWritten() {
+ return totalKeysWritten;
+ }
+
+ @Override
+ public long getTotalBytesScanned() {
+ return compactionCursor.getTotalBytesScanned();
+ }
+
+ @Override
+ public void close() throws IOException {
+ compactionCursor.close();
+ }
+
+ @Override
+ public void stop() {
+ compactionCursor.stop();
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/IteratorCompactionPipeline.java b/src/java/org/apache/cassandra/db/compaction/IteratorCompactionPipeline.java
new file mode 100644
index 000000000000..f049e9bab4d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/IteratorCompactionPipeline.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.db.AbstractCompactionController;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.TimeUUID;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+class IteratorCompactionPipeline extends AbstractCompactionPipeline {
+ final CompactionIterator ci;
+ final AbstractCompactionStrategy.ScannerList scanners;
+ final CompactionTask task;
+ long totalKeysWritten;
+ CompactionAwareWriter writer;
+
+ IteratorCompactionPipeline(CompactionTask task, OperationType type, AbstractCompactionStrategy.ScannerList scanners, AbstractCompactionController controller, long nowInSec, TimeUUID compactionId) {
+ this.task = task;
+ this.scanners = scanners;
+ ci = new CompactionIterator(type, this.scanners.scanners, controller, nowInSec, compactionId);
+ }
+
+ public AutoCloseable openWriterResource(ColumnFamilyStore cfs,
+ Directories directories,
+ ILifecycleTransaction transaction,
+ Set nonExpiredSSTables) {
+ this.writer = task.getCompactionAwareWriter(cfs, directories, transaction, nonExpiredSSTables);
+ return writer;
+ }
+
+
+ @Override
+ public Collection finishWriting() {
+ return writer.finish();
+ }
+
+ @Override
+ public long estimatedKeys() {
+ return writer.estimatedKeys();
+ }
+
+ @Override
+ public CompactionInfo getCompactionInfo() {
+ return ci.getCompactionInfo();
+ }
+
+ @Override
+ public boolean isGlobal() {
+ return ci.isGlobal();
+ }
+
+ @Override
+ boolean processNextPartitionKey() throws IOException {
+ if (ci.hasNext()) {
+ if (writer.append(ci.next()))
+ totalKeysWritten++;
+ ci.setTargetDirectory(writer.getSStableDirectoryPath());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public long[] getMergedRowCounts() {
+ return ci.getMergedRowCounts();
+ }
+
+ @Override
+ public long getTotalSourceCQLRows() {
+ return ci.getTotalSourceCQLRows();
+ }
+
+ @Override
+ public long getTotalKeysWritten() {
+ return totalKeysWritten;
+ }
+
+ @Override
+ public long getTotalBytesScanned() {
+ return scanners.getTotalBytesScanned();
+ }
+
+ @Override
+ public void close() throws IOException {
+ ci.close();
+ }
+
+ @Override
+ public void stop() {
+ ci.stop();
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 1509aa2e0371..9a8677cbd0e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -295,6 +295,7 @@ public int getLevelFanoutSize()
return levelFanoutSize;
}
+ @Override
public ScannerList getScanners(Collection sstables, Collection> ranges)
{
Set[] sstablesPerLevel = manifest.getSStablesPerLevelSnapshot();
@@ -430,7 +431,12 @@ public LeveledScanner(TableMetadata metadata, Collection sstables
assert sstableIterator.hasNext(); // caller should check intersecting first
SSTableReader currentSSTable = sstableIterator.next();
currentScanner = currentSSTable.getScanner(ranges);
+ }
+ @Override
+ public boolean isFullRange()
+ {
+ return ranges == null;
}
public static Collection intersecting(Collection sstables, Collection> ranges)
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index ea21f7be57e0..8fa030aeeb3c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -40,7 +40,6 @@
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.File;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -69,7 +68,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
private final List diskBoundaries;
private int locationIndex;
protected Directories.DataDirectory currentDirectory;
-
+ protected String sstableDirectoryPath;
public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
ILifecycleTransaction txn,
@@ -151,9 +150,10 @@ public final boolean append(UnfilteredRowIterator partition)
return realAppend(partition);
}
- public final File getSStableDirectory() throws IOException
+ // hot path, called per partition
+ public final String getSStableDirectoryPath() throws IOException
{
- return getDirectories().getLocationForDisk(currentDirectory);
+ return sstableDirectoryPath;
}
@Override
@@ -173,35 +173,36 @@ protected boolean realAppend(UnfilteredRowIterator partition)
* specific strategy has decided a new sstable is needed.
* Guaranteed to be called before the first call to realAppend.
*/
- protected void maybeSwitchWriter(DecoratedKey key)
+ public final SSTableWriter maybeSwitchWriter(DecoratedKey key)
{
- if (maybeSwitchLocation(key))
- return;
-
- if (shouldSwitchWriterInCurrentLocation(key))
- switchCompactionWriter(currentDirectory, key);
+ SSTableWriter newWriter = maybeSwitchLocation(key);
+ if (newWriter == null && shouldSwitchWriterInCurrentLocation(key))
+ {
+ newWriter = switchCompactionWriter(currentDirectory, key);
+ }
+ return newWriter;
}
/**
* Switches the file location and writer and returns true if the new key should be placed in a different data
* directory.
*/
- protected boolean maybeSwitchLocation(DecoratedKey key)
+ private SSTableWriter maybeSwitchLocation(DecoratedKey key)
{
if (diskBoundaries == null)
{
if (locationIndex < 0)
{
Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, getExpectedWriteSize());
- switchCompactionWriter(defaultLocation, key);
+ SSTableWriter writer = switchCompactionWriter(defaultLocation, key);
locationIndex = 0;
- return true;
+ return writer;
}
- return false;
+ return null;
}
if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
- return false;
+ return null;
int prevIdx = locationIndex;
while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
@@ -209,8 +210,7 @@ protected boolean maybeSwitchLocation(DecoratedKey key)
Directories.DataDirectory newLocation = locations.get(locationIndex);
if (prevIdx >= 0)
logger.debug("Switching write location from {} to {}", locations.get(prevIdx), newLocation);
- switchCompactionWriter(newLocation, key);
- return true;
+ return switchCompactionWriter(newLocation, key);
}
/**
@@ -223,14 +223,14 @@ protected boolean maybeSwitchLocation(DecoratedKey key)
* Implementations of this method should finish the current sstable writer and start writing to this directory.
*
* Called once before starting to append and then whenever we see a need to start writing to another directory.
- *
- * @param directory
- * @param nextKey
*/
- protected void switchCompactionWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
+ protected SSTableWriter switchCompactionWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
{
currentDirectory = directory;
- sstableWriter.switchWriter(sstableWriter(directory, nextKey));
+ sstableDirectoryPath = getDirectories().getLocationForDisk(currentDirectory).path();
+ SSTableWriter newWriter = sstableWriter(directory, nextKey);
+ sstableWriter.switchWriter(newWriter);
+ return newWriter;
}
protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 2b124f4417ab..71a10158685e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -49,7 +49,7 @@ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, I
}
@Override
- protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
+ protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused)
{
return false;
}
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index d0fb70587ca1..367c9d877d0c 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -26,6 +26,7 @@
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
public class MajorLeveledCompactionWriter extends CompactionAwareWriter
{
@@ -69,7 +70,7 @@ public boolean realAppend(UnfilteredRowIterator partition)
}
@Override
- protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
+ protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused)
{
long totalWrittenInCurrentWriter = sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten();
if (totalWrittenInCurrentWriter > maxSSTableSize)
@@ -87,12 +88,12 @@ protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
}
@Override
- public void switchCompactionWriter(Directories.DataDirectory location, DecoratedKey nextKey)
+ public SSTableWriter switchCompactionWriter(Directories.DataDirectory location, DecoratedKey nextKey)
{
averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
partitionsWritten = 0;
sstablesWritten = 0;
- super.switchCompactionWriter(location, nextKey);
+ return super.switchCompactionWriter(location, nextKey);
}
protected int sstableLevel()
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 75f85b1e4da0..dceeebd0b63b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -73,7 +73,7 @@ private static long getTotalWriteSize(Iterable nonExpiredSSTables
}
@Override
- protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
+ protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused)
{
return sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten() > maxSSTableSize;
}
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 917fbb9cf05a..d6afb06fc68b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -79,7 +79,7 @@ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories di
}
@Override
- protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
+ protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused)
{
if (sstableWriter.currentWriter().getEstimatedOnDiskBytesWritten() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
{
diff --git a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
index 70d73041de1b..73be8c0b1a8c 100644
--- a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
@@ -149,7 +149,7 @@ else if (accessorR == ByteBufferAccessor.instance)
int leftSize = left.nativeDataSize();
int rightSize = rightNative.nativeDataSize();
return FastByteOperations.compareMemoryUnsigned(left.getAddress(), leftSize, rightNative.getAddress(), rightSize);
- } else // just in case of new implementations of ValueAccessor appear
+ }else // just in case of new implementations of ValueAccessor appear
return ByteBufferUtil.compareUnsigned(left.asByteBuffer(), accessorR.toBuffer(right));
}
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fd7880e367e4..aec80ba1cdb1 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -288,7 +288,7 @@ public void close()
}
/**
- * Digests the the provided iterator.
+ * Digests the provided iterator.
*
* Caller must close the provided iterator.
*
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index bf6b5b5061c1..8bccda8aa77d 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -80,13 +80,15 @@ public class BTreeRow extends AbstractRow
private static final Comparator COLUMN_COMPARATOR = (cd1, cd2) -> cd1.column.compareTo(cd2.column);
- // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
- // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
- // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
- // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
- // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
- // Long.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
- // no expiring cells, this will be Cell.MAX_DELETION_TIME;
+ // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone,
+ // and then after reconciliation to remove all tombstone since we don't return them to the client) as well as on
+ // compaction. But it's likely that many rows won't have any tombstone at all, so we want to speed up that case
+ // by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have
+ // tombstones, but that doesn't work for expiring columns. So instead we keep the deletion time for the first
+ // thing in the row to be deleted. This allows at any given time to know if we have any deleted information or not.
+ // If we have any "true" tombstone (i.e. not an expiring cell), this value will be forced to Long.MIN_VALUE,
+ // but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we
+ // have no tombstones and no expiring cells, this will be Cell.MAX_DELETION_TIME;
private final long minLocalDeletionTime;
private BTreeRow(Clustering clustering,
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 3ddfeae39a1f..c03cb6092d8a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -248,7 +248,7 @@ public static long decodeLocalDeletionTime(long localDeletionTime, int ttl, Dese
* where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
* meaning:
* - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
- * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
+ * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_ROW_TIMESTAMP_MASK.
* - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
* or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
* - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
@@ -259,13 +259,13 @@ public static long decodeLocalDeletionTime(long localDeletionTime, int ttl, Dese
* - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
* - [ path ]: the cell path if the column this is a cell of is complex.
*/
- static class Serializer
+ public static class Serializer
{
- private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
- private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
- private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
- private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
- private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
+ public final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
+ public final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
+ public final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+ public final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+ public final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
public void serialize(Cell cell, ColumnMetadata column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
{
@@ -309,11 +309,11 @@ else if (isExpiring)
public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnMetadata column, SerializationHeader header, DeserializationHelper helper, ValueAccessor accessor) throws IOException
{
int flags = in.readUnsignedByte();
- boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
- boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
- boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
- boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
- boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+ boolean hasValue = hasValue(flags);
+ boolean isDeleted = isDeleted(flags);
+ boolean isExpiring = isExpiring(flags);
+ boolean useRowTimestamp = useRowTimestamp(flags);
+ boolean useRowTTL = useRowTTL(flags);
long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
@@ -380,11 +380,11 @@ public long serializedSize(Cell cell, ColumnMetadata column, LivenessInfo
public boolean skip(DataInputPlus in, ColumnMetadata column, SerializationHeader header) throws IOException
{
int flags = in.readUnsignedByte();
- boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
- boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
- boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
- boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
- boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+ boolean hasValue = hasValue(flags);
+ boolean isDeleted = isDeleted(flags);
+ boolean isExpiring = isExpiring(flags);
+ boolean useRowTimestamp = useRowTimestamp(flags);
+ boolean useRowTTL = useRowTTL(flags);
if (!useRowTimestamp)
header.skipTimestamp(in);
@@ -403,5 +403,30 @@ public boolean skip(DataInputPlus in, ColumnMetadata column, SerializationHeader
return true;
}
+
+ public static boolean useRowTTL(int cellFlags)
+ {
+ return (cellFlags & USE_ROW_TTL_MASK) != 0;
+ }
+
+ public static boolean useRowTimestamp(int cellFlags)
+ {
+ return (cellFlags & USE_ROW_TIMESTAMP_MASK) != 0;
+ }
+
+ public static boolean isExpiring(int cellFlags)
+ {
+ return (cellFlags & IS_EXPIRING_MASK) != 0;
+ }
+
+ public static boolean isDeleted(int cellFlags)
+ {
+ return (cellFlags & IS_DELETED_MASK) != 0;
+ }
+
+ public static boolean hasValue(int cellFlags)
+ {
+ return (cellFlags & HAS_EMPTY_VALUE_MASK) == 0;
+ }
}
}
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java
index 48331a73a655..621a91d19c00 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -36,7 +36,7 @@ public abstract class Cells
private Cells() {}
/**
- * Collect statistics ont a given cell.
+ * Collect statistics on a given cell.
*
* @param cell the cell for which to collect stats.
* @param collector the stats collector.
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index 2db62044fe6b..377880ae1c6f 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -159,7 +159,7 @@ private DeletionTime currentOpenDeletionTimeInMerged()
return DeletionTime.LIVE;
DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
- // it's only open in the merged iterator if it doesn't supersedes the partition level deletion
+ // it's only open in the merged iterator if it doesn't supersede the partition level deletion
return !biggestDeletionTime.supersedes(partitionDeletion) ? DeletionTime.LIVE : biggestDeletionTime;
}
@@ -172,7 +172,7 @@ private void updateOpenMarkers()
continue;
// Note that we can have boundaries that are both open and close, but in that case all we care about
- // is what it the open deletion after the marker, so we favor the opening part in this case.
+ // is what is the open deletion after the marker, so we favor the opening part in this case.
if (marker.isOpen(reversed))
openMarkers[i] = marker.openDeletionTime(reversed);
else
@@ -192,7 +192,7 @@ public DeletionTime activeDeletion()
{
DeletionTime openMarker = currentOpenDeletionTimeInMerged();
// We only have an open marker in the merged stream if it's not shadowed by the partition deletion (which can be LIVE itself), so
- // if have an open marker, we know it's the "active" deletion for the merged stream.
+ // if we have an open marker, we know it's the "active" deletion for the merged stream.
return openMarker.isLive() ? partitionDeletion : openMarker;
}
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 2fcba1bce8ea..ca1edfdbaf22 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -99,19 +99,19 @@ public class UnfilteredSerializer
/*
* Unfiltered flags constants.
*/
- private final static int END_OF_PARTITION = 0x01; // Signal the end of the partition. Nothing follows a field with that flag.
- private final static int IS_MARKER = 0x02; // Whether the encoded unfiltered is a marker or a row. All following markers applies only to rows.
- private final static int HAS_TIMESTAMP = 0x04; // Whether the encoded row has a timestamp (i.e. if row.partitionKeyLivenessInfo().hasTimestamp() == true).
- private final static int HAS_TTL = 0x08; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
- private final static int HAS_DELETION = 0x10; // Whether the encoded row has some deletion info.
- private final static int HAS_ALL_COLUMNS = 0x20; // Whether the encoded row has all of the columns from the header present.
- private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
- private final static int EXTENSION_FLAG = 0x80; // If present, another byte is read containing the "extended flags" above.
+ public final static int END_OF_PARTITION = 0x01; // Signal the end of the partition. Nothing follows a field with that flag.
+ public final static int IS_MARKER = 0x02; // Whether the encoded unfiltered is a marker or a row. All following markers applies only to rows.
+ public final static int HAS_TIMESTAMP = 0x04; // Whether the encoded row has a timestamp (i.e. if row.partitionKeyLivenessInfo().hasTimestamp() == true).
+ public final static int HAS_TTL = 0x08; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
+ public final static int HAS_DELETION = 0x10; // Whether the encoded row has some deletion info.
+ public final static int HAS_ALL_COLUMNS = 0x20; // Whether the encoded row has all of the columns from the header present.
+ public final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
+ public final static int EXTENSION_FLAG = 0x80; // If present, another byte is read containing the "extended flags" above.
/*
* Extended flags
*/
- private final static int IS_STATIC = 0x01; // Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
+ public final static int IS_STATIC = 0x01; // Whether the encoded row is a static. If there is no extended flag, the row is assumed not static.
/**
* A shadowable tombstone cannot replace a previous row deletion otherwise it could resurrect a
* previously deleted cell not updated by a subsequent update, SEE CASSANDRA-11500
@@ -119,7 +119,7 @@ public class UnfilteredSerializer
* @deprecated See CASSANDRA-11500
*/
@Deprecated(since = "4.0")
- private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion), the deletion is assumed not shadowable.
+ public final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion), the deletion is assumed not shadowable.
public void serialize(Unfiltered unfiltered, SerializationHelper helper, DataOutputPlus out, int version)
throws IOException
@@ -220,14 +220,14 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
- if ((flags & HAS_TIMESTAMP) != 0)
+ if (hasTimestamp(flags))
header.writeTimestamp(pkLiveness.timestamp(), out);
- if ((flags & HAS_TTL) != 0)
+ if (hasTTL(flags))
{
header.writeTTL(pkLiveness.ttl(), out);
header.writeLocalDeletionTime(pkLiveness.localExpirationTime(), out);
}
- if ((flags & HAS_DELETION) != 0)
+ if (hasDeletion(flags))
header.writeDeletionTime(deletion.time(), out);
if ((flags & HAS_ALL_COLUMNS) == 0)
@@ -251,7 +251,7 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da
if (cd.column.isSimple())
Cell.serializer.serialize((Cell>) cd, column, out, pkLiveness, header);
else
- writeComplexColumn((ComplexColumnData) cd, column, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
+ writeComplexColumn((ComplexColumnData) cd, column, hasComplexDeletion(flags), pkLiveness, header, out);
}
catch (IOException e)
{
@@ -412,7 +412,7 @@ private long serializedMarkerBodySize(RangeTombstoneMarker marker, Serialization
public void writeEndOfPartition(DataOutputPlus out) throws IOException
{
- out.writeByte((byte)1);
+ out.writeByte((byte)END_OF_PARTITION);
}
public long serializedSizeEndOfPartition()
@@ -502,12 +502,12 @@ public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeade
else
{
assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
- if ((flags & HAS_DELETION) != 0)
+ if (hasDeletion(flags))
{
assert header.isForSSTable();
- boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
- boolean hasTTL = (flags & HAS_TTL) != 0;
- boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
+ boolean hasTimestamp = hasTimestamp(flags);
+ boolean hasTTL = hasTTL(flags);
+ boolean deletionIsShadowable = deletionIsShadowable(extendedFlags);
Clustering clustering = Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes());
long nextPosition = in.readUnsignedVInt() + in.getFilePointer();
in.readUnsignedVInt(); // skip previous unfiltered size
@@ -572,12 +572,12 @@ public Row deserializeRowBody(DataInputPlus in,
try
{
boolean isStatic = isStatic(extendedFlags);
- boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
- boolean hasTTL = (flags & HAS_TTL) != 0;
- boolean hasDeletion = (flags & HAS_DELETION) != 0;
- boolean deletionIsShadowable = (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
- boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
- boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+ boolean hasTimestamp = hasTimestamp(flags);
+ boolean hasTTL = hasTTL(flags);
+ boolean hasDeletion = hasDeletion(flags);
+ boolean deletionIsShadowable = deletionIsShadowable(extendedFlags);
+ boolean hasComplexDeletion = hasComplexDeletion(flags);
+ boolean hasAllColumns = hasAllColumns(flags);
Columns headerColumns = header.columns(isStatic);
if (header.isForSSTable())
@@ -734,7 +734,17 @@ public static boolean isEndOfPartition(int flags)
public static Unfiltered.Kind kind(int flags)
{
- return (flags & IS_MARKER) != 0 ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW;
+ return isTombstoneMarker(flags) ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW;
+ }
+
+ public static boolean isTombstoneMarker(int flags)
+ {
+ return (flags & IS_MARKER) != 0;
+ }
+
+ public static boolean isRow(int flags)
+ {
+ return (flags & IS_MARKER) == 0;
}
public static boolean isStatic(int extendedFlags)
@@ -742,7 +752,12 @@ public static boolean isStatic(int extendedFlags)
return (extendedFlags & IS_STATIC) != 0;
}
- private static boolean isExtended(int flags)
+ public static boolean deletionIsShadowable(int extendedFlags)
+ {
+ return (extendedFlags & HAS_SHADOWABLE_DELETION) != 0;
+ }
+
+ public static boolean isExtended(int flags)
{
return (flags & EXTENSION_FLAG) != 0;
}
@@ -756,4 +771,29 @@ public static boolean hasExtendedFlags(Row row)
{
return row.isStatic() || row.deletion().isShadowable();
}
+
+ public static boolean hasTTL(int flags)
+ {
+ return (flags & HAS_TTL) != 0;
+ }
+
+ public static boolean hasTimestamp(int flags)
+ {
+ return (flags & HAS_TIMESTAMP) != 0;
+ }
+
+ public static boolean hasAllColumns(int flags)
+ {
+ return (flags & HAS_ALL_COLUMNS) != 0;
+ }
+
+ public static boolean hasComplexDeletion(int flags)
+ {
+ return (flags & HAS_COMPLEX_DELETION) != 0;
+ }
+
+ public static boolean hasDeletion(int flags)
+ {
+ return (flags & HAS_DELETION) != 0;
+ }
}
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 74a605946a18..73766bc06fd3 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -177,7 +177,7 @@ public static class LongToken extends Token
{
static final long serialVersionUID = -5833580143318243006L;
- public final long token;
+ public long token;
public LongToken(long token)
{
@@ -320,6 +320,18 @@ private LongToken getToken(ByteBuffer key, long[] hash)
return new LongToken(normalize(hash[0]));
}
+ public long getTokenValue(ByteBuffer key, long[] hash)
+ {
+ if (key.remaining() == 0)
+ {
+ hash[0] = MINIMUM.token;
+ hash[1] = 0;
+ return MINIMUM.token;
+ }
+ populateHash(key, hash);
+ return normalize(hash[0]);
+ }
+
@Override
public boolean isFixedLength()
{
@@ -386,10 +398,15 @@ private static long flip(long value)
private long[] getHash(ByteBuffer key)
{
long[] hash = new long[2];
- MurmurHash.hash3_x64_128(key, key.position(), key.remaining(), 0, hash);
+ populateHash(key, hash);
return hash;
}
+ private void populateHash(ByteBuffer key, long[] hash)
+ {
+ MurmurHash.hash3_x64_128(key, key.position(), key.remaining(), 0, hash);
+ }
+
public LongToken getRandomToken()
{
return getRandomToken(ThreadLocalRandom.current());
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index c12a76bc842e..18567483f635 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -46,7 +46,7 @@
/**
* Base class for the sstable writers used by CQLSSTableWriter.
*/
-abstract class AbstractSSTableSimpleWriter implements Closeable
+public abstract class AbstractSSTableSimpleWriter implements Closeable
{
protected final File directory;
protected final TableMetadataRef metadata;
@@ -150,7 +150,7 @@ private static SSTableId getNextId(File directory, final String columnFamily) th
}
}
- PartitionUpdate.Builder getUpdateFor(ByteBuffer key) throws IOException
+ public PartitionUpdate.Builder getUpdateFor(ByteBuffer key) throws IOException
{
return getUpdateFor(metadata.get().partitioner.decorateKey(key));
}
diff --git a/src/java/org/apache/cassandra/io/sstable/ClusteringDescriptor.java b/src/java/org/apache/cassandra/io/sstable/ClusteringDescriptor.java
new file mode 100644
index 000000000000..4339132dee4a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ClusteringDescriptor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredClustering;
+
+public class ClusteringDescriptor extends ResizableByteBuffer
+{
+ public static final byte EXCL_END_BOUND_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.EXCL_END_BOUND.ordinal();
+ public static final byte INCL_START_BOUND_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.INCL_START_BOUND.ordinal();
+ public static final byte INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY.ordinal();
+
+ public static final byte STATIC_CLUSTERING_TYPE = (byte)ClusteringPrefix.Kind.STATIC_CLUSTERING.ordinal();
+ public static final byte ROW_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.CLUSTERING.ordinal();
+
+ public static final byte EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY.ordinal();
+ public static final byte INCL_END_BOUND_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.INCL_END_BOUND.ordinal();
+ public static final byte EXCL_START_BOUND_CLUSTERING_TYPE = (byte) ClusteringPrefix.Kind.EXCL_START_BOUND.ordinal();
+
+ protected AbstractType>[] clusteringTypes;
+ protected ClusteringPrefix.Kind clusteringKind;
+ protected byte clusteringKindEncoded;
+ protected int clusteringColumnsBound;
+
+ protected void loadClustering(RandomAccessReader dataReader, AbstractType>[] clusteringColumnTypes, byte clusteringKind, int clusteringColumnsBound) throws IOException
+ {
+ clusteringTypes = clusteringColumnTypes;
+ this.clusteringKindEncoded = clusteringKind;
+ this.clusteringKind = ClusteringPrefix.Kind.values()[clusteringKind];
+ this.clusteringColumnsBound = clusteringColumnsBound;
+ if (clusteringKind != STATIC_CLUSTERING_TYPE)
+ readUnfilteredClustering(dataReader, clusteringTypes, this.clusteringColumnsBound, this);
+ else
+ resetBuffer();
+ }
+
+ public ClusteringDescriptor resetMin() {
+ set(null, ClusteringBound.MIN_END.kind(), 0);
+ resetBuffer();
+ return this;
+ }
+
+ public ClusteringDescriptor resetMax() {
+ set(null, ClusteringBound.MAX_START.kind(), 0);
+ resetBuffer();
+ return this;
+ }
+
+ public final void resetClustering()
+ {
+ set(null, ClusteringPrefix.Kind.CLUSTERING, 0);
+
+ resetBuffer();
+ }
+
+ public void copy(ClusteringDescriptor newClustering)
+ {
+ set(newClustering.clusteringTypes, newClustering.clusteringKind, newClustering.clusteringColumnsBound());
+ overwrite(newClustering.clusteringBytes(), newClustering.clusteringLength());
+ }
+
+ private void set(AbstractType>[] clusteringColumnTypes, ClusteringPrefix.Kind clusteringKind, int clusteringColumnsBound) {
+ clusteringTypes = clusteringColumnTypes;
+ this.clusteringKindEncoded = (byte) clusteringKind.ordinal();
+ this.clusteringKind = clusteringKind;
+ this.clusteringColumnsBound = clusteringColumnsBound;
+ }
+
+ // Expose and rename parent data
+ public ByteBuffer clusteringBuffer() {
+ return buffer();
+ }
+
+ public int clusteringLength() {
+ return length();
+ }
+
+ public byte[] clusteringBytes() {
+ return bytes();
+ }
+
+ public AbstractType>[] clusteringTypes()
+ {
+ return clusteringTypes;
+ }
+
+ public byte clusteringKindEncoded() {
+ return clusteringKindEncoded;
+ }
+
+ public ClusteringPrefix.Kind clusteringKind() {
+ return clusteringKind;
+ }
+
+ public void clusteringKind(ClusteringPrefix.Kind kind)
+ {
+ clusteringKind = kind;
+ clusteringKindEncoded = (byte)kind.ordinal();
+ }
+
+ public int clusteringColumnsBound() {
+ return clusteringColumnsBound;
+ }
+
+ public boolean isStartBound()
+ {
+ return (clusteringKindEncoded == INCL_START_BOUND_CLUSTERING_TYPE || clusteringKindEncoded == EXCL_START_BOUND_CLUSTERING_TYPE);
+ }
+
+ public boolean isEndBound()
+ {
+ return (clusteringKindEncoded == INCL_END_BOUND_CLUSTERING_TYPE || clusteringKindEncoded == EXCL_END_BOUND_CLUSTERING_TYPE);
+ }
+
+ public boolean isBoundary()
+ {
+ return (clusteringKindEncoded == EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE || clusteringKindEncoded == INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE);
+ }
+
+ public ClusteringPrefix> toClusteringPrefix(List> clusteringTypesList) {
+ if (clusteringKindEncoded == ROW_CLUSTERING_TYPE) {
+ return Clustering.serializer.deserialize(clusteringBuffer(), 0, clusteringTypesList);
+ }
+ else if (clusteringColumnsBound == 0) {
+ return ByteArrayAccessor.factory.bound(clusteringKind);
+ }
+ else {
+ byte[][] values;
+ try (DataInputBuffer buffer = new DataInputBuffer(clusteringBuffer(), true))
+ {
+ values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(buffer, clusteringColumnsBound, 0, clusteringTypesList);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Reading from an in-memory buffer shouldn't trigger an IOException", e);
+ }
+ return ByteArrayAccessor.factory.boundOrBoundary(clusteringKind, values);
+ }
+ }
+
+ public boolean clusteringEquals(ClusteringDescriptor clusteringDescriptor)
+ {
+ if (this == clusteringDescriptor)
+ return true;
+ int length = this.length();
+ if (length != clusteringDescriptor.length())
+ return false;
+ if (this.clusteringColumnsBound != clusteringDescriptor.clusteringColumnsBound)
+ return false;
+ if(!Arrays.equals(this.bytes(), 0, length, clusteringDescriptor.bytes(), 0, length))
+ return false;
+ return ClusteringPrefix.Kind.compare(this.clusteringKind, clusteringDescriptor.clusteringKind) == 0;
+ }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/ElementDescriptor.java b/src/java/org/apache/cassandra/io/sstable/ElementDescriptor.java
new file mode 100644
index 000000000000..3574c6ce336c
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ElementDescriptor.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static org.apache.cassandra.io.sstable.SSTableCursorReader.readUnfilteredDeletionTime;
+
+public class ElementDescriptor extends ClusteringDescriptor
+{
+ private final ReusableLivenessInfo rowLivenessInfo = new ReusableLivenessInfo();
+ private final DeletionTime deletionTime = DeletionTime.build(0, 0);
+ private final DeletionTime deletionTime2 = DeletionTime.build(0, 0);
+
+ private long position;
+ private int flags;
+ private int extendedFlags;
+
+ private long unfilteredSize;
+ private long unfilteredDataStart;
+// private long prevUnfilteredSize;
+ private long unfilteredCellStart;
+ Columns rowColumns;
+
+ void loadTombstone(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ AbstractType>[] clusteringColumnTypes,
+ int flags) throws IOException
+ {
+ this.flags = flags;
+ this.extendedFlags = 0;
+ rowColumns = null;
+ byte clusteringKind = dataReader.readByte();
+ if (clusteringKind == STATIC_CLUSTERING_TYPE || clusteringKind == ROW_CLUSTERING_TYPE) {
+ // STATIC_CLUSTERING or CLUSTERING -> no deletion info, should not happen
+ throw new IllegalStateException();
+ }
+
+ int columnsBound = dataReader.readUnsignedShort();
+ loadClustering(dataReader, clusteringColumnTypes, clusteringKind, columnsBound);
+ this.unfilteredSize = dataReader.readUnsignedVInt();
+ dataReader.readUnsignedVInt(); // Unused: prevUnfilteredSize
+ if (clusteringKind == EXCL_END_INCL_START_BOUNDARY_CLUSTERING_TYPE || clusteringKind == INCL_END_EXCL_START_BOUNDARY_CLUSTERING_TYPE)
+ {
+ // boundary
+ readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime); // CLOSE
+ readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime2); // OPEN
+ }
+ else
+ {
+ // bound
+ readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime); // CLOSE|OPEN
+ }
+ }
+
+ void loadRow(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ AbstractType>[] clusteringTypes,
+ int flags) throws IOException {
+ // body = whatever is covered by size, so inclusive of the prev_row_size inclusive of flags
+ position = dataReader.getPosition() - 1;
+ this.flags = flags;
+ this.extendedFlags = 0;
+
+ loadClustering(dataReader, clusteringTypes, ROW_CLUSTERING_TYPE, clusteringTypes.length);
+
+ rowColumns = serializationHeader.columns(false);
+
+ loadCommonRowFields(dataReader, serializationHeader, deserializationHelper, flags);
+ }
+
+ void loadStaticRow(RandomAccessReader dataReader,
+ SerializationHeader serializationHeader,
+ DeserializationHelper deserializationHelper,
+ int flags,
+ int extendedFlags) throws IOException {
+ // body = whatever is covered by size, so inclusive of the prev_row_size inclusive of flags
+ position = dataReader.getPosition() - 2;
+ this.flags = flags;
+ this.extendedFlags = extendedFlags;
+ // no clustering
+ loadClustering(dataReader, null, STATIC_CLUSTERING_TYPE, 0);
+ rowColumns = serializationHeader.columns(true);
+
+ loadCommonRowFields(dataReader, serializationHeader, deserializationHelper, flags);
+ }
+
+ private void loadCommonRowFields(RandomAccessReader dataReader, SerializationHeader serializationHeader, DeserializationHelper deserializationHelper, int flags) throws IOException
+ {
+ unfilteredSize = dataReader.readUnsignedVInt();
+ unfilteredDataStart = dataReader.getPosition();
+ // prevUnfilteredSize = ;
+ dataReader.readUnsignedVInt(); // unused
+
+ SSTableCursorReader.readLivenessInfo(dataReader, serializationHeader, deserializationHelper, flags, rowLivenessInfo);
+ if (UnfilteredSerializer.hasDeletion(flags))
+ {
+ // struct delta_deletion_time {
+ // varint delta_marked_for_delete_at;
+ // varint delta_local_deletion_time;
+ //};
+ readUnfilteredDeletionTime(dataReader, serializationHeader, deletionTime);
+ }
+ else
+ {
+ deletionTime.resetLive();
+ }
+ if (!UnfilteredSerializer.hasAllColumns(flags))
+ {
+ // TODO: re-implement GC free
+ rowColumns = Columns.serializer.deserializeSubset(rowColumns, dataReader);
+ }
+ unfilteredCellStart = dataReader.getPosition();
+ }
+
+ public void resetElement()
+ {
+ resetClustering();
+ position = 0;
+ flags = 0;
+ extendedFlags = 0;
+ unfilteredSize = 0;
+ unfilteredDataStart = 0;
+// prevUnfilteredSize = 0;
+ unfilteredCellStart = 0;
+ rowColumns = null;
+ }
+
+ public long position()
+ {
+ return position;
+ }
+
+ public ReusableLivenessInfo livenessInfo()
+ {
+ return rowLivenessInfo;
+ }
+
+ public DeletionTime deletionTime()
+ {
+ return deletionTime;
+ }
+
+ public DeletionTime openDeletionTime()
+ {
+ return isBoundary() ? deletionTime2 : isEndBound() ? DeletionTime.LIVE : deletionTime;
+ }
+
+
+ public DeletionTime deletionTime2()
+ {
+ return deletionTime2;
+ }
+
+ public int flags()
+ {
+ return flags;
+ }
+
+ public int extendedFlags()
+ {
+ return extendedFlags;
+ }
+
+ public long size()
+ {
+ return unfilteredSize;
+ }
+
+ public long dataStart()
+ {
+ return unfilteredDataStart;
+ }
+
+ public Columns rowColumns()
+ {
+ return rowColumns;
+ }
+
+ public long unfilteredCellStart()
+ {
+ return unfilteredCellStart;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RowHeader{" +
+ "rowLivenessInfo=" + rowLivenessInfo +
+ ", deletionTime=" + deletionTime +
+ ", position=" + position +
+ ", flags=" + flags +
+ ", extFlags=" + extendedFlags +
+ ", unfilteredSize=" + unfilteredSize +
+ ", unfilteredDataStart=" + unfilteredDataStart +
+// ", prevUnfilteredSize=" + prevUnfilteredSize +
+ ", unfilteredCellStart=" + unfilteredCellStart +
+ ", rowColumns=" + rowColumns +
+ ", clusteringTypes=" + Arrays.toString(clusteringTypes()) +
+ '}';
+ }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/EmptySSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/EmptySSTableScanner.java
index 8976ed413072..fecb6940a40f 100644
--- a/src/java/org/apache/cassandra/io/sstable/EmptySSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/EmptySSTableScanner.java
@@ -56,6 +56,12 @@ public Set getBackingSSTables()
return ImmutableSet.of(sstable);
}
+ @Override
+ public boolean isFullRange()
+ {
+ return false;
+ }
+
public long getCurrentPosition()
{
return 0;
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 671bccb824b5..2cf628046990 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -39,6 +39,7 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator
public long getCurrentPosition();
public long getBytesScanned();
public Set getBackingSSTables();
+ public boolean isFullRange();
public static void closeAllAndPropagate(Collection scanners, Throwable throwable)
{
diff --git a/src/java/org/apache/cassandra/io/sstable/PartitionDescriptor.java b/src/java/org/apache/cassandra/io/sstable/PartitionDescriptor.java
new file mode 100644
index 000000000000..c3fdc8e85cd3
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/PartitionDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.ResizableByteBuffer;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class PartitionDescriptor extends ResizableByteBuffer
+{
+ private long position;
+ private final DeletionTime deletionTime = DeletionTime.build(0, 0);
+
+ /**
+ * Loads the following structure:
+ *