diff --git a/core/src/main/java/com/scalar/db/api/ConditionBuilder.java b/core/src/main/java/com/scalar/db/api/ConditionBuilder.java
index 6a8d0c4933..b809d6be4b 100644
--- a/core/src/main/java/com/scalar/db/api/ConditionBuilder.java
+++ b/core/src/main/java/com/scalar/db/api/ConditionBuilder.java
@@ -135,6 +135,23 @@ public static ConditionalExpression buildConditionalExpression(
return new ConditionalExpression(column, operator);
}
+ /**
+ * Builds a like expression with the specified column, operator, and escape character.
+ *
+ *
This method is primarily for internal use. Breaking changes can and will be introduced to
+ * this method. Users should not depend on it.
+ *
+ * @param column a target text column used to compare
+ * @param operator an operator used to compare the target column. The operator must be either LIKE
+ * or NOT_LIKE.
+ * @param escape an escape character for the like operator
+ * @return a conditional expression
+ */
+ public static ConditionalExpression buildLikeExpression(
+ TextColumn column, Operator operator, String escape) {
+ return new LikeExpression(column, operator, escape);
+ }
+
/**
* Returns a builder object for a condition expression for PutIf/DeleteIf
*
@@ -352,6 +369,7 @@ public ConditionalExpression isNotEqualToBlob(byte[] value) {
public ConditionalExpression isNotEqualToBlob(ByteBuffer value) {
return new ConditionalExpression(columnName, value, Operator.NE);
}
+
/**
* Creates a 'not equal' conditional expression for a DATE value.
*
@@ -391,6 +409,7 @@ public ConditionalExpression isNotEqualToTimestamp(LocalDateTime value) {
public ConditionalExpression isNotEqualToTimestampTZ(Instant value) {
return new ConditionalExpression(TimestampTZColumn.of(columnName, value), Operator.NE);
}
+
/**
* Creates a 'greater than' conditional expression for a BOOLEAN value.
*
@@ -590,6 +609,7 @@ public ConditionalExpression isGreaterThanOrEqualToBlob(byte[] value) {
public ConditionalExpression isGreaterThanOrEqualToBlob(ByteBuffer value) {
return new ConditionalExpression(columnName, value, Operator.GTE);
}
+
/**
* Creates a 'greater than or equal' conditional expression for a DATE value.
*
@@ -709,6 +729,7 @@ public ConditionalExpression isLessThanBlob(byte[] value) {
public ConditionalExpression isLessThanBlob(ByteBuffer value) {
return new ConditionalExpression(columnName, value, Operator.LT);
}
+
/**
* Creates a 'less than' conditional expression for a DATE value.
*
@@ -748,6 +769,7 @@ public ConditionalExpression isLessThanTimestamp(LocalDateTime value) {
public ConditionalExpression isLessThanTimestampTZ(Instant value) {
return new ConditionalExpression(TimestampTZColumn.of(columnName, value), Operator.LT);
}
+
/**
* Creates a 'less than or equal' conditional expression for a BOOLEAN value.
*
@@ -1029,6 +1051,7 @@ public ConditionalExpression isNotNullText() {
public ConditionalExpression isNotNullBlob() {
return new ConditionalExpression(BlobColumn.ofNull(columnName), Operator.IS_NOT_NULL);
}
+
/**
* Creates an 'is not null' conditional expression for a DATE value.
*
diff --git a/core/src/main/java/com/scalar/db/api/LikeExpression.java b/core/src/main/java/com/scalar/db/api/LikeExpression.java
index f3df7d2c5e..084fc20687 100644
--- a/core/src/main/java/com/scalar/db/api/LikeExpression.java
+++ b/core/src/main/java/com/scalar/db/api/LikeExpression.java
@@ -16,8 +16,9 @@ public class LikeExpression extends ConditionalExpression {
* Constructs a {@code LikeExpression} with the specified column and operator. For the escape
* character, the default one ("\", i.e., backslash) is used.
*
- * @param column a target column used to compare
- * @param operator an operator used to compare the target column
+ * @param column a target text column used to compare
+ * @param operator an operator used to compare the target text column. The operator must be either
+ * LIKE or NOT_LIKE.
*/
LikeExpression(TextColumn column, Operator operator) {
this(column, operator, DEFAULT_ESCAPE_CHAR);
@@ -28,8 +29,9 @@ public class LikeExpression extends ConditionalExpression {
* The escape character must be a string of a single character or an empty string. If an empty
* string is specified, the escape character is disabled.
*
- * @param column a target column used to compare
- * @param operator an operator used to compare the target column
+ * @param column a target text column used to compare
+ * @param operator an operator used to compare the target text column. The operator must be either
+ * LIKE or NOT_LIKE.
* @param escape an escape character for the like operator
*/
LikeExpression(TextColumn column, Operator operator, String escape) {
@@ -75,6 +77,11 @@ private void check(String pattern, Operator operator, String escape) {
}
}
+ @Override
+ public TextColumn getColumn() {
+ return (TextColumn) super.getColumn();
+ }
+
/**
* Returns the escape character for LIKE operator.
*
diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java
index 66d213c3bc..9ce5ec289c 100644
--- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java
+++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java
@@ -4,11 +4,16 @@
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isImplicitPreReadEnabled;
import com.google.common.annotations.VisibleForTesting;
+import com.scalar.db.api.AndConditionSet;
+import com.scalar.db.api.ConditionBuilder;
+import com.scalar.db.api.ConditionSetBuilder;
+import com.scalar.db.api.ConditionalExpression;
import com.scalar.db.api.Consistency;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Get;
import com.scalar.db.api.GetBuilder;
+import com.scalar.db.api.LikeExpression;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
@@ -25,13 +30,16 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -137,34 +145,45 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException {
@VisibleForTesting
Optional read(@Nullable Snapshot.Key key, Get get) throws CrudException {
Optional result = getFromStorage(get);
- if (!result.isPresent() || result.get().isCommitted()) {
- if (result.isPresent() || get.getConjunctions().isEmpty()) {
- // Keep the read set latest to create before image by using the latest record (result)
- // because another conflicting transaction might have updated the record after this
- // transaction read it first. However, we update it only if a get operation has no
- // conjunction or the result exists. This is because we don’t know whether the record
- // actually exists or not due to the conjunction.
- if (key != null) {
- putIntoReadSetInSnapshot(key, result);
- } else {
- // Only for a Get with index, the argument `key` is null
+ if (result.isPresent() && !result.get().isCommitted()) {
+ throw new UncommittedRecordException(
+ get,
+ result.get(),
+ CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(),
+ snapshot.getId());
+ }
- if (result.isPresent()) {
- // Only when we can get the record with the Get with index, we can put it into the read
- // set
- key = new Snapshot.Key(get, result.get());
- putIntoReadSetInSnapshot(key, result);
- }
+ if (!get.getConjunctions().isEmpty()) {
+ // Because we also get records whose before images match the conjunctions, we need to check if
+ // the current status of the records actually match the conjunctions.
+ result =
+ result.filter(
+ r ->
+ ScalarDbUtils.columnsMatchAnyOfConjunctions(
+ r.getColumns(), get.getConjunctions()));
+ }
+
+ if (result.isPresent() || get.getConjunctions().isEmpty()) {
+ // Keep the read set latest to create before image by using the latest record (result)
+ // because another conflicting transaction might have updated the record after this
+ // transaction read it first. However, we update it only if a get operation has no
+ // conjunction or the result exists. This is because we don’t know whether the record
+ // actually exists or not due to the conjunction.
+ if (key != null) {
+ putIntoReadSetInSnapshot(key, result);
+ } else {
+ // Only for a Get with index, the argument `key` is null
+
+ if (result.isPresent()) {
+ // Only when we can get the record with the Get with index, we can put it into the read
+ // set
+ key = new Snapshot.Key(get, result.get());
+ putIntoReadSetInSnapshot(key, result);
}
}
- putIntoGetSetInSnapshot(get, result);
- return result;
}
- throw new UncommittedRecordException(
- get,
- result.get(),
- CoreError.CONSENSUS_COMMIT_READ_UNCOMMITTED_RECORD.buildMessage(),
- snapshot.getId());
+ putIntoGetSetInSnapshot(get, result);
+ return result;
}
public List scan(Scan originalScan) throws CrudException {
@@ -191,12 +210,24 @@ private LinkedHashMap scanInternal(Scan scan)
Scanner scanner = null;
try {
- scanner = scanFromStorage(scan);
+ if (scan.getLimit() > 0) {
+ // Since the conjunctions may delete some records from the scan result, it is necessary to
+ // perform the scan without a limit.
+ scanner = scanFromStorage(Scan.newBuilder(scan).limit(0).build());
+ } else {
+ scanner = scanFromStorage(scan);
+ }
+
for (Result r : scanner) {
TransactionResult result = new TransactionResult(r);
Snapshot.Key key = new Snapshot.Key(scan, r);
- processScanResult(key, scan, result);
- results.put(key, result);
+ Optional processedScanResult = processScanResult(key, scan, result);
+ processedScanResult.ifPresent(res -> results.put(key, res));
+
+ if (scan.getLimit() > 0 && results.size() >= scan.getLimit()) {
+ // If the scan has a limit, we stop scanning when we reach the limit.
+ break;
+ }
}
} catch (RuntimeException e) {
Exception exception;
@@ -224,8 +255,8 @@ private LinkedHashMap scanInternal(Scan scan)
return results;
}
- private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult result)
- throws CrudException {
+ private Optional processScanResult(
+ Snapshot.Key key, Scan scan, TransactionResult result) throws CrudException {
if (!result.isCommitted()) {
throw new UncommittedRecordException(
scan,
@@ -234,10 +265,25 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re
snapshot.getId());
}
- // We always update the read set to create before image by using the latest record (result)
- // because another conflicting transaction might have updated the record after this
- // transaction read it first.
- putIntoReadSetInSnapshot(key, Optional.of(result));
+ Optional ret = Optional.of(result);
+ if (!scan.getConjunctions().isEmpty()) {
+ // Because we also get records whose before images match the conjunctions, we need to check if
+ // the current status of the records actually match the conjunctions.
+ ret =
+ ret.filter(
+ r ->
+ ScalarDbUtils.columnsMatchAnyOfConjunctions(
+ r.getColumns(), scan.getConjunctions()));
+ }
+
+ if (ret.isPresent()) {
+ // We always update the read set to create before image by using the latest record (result)
+ // because another conflicting transaction might have updated the record after this
+ // transaction read it first.
+ putIntoReadSetInSnapshot(key, ret);
+ }
+
+ return ret;
}
public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException {
@@ -397,7 +443,16 @@ private Get createGet(Snapshot.Key key) throws CrudException {
@VisibleForTesting
Optional getFromStorage(Get get) throws CrudException {
try {
- return storage.get(get).map(TransactionResult::new);
+ if (get.getConjunctions().isEmpty()) {
+ // If there are no conjunctions, we can read the record directly
+ return storage.get(get).map(TransactionResult::new);
+ } else {
+ // If there are conjunctions, we need to convert them to include conditions on the before
+ // image
+ Set converted = convertConjunctions(get, get.getConjunctions());
+ Get convertedGet = Get.newBuilder(get).clearConditions().whereOr(converted).build();
+ return storage.get(convertedGet).map(TransactionResult::new);
+ }
} catch (ExecutionException e) {
throw new CrudException(
CoreError.CONSENSUS_COMMIT_READING_RECORD_FROM_STORAGE_FAILED.buildMessage(),
@@ -408,7 +463,16 @@ Optional getFromStorage(Get get) throws CrudException {
private Scanner scanFromStorage(Scan scan) throws CrudException {
try {
- return storage.scan(scan);
+ if (scan.getConjunctions().isEmpty()) {
+ // If there are no conjunctions, we can read the record directly
+ return storage.scan(scan);
+ } else {
+ // If there are conjunctions, we need to convert them to include conditions on the before
+ // image
+ Set converted = convertConjunctions(scan, scan.getConjunctions());
+ Scan convertedScan = Scan.newBuilder(scan).clearConditions().whereOr(converted).build();
+ return storage.scan(convertedScan);
+ }
} catch (ExecutionException e) {
throw new CrudException(
CoreError.CONSENSUS_COMMIT_SCANNING_RECORDS_FROM_STORAGE_FAILED.buildMessage(),
@@ -417,6 +481,119 @@ private Scanner scanFromStorage(Scan scan) throws CrudException {
}
}
+ /**
+ * Converts the given conjunctions to include conditions on before images.
+ *
+ * This is necessary because we might miss prepared records whose before images match the
+ * original conditions when reading from storage. For example, suppose we have the following
+ * records in storage:
+ *
+ *
+ * | partition_key | clustering_key | column | status | before_column | before_status |
+ * |---------------|----------------|--------|-----------|---------------|----------------|
+ * | 0 | 0 | 1000 | COMMITTED | | |
+ * | 0 | 1 | 200 | PREPARED | 1000 | COMMITTED |
+ *
+ *
+ * If we scan records with the condition "column = 1000" without converting the condition
+ * (conjunction), we only get the first record, not the second one, because the condition does not
+ * match. However, the second record has not been committed yet, so we should still retrieve it,
+ * considering the possibility that the record will be rolled back.
+ *
+ * To handle such cases, we convert the conjunctions to include conditions on the before image.
+ * For example, if the original condition is:
+ *
+ *
+ * column = 1000
+ *
+ *
+ * We convert it to:
+ *
+ *
+ * column = 1000 OR before_column = 1000
+ *
+ *
+ * Here are more examples:
+ *
+ *
Example 1:
+ *
+ *
+ * {@code column >= 500 AND column < 1000}
+ *
+ *
+ * becomes:
+ *
+ *
+ * {@code (column >= 500 AND column < 1000) OR (before_column >= 500 AND before_column < 1000)}
+ *
+ *
+ * Example 2:
+ *
+ *
+ * {@code column1 = 500 OR column2 != 1000}
+ *
+ *
+ * becomes:
+ *
+ *
+ * {@code column1 = 500 OR column2 != 1000 OR before_column1 = 500 OR before_column2 != 1000}
+ *
+ *
+ * This way, we can ensure that prepared records whose before images satisfy the original scan
+ * conditions are not missed during the scan.
+ *
+ * @param selection the selection to convert
+ * @param conjunctions the conjunctions to convert
+ * @return the converted conjunctions
+ */
+ private Set convertConjunctions(
+ Selection selection, Set conjunctions) throws CrudException {
+ TableMetadata metadata = getTableMetadata(selection);
+
+ Set converted = new HashSet<>(conjunctions.size() * 2);
+
+ // Keep the original conjunctions
+ conjunctions.forEach(
+ c -> converted.add(ConditionSetBuilder.andConditionSet(c.getConditions()).build()));
+
+ // Add conditions on the before image
+ for (Selection.Conjunction conjunction : conjunctions) {
+ Set conditions = new HashSet<>(conjunction.getConditions().size());
+ for (ConditionalExpression condition : conjunction.getConditions()) {
+ String columnName = condition.getColumn().getName();
+
+ if (metadata.getPartitionKeyNames().contains(columnName)
+ || metadata.getClusteringKeyNames().contains(columnName)) {
+ // If the condition is on the primary key, we don't need to convert it
+ conditions.add(condition);
+ continue;
+ }
+
+ // Convert the condition to use the before image column
+ ConditionalExpression convertedCondition;
+ if (condition instanceof LikeExpression) {
+ LikeExpression likeExpression = (LikeExpression) condition;
+ convertedCondition =
+ ConditionBuilder.buildLikeExpression(
+ likeExpression.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName),
+ likeExpression.getOperator(),
+ likeExpression.getEscape());
+ } else {
+ convertedCondition =
+ ConditionBuilder.buildConditionalExpression(
+ condition.getColumn().copyWith(Attribute.BEFORE_PREFIX + columnName),
+ condition.getOperator());
+ }
+
+ conditions.add(convertedCondition);
+ }
+
+ converted.add(ConditionSetBuilder.andConditionSet(conditions).build());
+ }
+
+ return converted;
+ }
+
private Selection prepareStorageSelection(Selection selection) throws CrudException {
selection.clearProjections();
// Retrieve only the after images columns when including the metadata is disabled, otherwise
@@ -486,6 +663,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera
private final Scanner scanner;
@Nullable private final LinkedHashMap results;
+ private final AtomicInteger scanCount = new AtomicInteger();
private final AtomicBoolean fullyScanned = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -493,7 +671,14 @@ public ConsensusCommitStorageScanner(Scan scan, List originalProjections
throws CrudException {
this.scan = scan;
this.originalProjections = originalProjections;
- scanner = scanFromStorage(scan);
+
+ if (scan.getLimit() > 0) {
+ // Since the conjunctions may delete some records from the scan result, it is necessary to
+ // perform the scan without a limit.
+ scanner = scanFromStorage(Scan.newBuilder(scan).limit(0).build());
+ } else {
+ scanner = scanFromStorage(scan);
+ }
if (isValidationOrSnapshotReadRequired()) {
results = new LinkedHashMap<>();
@@ -506,25 +691,45 @@ public ConsensusCommitStorageScanner(Scan scan, List originalProjections
@Override
public Optional one() throws CrudException {
+ if (fullyScanned.get()) {
+ return Optional.empty();
+ }
+
try {
- Optional r = scanner.one();
+ while (true) {
+ Optional r = scanner.one();
- if (!r.isPresent()) {
- fullyScanned.set(true);
- return Optional.empty();
- }
+ if (!r.isPresent()) {
+ fullyScanned.set(true);
+ return Optional.empty();
+ }
- Snapshot.Key key = new Snapshot.Key(scan, r.get());
- TransactionResult result = new TransactionResult(r.get());
- processScanResult(key, scan, result);
+ Snapshot.Key key = new Snapshot.Key(scan, r.get());
+ TransactionResult result = new TransactionResult(r.get());
- if (results != null) {
- results.put(key, result);
- }
+ Optional processedScanResult = processScanResult(key, scan, result);
+ if (!processedScanResult.isPresent()) {
+ continue;
+ }
+
+ if (results != null) {
+ results.put(key, processedScanResult.get());
+ }
+ scanCount.incrementAndGet();
- TableMetadata metadata = getTableMetadata(scan);
- return Optional.of(
- new FilteredResult(result, originalProjections, metadata, isIncludeMetadataEnabled));
+ if (scan.getLimit() > 0 && scanCount.get() >= scan.getLimit()) {
+ // If the scan has a limit, we stop scanning when we reach the limit.
+ fullyScanned.set(true);
+ }
+
+ TableMetadata metadata = getTableMetadata(scan);
+ return Optional.of(
+ new FilteredResult(
+ processedScanResult.get(),
+ originalProjections,
+ metadata,
+ isIncludeMetadataEnabled));
+ }
} catch (ExecutionException e) {
closeScanner();
throw new CrudException(
diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java
index b9c1ca8eba..5fbf1bf9a5 100644
--- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java
+++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java
@@ -1,8 +1,13 @@
package com.scalar.db.transaction.consensuscommit;
+import static com.scalar.db.api.ConditionBuilder.column;
+import static com.scalar.db.api.ConditionBuilder.deleteIfExists;
+import static com.scalar.db.api.ConditionBuilder.putIfExists;
+import static com.scalar.db.api.ConditionSetBuilder.condition;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -12,7 +17,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.scalar.db.api.ConditionBuilder;
import com.scalar.db.api.ConditionalExpression;
import com.scalar.db.api.Consistency;
import com.scalar.db.api.Delete;
@@ -42,6 +46,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -63,6 +68,7 @@ public class CrudHandlerTest {
private static final String ANY_NAME_1 = "name1";
private static final String ANY_NAME_2 = "name2";
private static final String ANY_NAME_3 = "name3";
+ private static final String ANY_NAME_4 = "name4";
private static final String ANY_TEXT_1 = "text1";
private static final String ANY_TEXT_2 = "text2";
private static final String ANY_TEXT_3 = "text3";
@@ -76,6 +82,7 @@ public class CrudHandlerTest {
.addColumn(ANY_NAME_1, DataType.TEXT)
.addColumn(ANY_NAME_2, DataType.TEXT)
.addColumn(ANY_NAME_3, DataType.TEXT)
+ .addColumn(ANY_NAME_4, DataType.INT)
.addPartitionKey(ANY_NAME_1)
.addClusteringKey(ANY_NAME_2)
.addSecondaryIndex(ANY_NAME_3)
@@ -139,7 +146,7 @@ private Scan prepareCrossPartitionScan() {
.namespace(ANY_NAMESPACE_NAME)
.table(ANY_TABLE_NAME)
.all()
- .where(ConditionBuilder.column("column").isEqualToInt(10))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
.build();
}
@@ -152,10 +159,16 @@ private Scan toScanForStorageFrom(Scan scan) {
}
private TransactionResult prepareResult(TransactionState state) {
+ return prepareResult(ANY_TEXT_1, ANY_TEXT_2, state);
+ }
+
+ private TransactionResult prepareResult(
+ String partitionKeyColumnValue, String clusteringKeyColumnValue, TransactionState state) {
ImmutableMap> columns =
ImmutableMap.>builder()
- .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1))
- .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2))
+ .put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, partitionKeyColumnValue))
+ .put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, clusteringKeyColumnValue))
+ .put(ANY_NAME_3, TextColumn.of(ANY_NAME_3, ANY_TEXT_3))
.put(Attribute.ID, ScalarDbUtils.toColumn(Attribute.toIdValue(ANY_ID_2)))
.put(Attribute.STATE, ScalarDbUtils.toColumn(Attribute.toStateValue(state)))
.put(Attribute.VERSION, ScalarDbUtils.toColumn(Attribute.toVersionValue(2)))
@@ -356,14 +369,15 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept
true);
when(snapshot.isValidationRequired()).thenReturn(true);
- ConditionalExpression condition = mock(ConditionalExpression.class);
+ ConditionalExpression condition = column(ANY_NAME_3).isEqualToText(ANY_TEXT_3);
Get get = Get.newBuilder(prepareGet()).where(condition).build();
Get getForStorage = toGetForStorageFrom(get);
+
Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED));
Optional transactionResult = expected.map(e -> (TransactionResult) e);
Snapshot.Key key = new Snapshot.Key(getForStorage);
when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false);
- when(storage.get(getForStorage)).thenReturn(expected);
+ when(storage.get(any())).thenReturn(expected);
when(snapshot.mergeResult(
key, transactionResult, Collections.singleton(Selection.Conjunction.of(condition))))
.thenReturn(transactionResult);
@@ -377,7 +391,13 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept
Optional.of(
new FilteredResult(
expected.get(), Collections.emptyList(), TABLE_METADATA, false)));
- verify(storage).get(getForStorage);
+ verify(storage)
+ .get(
+ Get.newBuilder(getForStorage)
+ .clearConditions()
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .build());
verify(snapshot, never()).putIntoReadSet(any(), any());
verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get()));
}
@@ -537,7 +557,6 @@ void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanT
when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty());
}
when(storage.scan(scanForStorage)).thenReturn(scanner);
- when(snapshot.getResult(any())).thenReturn(Optional.of(expected));
// Act
List results = scanOrGetScanner(scan, scanType);
@@ -579,7 +598,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot
when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty());
}
when(storage.scan(scanForStorage)).thenReturn(scanner);
- when(snapshot.getResult(any())).thenReturn(Optional.of(expected));
// Act
List results = scanOrGetScanner(scan, scanType);
@@ -621,7 +639,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot
when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty());
}
when(storage.scan(scanForStorage)).thenReturn(scanner);
- when(snapshot.getResult(any())).thenReturn(Optional.of(expected));
// Act
List results = scanOrGetScanner(scan, scanType);
@@ -664,7 +681,6 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot
when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty());
}
when(storage.scan(scanForStorage)).thenReturn(scanner);
- when(snapshot.getResult(any())).thenReturn(Optional.of(expected));
// Act
List results = scanOrGetScanner(scan, scanType);
@@ -731,7 +747,6 @@ void scanOrGetScanner_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot(Scan
when(snapshot.getResults(scanForStorage))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(Maps.newLinkedHashMap(ImmutableMap.of(key, expected))));
- when(snapshot.getResult(key)).thenReturn(Optional.of(expected));
// Act
List results1 = scanOrGetScanner(scan1, scanType);
@@ -945,7 +960,6 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum
}
when(storage.scan(any(ScanAll.class))).thenReturn(scanner);
TransactionResult transactionResult = new TransactionResult(result);
- when(snapshot.getResult(key)).thenReturn(Optional.of(transactionResult));
// Act
List results = scanOrGetScanner(scan, scanType);
@@ -994,6 +1008,120 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum
verify(snapshot, never()).verifyNoOverlap(any(), any());
}
+ @ParameterizedTest
+ @EnumSource(ScanType.class)
+ void scanOrGetScanner_WithLimit_ShouldReturnLimitedResults(ScanType scanType)
+ throws CrudException, ExecutionException, IOException {
+ // Arrange
+ Scan scanWithoutLimit = prepareScan();
+ Scan scanWithLimit = Scan.newBuilder(scanWithoutLimit).limit(2).build();
+ Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit);
+
+ Result result1 = prepareResult(ANY_TEXT_1, ANY_TEXT_2, TransactionState.COMMITTED);
+ Result result2 = prepareResult(ANY_TEXT_1, ANY_TEXT_3, TransactionState.COMMITTED);
+
+ Snapshot.Key key1 = new Snapshot.Key(scanWithLimit, result1);
+ Snapshot.Key key2 = new Snapshot.Key(scanWithLimit, result2);
+
+ TransactionResult transactionResult1 = new TransactionResult(result1);
+ TransactionResult transactionResult2 = new TransactionResult(result2);
+
+ // Set up mock scanner to return two results
+ if (scanType == ScanType.SCAN) {
+ when(scanner.iterator()).thenReturn(Arrays.asList(result1, result2).iterator());
+ } else {
+ when(scanner.one())
+ .thenReturn(Optional.of(result1))
+ .thenReturn(Optional.of(result2))
+ .thenReturn(Optional.empty());
+ }
+ when(storage.scan(scanForStorage)).thenReturn(scanner);
+
+ // Act
+ List results = scanOrGetScanner(scanWithLimit, scanType);
+
+ // Assert
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0))
+ .isEqualTo(
+ new FilteredResult(transactionResult1, Collections.emptyList(), TABLE_METADATA, false));
+ assertThat(results.get(1))
+ .isEqualTo(
+ new FilteredResult(transactionResult2, Collections.emptyList(), TABLE_METADATA, false));
+
+ verify(scanner).close();
+ verify(snapshot).putIntoReadSet(key1, Optional.of(transactionResult1));
+ verify(snapshot).putIntoReadSet(key2, Optional.of(transactionResult2));
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor> resultsCaptor =
+ ArgumentCaptor.forClass(LinkedHashMap.class);
+ verify(snapshot).putIntoScanSet(eq(scanWithLimit), resultsCaptor.capture());
+
+ LinkedHashMap capturedResults = resultsCaptor.getValue();
+ assertThat(capturedResults).hasSize(2);
+ assertThat(capturedResults).containsKeys(key1, key2);
+ }
+
+ @ParameterizedTest
+ @EnumSource(ScanType.class)
+ void scanOrGetScanner_WithLimitExceedingAvailableResults_ShouldReturnAllAvailableResults(
+ ScanType scanType) throws CrudException, ExecutionException, IOException {
+ // Arrange
+ Scan scanWithoutLimit = prepareScan();
+ Scan scanWithLimit =
+ Scan.newBuilder(scanWithoutLimit).limit(5).build(); // Limit higher than available results
+ Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit);
+
+ Result result = prepareResult(TransactionState.COMMITTED);
+ Snapshot.Key key1 = new Snapshot.Key(scanWithLimit, result);
+ TransactionResult transactionResult1 = new TransactionResult(result);
+
+ // Set up mock scanner to return one result (less than limit)
+ if (scanType == ScanType.SCAN) {
+ when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator());
+ } else {
+ when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty());
+ }
+ when(storage.scan(scanForStorage)).thenReturn(scanner);
+
+ // Act
+ List results = scanOrGetScanner(scanWithLimit, scanType);
+
+ // Assert
+ assertThat(results).hasSize(1);
+ verify(scanner).close();
+ verify(snapshot).putIntoReadSet(key1, Optional.of(transactionResult1));
+ }
+
+ @ParameterizedTest
+ @EnumSource(ScanType.class)
+ void scanOrGetScanner_WithLimit_UncommittedResult_ShouldThrowUncommittedRecordException(
+ ScanType scanType) throws ExecutionException, IOException {
+ // Arrange
+ Scan scanWithoutLimit = prepareScan();
+ Scan scanWithLimit = Scan.newBuilder(scanWithoutLimit).limit(3).build();
+ Scan scanForStorage = toScanForStorageFrom(scanWithoutLimit);
+
+ Result uncommittedResult = prepareResult(ANY_TEXT_1, ANY_TEXT_3, TransactionState.PREPARED);
+
+ // Set up mock scanner to return one committed and one uncommitted result
+ if (scanType == ScanType.SCAN) {
+ when(scanner.iterator()).thenReturn(Collections.singletonList(uncommittedResult).iterator());
+ } else {
+ when(scanner.one()).thenReturn(Optional.of(uncommittedResult)).thenReturn(Optional.empty());
+ }
+ when(storage.scan(scanForStorage)).thenReturn(scanner);
+
+ // Act & Assert
+ assertThatThrownBy(() -> scanOrGetScanner(scanWithLimit, scanType))
+ .isInstanceOf(UncommittedRecordException.class);
+
+ verify(scanner).close();
+ verify(snapshot, never()).putIntoReadSet(any(), any());
+ verify(snapshot, never()).putIntoScanSet(any(), any());
+ }
+
@Test
public void
scan_RuntimeExceptionCausedByExecutionExceptionThrownByIteratorHasNext_ShouldThrowCrudException()
@@ -1207,7 +1335,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.putIfExists())
+ .condition(putIfExists())
.enableImplicitPreRead()
.build();
Snapshot.Key key = new Snapshot.Key(put);
@@ -1245,7 +1373,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.putIfExists())
+ .condition(putIfExists())
.enableImplicitPreRead()
.build();
Snapshot.Key key = new Snapshot.Key(put);
@@ -1285,7 +1413,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.putIfExists())
+ .condition(putIfExists())
.build();
Snapshot.Key key = new Snapshot.Key(put);
when(snapshot.containsKeyInReadSet(key)).thenReturn(true);
@@ -1321,7 +1449,7 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.putIfExists())
+ .condition(putIfExists())
.build();
// Act Assert
@@ -1361,7 +1489,7 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.deleteIfExists())
+ .condition(deleteIfExists())
.build();
Snapshot.Key key = new Snapshot.Key(delete);
when(snapshot.containsKeyInReadSet(key)).thenReturn(true);
@@ -1397,7 +1525,7 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp
.namespace("ns")
.table("tbl")
.partitionKey(Key.ofText("c1", "foo"))
- .condition(ConditionBuilder.deleteIfExists())
+ .condition(deleteIfExists())
.build();
Snapshot.Key key = new Snapshot.Key(delete);
when(snapshot.containsKeyInReadSet(key)).thenReturn(false);
@@ -1491,7 +1619,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
.namespace(key.getNamespace())
.table(key.getTable())
.partitionKey(key.getPartitionKey())
- .where(mock(ConditionalExpression.class))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_1))
.build();
when(snapshot.containsKeyInGetSet(getForKey)).thenReturn(false);
when(storage.get(any())).thenReturn(Optional.empty());
@@ -1500,7 +1628,15 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
handler.readUnread(key, getForKey);
// Assert
- verify(storage).get(any());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(key.getNamespace())
+ .table(key.getTable())
+ .partitionKey(key.getPartitionKey())
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_1))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_1))
+ .build());
verify(snapshot, never()).putIntoReadSet(key, Optional.empty());
verify(snapshot).putIntoGetSet(getForKey, Optional.empty());
}
@@ -1515,7 +1651,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
when(key.getTable()).thenReturn(ANY_TABLE_NAME);
when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1));
- Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
when(storage.get(any())).thenReturn(Optional.of(result));
@@ -1546,7 +1681,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
when(key.getTable()).thenReturn(ANY_TABLE_NAME);
when(key.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1));
- Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
when(storage.get(any())).thenReturn(Optional.of(result));
@@ -1598,7 +1732,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_CommittedRecordReturnedByStorage_ShouldCallAppropriateMethods()
throws CrudException, ExecutionException {
// Arrange
- Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
@@ -1628,7 +1761,6 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods()
readUnread_NullKeyAndGetWithIndexNotContainedInGetSet_UncommittedRecordReturnedByStorage_ShouldThrowUncommittedRecordException()
throws ExecutionException {
// Arrange
- Result result = mock(Result.class);
when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.PREPARED.get());
when(storage.get(any())).thenReturn(Optional.of(result));
@@ -1794,6 +1926,471 @@ public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods()
assertThat(transactionIdCaptor.getValue()).isEqualTo(ANY_TX_ID);
}
+ @Test
+ public void get_WithConjunctions_ShouldConvertConjunctions()
+ throws CrudException, ExecutionException {
+ // Arrange
+ when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
+ when(storage.get(any())).thenReturn(Optional.of(result));
+
+ // Act
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .build());
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isEqualToInt(10))
+ .build());
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_4).isEqualToInt(20))
+ .build());
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(
+ condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(ANY_NAME_4).isGreaterThanInt(30))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .build());
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .build())
+ .and(
+ condition(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .or(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .build());
+ handler.get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .build());
+
+ // Assert
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isEqualToInt(10))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(10))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_4).isEqualToInt(20))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(20))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(
+ condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(ANY_NAME_4).isGreaterThanInt(30))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3)
+ .isNotEqualToText(ANY_TEXT_3))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3)
+ .isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(30))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .get(
+ Get.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2))
+ .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ }
+
+ @Test
+ public void scan_WithConjunctions_ShouldConvertConjunctions()
+ throws CrudException, ExecutionException {
+ // Arrange
+ when(result.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get());
+ when(result.getPartitionKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_1, ANY_TEXT_1)));
+ when(result.getClusteringKey()).thenReturn(Optional.of(Key.ofText(ANY_NAME_2, ANY_TEXT_2)));
+ when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator());
+ when(storage.scan(any())).thenReturn(scanner);
+
+ // Act
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isEqualToInt(10))
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_4).isEqualToInt(20))
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(
+ condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(ANY_NAME_4).isGreaterThanInt(30))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .build())
+ .and(
+ condition(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .or(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .all()
+ .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3))
+ .and(column(ANY_NAME_2).isLessThanOrEqualToText(ANY_TEXT_4))
+ .build());
+ handler.scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .all()
+ .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .build());
+
+ // Assert
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isEqualToInt(10))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(10))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(ANY_NAME_4).isEqualToInt(20))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isEqualToInt(20))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(
+ condition(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(ANY_NAME_4).isGreaterThanInt(30))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3)
+ .isNotEqualToText(ANY_TEXT_3))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3)
+ .isNotEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(30))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(40))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_3))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isLessThanOrEqualToInt(50))
+ .build())
+ .or(
+ condition(
+ column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_4).isGreaterThanInt(60))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .partitionKey(Key.ofText(ANY_NAME_1, ANY_TEXT_1))
+ .where(column(ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_3))
+ .or(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isLikeText(ANY_TEXT_4))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .all()
+ .where(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3))
+ .and(column(ANY_NAME_2).isLessThanOrEqualToText(ANY_TEXT_4))
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ verify(storage)
+ .scan(
+ Scan.newBuilder()
+ .namespace(ANY_NAMESPACE_NAME)
+ .table(ANY_TABLE_NAME)
+ .all()
+ .where(
+ condition(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3))
+ .and(column(ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .build())
+ .or(
+ condition(column(ANY_NAME_1).isGreaterThanText(ANY_TEXT_3))
+ .and(column(Attribute.BEFORE_PREFIX + ANY_NAME_3).isEqualToText(ANY_TEXT_4))
+ .build())
+ .projections(TRANSACTION_TABLE_METADATA.getAfterImageColumnNames())
+ .consistency(Consistency.LINEARIZABLE)
+ .build());
+ }
+
private List scanOrGetScanner(Scan scan, ScanType scanType) throws CrudException {
if (scanType == ScanType.SCAN) {
return handler.scan(scan);
diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java
index 13ca8389c2..3624886cad 100644
--- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java
+++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java
@@ -1,5 +1,6 @@
package com.scalar.db.transaction.consensuscommit;
+import static com.scalar.db.api.ConditionBuilder.column;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -18,7 +19,6 @@
import static org.mockito.Mockito.verify;
import com.google.common.collect.Sets;
-import com.scalar.db.api.ConditionBuilder;
import com.scalar.db.api.Consistency;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedStorage;
@@ -43,8 +43,10 @@
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CommitException;
import com.scalar.db.exception.transaction.CrudConflictException;
+import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.PreparationConflictException;
import com.scalar.db.exception.transaction.TransactionException;
+import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.io.DataType;
import com.scalar.db.io.IntValue;
import com.scalar.db.io.Key;
@@ -62,6 +64,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.junit.jupiter.api.AfterAll;
@@ -2473,7 +2476,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR
transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1));
Get get =
Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(1))
+ .where(column(BALANCE).isEqualToInt(1))
.build();
Optional result = transaction.get(get);
assertThatCode(transaction::commit).doesNotThrowAnyException();
@@ -2494,7 +2497,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR
transaction.put(preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1));
Get get =
Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(0))
+ .where(column(BALANCE).isEqualToInt(0))
.build();
Optional result = transaction.get(get);
assertThatCode(transaction::commit).doesNotThrowAnyException();
@@ -2513,7 +2516,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR
Put put = preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1);
Get get =
Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE))
+ .where(column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE))
.build();
// Act
@@ -2536,7 +2539,7 @@ public void get_PutThenGetWithoutConjunctionReturnEmptyFromStorage_ShouldReturnR
Put put = preparePut(0, 0, namespace1, TABLE_1).withValue(BALANCE, 1);
Get get =
Get.newBuilder(prepareGet(0, 0, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(0))
+ .where(column(BALANCE).isEqualToInt(0))
.build();
// Act
@@ -2658,7 +2661,7 @@ public void scan_NonOverlappingPutGivenBefore_ShouldScan() throws TransactionExc
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.start(Key.ofInt(ACCOUNT_TYPE, 0))
- .where(ConditionBuilder.column(BALANCE).isNotEqualToInt(1))
+ .where(column(BALANCE).isNotEqualToInt(1))
.build();
// Act
@@ -2679,7 +2682,7 @@ public void scan_NonOverlappingPutGivenBefore_ShouldScan() throws TransactionExc
transaction.put(preparePut(0, 1, namespace1, TABLE_1).withValue(BALANCE, 9999));
Scan scan =
Scan.newBuilder(prepareScan(0, 1, 1, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE))
+ .where(column(BALANCE).isLessThanOrEqualToInt(INITIAL_BALANCE))
.build();
// Act
@@ -2701,8 +2704,8 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg
.withValue(SOME_COLUMN, "aaa"));
Scan scan =
Scan.newBuilder(prepareScan(0, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isLessThanInt(1000))
- .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa"))
+ .where(column(BALANCE).isLessThanInt(1000))
+ .and(column(SOME_COLUMN).isEqualToText("aaa"))
.build();
// Act
@@ -2726,7 +2729,7 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg
.build());
Scan scan =
Scan.newBuilder(prepareScanWithIndex(namespace1, TABLE_1, 1))
- .where(ConditionBuilder.column(SOME_COLUMN).isGreaterThanText("aaa"))
+ .where(column(SOME_COLUMN).isGreaterThanText("aaa"))
.build();
// Act
@@ -2808,8 +2811,8 @@ public void scan_OverlappingPutWithConjunctionsGivenBefore_ShouldThrowIllegalArg
.withValue(SOME_COLUMN, "aaa"));
Scan scan =
Scan.newBuilder(prepareScanWithIndex(namespace1, TABLE_1, 999))
- .where(ConditionBuilder.column(BALANCE).isLessThanInt(1000))
- .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa"))
+ .where(column(BALANCE).isLessThanInt(1000))
+ .and(column(SOME_COLUMN).isEqualToText("aaa"))
.build();
// Act
@@ -3139,8 +3142,8 @@ public void get_GetWithMatchedConjunctionsGivenForCommittedRecord_ShouldReturnRe
DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE);
Get get =
Get.newBuilder(prepareGet(1, 1, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(INITIAL_BALANCE))
- .and(ConditionBuilder.column(SOME_COLUMN).isNullText())
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .and(column(SOME_COLUMN).isNullText())
.build();
// Act
@@ -3163,8 +3166,8 @@ public void get_GetWithUnmatchedConjunctionsGivenForCommittedRecord_ShouldReturn
DistributedTransaction transaction = manager.begin(Isolation.SERIALIZABLE);
Get get =
Get.newBuilder(prepareGet(1, 1, namespace1, TABLE_1))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(INITIAL_BALANCE))
- .and(ConditionBuilder.column(SOME_COLUMN).isEqualToText("aaa"))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .and(column(SOME_COLUMN).isEqualToText("aaa"))
.build();
// Act
@@ -3208,7 +3211,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.start(Key.ofInt(ACCOUNT_TYPE, 0))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(1))
+ .where(column(BALANCE).isEqualToInt(1))
.build();
List result1 = transaction1.scan(scan);
@@ -3243,7 +3246,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.start(Key.ofInt(ACCOUNT_TYPE, 0))
- .where(ConditionBuilder.column(BALANCE).isEqualToInt(1))
+ .where(column(BALANCE).isEqualToInt(1))
.build();
Scan scan2 =
Scan.newBuilder()
@@ -3251,7 +3254,7 @@ public void scan_CalledTwiceWithSameConditionsAndUpdateForHappenedInBetween_Shou
.table(TABLE_1)
.partitionKey(Key.ofInt(ACCOUNT_ID, 0))
.start(Key.ofInt(ACCOUNT_TYPE, 0))
- .where(ConditionBuilder.column(BALANCE).isGreaterThanInt(1))
+ .where(column(BALANCE).isGreaterThanInt(1))
.build();
List result1 = transaction1.scan(scan1);
@@ -5453,6 +5456,623 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio
assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class);
}
+ @Test
+ public void
+ get_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.insert(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build());
+
+ // Create a prepared record without before image
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+
+ // Act Assert
+ Optional actual;
+ while (true) {
+ try {
+ actual =
+ manager.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build());
+ break;
+ } catch (CrudConflictException e) {
+ // Retry on conflict
+ }
+ }
+
+ assertThat(actual).isPresent();
+ assertThat(actual.get().getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(actual.get().getInt(ACCOUNT_TYPE)).isEqualTo(0);
+ assertThat(actual.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ get_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.insert(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build());
+
+ // Create a committed record with before image to simulate an old committed record that has both
+ // after and before images
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+ originalStorage.put(
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(Attribute.STATE, TransactionState.COMMITTED.get())
+ .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis())
+ .build());
+
+ // Act Assert
+ Optional actual =
+ manager.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build());
+
+ assertThat(actual).isNotPresent();
+ }
+
+ @Test
+ public void
+ scan_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a prepared record without before image
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+
+ // Act Assert
+ List results;
+ while (true) {
+ try {
+ results =
+ manager.scan(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build());
+ break;
+ } catch (CrudConflictException e) {
+ // Retry on conflict
+ }
+ }
+
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ scan_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a committed record with before image to simulate an old committed record that has both
+ // after and before images
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+ originalStorage.put(
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(Attribute.STATE, TransactionState.COMMITTED.get())
+ .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis())
+ .build());
+
+ // Act Assert
+ List results =
+ manager.scan(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build());
+
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ scan_WithConjunctionAndLimit_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 3))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a committed record with before image to simulate an old committed record that has both
+ // after and before images
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+ originalStorage.put(
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(Attribute.STATE, TransactionState.COMMITTED.get())
+ .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis())
+ .build());
+
+ // Act Assert
+ List results =
+ manager.scan(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .limit(2)
+ .build());
+
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(2);
+ assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ getScanner_WithConjunction_ForPreparedRecordWhoseBeforeImageMatchesConjunction_ShouldReturnRecordAfterLazyRecovery()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a prepared record without before image
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+
+ // Act Assert
+ List results;
+ while (true) {
+ try (TransactionManagerCrudOperable.Scanner scanner =
+ manager.getScanner(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build())) {
+ results = scanner.all();
+ break;
+ } catch (CrudConflictException e) {
+ // Retry on conflict
+ }
+ }
+
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ getScanner_WithConjunction_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a committed record with before image to simulate an old committed record that has both
+ // after and before images
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+ originalStorage.put(
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(Attribute.STATE, TransactionState.COMMITTED.get())
+ .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis())
+ .build());
+
+ // Act Assert
+ List results;
+ try (TransactionManagerCrudOperable.Scanner scanner =
+ manager.getScanner(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .build())) {
+ results = scanner.all();
+ }
+
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
+ @Test
+ public void
+ getScanner_WithConjunctionAndLimit_ForCommittedRecordWhoseBeforeImageMatchesConjunction_ShouldNotReturnRecord()
+ throws UnknownTransactionStatusException, CrudException, ExecutionException {
+ // Arrange
+ manager.mutate(
+ Arrays.asList(
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build(),
+ Insert.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 3))
+ .intValue(BALANCE, INITIAL_BALANCE)
+ .build()));
+
+ // Create a committed record with before image to simulate an old committed record that has both
+ // after and before images
+ Optional result =
+ originalStorage.get(
+ Get.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .build());
+ Put put =
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(BALANCE, 100)
+ .build();
+ String transactionId = UUID.randomUUID().toString();
+ PrepareMutationComposer prepareMutationComposer =
+ new PrepareMutationComposer(
+ transactionId,
+ System.currentTimeMillis() - (RecoveryHandler.TRANSACTION_LIFETIME_MILLIS + 1),
+ new TransactionTableMetadataManager(admin, 0));
+ prepareMutationComposer.add(put, result.map(TransactionResult::new).orElse(null));
+ originalStorage.mutate(prepareMutationComposer.get());
+ originalStorage.put(
+ Put.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0))
+ .intValue(Attribute.STATE, TransactionState.COMMITTED.get())
+ .bigIntValue(Attribute.COMMITTED_AT, System.currentTimeMillis())
+ .build());
+
+ // Act Assert
+ List results;
+ try (TransactionManagerCrudOperable.Scanner scanner =
+ manager.getScanner(
+ Scan.newBuilder()
+ .namespace(namespace1)
+ .table(TABLE_1)
+ .partitionKey(Key.ofInt(ACCOUNT_ID, 0))
+ .where(column(BALANCE).isEqualToInt(INITIAL_BALANCE))
+ .limit(2)
+ .build())) {
+ results = scanner.all();
+ }
+
+ assertThat(results).hasSize(2);
+ assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(1);
+ assertThat(results.get(0).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0);
+ assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(2);
+ assertThat(results.get(1).getInt(BALANCE)).isEqualTo(INITIAL_BALANCE);
+ }
+
@Test
public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord()
throws TransactionException {