Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.io.DataType;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
Expand Down Expand Up @@ -34,4 +38,50 @@ public AbstractMutationComposer(String id, TransactionTableMetadataManager table
public List<Mutation> get() {
return ImmutableList.copyOf(mutations);
}

static void setBeforeImageColumnsToNull(
PutBuilder.Buildable putBuilder,
Set<String> beforeImageColumnNames,
TableMetadata tableMetadata) {
for (String beforeImageColumnName : beforeImageColumnNames) {
DataType columnDataType = tableMetadata.getColumnDataType(beforeImageColumnName);
switch (columnDataType) {
case BOOLEAN:
putBuilder.booleanValue(beforeImageColumnName, null);
break;
case INT:
putBuilder.intValue(beforeImageColumnName, null);
break;
case BIGINT:
putBuilder.bigIntValue(beforeImageColumnName, null);
break;
case FLOAT:
putBuilder.floatValue(beforeImageColumnName, null);
break;
case DOUBLE:
putBuilder.doubleValue(beforeImageColumnName, null);
break;
case TEXT:
putBuilder.textValue(beforeImageColumnName, null);
break;
case BLOB:
putBuilder.blobValue(beforeImageColumnName, (byte[]) null);
break;
case DATE:
putBuilder.dateValue(beforeImageColumnName, null);
break;
case TIME:
putBuilder.timeValue(beforeImageColumnName, null);
break;
case TIMESTAMP:
putBuilder.timestampValue(beforeImageColumnName, null);
break;
case TIMESTAMPTZ:
putBuilder.timestampTZValue(beforeImageColumnName, null);
break;
default:
throw new AssertionError("Unknown data type: " + columnDataType);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package com.scalar.db.transaction.consensuscommit;

import static com.scalar.db.api.ConditionalExpression.Operator;
import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT;
import static com.scalar.db.transaction.consensuscommit.Attribute.ID;
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue;
import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue;

import com.google.common.annotations.VisibleForTesting;
import com.scalar.db.api.ConditionBuilder;
import com.scalar.db.api.ConditionalExpression;
import com.scalar.db.api.Consistency;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DeleteIf;
import com.scalar.db.api.Mutation;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.PutIf;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.TransactionState;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Key;
import com.scalar.db.util.ScalarDbUtils;
import java.util.LinkedHashSet;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -82,17 +86,33 @@ private void add(Selection base, @Nullable TransactionResult result) throws Exec

private Put composePut(Operation base, @Nullable TransactionResult result)
throws ExecutionException {
return new Put(getPartitionKey(base, result), getClusteringKey(base, result).orElse(null))
.forNamespace(base.forNamespace().get())
.forTable(base.forTable().get())
.withConsistency(Consistency.LINEARIZABLE)
.withCondition(
new PutIf(
new ConditionalExpression(ID, toIdValue(id), Operator.EQ),
new ConditionalExpression(
STATE, toStateValue(TransactionState.PREPARED), Operator.EQ)))
.withValue(Attribute.toCommittedAtValue(current))
.withValue(Attribute.toStateValue(TransactionState.COMMITTED));
PutBuilder.Buildable putBuilder =
Put.newBuilder()
.namespace(base.forNamespace().get())
.table(base.forTable().get())
.partitionKey(getPartitionKey(base, result))
.condition(
ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(id))
.and(
ConditionBuilder.column(STATE)
.isEqualToInt(TransactionState.PREPARED.get()))
.build())
.bigIntValue(COMMITTED_AT, current)
.intValue(STATE, TransactionState.COMMITTED.get())
.consistency(Consistency.LINEARIZABLE);
getClusteringKey(base, result).ifPresent(putBuilder::clusteringKey);

// Set before image columns to null
if (result != null) {
TransactionTableMetadata transactionTableMetadata =
tableMetadataManager.getTransactionTableMetadata(base);
LinkedHashSet<String> beforeImageColumnNames =
transactionTableMetadata.getBeforeImageColumnNames();
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();
setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata);
}
Comment on lines +105 to +113
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logic to set the before image columns to null.


return putBuilder.build();
}

private Delete composeDelete(Operation base, @Nullable TransactionResult result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Selection;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.api.TransactionState;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Column;
Expand Down Expand Up @@ -77,8 +78,11 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
&& (result.getState().equals(TransactionState.PREPARED)
|| result.getState().equals(TransactionState.DELETED));

TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base);
LinkedHashSet<String> beforeImageColumnNames = metadata.getBeforeImageColumnNames();
TransactionTableMetadata transactionTableMetadata =
tableMetadataManager.getTransactionTableMetadata(base);
LinkedHashSet<String> beforeImageColumnNames =
transactionTableMetadata.getBeforeImageColumnNames();
TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata();

List<Column<?>> columns = new ArrayList<>();
result
Expand All @@ -98,9 +102,8 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
}
});

Key partitionKey = ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata());
Optional<Key> clusteringKey =
ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata());
Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata);
Optional<Key> clusteringKey = ScalarDbUtils.getClusteringKey(result, tableMetadata);

PutBuilder.Buildable putBuilder =
Put.newBuilder()
Expand All @@ -115,6 +118,9 @@ private Put composePut(Operation base, TransactionResult result) throws Executio
clusteringKey.ifPresent(putBuilder::clusteringKey);
columns.forEach(putBuilder::value);

// Set before image columns to null
setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata);
Comment on lines +121 to +122
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Added logic to set the before image columns to null.


return putBuilder.build();
}

Expand Down
Loading