Skip to content

Commit ff78780

Browse files
Nitsan Wakartjmckenzie-dev
authored andcommitted
Add cursor based optimized compaction path
Adds a compaction implementation utilizing new fixed allocation SSTable reader/writer implementations, and other purpose built code, leading to improved efficiencies. patch by Nitsan Wakart; reviewed by Branimir Lambov, Dmitry Konstantinov for CASSANDRA-20918
1 parent 66a7a36 commit ff78780

File tree

137 files changed

+13127
-434
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+13127
-434
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Add cursor based optimized compaction path (CASSANDRA-20918)
23
* Ensure peers with LEFT status are expired from gossip state (CASSANDRA-21035)
34
* Optimize UTF8Validator.validate for ASCII prefixed Strings (CASSANDRA-21075)
45
* Switch LatencyMetrics to use ThreadLocalTimer/ThreadLocalCounter (CASSANDRA-21080)

build.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,7 @@
14141414
<jvmarg value="-Dcassandra.config=file:///${scm_none_yaml}"/>
14151415
<jvmarg value="-Dcassandra.test.storage_compatibility_mode=NONE"/>
14161416
<jvmarg value="-Dcassandra.skip_sync=true" />
1417+
<jvmarg value="-Dcassandra.cursor_compaction_enabled=false" />
14171418
</testmacrohelper>
14181419
</sequential>
14191420
</macrodef>

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ public enum CassandraRelevantProperties
193193
CONSISTENT_RANGE_MOVEMENT("cassandra.consistent.rangemovement", "true"),
194194
CONSISTENT_SIMULTANEOUS_MOVES_ALLOW("cassandra.consistent.simultaneousmoves.allow"),
195195
CRYPTO_PROVIDER_CLASS_NAME("cassandra.crypto_provider_class_name"),
196+
CURSOR_COMPACTION_ENABLED("cassandra.cursor_compaction_enabled", "true"),
196197
CUSTOM_DISK_ERROR_HANDLER("cassandra.custom_disk_error_handler"),
197198
CUSTOM_GUARDRAILS_CONFIG_PROVIDER_CLASS("cassandra.custom_guardrails_config_provider_class"),
198199
CUSTOM_QUERY_HANDLER_CLASS("cassandra.custom_query_handler_class"),

src/java/org/apache/cassandra/config/Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import static org.apache.cassandra.config.CassandraRelevantProperties.AUTOCOMPACTION_ON_STARTUP_ENABLED;
5151
import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_AVAILABLE_PROCESSORS;
52+
import static org.apache.cassandra.config.CassandraRelevantProperties.CURSOR_COMPACTION_ENABLED;
5253
import static org.apache.cassandra.config.CassandraRelevantProperties.FILE_CACHE_ENABLED;
5354
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE;
5455
import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES;
@@ -661,6 +662,8 @@ public static class SSTableConfig
661662
@Replaces(oldName = "enable_drop_compact_storage", converter = Converters.IDENTITY, deprecated = true)
662663
public volatile boolean drop_compact_storage_enabled = false;
663664

665+
public boolean cursor_compaction_enabled = CURSOR_COMPACTION_ENABLED.getBoolean();
666+
664667
public volatile boolean use_statements_enabled = true;
665668

666669
/**

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4725,6 +4725,17 @@ public static void setTransientReplicationEnabledUnsafe(boolean enabled)
47254725
conf.transient_replication_enabled = enabled;
47264726
}
47274727

4728+
public static boolean cursorCompactionEnabled()
4729+
{
4730+
return conf.cursor_compaction_enabled;
4731+
}
4732+
4733+
@VisibleForTesting
4734+
public static void setCursorCompactionEnabled(boolean cursor_compaction_enabled)
4735+
{
4736+
conf.cursor_compaction_enabled = cursor_compaction_enabled;
4737+
}
4738+
47284739
public static boolean enableDropCompactStorage()
47294740
{
47304741
return conf.drop_compact_storage_enabled;

src/java/org/apache/cassandra/db/BufferDecoratedKey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
public class BufferDecoratedKey extends DecoratedKey
2727
{
28-
private final ByteBuffer key;
28+
protected ByteBuffer key;
2929

3030
public BufferDecoratedKey(Token token, ByteBuffer key)
3131
{

src/java/org/apache/cassandra/db/Clustering.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public String toString(TableMetadata metadata)
133133
/**
134134
* Serializer for Clustering object.
135135
* <p>
136-
* Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
136+
* Because every clustering in a given table must have the same size (and that size cannot actually change once the table
137137
* has been defined), we don't record that size.
138138
*/
139139
public static class Serializer

src/java/org/apache/cassandra/db/ClusteringComparator.java

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
import com.google.common.base.Joiner;
2727
import com.google.common.collect.ImmutableList;
2828

29+
import org.apache.cassandra.io.sstable.ClusteringDescriptor;
30+
import org.apache.cassandra.db.marshal.AbstractType;
31+
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
2932
import org.apache.cassandra.db.marshal.ValueAccessor;
3033
import org.apache.cassandra.db.rows.Row;
31-
import org.apache.cassandra.db.marshal.AbstractType;
32-
import org.apache.cassandra.serializers.MarshalException;
33-
3434
import org.apache.cassandra.io.sstable.IndexInfo;
35+
import org.apache.cassandra.serializers.MarshalException;
36+
import org.apache.cassandra.utils.ByteBufferUtil;
3537
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
3638
import org.apache.cassandra.utils.bytecomparable.ByteSource;
39+
import org.apache.cassandra.utils.vint.VIntCoding;
3740

3841
import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
3942
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
@@ -156,6 +159,107 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2)
156159
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
157160
}
158161

162+
public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
163+
{
164+
final int c1Size = c1.clusteringColumnsBound();
165+
final int c2Size = c2.clusteringColumnsBound();
166+
final int minColumns = Math.min(c1Size, c2Size);
167+
168+
final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
169+
if (cmp != 0)
170+
return cmp;
171+
172+
final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
173+
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
174+
if (c1Size == c2Size)
175+
{
176+
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
177+
}
178+
179+
return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
180+
}
181+
182+
public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) {
183+
return compare(types, c1, c2, types.length);
184+
}
185+
186+
private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size)
187+
{
188+
long clusteringBlock1 = 0;
189+
long clusteringBlock2 = 0;
190+
final int position1 = c1.position();
191+
final int position2 = c2.position();
192+
final int limit1 = c1.limit();
193+
final int limit2 = c2.limit();
194+
try
195+
{
196+
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
197+
{
198+
if (clusteringIndex % 32 == 0)
199+
{
200+
clusteringBlock1 = VIntCoding.readUnsignedVInt(c1);
201+
clusteringBlock2 = VIntCoding.readUnsignedVInt(c2);
202+
}
203+
204+
AbstractType<?> type = types[clusteringIndex];
205+
206+
byte v1Flags = (byte) (clusteringBlock1 & 0b11);
207+
byte v2Flags = (byte) (clusteringBlock2 & 0b11);
208+
209+
// both values are present
210+
if ((v1Flags|v2Flags) == 0)
211+
{
212+
boolean isByteOrderComparable = type.isByteOrderComparable;
213+
int vlen1,vlen2;
214+
if (type.isValueLengthFixed())
215+
{
216+
vlen1 = vlen2 = type.valueLengthIfFixed();
217+
}
218+
else
219+
{
220+
vlen1 = VIntCoding.readUnsignedVInt32(c1);
221+
vlen2 = VIntCoding.readUnsignedVInt32(c2);
222+
}
223+
int v1Limit = c1.position() + vlen1;
224+
if (v1Limit > limit1)
225+
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
226+
c1.limit(v1Limit);
227+
int v2Limit = c2.position() + vlen2;
228+
if (v2Limit > limit2)
229+
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
230+
c2.limit(v2Limit);
231+
int cmp = isByteOrderComparable ?
232+
ByteBufferUtil.compareUnsigned(c1, c2) :
233+
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
234+
if (cmp != 0)
235+
return cmp;
236+
c1.position(v1Limit);
237+
c2.position(v2Limit);
238+
c1.limit(limit1);
239+
c2.limit(limit2);
240+
}
241+
// present > not present
242+
else
243+
{
244+
// null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
245+
// compare swapped arguments to reverse the order
246+
int cmp = Long.compare(v2Flags, v1Flags);
247+
if (cmp != 0)
248+
return cmp;
249+
// null/empty == null/empty, continue...
250+
}
251+
clusteringBlock1 = clusteringBlock1 >>> 2;
252+
clusteringBlock2 = clusteringBlock2 >>> 2;
253+
}
254+
}
255+
finally
256+
{
257+
c1.position(position1).limit(limit1);
258+
c2.position(position2).limit(limit2);
259+
}
260+
return 0;
261+
}
262+
159263
public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2)
160264
{
161265
return compare(c1, c2, size());

src/java/org/apache/cassandra/db/ClusteringPrefix.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
519519
return result;
520520
}
521521

522-
byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
522+
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
523523
{
524524
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
525525
assert size > 0;

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2330,6 +2330,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
23302330
return partitionKeySetIgnoreGcGrace.contains(dk);
23312331
}
23322332

2333+
public boolean shouldIgnoreGcGraceForAnyKey()
2334+
{
2335+
return !partitionKeySetIgnoreGcGrace.isEmpty();
2336+
}
2337+
23332338
public static Iterable<ColumnFamilyStore> all()
23342339
{
23352340
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());

0 commit comments

Comments
 (0)