|
1 | 1 | package com.scalar.db.transaction.consensuscommit; |
2 | 2 |
|
3 | 3 | import static com.scalar.db.api.ConditionalExpression.Operator; |
| 4 | +import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT; |
4 | 5 | import static com.scalar.db.transaction.consensuscommit.Attribute.ID; |
5 | 6 | import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; |
6 | 7 | import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue; |
7 | 8 | import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue; |
8 | 9 |
|
9 | 10 | import com.google.common.annotations.VisibleForTesting; |
| 11 | +import com.scalar.db.api.ConditionBuilder; |
10 | 12 | import com.scalar.db.api.ConditionalExpression; |
11 | 13 | import com.scalar.db.api.Consistency; |
12 | 14 | import com.scalar.db.api.Delete; |
13 | 15 | import com.scalar.db.api.DeleteIf; |
14 | 16 | import com.scalar.db.api.Mutation; |
15 | 17 | import com.scalar.db.api.Operation; |
16 | 18 | import com.scalar.db.api.Put; |
17 | | -import com.scalar.db.api.PutIf; |
| 19 | +import com.scalar.db.api.PutBuilder; |
18 | 20 | import com.scalar.db.api.Selection; |
| 21 | +import com.scalar.db.api.TableMetadata; |
19 | 22 | import com.scalar.db.api.TransactionState; |
20 | 23 | import com.scalar.db.exception.storage.ExecutionException; |
21 | 24 | import com.scalar.db.io.Key; |
22 | 25 | import com.scalar.db.util.ScalarDbUtils; |
| 26 | +import java.util.LinkedHashSet; |
23 | 27 | import java.util.Optional; |
24 | 28 | import javax.annotation.Nullable; |
25 | 29 | import javax.annotation.concurrent.NotThreadSafe; |
@@ -82,17 +86,33 @@ private void add(Selection base, @Nullable TransactionResult result) throws Exec |
82 | 86 |
|
83 | 87 | private Put composePut(Operation base, @Nullable TransactionResult result) |
84 | 88 | throws ExecutionException { |
85 | | - return new Put(getPartitionKey(base, result), getClusteringKey(base, result).orElse(null)) |
86 | | - .forNamespace(base.forNamespace().get()) |
87 | | - .forTable(base.forTable().get()) |
88 | | - .withConsistency(Consistency.LINEARIZABLE) |
89 | | - .withCondition( |
90 | | - new PutIf( |
91 | | - new ConditionalExpression(ID, toIdValue(id), Operator.EQ), |
92 | | - new ConditionalExpression( |
93 | | - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))) |
94 | | - .withValue(Attribute.toCommittedAtValue(current)) |
95 | | - .withValue(Attribute.toStateValue(TransactionState.COMMITTED)); |
| 89 | + PutBuilder.Buildable putBuilder = |
| 90 | + Put.newBuilder() |
| 91 | + .namespace(base.forNamespace().get()) |
| 92 | + .table(base.forTable().get()) |
| 93 | + .partitionKey(getPartitionKey(base, result)) |
| 94 | + .condition( |
| 95 | + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(id)) |
| 96 | + .and( |
| 97 | + ConditionBuilder.column(STATE) |
| 98 | + .isEqualToInt(TransactionState.PREPARED.get())) |
| 99 | + .build()) |
| 100 | + .bigIntValue(COMMITTED_AT, current) |
| 101 | + .intValue(STATE, TransactionState.COMMITTED.get()) |
| 102 | + .consistency(Consistency.LINEARIZABLE); |
| 103 | + getClusteringKey(base, result).ifPresent(putBuilder::clusteringKey); |
| 104 | + |
| 105 | + // Set null to before image columns |
| 106 | + if (result != null) { |
| 107 | + TransactionTableMetadata transactionTableMetadata = |
| 108 | + tableMetadataManager.getTransactionTableMetadata(base); |
| 109 | + LinkedHashSet<String> beforeImageColumnNames = |
| 110 | + transactionTableMetadata.getBeforeImageColumnNames(); |
| 111 | + TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata(); |
| 112 | + setNullToBeforeImageColumns(putBuilder, beforeImageColumnNames, tableMetadata); |
| 113 | + } |
| 114 | + |
| 115 | + return putBuilder.build(); |
96 | 116 | } |
97 | 117 |
|
98 | 118 | private Delete composeDelete(Operation base, @Nullable TransactionResult result) |
|
0 commit comments