Skip to content

Commit 9c8ab88

Browse files
committed
Accord: write rejections would be returned to users as server errors rather than INVALID and TxnReferenceOperation didn't handle all collections prperly
patch by David Capwell; reviewed by Benedict Elliott Smith, Caleb Rackliffe, Jyothsna Konisa for CASSANDRA-21061
1 parent 9d89b47 commit 9c8ab88

File tree

16 files changed

+518
-45
lines changed

16 files changed

+518
-45
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Accord: write rejections would be returned to users as server errors rather than INVALID and TxnReferenceOperation didn't handle all collections prperly (CASSANDRA-21061)
23
* Use byte[] directly in QueryOptions instead of ByteBuffer and convert them to ArrayCell instead of BufferCell to reduce allocations (CASSANDRA-20166)
34
* Log queries scanning too many SSTables per read (CASSANDRA-21048)
45
* Extend nodetool verify to (optionally) validate SAI files (CASSANDRA-20949)

src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.cassandra.service.accord.txn.TxnReference;
7575
import org.apache.cassandra.service.accord.txn.TxnResult;
7676
import org.apache.cassandra.service.accord.txn.TxnUpdate;
77+
import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
7778
import org.apache.cassandra.service.accord.txn.TxnWrite;
7879
import org.apache.cassandra.service.paxos.Ballot;
7980
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -574,6 +575,7 @@ public ConsensusAttemptResult toCasResult(TxnResult txnResult)
574575
{
575576
if (txnResult.kind() == retry_new_protocol)
576577
return RETRY_NEW_PROTOCOL;
578+
TxnValidationRejection.maybeThrow(txnResult);
577579
TxnData txnData = (TxnData)txnResult;
578580
TxnDataKeyValue partition = (TxnDataKeyValue)txnData.get(txnDataName(CAS_READ));
579581
return casResult(partition != null ? partition.rowIterator(false) : null);

src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.cassandra.service.accord.txn.TxnReference;
8686
import org.apache.cassandra.service.accord.txn.TxnResult;
8787
import org.apache.cassandra.service.accord.txn.TxnUpdate;
88+
import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
8889
import org.apache.cassandra.service.accord.txn.TxnWrite;
8990
import org.apache.cassandra.service.consensus.TransactionalMode;
9091
import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
@@ -250,7 +251,7 @@ TxnNamedRead createNamedRead(NamedSelect namedSelect, QueryOptions options, Tabl
250251
SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) select.getQuery(options, 0);
251252

252253
if (selectQuery.queries.size() != 1)
253-
throw new IllegalArgumentException("Within a transaction, SELECT statements must select a single partition; found " + selectQuery.queries.size() + " partitions");
254+
throw invalidRequest("Within a transaction, SELECT statements must select a single partition; found " + selectQuery.queries.size() + " partitions");
254255

255256
SinglePartitionReadCommand command = Iterables.getOnlyElement(selectQuery.queries);
256257
return new TxnNamedRead(namedSelect.name, keyCollector.collect(command.metadata(), command.partitionKey()), command, keyCollector.tables);
@@ -560,6 +561,8 @@ public ResultMessage execute(QueryState state, QueryOptions options, Dispatcher.
560561
TxnResult txnResult = AccordService.instance().coordinate(minEpoch, txn, options.getConsistency(), requestTime);
561562
if (txnResult.kind() == retry_new_protocol)
562563
throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
564+
TxnValidationRejection.maybeThrow(txnResult);
565+
563566
TxnData data = (TxnData)txnResult;
564567

565568
if (returningSelect != null)

src/java/org/apache/cassandra/service/StorageProxy.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
149149
import org.apache.cassandra.service.accord.txn.TxnRead;
150150
import org.apache.cassandra.service.accord.txn.TxnResult;
151+
import org.apache.cassandra.service.accord.txn.TxnValidationRejection;
151152
import org.apache.cassandra.service.consensus.TransactionalMode;
152153
import org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
153154
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
@@ -1316,13 +1317,16 @@ public static void dispatchMutationsWithRetryOnDifferentSystem(List<? extends IM
13161317
{
13171318
if (accordResult != null)
13181319
{
1319-
TxnResult.Kind kind = accordResult.awaitAndGet().kind();
1320+
TxnResult result = accordResult.awaitAndGet();
1321+
TxnResult.Kind kind = result.kind();
13201322
if (kind == retry_new_protocol && failure == null)
13211323
{
13221324
Tracing.trace("Accord returned retry new protocol");
13231325
logger.debug("Retrying mutations on different system because some mutations were misrouted according to Accord");
13241326
continue;
13251327
}
1328+
TxnValidationRejection.maybeThrow(result);
1329+
13261330
Tracing.trace("Successfully wrote Accord mutations");
13271331
}
13281332
}
@@ -1549,9 +1553,11 @@ public static void mutateAtomically(List<Mutation> mutations,
15491553
// the batch log.
15501554
if (accordResult != null)
15511555
{
1552-
TxnResult.Kind kind = accordResult.awaitAndGet().kind();
1556+
TxnResult result = accordResult.awaitAndGet();
1557+
TxnResult.Kind kind = result.kind();
15531558
if (kind == retry_new_protocol && failure == null)
15541559
continue;
1560+
TxnValidationRejection.maybeThrow(result);
15551561
Tracing.trace("Successfully wrote Accord mutations");
15561562
cleanup.ackMutation();
15571563
}

src/java/org/apache/cassandra/service/accord/txn/TxnDataValue.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,23 @@ enum Kind
4242
{
4343
key(0),
4444
range(1);
45+
4546
int id;
4647

4748
Kind(int id)
4849
{
4950
this.id = id;
5051
}
5152

52-
public TxnDataValueSerializer serializer()
53+
@SuppressWarnings("unchecked")
54+
public <T extends TxnDataValue> IVersionedSerializer<T> serializer()
5355
{
5456
switch (this)
5557
{
5658
case key:
57-
return TxnDataKeyValue.serializer;
59+
return (IVersionedSerializer<T>) TxnDataKeyValue.serializer;
5860
case range:
59-
return TxnDataRangeValue.serializer;
61+
return (IVersionedSerializer<T>) TxnDataRangeValue.serializer;
6062
default:
6163
throw new IllegalStateException("Unrecognized kind " + this);
6264
}
@@ -71,7 +73,7 @@ public TxnDataValueSerializer serializer()
7173

7274
long estimatedSizeOnHeap();
7375

74-
IVersionedSerializer<TxnDataValue> serializer = new IVersionedSerializer<TxnDataValue>()
76+
IVersionedSerializer<TxnDataValue> serializer = new IVersionedSerializer<>()
7577
{
7678
@SuppressWarnings("unchecked")
7779
@Override

src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ protected byte type()
154154
@Override
155155
public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
156156
{
157+
TxnValidationRejection rejection = TxnUpdate.validationRejection(update);
158+
if (rejection != null) return rejection;
159+
157160
// Skip the migration checks in the base class for empty transactions, we don't
158161
// want/need the RetryWithNewProtocolResult
159162
return new TxnData();
@@ -207,6 +210,9 @@ private TxnQuery() {}
207210
@Override
208211
public Result compute(TxnId txnId, Timestamp executeAt, Seekables<?, ?> keys, @Nullable Data data, @Nullable Read read, @Nullable Update update)
209212
{
213+
TxnValidationRejection rejection = TxnUpdate.validationRejection(update);
214+
if (rejection != null) return rejection;
215+
210216
// TODO (required): This is not the cluster metadata of the current transaction
211217
ClusterMetadata clusterMetadata = ClusterMetadata.current();
212218
checkState(clusterMetadata.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is < executeAt epoch %d", clusterMetadata.epoch.getEpoch(), executeAt.epoch());

src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java

Lines changed: 75 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import java.util.Map;
2525
import java.util.Objects;
2626

27+
import javax.annotation.Nullable;
28+
29+
import com.google.common.annotations.VisibleForTesting;
30+
2731
import org.apache.cassandra.cql3.FieldIdentifier;
2832
import org.apache.cassandra.cql3.Operation;
2933
import org.apache.cassandra.cql3.UpdateParameters;
@@ -35,8 +39,10 @@
3539
import org.apache.cassandra.cql3.terms.Term;
3640
import org.apache.cassandra.cql3.terms.UserTypes;
3741
import org.apache.cassandra.db.DecoratedKey;
42+
import org.apache.cassandra.db.TypeSizes;
3843
import org.apache.cassandra.db.marshal.AbstractType;
3944
import org.apache.cassandra.db.marshal.CollectionType;
45+
import org.apache.cassandra.db.marshal.Int32Type;
4046
import org.apache.cassandra.db.marshal.ListType;
4147
import org.apache.cassandra.db.marshal.MapType;
4248
import org.apache.cassandra.db.marshal.SetType;
@@ -58,25 +64,29 @@
5864
public class TxnReferenceOperation
5965
{
6066
private static final Map<Class<? extends Operation>, Kind> operationKindMap = initOperationKindMap();
61-
62-
private static Map<Class<? extends Operation>, Kind> initOperationKindMap()
67+
68+
@VisibleForTesting
69+
static Map<Class<? extends Operation>, Kind> initOperationKindMap()
6370
{
6471
Map<Class<? extends Operation>, Kind> temp = new HashMap<>();
65-
temp.put(Sets.Adder.class, Kind.SetAdder);
6672
temp.put(Constants.Adder.class, Kind.ConstantAdder);
73+
temp.put(Constants.Setter.class, Kind.ConstantSetter);
74+
temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
6775
temp.put(Lists.Appender.class, Kind.ListAppender);
68-
temp.put(Sets.Discarder.class, Kind.SetDiscarder);
6976
temp.put(Lists.Discarder.class, Kind.ListDiscarder);
77+
temp.put(Lists.DiscarderByIndex.class, Kind.ListDiscarderByIndex);
7078
temp.put(Lists.Prepender.class, Kind.ListPrepender);
71-
temp.put(Maps.Putter.class, Kind.MapPutter);
7279
temp.put(Lists.Setter.class, Kind.ListSetter);
73-
temp.put(Sets.Setter.class, Kind.SetSetter);
80+
temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
81+
temp.put(Maps.DiscarderByKey.class, Kind.MapDiscarderByKey);
82+
temp.put(Maps.Putter.class, Kind.MapPutter);
7483
temp.put(Maps.Setter.class, Kind.MapSetter);
75-
temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
76-
temp.put(Constants.Setter.class, Kind.ConstantSetter);
77-
temp.put(Constants.Substracter.class, Kind.ConstantSubtracter);
7884
temp.put(Maps.SetterByKey.class, Kind.MapSetterByKey);
79-
temp.put(Lists.SetterByIndex.class, Kind.ListSetterByIndex);
85+
temp.put(Sets.Adder.class, Kind.SetAdder);
86+
temp.put(Sets.Discarder.class, Kind.SetDiscarder);
87+
temp.put(Sets.ElementDiscarder.class, Kind.SetElementDiscarder);
88+
temp.put(Sets.Setter.class, Kind.SetSetter);
89+
temp.put(UserTypes.Setter.class, Kind.UserTypeSetter);
8090
temp.put(UserTypes.SetterByField.class, Kind.UserTypeSetterByField);
8191
return temp;
8292
}
@@ -103,7 +113,10 @@ public enum Kind
103113
ConstantSubtracter((byte) 13, (column, keyOrIndex, field, value) -> new Constants.Substracter(column, value)),
104114
MapSetterByKey((byte) 14, (column, keyOrIndex, field, value) -> new Maps.SetterByKey(column, keyOrIndex, value)),
105115
ListSetterByIndex((byte) 15, (column, keyOrIndex, field, value) -> new Lists.SetterByIndex(column, keyOrIndex, value)),
106-
UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) -> new UserTypes.SetterByField(column, field, value));
116+
UserTypeSetterByField((byte) 16, (column, keyOrIndex, field, value) -> new UserTypes.SetterByField(column, field, value)),
117+
ListDiscarderByIndex((byte) 17, (column, keyOrIndex, field, value) -> new Lists.DiscarderByIndex(column, value)),
118+
MapDiscarderByKey((byte) 18, (column, keyOrIndex, field, value) -> new Maps.DiscarderByKey(column, value)),
119+
SetElementDiscarder((byte) 19, (column, keyOrIndex, field, value) -> new Sets.ElementDiscarder(column, value));
107120

108121
private final byte id;
109122
private final ToOperation toOperation;
@@ -154,18 +167,20 @@ public Operation toOperation(ColumnMetadata column, Term keyOrIndex, FieldIdenti
154167

155168
private final Kind kind;
156169
private final ColumnMetadata receiver;
157-
private final TableMetadata table;
158-
private final ByteBuffer key;
159-
private final ByteBuffer field;
170+
public final TableMetadata table;
171+
private final @Nullable ByteBuffer keyOrIndex;
172+
private final @Nullable ByteBuffer field;
160173
private final TxnReferenceValue value;
174+
private final @Nullable AbstractType<?> keyOrIndexType;
161175
private final AbstractType<?> valueType;
162176

163-
public TxnReferenceOperation(Kind kind, ColumnMetadata receiver, TableMetadata table, ByteBuffer key, ByteBuffer field, TxnReferenceValue value)
177+
public TxnReferenceOperation(Kind kind, ColumnMetadata receiver, TableMetadata table,
178+
@Nullable ByteBuffer keyOrIndex, @Nullable ByteBuffer field, TxnReferenceValue value)
164179
{
165180
this.kind = kind;
166181
this.receiver = receiver;
167182
this.table = table;
168-
this.key = key;
183+
this.keyOrIndex = keyOrIndex;
169184
this.field = field;
170185

171186
// We don't expect operators on clustering keys, but unwrap just in case.
@@ -175,20 +190,36 @@ public TxnReferenceOperation(Kind kind, ColumnMetadata receiver, TableMetadata t
175190
{
176191
// The value for a map subtraction is actually a set (see Operation.Substraction)
177192
this.valueType = SetType.getInstance(((MapType<?, ?>) receiverType).getKeysType(), true);
193+
this.keyOrIndexType = null;
194+
}
195+
else if (kind == Kind.MapDiscarderByKey || kind == Kind.SetElementDiscarder)
196+
{
197+
CollectionType<?> ct = (CollectionType<?>) receiverType;
198+
this.keyOrIndexType = null;
199+
this.valueType = ct.nameComparator();
178200
}
179201
else if (kind == Kind.MapSetterByKey || kind == Kind.ListSetterByIndex)
180202
{
181-
this.valueType = ((CollectionType<?>) receiverType).valueComparator();
203+
CollectionType<?> ct = (CollectionType<?>) receiverType;
204+
this.keyOrIndexType = ct.nameComparator();
205+
this.valueType = ct.valueComparator();
206+
}
207+
else if (kind == Kind.ListDiscarderByIndex)
208+
{
209+
this.valueType = Int32Type.instance;
210+
this.keyOrIndexType = null;
182211
}
183212
else if (kind == Kind.UserTypeSetterByField)
184213
{
185214
UserType userType = (UserType) receiverType;
186215
CellPath fieldPath = userType.cellPathForField(new FieldIdentifier(field));
187216
this.valueType = userType.fieldType(fieldPath);
217+
this.keyOrIndexType = null;
188218
}
189219
else
190220
{
191221
this.valueType = receiverType;
222+
this.keyOrIndexType = null;
192223
}
193224

194225
this.value = value;
@@ -202,27 +233,35 @@ public boolean equals(Object o)
202233
TxnReferenceOperation that = (TxnReferenceOperation) o;
203234
return Objects.equals(receiver, that.receiver)
204235
&& kind == that.kind
205-
&& Objects.equals(key, that.key)
236+
&& Objects.equals(keyOrIndex, that.keyOrIndex)
206237
&& Objects.equals(field, that.field)
207238
&& Objects.equals(value, that.value);
208239
}
209240

210-
public void collect(TableMetadatas.Collector collector)
211-
{
212-
collector.add(table);
213-
value.collect(collector);
214-
}
215-
216241
@Override
217242
public int hashCode()
218243
{
219-
return Objects.hash(receiver, kind, key, field, value);
244+
return Objects.hash(receiver, kind, keyOrIndex, field, value);
220245
}
221246

247+
222248
@Override
223249
public String toString()
224250
{
225-
return receiver + " = " + value;
251+
return "TxnReferenceOperation{" +
252+
"kind=" + kind +
253+
", receiver=" + receiver +
254+
", table=" + table +
255+
", key=" + keyOrIndex +
256+
", field=" + field +
257+
", value=" + value +
258+
'}';
259+
}
260+
261+
public void collect(TableMetadatas.Collector collector)
262+
{
263+
collector.add(table);
264+
value.collect(collector);
226265
}
227266

228267
public ColumnMetadata receiver()
@@ -236,11 +275,12 @@ public void apply(TxnData data, DecoratedKey key, UpdateParameters up)
236275
operation.execute(key, up);
237276
}
238277

239-
private Operation toOperation(TxnData data)
278+
@VisibleForTesting
279+
Operation toOperation(TxnData data)
240280
{
241281
FieldIdentifier fieldIdentifier = field == null ? null : new FieldIdentifier(field);
242282
Term valueTerm = toTerm(data, valueType);
243-
Term keyorIndexTerm = key == null ? null : toTerm(key, valueType);
283+
Term keyorIndexTerm = keyOrIndex == null ? null : toTerm(keyOrIndex, keyOrIndexType);
244284
return kind.toOperation(receiver, keyorIndexTerm, fieldIdentifier, valueTerm);
245285
}
246286

@@ -274,9 +314,9 @@ public void serialize(TxnReferenceOperation operation, TableMetadatas tables, Da
274314
columnMetadataSerializer.serialize(operation.receiver, operation.table, out);
275315
TxnReferenceValue.serializer.serialize(operation.value, tables, out);
276316

277-
out.writeBoolean(operation.key != null);
278-
if (operation.key != null)
279-
ByteBufferUtil.writeWithVIntLength(operation.key, out);
317+
out.writeBoolean(operation.keyOrIndex != null);
318+
if (operation.keyOrIndex != null)
319+
ByteBufferUtil.writeWithVIntLength(operation.keyOrIndex, out);
280320

281321
out.writeBoolean(operation.field != null);
282322
if (operation.field != null)
@@ -303,9 +343,11 @@ public long serializedSize(TxnReferenceOperation operation, TableMetadatas table
303343
size += columnMetadataSerializer.serializedSize(operation.receiver, operation.table);
304344
size += TxnReferenceValue.serializer.serializedSize(operation.value, tables);
305345

306-
if (operation.key != null)
307-
size += ByteBufferUtil.serializedSizeWithVIntLength(operation.key);
346+
size += TypeSizes.sizeof(operation.keyOrIndex != null);
347+
if (operation.keyOrIndex != null)
348+
size += ByteBufferUtil.serializedSizeWithVIntLength(operation.keyOrIndex);
308349

350+
size += TypeSizes.sizeof(operation.field != null);
309351
if (operation.field != null)
310352
size += ByteBufferUtil.serializedSizeWithVIntLength(operation.field);
311353

src/java/org/apache/cassandra/service/accord/txn/TxnResult.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ enum Kind
2626
{
2727
txn_data(0),
2828
retry_new_protocol(1),
29-
range_read(2);
29+
range_read(2),
30+
validation_rejection(3);
3031

3132
int id;
3233

0 commit comments

Comments
 (0)