Skip to content

Commit f43d9d0

Browse files
committed
Add support in the binary protocol to allow transactions to have multiple conditions
patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-20883
1 parent f6fd495 commit f43d9d0

18 files changed

+1333
-258
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Add support in the binary protocol to allow transactions to have multiple conditions (CASSANDRA-20883)
23
* Enable CQLSSTableWriter to create SSTables compressed with a dictionary (CASSANDRA-20938)
34
* Support ZSTD dictionary compression (CASSANDRA-17021)
45
* Fix ExceptionsTable when stacktrace has zero elements (CASSANDRA-20992)

src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,10 @@ default Out deserializeUnchecked(P p, ByteBuffer buffer)
7474
}
7575
}
7676

77+
default void skip(P p, DataInputPlus in) throws IOException
78+
{
79+
deserialize(p, in);
80+
}
81+
7782
long serializedSize(In t, P p);
7883
}

src/java/org/apache/cassandra/service/accord/TokenRange.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public static TokenRange create(TokenKey start, TokenKey end)
5454
return new TokenRange(start, end);
5555
}
5656

57+
public static TokenRange create(TableId tableId, Token start, Token end)
58+
{
59+
return new TokenRange(new TokenKey(tableId, start), new TokenKey(tableId, end));
60+
}
61+
5762
public static TokenRange createUnsafe(TokenKey start, TokenKey end)
5863
{
5964
return new TokenRange(start, end);

src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,99 @@
2323
import accord.utils.BitUtils;
2424
import accord.utils.Invariants;
2525
import net.nicoulaj.compilecommand.annotations.Inline;
26+
import org.apache.cassandra.db.TypeSizes;
2627
import org.apache.cassandra.io.util.DataInputPlus;
2728
import org.apache.cassandra.io.util.DataOutputPlus;
2829

2930
/**
3031
* A set of simple utilities to quickly serialize/deserialize arrays/lists of values that each require <= 64 bits to represent.
3132
* These are packed into an "array" of fixed bit width, so that the total size consumed is ceil((bits*elements)/8).
3233
* This can (in future) be read directly without deserialization, by indexing into the byte stream directly.
34+
* <p/>
35+
* The serialized value is optimized for values in the range 0 to 256 (negative will be rejected), and should produce
36+
* output smaller or equal to vint serialization; when values are larger than 256, then the packing can produce 1 extra
37+
* serialized byte. Serialization is safe in these cases, and faster to skip.
3338
*/
3439
public class SerializePacked
3540
{
36-
public static void serializePackedInts(int[] vs, int from, int to, int max, DataOutputPlus out) throws IOException
41+
public static void serializePackedSortedIntsAndLength(int[] vs, DataOutputPlus out) throws IOException
42+
{
43+
out.writeUnsignedVInt32(vs.length);
44+
serializePackedSortedInts(vs, out);
45+
}
46+
47+
public static void serializePackedSortedInts(int[] vs, DataOutputPlus out) throws IOException
48+
{
49+
if (vs.length == 0)
50+
return;
51+
52+
int last = vs[vs.length - 1];
53+
Invariants.require(last >= 0,
54+
() -> String.format("Found a negative value at offset %d; value %d", (Object) (vs.length - 1), (Object) last));
55+
out.writeUnsignedVInt32(last);
56+
serializePackedInts(vs, 0, vs.length - 1, last, out);
57+
}
58+
59+
public static int[] deserializePackedSortedIntsAndLength(DataInputPlus in) throws IOException
60+
{
61+
return deserializePackedSortedInts(in.readUnsignedVInt32(), in);
62+
}
63+
64+
public static int[] deserializePackedSortedInts(int length, DataInputPlus in) throws IOException
65+
{
66+
if (length == 0)
67+
return new int[0];
68+
69+
int last = in.readUnsignedVInt32();
70+
int[] vs = new int[length];
71+
deserializePackedInts(vs, 0, length - 1, last, in);
72+
vs[length - 1] = last;
73+
return vs;
74+
}
75+
76+
public static void skipPackedSortedIntsAndLength(DataInputPlus in) throws IOException
77+
{
78+
skipPackedSortedInts(in.readUnsignedVInt32(), in);
79+
}
80+
81+
public static void skipPackedSortedInts(int length, DataInputPlus in) throws IOException
82+
{
83+
if (length > 0)
84+
{
85+
int last = in.readUnsignedVInt32();
86+
skipPackedInts(0, length - 1, last, in);
87+
}
88+
}
89+
90+
public static long serializedSizeOfPackedSortedIntsAndLength(int[] vs)
91+
{
92+
return TypeSizes.sizeofUnsignedVInt(vs.length) + serializedSizeOfPackedSortedInts(vs);
93+
}
94+
95+
public static long serializedSizeOfPackedSortedInts(int[] vs)
96+
{
97+
if (vs.length == 0)
98+
return 0;
99+
int last = vs[vs.length - 1];
100+
return TypeSizes.sizeofUnsignedVInt(last) + serializedPackedSize(vs.length - 1, last);
101+
}
102+
103+
public static void serializePackedInts(int[] vs, int from, int to, long max, DataOutputPlus out) throws IOException
37104
{
38105
serializePacked((in, i) -> in[i], vs, from, to, max, out);
39106
}
40107

41-
public static void deserializePackedInts(int[] vs, int from, int to, int max, DataInputPlus in) throws IOException
108+
public static void deserializePackedInts(int[] vs, int from, int to, long max, DataInputPlus in) throws IOException
42109
{
43110
deserializePacked((out, i, v) -> out[i] = (int)v, vs, from, to, max, in);
44111
}
45112

46-
public static long serializedPackedIntsSize(int[] vs, int from, int to, int max)
113+
public static void skipPackedInts(int from, int to, long max, DataInputPlus in) throws IOException
114+
{
115+
in.skipBytesFully(serializedPackedSize(to - from, max));
116+
}
117+
118+
public static long serializedPackedIntsSize(int[] vs, int from, int to, long max)
47119
{
48120
return serializedPackedSize(to - from, max);
49121
}
@@ -60,12 +132,15 @@ public static <In> void serializePacked(SerializeAdapter<In> adapter, In in, int
60132
if (bitsPerEntry == 0)
61133
return;
62134

135+
long outOfRange = -1L << bitsPerEntry;
63136
long buffer = 0L;
64137
int bufferCount = 0;
65138
for (int i = from; i < to; i++)
66139
{
67140
long v = adapter.get(in, i);
68-
Invariants.require(v <= max);
141+
int finalI = i;
142+
Invariants.require(v >= 0 && (v & outOfRange) == 0,
143+
() -> String.format(v < 0 ? "Found a negative value at offset %d; value %d" : "Value out of range at offset %d; value %d", (Object) finalI, (Object) v));
69144
buffer |= v << bufferCount;
70145
bufferCount = bufferCount + bitsPerEntry;
71146
if (bufferCount >= 64)

src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import java.util.AbstractList;
2323
import java.util.Arrays;
2424
import java.util.Comparator;
25+
import java.util.List;
26+
27+
import com.google.common.annotations.VisibleForTesting;
2528

2629
import accord.utils.Invariants;
2730
import accord.utils.SortedArrays;
@@ -99,6 +102,14 @@ public static Complete of(TableMetadata metadata)
99102
return new One(metadata);
100103
}
101104

105+
@VisibleForTesting
106+
public static Complete of(List<TableMetadata> values)
107+
{
108+
Collector collector = new Collector();
109+
collector.addAll(values);
110+
return collector.build();
111+
}
112+
102113
public static Complete ofSortedUnique(TableMetadata ... metadatas)
103114
{
104115
if (metadatas.length == 0)

src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void failCondition()
7272

7373
}
7474

75-
public boolean checkCondition(Data data)
75+
public boolean checkAnyConditionMatch(Data data)
7676
{
7777
throw new UnsupportedOperationException();
7878
}

src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public Result doCompute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys,
119119

120120
AccordUpdate accordUpdate = (AccordUpdate)update;
121121
TxnData txnData = (TxnData)data;
122-
boolean conditionCheck = accordUpdate.checkCondition(data);
122+
boolean conditionCheck = accordUpdate.checkAnyConditionMatch(data);
123123
// If the condition applied an empty result indicates success
124124
if (conditionCheck)
125125
return new TxnData();

0 commit comments

Comments
 (0)