From 3207eaaf85946628da69a0259c0e5714723a2844 Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Sun, 25 May 2025 00:51:37 +0900 Subject: [PATCH] Remove before images after committing or rolling back records --- .../AbstractMutationComposer.java | 50 ++ .../CommitMutationComposer.java | 44 +- .../RollbackMutationComposer.java | 16 +- .../CommitMutationComposerTest.java | 144 +++-- .../RollbackMutationComposerTest.java | 586 ++++++++++++++---- 5 files changed, 648 insertions(+), 192 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java index e086ecd565..ade536d0f6 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java @@ -3,8 +3,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.scalar.db.api.Mutation; +import com.scalar.db.api.PutBuilder; +import com.scalar.db.api.TableMetadata; +import com.scalar.db.io.DataType; import java.util.ArrayList; import java.util.List; +import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; @NotThreadSafe @@ -34,4 +38,50 @@ public AbstractMutationComposer(String id, TransactionTableMetadataManager table public List get() { return ImmutableList.copyOf(mutations); } + + static void setBeforeImageColumnsToNull( + PutBuilder.Buildable putBuilder, + Set beforeImageColumnNames, + TableMetadata tableMetadata) { + for (String beforeImageColumnName : beforeImageColumnNames) { + DataType columnDataType = tableMetadata.getColumnDataType(beforeImageColumnName); + switch (columnDataType) { + case BOOLEAN: + putBuilder.booleanValue(beforeImageColumnName, null); + break; + case INT: + putBuilder.intValue(beforeImageColumnName, null); + break; + case BIGINT: + putBuilder.bigIntValue(beforeImageColumnName, null); + break; + case FLOAT: + putBuilder.floatValue(beforeImageColumnName, null); + break; + case DOUBLE: + putBuilder.doubleValue(beforeImageColumnName, null); + break; + case TEXT: + putBuilder.textValue(beforeImageColumnName, null); + break; + case BLOB: + putBuilder.blobValue(beforeImageColumnName, (byte[]) null); + break; + case DATE: + putBuilder.dateValue(beforeImageColumnName, null); + break; + case TIME: + putBuilder.timeValue(beforeImageColumnName, null); + break; + case TIMESTAMP: + putBuilder.timestampValue(beforeImageColumnName, null); + break; + case TIMESTAMPTZ: + putBuilder.timestampTZValue(beforeImageColumnName, null); + break; + default: + throw new AssertionError("Unknown data type: " + columnDataType); + } + } + } } diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java index a9dc8f4fb0..757503f258 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java @@ -1,12 +1,14 @@ package com.scalar.db.transaction.consensuscommit; import static com.scalar.db.api.ConditionalExpression.Operator; +import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT; import static com.scalar.db.transaction.consensuscommit.Attribute.ID; import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue; import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue; import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.api.ConditionBuilder; import com.scalar.db.api.ConditionalExpression; import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; @@ -14,12 +16,14 @@ import com.scalar.db.api.Mutation; import com.scalar.db.api.Operation; import com.scalar.db.api.Put; -import com.scalar.db.api.PutIf; +import com.scalar.db.api.PutBuilder; import com.scalar.db.api.Selection; +import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.Key; import com.scalar.db.util.ScalarDbUtils; +import java.util.LinkedHashSet; import java.util.Optional; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -82,17 +86,33 @@ private void add(Selection base, @Nullable TransactionResult result) throws Exec private Put composePut(Operation base, @Nullable TransactionResult result) throws ExecutionException { - return new Put(getPartitionKey(base, result), getClusteringKey(base, result).orElse(null)) - .forNamespace(base.forNamespace().get()) - .forTable(base.forTable().get()) - .withConsistency(Consistency.LINEARIZABLE) - .withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(id), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))) - .withValue(Attribute.toCommittedAtValue(current)) - .withValue(Attribute.toStateValue(TransactionState.COMMITTED)); + PutBuilder.Buildable putBuilder = + Put.newBuilder() + .namespace(base.forNamespace().get()) + .table(base.forTable().get()) + .partitionKey(getPartitionKey(base, result)) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(id)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(TransactionState.PREPARED.get())) + .build()) + .bigIntValue(COMMITTED_AT, current) + .intValue(STATE, TransactionState.COMMITTED.get()) + .consistency(Consistency.LINEARIZABLE); + getClusteringKey(base, result).ifPresent(putBuilder::clusteringKey); + + // Set before image columns to null + if (result != null) { + TransactionTableMetadata transactionTableMetadata = + tableMetadataManager.getTransactionTableMetadata(base); + LinkedHashSet beforeImageColumnNames = + transactionTableMetadata.getBeforeImageColumnNames(); + TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata(); + setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata); + } + + return putBuilder.build(); } private Delete composeDelete(Operation base, @Nullable TransactionResult result) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposer.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposer.java index bc42f9f153..13d6d94652 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposer.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposer.java @@ -18,6 +18,7 @@ import com.scalar.db.api.Put; import com.scalar.db.api.PutBuilder; import com.scalar.db.api.Selection; +import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.Column; @@ -77,8 +78,11 @@ private Put composePut(Operation base, TransactionResult result) throws Executio && (result.getState().equals(TransactionState.PREPARED) || result.getState().equals(TransactionState.DELETED)); - TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(base); - LinkedHashSet beforeImageColumnNames = metadata.getBeforeImageColumnNames(); + TransactionTableMetadata transactionTableMetadata = + tableMetadataManager.getTransactionTableMetadata(base); + LinkedHashSet beforeImageColumnNames = + transactionTableMetadata.getBeforeImageColumnNames(); + TableMetadata tableMetadata = transactionTableMetadata.getTableMetadata(); List> columns = new ArrayList<>(); result @@ -98,9 +102,8 @@ private Put composePut(Operation base, TransactionResult result) throws Executio } }); - Key partitionKey = ScalarDbUtils.getPartitionKey(result, metadata.getTableMetadata()); - Optional clusteringKey = - ScalarDbUtils.getClusteringKey(result, metadata.getTableMetadata()); + Key partitionKey = ScalarDbUtils.getPartitionKey(result, tableMetadata); + Optional clusteringKey = ScalarDbUtils.getClusteringKey(result, tableMetadata); PutBuilder.Buildable putBuilder = Put.newBuilder() @@ -115,6 +118,9 @@ private Put composePut(Operation base, TransactionResult result) throws Executio clusteringKey.ifPresent(putBuilder::clusteringKey); columns.forEach(putBuilder::value); + // Set before image columns to null + setBeforeImageColumnsToNull(putBuilder, beforeImageColumnNames, tableMetadata); + return putBuilder.build(); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposerTest.java index 45e3ad33b9..7cf665d1d4 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposerTest.java @@ -1,8 +1,17 @@ package com.scalar.db.transaction.consensuscommit; import static com.scalar.db.api.ConditionalExpression.Operator; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_COMMITTED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREFIX; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREPARED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_VERSION; +import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT; import static com.scalar.db.transaction.consensuscommit.Attribute.ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.PREPARED_AT; import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION; import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue; import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue; import static org.assertj.core.api.Assertions.assertThat; @@ -10,24 +19,24 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.scalar.db.api.ConditionBuilder; import com.scalar.db.api.ConditionalExpression; import com.scalar.db.api.Consistency; import com.scalar.db.api.Delete; import com.scalar.db.api.DeleteIf; import com.scalar.db.api.Get; -import com.scalar.db.api.Operation; import com.scalar.db.api.Put; import com.scalar.db.api.PutIf; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; import com.scalar.db.common.ResultImpl; import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.BigIntColumn; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; import com.scalar.db.io.IntColumn; import com.scalar.db.io.Key; import com.scalar.db.io.TextColumn; -import com.scalar.db.util.ScalarDbUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -42,19 +51,40 @@ public class CommitMutationComposerTest { private static final String ANY_NAME_1 = "name1"; private static final String ANY_NAME_2 = "name2"; private static final String ANY_NAME_3 = "name3"; + private static final String ANY_NAME_4 = "name4"; + private static final String ANY_NAME_5 = "name5"; + private static final String ANY_NAME_6 = "name6"; + private static final String ANY_NAME_7 = "name7"; + private static final String ANY_NAME_8 = "name8"; + private static final String ANY_NAME_9 = "name9"; + private static final String ANY_NAME_10 = "name10"; + private static final String ANY_NAME_11 = "name11"; + private static final String ANY_NAME_12 = "name12"; + private static final String ANY_NAME_13 = "name13"; private static final String ANY_TEXT_1 = "text1"; private static final String ANY_TEXT_2 = "text2"; private static final int ANY_INT_1 = 100; private static final int ANY_INT_2 = 200; private static final TableMetadata TABLE_METADATA = - TableMetadata.newBuilder() - .addColumn(ANY_NAME_1, DataType.TEXT) - .addColumn(ANY_NAME_2, DataType.TEXT) - .addColumn(ANY_NAME_3, DataType.INT) - .addPartitionKey(ANY_NAME_1) - .addClusteringKey(ANY_NAME_2) - .build(); + ConsensusCommitUtils.buildTransactionTableMetadata( + TableMetadata.newBuilder() + .addColumn(ANY_NAME_1, DataType.TEXT) + .addColumn(ANY_NAME_2, DataType.TEXT) + .addColumn(ANY_NAME_3, DataType.INT) + .addColumn(ANY_NAME_4, DataType.BOOLEAN) + .addColumn(ANY_NAME_5, DataType.BIGINT) + .addColumn(ANY_NAME_6, DataType.FLOAT) + .addColumn(ANY_NAME_7, DataType.DOUBLE) + .addColumn(ANY_NAME_8, DataType.TEXT) + .addColumn(ANY_NAME_9, DataType.BLOB) + .addColumn(ANY_NAME_10, DataType.DATE) + .addColumn(ANY_NAME_11, DataType.TIME) + .addColumn(ANY_NAME_12, DataType.TIMESTAMP) + .addColumn(ANY_NAME_13, DataType.TIMESTAMPTZ) + .addPartitionKey(ANY_NAME_1) + .addClusteringKey(ANY_NAME_2) + .build()); @Mock private TransactionTableMetadataManager tableMetadataManager; @@ -67,7 +97,7 @@ public void setUp() throws Exception { // Arrange composer = new CommitMutationComposer(ANY_ID, ANY_TIME_2, tableMetadataManager); - when(tableMetadataManager.getTransactionTableMetadata(any(Operation.class))) + when(tableMetadataManager.getTransactionTableMetadata(any())) .thenReturn(new TransactionTableMetadata(TABLE_METADATA)); } @@ -102,12 +132,10 @@ private TransactionResult prepareResult(TransactionState state) { .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT_2)) - .put(Attribute.ID, ScalarDbUtils.toColumn(Attribute.toIdValue(ANY_ID))) - .put( - Attribute.PREPARED_AT, - ScalarDbUtils.toColumn(Attribute.toPreparedAtValue(ANY_TIME_1))) - .put(Attribute.STATE, ScalarDbUtils.toColumn(Attribute.toStateValue(state))) - .put(Attribute.VERSION, ScalarDbUtils.toColumn(Attribute.toVersionValue(2))) + .put(Attribute.ID, TextColumn.of(ID, ANY_ID)) + .put(Attribute.PREPARED_AT, BigIntColumn.of(PREPARED_AT, ANY_TIME_1)) + .put(Attribute.STATE, IntColumn.of(STATE, state.get())) + .put(Attribute.VERSION, IntColumn.of(VERSION, 2)) .build(); return new TransactionResult(new ResultImpl(columns, TABLE_METADATA)); } @@ -125,17 +153,37 @@ public void add_PutAndPreparedResultGiven_ShouldComposePutWithPutIfCondition() // Assert Put actual = (Put) composer.get().get(0); Put expected = - new Put(put.getPartitionKey(), put.getClusteringKey().orElse(null)) - .forNamespace(put.forNamespace().get()) - .forTable(put.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); - expected.withValue(Attribute.toCommittedAtValue(ANY_TIME_2)); - expected.withValue(Attribute.toStateValue(TransactionState.COMMITTED)); + Put.newBuilder() + .namespace(put.forNamespace().get()) + .table(put.forTable().get()) + .partitionKey(put.getPartitionKey()) + .clusteringKey(put.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(toStateValue(TransactionState.PREPARED).getAsInt())) + .build()) + .bigIntValue(COMMITTED_AT, ANY_TIME_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); } @@ -217,17 +265,37 @@ public void add_SelectionAndPreparedResultGiven_ShouldComposePutForRollforward() // Assert Put actual = (Put) composer.get().get(0); Put expected = - new Put(get.getPartitionKey(), get.getClusteringKey().orElse(null)) - .forNamespace(get.forNamespace().get()) - .forTable(get.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); - expected.withValue(Attribute.toCommittedAtValue(ANY_TIME_2)); - expected.withValue(Attribute.toStateValue(TransactionState.COMMITTED)); + Put.newBuilder() + .namespace(get.forNamespace().get()) + .table(get.forTable().get()) + .partitionKey(get.getPartitionKey()) + .clusteringKey(get.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(TransactionState.PREPARED.get())) + .build()) + .bigIntValue(COMMITTED_AT, ANY_TIME_2) + .intValue(STATE, TransactionState.COMMITTED.get()) + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposerTest.java index 3cba9a8999..a848cbe324 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/RollbackMutationComposerTest.java @@ -1,8 +1,17 @@ package com.scalar.db.transaction.consensuscommit; import static com.scalar.db.api.ConditionalExpression.Operator; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_COMMITTED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREFIX; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_PREPARED_AT; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.BEFORE_VERSION; +import static com.scalar.db.transaction.consensuscommit.Attribute.COMMITTED_AT; import static com.scalar.db.transaction.consensuscommit.Attribute.ID; +import static com.scalar.db.transaction.consensuscommit.Attribute.PREPARED_AT; import static com.scalar.db.transaction.consensuscommit.Attribute.STATE; +import static com.scalar.db.transaction.consensuscommit.Attribute.VERSION; import static com.scalar.db.transaction.consensuscommit.Attribute.toIdValue; import static com.scalar.db.transaction.consensuscommit.Attribute.toStateValue; import static org.assertj.core.api.Assertions.assertThat; @@ -20,20 +29,30 @@ import com.scalar.db.api.Get; import com.scalar.db.api.Put; import com.scalar.db.api.PutBuilder; -import com.scalar.db.api.PutIf; import com.scalar.db.api.Scan; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionState; import com.scalar.db.common.ResultImpl; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.io.BigIntColumn; +import com.scalar.db.io.BlobColumn; +import com.scalar.db.io.BooleanColumn; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; +import com.scalar.db.io.DateColumn; +import com.scalar.db.io.DoubleColumn; +import com.scalar.db.io.FloatColumn; import com.scalar.db.io.IntColumn; import com.scalar.db.io.Key; import com.scalar.db.io.TextColumn; -import com.scalar.db.io.Value; -import com.scalar.db.util.ScalarDbUtils; +import com.scalar.db.io.TimeColumn; +import com.scalar.db.io.TimestampColumn; +import com.scalar.db.io.TimestampTZColumn; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -47,17 +66,56 @@ public class RollbackMutationComposerTest { private static final String ANY_TABLE_NAME = "table"; private static final String ANY_ID_1 = "id1"; private static final String ANY_ID_2 = "id2"; - private static final long ANY_TIME_1 = 100; - private static final long ANY_TIME_2 = 200; - private static final long ANY_TIME_3 = 300; + private static final long ANY_TIME_MILLIS_1 = 100; + private static final long ANY_TIME_MILLIS_2 = 200; + private static final long ANY_TIME_MILLIS_3 = 300; private static final String ANY_NAME_1 = "name1"; private static final String ANY_NAME_2 = "name2"; private static final String ANY_NAME_3 = "name3"; + private static final String ANY_NAME_4 = "name4"; + private static final String ANY_NAME_5 = "name5"; + private static final String ANY_NAME_6 = "name6"; + private static final String ANY_NAME_7 = "name7"; + private static final String ANY_NAME_8 = "name8"; + private static final String ANY_NAME_9 = "name9"; + private static final String ANY_NAME_10 = "name10"; + private static final String ANY_NAME_11 = "name11"; + private static final String ANY_NAME_12 = "name12"; + private static final String ANY_NAME_13 = "name13"; private static final String ANY_TEXT_1 = "text1"; private static final String ANY_TEXT_2 = "text2"; + private static final String ANY_TEXT_3 = "text3"; + private static final String ANY_TEXT_4 = "text4"; private static final int ANY_INT_1 = 100; private static final int ANY_INT_2 = 200; private static final int ANY_INT_3 = 300; + private static final long ANY_BIGINT_1 = 1000L; + private static final long ANY_BIGINT_2 = 2000L; + private static final long ANY_BIGINT_3 = 3000L; + private static final float ANY_FLOAT_1 = 1.23f; + private static final float ANY_FLOAT_2 = 4.56f; + private static final float ANY_FLOAT_3 = 7.89f; + private static final double ANY_DOUBLE_1 = 7.89; + private static final double ANY_DOUBLE_2 = 0.12; + private static final double ANY_DOUBLE_3 = 3.45; + private static final byte[] ANY_BLOB_1 = new byte[] {1, 2, 3}; + private static final byte[] ANY_BLOB_2 = new byte[] {4, 5, 6}; + private static final byte[] ANY_BLOB_3 = new byte[] {7, 8, 9}; + private static final LocalDate ANY_DATE_1 = LocalDate.of(2020, 1, 1); + private static final LocalDate ANY_DATE_2 = LocalDate.of(2021, 1, 1); + private static final LocalDate ANY_DATE_3 = LocalDate.of(2022, 1, 1); + private static final LocalTime ANY_TIME_1 = LocalTime.of(12, 0, 0); + private static final LocalTime ANY_TIME_2 = LocalTime.of(13, 0, 0); + private static final LocalTime ANY_TIME_3 = LocalTime.of(14, 0, 0); + private static final LocalDateTime ANY_TIMESTAMP_1 = LocalDateTime.of(2020, 1, 1, 12, 0, 0); + private static final LocalDateTime ANY_TIMESTAMP_2 = LocalDateTime.of(2021, 1, 1, 13, 0, 0); + private static final LocalDateTime ANY_TIMESTAMP_3 = LocalDateTime.of(2022, 1, 1, 14, 0, 0); + private static final Instant ANY_TIMESTAMPTZ_1 = + LocalDateTime.of(2020, 1, 1, 12, 0, 0).toInstant(ZoneOffset.UTC); + private static final Instant ANY_TIMESTAMPTZ_2 = + LocalDateTime.of(2021, 1, 1, 13, 0, 0).toInstant(ZoneOffset.UTC); + private static final Instant ANY_TIMESTAMPTZ_3 = + LocalDateTime.of(2022, 1, 1, 14, 0, 0).toInstant(ZoneOffset.UTC); private static final TableMetadata TABLE_METADATA = ConsensusCommitUtils.buildTransactionTableMetadata( @@ -65,6 +123,16 @@ public class RollbackMutationComposerTest { .addColumn(ANY_NAME_1, DataType.TEXT) .addColumn(ANY_NAME_2, DataType.TEXT) .addColumn(ANY_NAME_3, DataType.INT) + .addColumn(ANY_NAME_4, DataType.BOOLEAN) + .addColumn(ANY_NAME_5, DataType.BIGINT) + .addColumn(ANY_NAME_6, DataType.FLOAT) + .addColumn(ANY_NAME_7, DataType.DOUBLE) + .addColumn(ANY_NAME_8, DataType.TEXT) + .addColumn(ANY_NAME_9, DataType.BLOB) + .addColumn(ANY_NAME_10, DataType.DATE) + .addColumn(ANY_NAME_11, DataType.TIME) + .addColumn(ANY_NAME_12, DataType.TIMESTAMP) + .addColumn(ANY_NAME_13, DataType.TIMESTAMPTZ) .addPartitionKey(ANY_NAME_1) .addClusteringKey(ANY_NAME_2) .build()); @@ -101,13 +169,23 @@ private Scan prepareScan() { } private Put preparePut() { - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); - return new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME) - .withValue(ANY_NAME_3, ANY_INT_3); + return Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1)) + .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) + .intValue(ANY_NAME_3, ANY_INT_3) + .booleanValue(ANY_NAME_4, false) + .bigIntValue(ANY_NAME_5, ANY_BIGINT_3) + .floatValue(ANY_NAME_6, ANY_FLOAT_3) + .doubleValue(ANY_NAME_7, ANY_DOUBLE_3) + .textValue(ANY_NAME_8, ANY_TEXT_4) + .blobValue(ANY_NAME_9, ANY_BLOB_3) + .dateValue(ANY_NAME_10, ANY_DATE_3) + .timeValue(ANY_NAME_11, ANY_TIME_3) + .timestampValue(ANY_NAME_12, ANY_TIMESTAMP_3) + .timestampTZValue(ANY_NAME_13, ANY_TIMESTAMPTZ_3) + .build(); } private TransactionResult prepareResult(TransactionState state) { @@ -116,27 +194,47 @@ private TransactionResult prepareResult(TransactionState state) { .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT_2)) - .put(Attribute.ID, ScalarDbUtils.toColumn(Attribute.toIdValue(ANY_ID_2))) + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, true)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT_2)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT_2)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE_2)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_4)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB_2)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE_2)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME_2)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP_2)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ_2)) + .put(ID, TextColumn.of(ID, ANY_ID_2)) + .put(PREPARED_AT, BigIntColumn.of(PREPARED_AT, ANY_TIME_MILLIS_3)) + .put(STATE, IntColumn.of(STATE, state.get())) + .put(VERSION, IntColumn.of(VERSION, 1)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.of(BEFORE_PREFIX + ANY_NAME_3, ANY_INT_1)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.of(BEFORE_PREFIX + ANY_NAME_4, false)) .put( - Attribute.PREPARED_AT, - ScalarDbUtils.toColumn(Attribute.toPreparedAtValue(ANY_TIME_3))) - .put(Attribute.STATE, ScalarDbUtils.toColumn(Attribute.toStateValue(state))) - .put(Attribute.VERSION, ScalarDbUtils.toColumn(Attribute.toVersionValue(2))) + BEFORE_PREFIX + ANY_NAME_5, + BigIntColumn.of(BEFORE_PREFIX + ANY_NAME_5, ANY_BIGINT_1)) .put( - Attribute.BEFORE_PREFIX + ANY_NAME_3, - IntColumn.of(Attribute.BEFORE_PREFIX + ANY_NAME_3, ANY_INT_1)) - .put(Attribute.BEFORE_ID, ScalarDbUtils.toColumn(Attribute.toBeforeIdValue(ANY_ID_1))) + BEFORE_PREFIX + ANY_NAME_6, FloatColumn.of(BEFORE_PREFIX + ANY_NAME_6, ANY_FLOAT_1)) .put( - Attribute.BEFORE_PREPARED_AT, - ScalarDbUtils.toColumn(Attribute.toBeforePreparedAtValue(ANY_TIME_1))) + BEFORE_PREFIX + ANY_NAME_7, + DoubleColumn.of(BEFORE_PREFIX + ANY_NAME_7, ANY_DOUBLE_1)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.of(BEFORE_PREFIX + ANY_NAME_8, ANY_TEXT_3)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.of(BEFORE_PREFIX + ANY_NAME_9, ANY_BLOB_1)) .put( - Attribute.BEFORE_COMMITTED_AT, - ScalarDbUtils.toColumn(Attribute.toBeforeCommittedAtValue(ANY_TIME_2))) + BEFORE_PREFIX + ANY_NAME_10, DateColumn.of(BEFORE_PREFIX + ANY_NAME_10, ANY_DATE_1)) .put( - Attribute.BEFORE_STATE, - ScalarDbUtils.toColumn(Attribute.toBeforeStateValue(TransactionState.COMMITTED))) + BEFORE_PREFIX + ANY_NAME_11, TimeColumn.of(BEFORE_PREFIX + ANY_NAME_11, ANY_TIME_1)) .put( - Attribute.BEFORE_VERSION, ScalarDbUtils.toColumn(Attribute.toBeforeVersionValue(1))) + BEFORE_PREFIX + ANY_NAME_12, + TimestampColumn.of(BEFORE_PREFIX + ANY_NAME_12, ANY_TIMESTAMP_1)) + .put( + BEFORE_PREFIX + ANY_NAME_13, + TimestampTZColumn.of(BEFORE_PREFIX + ANY_NAME_13, ANY_TIMESTAMPTZ_1)) + .put(BEFORE_ID, TextColumn.of(BEFORE_ID, ANY_ID_1)) + .put(BEFORE_PREPARED_AT, BigIntColumn.of(BEFORE_PREPARED_AT, ANY_TIME_MILLIS_1)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.of(BEFORE_COMMITTED_AT, ANY_TIME_MILLIS_2)) + .put(BEFORE_STATE, IntColumn.of(BEFORE_STATE, TransactionState.COMMITTED.get())) + .put(BEFORE_VERSION, IntColumn.of(BEFORE_VERSION, 1)) .build(); return new TransactionResult(new ResultImpl(columns, TABLE_METADATA)); } @@ -147,17 +245,38 @@ private TransactionResult prepareInitialResult(String id, TransactionState state .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT_1)) - .put(Attribute.ID, ScalarDbUtils.toColumn(Attribute.toIdValue(id))) - .put( - Attribute.PREPARED_AT, - ScalarDbUtils.toColumn(Attribute.toPreparedAtValue(ANY_TIME_1))) - .put(Attribute.STATE, ScalarDbUtils.toColumn(Attribute.toStateValue(state))) - .put(Attribute.VERSION, ScalarDbUtils.toColumn(Attribute.toVersionValue(1))) - .put(Attribute.BEFORE_ID, ScalarDbUtils.toColumn(Attribute.toBeforeIdValue(null))) - .put(Attribute.BEFORE_VERSION, IntColumn.ofNull(Attribute.BEFORE_VERSION)); + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, false)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT_1)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT_1)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE_1)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_3)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB_1)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE_1)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME_1)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP_1)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ_1)) + .put(ID, TextColumn.of(ID, id)) + .put(PREPARED_AT, BigIntColumn.of(PREPARED_AT, ANY_TIME_MILLIS_1)) + .put(STATE, IntColumn.of(STATE, state.get())) + .put(VERSION, IntColumn.of(VERSION, 1)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_3)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.ofNull(BEFORE_PREFIX + ANY_NAME_4)) + .put(BEFORE_PREFIX + ANY_NAME_5, BigIntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_5)) + .put(BEFORE_PREFIX + ANY_NAME_6, FloatColumn.ofNull(BEFORE_PREFIX + ANY_NAME_6)) + .put(BEFORE_PREFIX + ANY_NAME_7, DoubleColumn.ofNull(BEFORE_PREFIX + ANY_NAME_7)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.ofNull(BEFORE_PREFIX + ANY_NAME_8)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.ofNull(BEFORE_PREFIX + ANY_NAME_9)) + .put(BEFORE_PREFIX + ANY_NAME_10, DateColumn.ofNull(BEFORE_PREFIX + ANY_NAME_10)) + .put(BEFORE_PREFIX + ANY_NAME_11, TimeColumn.ofNull(BEFORE_PREFIX + ANY_NAME_11)) + .put(BEFORE_PREFIX + ANY_NAME_12, TimestampColumn.ofNull(BEFORE_PREFIX + ANY_NAME_12)) + .put(BEFORE_PREFIX + ANY_NAME_13, TimestampTZColumn.ofNull(BEFORE_PREFIX + ANY_NAME_13)) + .put(BEFORE_ID, TextColumn.ofNull(BEFORE_ID)) + .put(BEFORE_PREPARED_AT, BigIntColumn.ofNull(BEFORE_PREPARED_AT)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.ofNull(BEFORE_COMMITTED_AT)) + .put(BEFORE_STATE, IntColumn.ofNull(BEFORE_STATE)) + .put(BEFORE_VERSION, IntColumn.ofNull(BEFORE_VERSION)); if (state.equals(TransactionState.COMMITTED)) { - builder.put( - Attribute.COMMITTED_AT, ScalarDbUtils.toColumn(Attribute.toCommittedAtValue(ANY_TIME_2))); + builder.put(COMMITTED_AT, BigIntColumn.of(COMMITTED_AT, ANY_TIME_MILLIS_2)); } return new TransactionResult(new ResultImpl(builder.build(), TABLE_METADATA)); } @@ -168,19 +287,48 @@ private TransactionResult prepareResultWithNullMetadata(TransactionState state) .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT_2)) - .put(Attribute.ID, TextColumn.of(Attribute.ID, ANY_ID_2)) - .put(Attribute.PREPARED_AT, BigIntColumn.of(Attribute.PREPARED_AT, ANY_TIME_1)) - .put(Attribute.COMMITTED_AT, BigIntColumn.of(Attribute.COMMITTED_AT, ANY_TIME_1)) - .put(Attribute.STATE, IntColumn.of(Attribute.STATE, state.get())) - .put(Attribute.VERSION, IntColumn.of(Attribute.VERSION, 1)) + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, true)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT_2)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT_2)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE_2)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_4)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB_2)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE_2)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME_2)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP_2)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ_2)) + .put(ID, TextColumn.of(ID, ANY_ID_2)) + .put(PREPARED_AT, BigIntColumn.of(PREPARED_AT, ANY_TIME_MILLIS_1)) + .put(COMMITTED_AT, BigIntColumn.of(COMMITTED_AT, ANY_TIME_MILLIS_1)) + .put(STATE, IntColumn.of(STATE, state.get())) + .put(VERSION, IntColumn.of(VERSION, 1)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.of(BEFORE_PREFIX + ANY_NAME_3, ANY_INT_1)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.of(BEFORE_PREFIX + ANY_NAME_4, false)) + .put( + BEFORE_PREFIX + ANY_NAME_5, + BigIntColumn.of(BEFORE_PREFIX + ANY_NAME_5, ANY_BIGINT_1)) + .put( + BEFORE_PREFIX + ANY_NAME_6, FloatColumn.of(BEFORE_PREFIX + ANY_NAME_6, ANY_FLOAT_1)) .put( - Attribute.BEFORE_PREFIX + ANY_NAME_3, - IntColumn.of(Attribute.BEFORE_PREFIX + ANY_NAME_3, ANY_INT_1)) - .put(Attribute.BEFORE_ID, TextColumn.ofNull(Attribute.BEFORE_ID)) - .put(Attribute.BEFORE_PREPARED_AT, BigIntColumn.ofNull(Attribute.BEFORE_PREPARED_AT)) - .put(Attribute.BEFORE_COMMITTED_AT, BigIntColumn.ofNull(Attribute.BEFORE_COMMITTED_AT)) - .put(Attribute.BEFORE_STATE, IntColumn.ofNull(Attribute.BEFORE_STATE)) - .put(Attribute.BEFORE_VERSION, IntColumn.of(Attribute.BEFORE_VERSION, 0)) + BEFORE_PREFIX + ANY_NAME_7, + DoubleColumn.of(BEFORE_PREFIX + ANY_NAME_7, ANY_DOUBLE_1)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.of(BEFORE_PREFIX + ANY_NAME_8, ANY_TEXT_3)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.of(BEFORE_PREFIX + ANY_NAME_9, ANY_BLOB_1)) + .put( + BEFORE_PREFIX + ANY_NAME_10, DateColumn.of(BEFORE_PREFIX + ANY_NAME_10, ANY_DATE_1)) + .put( + BEFORE_PREFIX + ANY_NAME_11, TimeColumn.of(BEFORE_PREFIX + ANY_NAME_11, ANY_TIME_1)) + .put( + BEFORE_PREFIX + ANY_NAME_12, + TimestampColumn.of(BEFORE_PREFIX + ANY_NAME_12, ANY_TIMESTAMP_1)) + .put( + BEFORE_PREFIX + ANY_NAME_13, + TimestampTZColumn.of(BEFORE_PREFIX + ANY_NAME_13, ANY_TIMESTAMPTZ_1)) + .put(BEFORE_ID, TextColumn.ofNull(BEFORE_ID)) + .put(BEFORE_PREPARED_AT, BigIntColumn.ofNull(BEFORE_PREPARED_AT)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.ofNull(BEFORE_COMMITTED_AT)) + .put(BEFORE_STATE, IntColumn.ofNull(BEFORE_STATE)) + .put(BEFORE_VERSION, IntColumn.of(BEFORE_VERSION, 0)) .build(); return new TransactionResult(new ResultImpl(columns, TABLE_METADATA)); } @@ -191,35 +339,41 @@ private TransactionResult prepareInitialResultWithNullMetadata() { .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1)) .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2)) .put(ANY_NAME_3, IntColumn.of(ANY_NAME_3, ANY_INT_1)) - .put(Attribute.ID, TextColumn.ofNull(Attribute.ID)) - .put(Attribute.PREPARED_AT, BigIntColumn.ofNull(Attribute.PREPARED_AT)) - .put(Attribute.COMMITTED_AT, BigIntColumn.ofNull(Attribute.COMMITTED_AT)) - .put(Attribute.STATE, IntColumn.ofNull(Attribute.STATE)) - .put(Attribute.VERSION, IntColumn.ofNull(Attribute.VERSION)) - .put(Attribute.BEFORE_ID, TextColumn.ofNull(Attribute.BEFORE_ID)) - .put(Attribute.BEFORE_PREPARED_AT, BigIntColumn.ofNull(Attribute.BEFORE_PREPARED_AT)) - .put(Attribute.BEFORE_COMMITTED_AT, BigIntColumn.ofNull(Attribute.BEFORE_COMMITTED_AT)) - .put(Attribute.BEFORE_STATE, IntColumn.ofNull(Attribute.BEFORE_STATE)) - .put(Attribute.BEFORE_VERSION, IntColumn.ofNull(Attribute.BEFORE_VERSION)) + .put(ANY_NAME_4, BooleanColumn.of(ANY_NAME_4, false)) + .put(ANY_NAME_5, BigIntColumn.of(ANY_NAME_5, ANY_BIGINT_1)) + .put(ANY_NAME_6, FloatColumn.of(ANY_NAME_6, ANY_FLOAT_1)) + .put(ANY_NAME_7, DoubleColumn.of(ANY_NAME_7, ANY_DOUBLE_1)) + .put(ANY_NAME_8, TextColumn.of(ANY_NAME_8, ANY_TEXT_3)) + .put(ANY_NAME_9, BlobColumn.of(ANY_NAME_9, ANY_BLOB_1)) + .put(ANY_NAME_10, DateColumn.of(ANY_NAME_10, ANY_DATE_1)) + .put(ANY_NAME_11, TimeColumn.of(ANY_NAME_11, ANY_TIME_1)) + .put(ANY_NAME_12, TimestampColumn.of(ANY_NAME_12, ANY_TIMESTAMP_1)) + .put(ANY_NAME_13, TimestampTZColumn.of(ANY_NAME_13, ANY_TIMESTAMPTZ_1)) + .put(ID, TextColumn.ofNull(ID)) + .put(PREPARED_AT, BigIntColumn.ofNull(PREPARED_AT)) + .put(COMMITTED_AT, BigIntColumn.ofNull(COMMITTED_AT)) + .put(STATE, IntColumn.ofNull(STATE)) + .put(VERSION, IntColumn.ofNull(VERSION)) + .put(BEFORE_PREFIX + ANY_NAME_3, IntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_3)) + .put(BEFORE_PREFIX + ANY_NAME_4, BooleanColumn.ofNull(BEFORE_PREFIX + ANY_NAME_4)) + .put(BEFORE_PREFIX + ANY_NAME_5, BigIntColumn.ofNull(BEFORE_PREFIX + ANY_NAME_5)) + .put(BEFORE_PREFIX + ANY_NAME_6, FloatColumn.ofNull(BEFORE_PREFIX + ANY_NAME_6)) + .put(BEFORE_PREFIX + ANY_NAME_7, DoubleColumn.ofNull(BEFORE_PREFIX + ANY_NAME_7)) + .put(BEFORE_PREFIX + ANY_NAME_8, TextColumn.ofNull(BEFORE_PREFIX + ANY_NAME_8)) + .put(BEFORE_PREFIX + ANY_NAME_9, BlobColumn.ofNull(BEFORE_PREFIX + ANY_NAME_9)) + .put(BEFORE_PREFIX + ANY_NAME_10, DateColumn.ofNull(BEFORE_PREFIX + ANY_NAME_10)) + .put(BEFORE_PREFIX + ANY_NAME_11, TimeColumn.ofNull(BEFORE_PREFIX + ANY_NAME_11)) + .put(BEFORE_PREFIX + ANY_NAME_12, TimestampColumn.ofNull(BEFORE_PREFIX + ANY_NAME_12)) + .put(BEFORE_PREFIX + ANY_NAME_13, TimestampTZColumn.ofNull(BEFORE_PREFIX + ANY_NAME_13)) + .put(BEFORE_ID, TextColumn.ofNull(BEFORE_ID)) + .put(BEFORE_PREPARED_AT, BigIntColumn.ofNull(BEFORE_PREPARED_AT)) + .put(BEFORE_COMMITTED_AT, BigIntColumn.ofNull(BEFORE_COMMITTED_AT)) + .put(BEFORE_STATE, IntColumn.ofNull(BEFORE_STATE)) + .put(BEFORE_VERSION, IntColumn.ofNull(BEFORE_VERSION)) .build(); return new TransactionResult(new ResultImpl(columns, TABLE_METADATA)); } - private List> extractAfterValues(TransactionResult result) { - List> values = new ArrayList<>(); - result - .getValues() - .forEach( - (k, v) -> { - if (ConsensusCommitUtils.isAfterImageColumn(k, TABLE_METADATA) - && !TABLE_METADATA.getPartitionKeyNames().contains(k) - && !TABLE_METADATA.getClusteringKeyNames().contains(k)) { - values.add(v); - } - }); - return values; - } - private List> extractAfterColumns(TransactionResult result) { List> columns = new ArrayList<>(); result @@ -247,19 +401,42 @@ public void add_GetAndPreparedResultByThisGiven_ShouldComposePut() throws Execut // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(get.forNamespace().get()) + .table(get.forTable().get()) + .partitionKey(get.getPartitionKey()) + .clusteringKey(get.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(TransactionState.PREPARED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(get.getPartitionKey(), get.getClusteringKey().orElse(null)) - .forNamespace(get.forNamespace().get()) - .forTable(get.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); - expected.withValues( - extractAfterValues(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED))); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test @@ -274,18 +451,41 @@ public void add_GetAndDeletedResultByThisGiven_ShouldComposePut() throws Executi // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(get.forNamespace().get()) + .table(get.forTable().get()) + .partitionKey(get.getPartitionKey()) + .clusteringKey(get.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE).isEqualToInt(TransactionState.DELETED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(get.getPartitionKey(), get.getClusteringKey().orElse(null)) - .forNamespace(get.forNamespace().get()) - .forTable(get.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression(STATE, toStateValue(TransactionState.DELETED), Operator.EQ))); - expected.withValues( - extractAfterValues(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED))); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test @@ -333,17 +533,40 @@ public void add_PutAndResultFromSnapshotGivenAndPreparedResultGivenFromStorage_S // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(put.forNamespace().get()) + .table(put.forTable().get()) + .partitionKey(put.getPartitionKey()) + .clusteringKey(put.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(TransactionState.PREPARED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(put.getPartitionKey(), put.getClusteringKey().orElse(null)) - .forNamespace(put.forNamespace().get()) - .forTable(put.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); - expected.withValues(extractAfterValues(resultInSnapshot)); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); verify(storage).get(any(Get.class)); } @@ -362,16 +585,39 @@ public void add_PutAndResultFromSnapshotGivenAndDeletedResultGivenFromStorage_Sh // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(put.forNamespace().get()) + .table(put.forTable().get()) + .partitionKey(put.getPartitionKey()) + .clusteringKey(put.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE).isEqualToInt(TransactionState.DELETED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(put.getPartitionKey(), put.getClusteringKey().orElse(null)) - .forNamespace(put.forNamespace().get()) - .forTable(put.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression(STATE, toStateValue(TransactionState.DELETED), Operator.EQ))); - expected.withValues(extractAfterValues(resultInSnapshot)); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); verify(storage).get(any(Get.class)); } @@ -420,19 +666,42 @@ public void add_ScanAndPreparedResultByThisGiven_ShouldComposePut() throws Execu // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(scan.forNamespace().get()) + .table(scan.forTable().get()) + .partitionKey(scan.getPartitionKey()) + .clusteringKey(result.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE) + .isEqualToInt(TransactionState.PREPARED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(scan.getPartitionKey(), result.getClusteringKey().orElse(null)) - .forNamespace(scan.forNamespace().get()) - .forTable(scan.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression( - STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); - expected.withValues( - extractAfterValues(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED))); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test @@ -447,18 +716,41 @@ public void add_ScanAndDeletedResultByThisGiven_ShouldComposePut() throws Execut // Assert Put actual = (Put) composer.get().get(0); + PutBuilder.Buildable builder = + Put.newBuilder() + .namespace(scan.forNamespace().get()) + .table(scan.forTable().get()) + .partitionKey(scan.getPartitionKey()) + .clusteringKey(result.getClusteringKey().orElse(null)) + .consistency(Consistency.LINEARIZABLE) + .condition( + ConditionBuilder.putIf(ConditionBuilder.column(ID).isEqualToText(ANY_ID_2)) + .and( + ConditionBuilder.column(STATE).isEqualToInt(TransactionState.DELETED.get())) + .build()); + extractAfterColumns(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED)) + .forEach(builder::value); Put expected = - new Put(scan.getPartitionKey(), result.getClusteringKey().orElse(null)) - .forNamespace(scan.forNamespace().get()) - .forTable(scan.forTable().get()); - expected.withConsistency(Consistency.LINEARIZABLE); - expected.withCondition( - new PutIf( - new ConditionalExpression(ID, toIdValue(ANY_ID_2), Operator.EQ), - new ConditionalExpression(STATE, toStateValue(TransactionState.DELETED), Operator.EQ))); - expected.withValues( - extractAfterValues(prepareInitialResult(ANY_ID_1, TransactionState.COMMITTED))); + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test @@ -485,6 +777,7 @@ public void add_ScanAndPreparedResultByThisGivenAndBeforeResultNotGiven_ShouldCo new ConditionalExpression( STATE, toStateValue(TransactionState.PREPARED), Operator.EQ))); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test @@ -514,8 +807,27 @@ public void add_GetAndPreparedResultWithNullMetadataByThisGiven_ShouldComposePut .isEqualToInt(TransactionState.PREPARED.get())) .build()); extractAfterColumns(prepareInitialResultWithNullMetadata()).forEach(builder::value); - Put expected = builder.build(); + Put expected = + builder + .textValue(BEFORE_ID, null) + .intValue(BEFORE_STATE, null) + .intValue(BEFORE_VERSION, null) + .bigIntValue(BEFORE_PREPARED_AT, null) + .bigIntValue(BEFORE_COMMITTED_AT, null) + .intValue(BEFORE_PREFIX + ANY_NAME_3, null) + .booleanValue(BEFORE_PREFIX + ANY_NAME_4, null) + .bigIntValue(BEFORE_PREFIX + ANY_NAME_5, null) + .floatValue(BEFORE_PREFIX + ANY_NAME_6, null) + .doubleValue(BEFORE_PREFIX + ANY_NAME_7, null) + .textValue(BEFORE_PREFIX + ANY_NAME_8, null) + .blobValue(BEFORE_PREFIX + ANY_NAME_9, (byte[]) null) + .dateValue(BEFORE_PREFIX + ANY_NAME_10, null) + .timeValue(BEFORE_PREFIX + ANY_NAME_11, null) + .timestampValue(BEFORE_PREFIX + ANY_NAME_12, null) + .timestampTZValue(BEFORE_PREFIX + ANY_NAME_13, null) + .build(); assertThat(actual).isEqualTo(expected); + verify(storage).get(any(Get.class)); } @Test