diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 1657611990..d9469f4d5c 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -144,7 +144,7 @@ public DistributedTransaction begin() { @Override public DistributedTransaction begin(String txId) { - return begin(txId, config.getIsolation(), false); + return begin(txId, config.getIsolation(), false, false); } @Override @@ -155,7 +155,7 @@ public DistributedTransaction beginReadOnly() { @Override public DistributedTransaction beginReadOnly(String txId) { - return begin(txId, config.getIsolation(), true); + return begin(txId, config.getIsolation(), true, false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -169,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) { @Deprecated @Override public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) { - return begin(txId, Isolation.valueOf(isolation.name()), false); + return begin(txId, Isolation.valueOf(isolation.name()), false, false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -192,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat @Override public DistributedTransaction start( String txId, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.SERIALIZABLE, false); + return begin(txId, Isolation.SERIALIZABLE, false, false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -202,23 +202,24 @@ public DistributedTransaction start( String txId, com.scalar.db.api.Isolation isolation, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.valueOf(isolation.name()), false); + return begin(txId, Isolation.valueOf(isolation.name()), false, false); } @VisibleForTesting DistributedTransaction begin(Isolation isolation) { String txId = UUID.randomUUID().toString(); - return begin(txId, isolation, false); + return begin(txId, isolation, false, false); } @VisibleForTesting DistributedTransaction beginReadOnly(Isolation isolation) { String txId = UUID.randomUUID().toString(); - return begin(txId, isolation, true); + return begin(txId, isolation, true, false); } @VisibleForTesting - DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) { + DistributedTransaction begin( + String txId, Isolation isolation, boolean readOnly, boolean oneOperation) { checkArgument(!Strings.isNullOrEmpty(txId)); checkNotNull(isolation); if (isGroupCommitEnabled()) { @@ -238,7 +239,8 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor, - readOnly); + readOnly, + oneOperation); DistributedTransaction transaction = new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter); if (readOnly) { @@ -249,6 +251,11 @@ DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) return transaction; } + private DistributedTransaction beginOneOperation(boolean readOnly) { + String txId = UUID.randomUUID().toString(); + return begin(txId, config.getIsolation(), readOnly, true); + } + @Override public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true); @@ -261,7 +268,7 @@ public List scan(Scan scan) throws CrudException, UnknownTransactionStat @Override public Scanner getScanner(Scan scan) throws CrudException { - DistributedTransaction transaction = beginReadOnly(); + DistributedTransaction transaction = beginOneOperation(true); TransactionCrudOperable.Scanner scanner; try { @@ -431,12 +438,7 @@ private R executeTransaction( ThrowableFunction throwableFunction, boolean readOnly) throws CrudException, UnknownTransactionStatusException { - DistributedTransaction transaction; - if (readOnly) { - transaction = beginReadOnly(); - } else { - transaction = begin(); - } + DistributedTransaction transaction = beginOneOperation(readOnly); try { R result = throwableFunction.apply(transaction); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 73d420215f..76b44fb626 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -51,6 +51,11 @@ public class CrudHandler { // Whether the transaction is in read-only mode or not. private final boolean readOnly; + // Whether the transaction is in one-operation mode or not. One-operation mode refers to executing + // a CRUD operation directly through `DistributedTransactionManager` without explicitly beginning + // a transaction. + private final boolean oneOperation; + private final List scanners = new ArrayList<>(); @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -60,7 +65,8 @@ public CrudHandler( TransactionTableMetadataManager tableMetadataManager, boolean isIncludeMetadataEnabled, ParallelExecutor parallelExecutor, - boolean readOnly) { + boolean readOnly, + boolean oneOperation) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; @@ -68,6 +74,7 @@ public CrudHandler( this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId()); this.parallelExecutor = parallelExecutor; this.readOnly = readOnly; + this.oneOperation = oneOperation; } @VisibleForTesting @@ -78,7 +85,8 @@ public CrudHandler( boolean isIncludeMetadataEnabled, MutationConditionsValidator mutationConditionsValidator, ParallelExecutor parallelExecutor, - boolean readOnly) { + boolean readOnly, + boolean oneOperation) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; @@ -86,6 +94,7 @@ public CrudHandler( this.mutationConditionsValidator = mutationConditionsValidator; this.parallelExecutor = parallelExecutor; this.readOnly = readOnly; + this.oneOperation = oneOperation; } public Optional get(Get originalGet) throws CrudException { @@ -102,11 +111,17 @@ public Optional get(Get originalGet) throws CrudException { key = new Snapshot.Key(get); } - readUnread(key, get); - - return snapshot - .getResult(key, get) - .map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled)); + if (isSnapshotReadRequired()) { + readUnread(key, get); + return snapshot + .getResult(key, get) + .map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled)); + } else { + Optional result = read(key, get); + return snapshot + .mergeResult(key, result, get.getConjunctions()) + .map(r -> new FilteredResult(r, originalProjections, metadata, isIncludeMetadataEnabled)); + } } // Only for a Get with index, the argument `key` is null @@ -120,7 +135,7 @@ void readUnread(@Nullable Snapshot.Key key, Get get) throws CrudException { // Although this class is not thread-safe, this method is actually thread-safe, so we call it // concurrently in the implicit pre-read @VisibleForTesting - void read(@Nullable Snapshot.Key key, Get get) throws CrudException { + Optional read(@Nullable Snapshot.Key key, Get get) throws CrudException { Optional result = getFromStorage(get); if (!result.isPresent() || result.get().isCommitted()) { if (result.isPresent() || get.getConjunctions().isEmpty()) { @@ -142,8 +157,8 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException { } } } - snapshot.putIntoGetSet(get, result); - return; + putIntoGetSetInSnapshot(get, result); + return result; } throw new UncommittedRecordException( get, @@ -204,7 +219,7 @@ private LinkedHashMap scanInternal(Scan scan) } } - snapshot.putIntoScanSet(scan, results); + putIntoScanSetInSnapshot(scan, results); return results; } @@ -263,9 +278,43 @@ private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional result) { + // If neither validation nor snapshot read is required, we don't need to put the result into + // the get set + if (isValidationOrSnapshotReadRequired()) { + snapshot.putIntoGetSet(get, result); + } + } + + private void putIntoScanSetInSnapshot( + Scan scan, LinkedHashMap results) { + // If neither validation nor snapshot read is required, we don't need to put the results into + // the scan set + if (isValidationOrSnapshotReadRequired()) { + snapshot.putIntoScanSet(scan, results); + } + } + + private void putIntoScannerSetInSnapshot( + Scan scan, LinkedHashMap results) { + // if validation is not required, we don't need to put the results into the scanner set + if (snapshot.isValidationRequired()) { + snapshot.putIntoScannerSet(scan, results); + } + } + private void verifyNoOverlap(Scan scan, Map results) { - // In read-only mode, we don't need to verify the overlap - if (!readOnly) { + // In either read-only mode or one-operation mode, we don't need to verify the overlap + if (!readOnly && !oneOperation) { snapshot.verifyNoOverlap(scan, results); } } @@ -432,7 +481,7 @@ private class ConsensusCommitStorageScanner extends AbstractTransactionCrudOpera private final List originalProjections; private final Scanner scanner; - private final LinkedHashMap results = new LinkedHashMap<>(); + @Nullable private final LinkedHashMap results; private final AtomicBoolean fullyScanned = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -441,6 +490,14 @@ public ConsensusCommitStorageScanner(Scan scan, List originalProjections this.scan = scan; this.originalProjections = originalProjections; scanner = scanFromStorage(scan); + + if (isValidationOrSnapshotReadRequired()) { + results = new LinkedHashMap<>(); + } else { + // If neither validation nor snapshot read is required, we don't need to put the results + // into the scan set + results = null; + } } @Override @@ -456,7 +513,10 @@ public Optional one() throws CrudException { Snapshot.Key key = new Snapshot.Key(scan, r.get()); TransactionResult result = new TransactionResult(r.get()); processScanResult(key, scan, result); - results.put(key, result); + + if (results != null) { + results.put(key, result); + } TableMetadata metadata = getTableMetadata(scan); return Optional.of( @@ -499,10 +559,10 @@ public void close() { if (fullyScanned.get()) { // If the scanner is fully scanned, we can treat it as a normal scan, and put the results // into the scan set - snapshot.putIntoScanSet(scan, results); + putIntoScanSetInSnapshot(scan, results); } else { // If the scanner is not fully scanned, put the results into the scanner set - snapshot.putIntoScannerSet(scan, results); + putIntoScannerSetInSnapshot(scan, results); } verifyNoOverlap(scan, results); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index ff0ac712f3..f9691249c2 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -249,18 +249,22 @@ private Optional mergeResult(Key key, Optional mergeResult( + public Optional mergeResult( Key key, Optional result, Set conjunctions) throws CrudException { - return mergeResult(key, result) - .filter( - r -> - // We need to apply conditions if it is a merged result because the transaction’s - // write makes the record no longer match the conditions. Of course, we can just - // return the result without checking the condition if there is no condition. - !r.isMergedResult() - || conjunctions.isEmpty() - || ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions)); + Optional ret = mergeResult(key, result); + + if (conjunctions.isEmpty()) { + // We can just return the result without checking the condition if there is no condition. + return ret; + } + + return ret.filter( + r -> + // We need to apply conditions if it is a merged result because the transaction’s write + // makes the record no longer match the conditions. + !r.isMergedResult() + || ScalarDbUtils.columnsMatchAnyOfConjunctions(r.getColumns(), conjunctions)); } private TableMetadata getTableMetadata(Key key) throws CrudException { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 35d8d74aa9..d606784b2a 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -148,10 +148,9 @@ TwoPhaseCommitTransaction begin(Isolation isolation) { return begin(txId, isolation); } - @VisibleForTesting - TwoPhaseCommitTransaction begin(String txId, Isolation isolation) { + private TwoPhaseCommitTransaction begin(String txId, Isolation isolation) { throwIfGroupCommitIsEnabled(); - return createNewTransaction(txId, isolation); + return begin(txId, isolation, false, false); } @Override @@ -163,10 +162,12 @@ public TwoPhaseCommitTransaction join(String txId) { @VisibleForTesting TwoPhaseCommitTransaction join(String txId, Isolation isolation) { throwIfGroupCommitIsEnabled(); - return createNewTransaction(txId, isolation); + return begin(txId, isolation, false, false); } - private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation isolation) { + @VisibleForTesting + TwoPhaseCommitTransaction begin( + String txId, Isolation isolation, boolean readOnly, boolean oneOperation) { Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor); CrudHandler crud = new CrudHandler( @@ -175,8 +176,8 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor, - false); - + readOnly, + oneOperation); TwoPhaseConsensusCommit transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker); getNamespace().ifPresent(transaction::withNamespace); @@ -184,19 +185,24 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is return transaction; } + private TwoPhaseCommitTransaction beginOneOperation(boolean readOnly) { + String txId = UUID.randomUUID().toString(); + return begin(txId, config.getIsolation(), readOnly, true); + } + @Override public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true); } @Override public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true); } @Override public Scanner getScanner(Scan scan) throws CrudException { - TwoPhaseCommitTransaction transaction = begin(); + TwoPhaseCommitTransaction transaction = beginOneOperation(true); TransactionCrudOperable.Scanner scanner; try { @@ -287,7 +293,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException t -> { t.put(copyAndSetTargetToIfNot(put)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -298,7 +305,8 @@ public void put(List puts) throws CrudException, UnknownTransactionStatusEx t -> { t.put(copyAndSetTargetToIfNot(puts)); return null; - }); + }, + false); } @Override @@ -307,7 +315,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus t -> { t.insert(copyAndSetTargetToIfNot(insert)); return null; - }); + }, + false); } @Override @@ -316,7 +325,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus t -> { t.upsert(copyAndSetTargetToIfNot(upsert)); return null; - }); + }, + false); } @Override @@ -325,7 +335,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus t -> { t.update(copyAndSetTargetToIfNot(update)); return null; - }); + }, + false); } @Override @@ -334,7 +345,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus t -> { t.delete(copyAndSetTargetToIfNot(delete)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -345,7 +357,8 @@ public void delete(List deletes) throws CrudException, UnknownTransactio t -> { t.delete(copyAndSetTargetToIfNot(deletes)); return null; - }); + }, + false); } @Override @@ -355,13 +368,15 @@ public void mutate(List mutations) t -> { t.mutate(copyAndSetTargetToIfNot(mutations)); return null; - }); + }, + false); } private R executeTransaction( - ThrowableFunction throwableFunction) + ThrowableFunction throwableFunction, + boolean readOnly) throws CrudException, UnknownTransactionStatusException { - TwoPhaseCommitTransaction transaction = begin(); + TwoPhaseCommitTransaction transaction = beginOneOperation(readOnly); try { R result = throwableFunction.apply(transaction); transaction.prepare(); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index e1d61e8253..a056ac7de1 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -186,7 +186,7 @@ public void begin_CalledTwiceWithSameTxId_ThrowTransactionException() DistributedTransaction transaction = spied.beginReadOnly(); // Assert - verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(false)); assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); } @@ -200,7 +200,7 @@ public void beginReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationI DistributedTransaction transaction = spied.beginReadOnly(ANY_TX_ID); // Assert - verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true), eq(false)); assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); } @@ -300,7 +300,7 @@ public void start_CalledTwiceWithSameTxId_ThrowTransactionException() DistributedTransaction transaction = spied.startReadOnly(); // Assert - verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(false)); assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); } @@ -315,7 +315,7 @@ public void startReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationI DistributedTransaction transaction = spied.startReadOnly(ANY_TX_ID); // Assert - verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true), eq(false)); assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); } @@ -603,7 +603,9 @@ public void get_ShouldGet() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -615,7 +617,7 @@ public void get_ShouldGet() throws TransactionException { Optional actual = spied.get(get); // Assert - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).commit(); assertThat(actual).isEqualTo(Optional.of(result)); @@ -628,7 +630,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -637,7 +641,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -650,7 +654,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -659,7 +665,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -672,7 +678,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -681,7 +689,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).commit(); } @@ -693,7 +701,9 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -702,7 +712,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).commit(); } @@ -713,7 +723,9 @@ public void scan_ShouldScan() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -726,7 +738,7 @@ public void scan_ShouldScan() throws TransactionException { List actual = spied.scan(scan); // Assert - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).scan(scan); verify(transaction).commit(); assertThat(actual).isEqualTo(results); @@ -738,7 +750,9 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -768,7 +782,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t assertThat(actual.one()).isEmpty(); actual.close(); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).commit(); verify(scanner).close(); } @@ -779,7 +793,9 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -806,7 +822,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() assertThat(actual.all()).isEmpty(); actual.close(); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).commit(); verify(scanner).close(); } @@ -818,7 +834,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -853,7 +871,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul assertThat(iterator.hasNext()).isFalse(); actual.close(); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).commit(); verify(scanner).close(); } @@ -866,7 +884,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -880,7 +900,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).rollback(); } @@ -892,7 +912,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -910,7 +932,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -922,7 +944,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -940,7 +964,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -952,7 +976,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -970,7 +996,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -983,7 +1009,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(CommitConflictException.class).when(transaction).commit(); Scan scan = @@ -1000,7 +1028,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -1013,7 +1041,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); Scan scan = @@ -1030,7 +1060,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); } @@ -1042,7 +1072,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).beginReadOnly(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(CommitException.class).when(transaction).commit(); Scan scan = @@ -1059,7 +1091,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).beginReadOnly(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -1070,7 +1102,9 @@ public void put_ShouldPut() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Put put = Put.newBuilder() @@ -1084,7 +1118,7 @@ public void put_ShouldPut() throws TransactionException { spied.put(put); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).put(put); verify(transaction).commit(); } @@ -1095,7 +1129,9 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List puts = Arrays.asList( @@ -1122,7 +1158,7 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { spied.put(puts); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).put(puts); verify(transaction).commit(); } @@ -1133,7 +1169,9 @@ public void insert_ShouldInsert() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Insert insert = Insert.newBuilder() @@ -1147,7 +1185,7 @@ public void insert_ShouldInsert() throws TransactionException { spied.insert(insert); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).insert(insert); verify(transaction).commit(); } @@ -1158,7 +1196,9 @@ public void upsert_ShouldUpsert() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Upsert upsert = Upsert.newBuilder() @@ -1172,7 +1212,7 @@ public void upsert_ShouldUpsert() throws TransactionException { spied.upsert(upsert); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).upsert(upsert); verify(transaction).commit(); } @@ -1183,7 +1223,9 @@ public void update_ShouldUpdate() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Update update = Update.newBuilder() @@ -1197,7 +1239,7 @@ public void update_ShouldUpdate() throws TransactionException { spied.update(update); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).update(update); verify(transaction).commit(); } @@ -1208,7 +1250,9 @@ public void delete_ShouldDelete() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Delete delete = Delete.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1217,7 +1261,7 @@ public void delete_ShouldDelete() throws TransactionException { spied.delete(delete); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).delete(delete); verify(transaction).commit(); } @@ -1228,7 +1272,9 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List deletes = Arrays.asList( @@ -1252,7 +1298,7 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti spied.delete(deletes); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).delete(deletes); verify(transaction).commit(); } @@ -1263,7 +1309,9 @@ public void mutate_ShouldMutate() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List mutations = Arrays.asList( @@ -1301,7 +1349,7 @@ public void mutate_ShouldMutate() throws TransactionException { spied.mutate(mutations); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).mutate(mutations); verify(transaction).commit(); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index e524942791..b9c1ca8eba 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -23,12 +23,14 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; import com.scalar.db.api.Scanner; +import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.common.ResultImpl; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CrudException; +import com.scalar.db.exception.transaction.ValidationConflictException; import com.scalar.db.io.Column; import com.scalar.db.io.DataType; import com.scalar.db.io.Key; @@ -64,6 +66,8 @@ public class CrudHandlerTest { private static final String ANY_TEXT_1 = "text1"; private static final String ANY_TEXT_2 = "text2"; private static final String ANY_TEXT_3 = "text3"; + private static final String ANY_TEXT_4 = "text4"; + private static final String ANY_TEXT_5 = "text5"; private static final String ANY_TX_ID = "tx_id"; private static final TableMetadata TABLE_METADATA = @@ -99,6 +103,7 @@ public void setUp() throws Exception { false, mutationConditionsValidator, parallelExecutor, + false, false); // Arrange @@ -226,7 +231,8 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept false, mutationConditionsValidator, parallelExecutor, - true); + true, + false); Get get = prepareGet(); Get getForStorage = toGetForStorageFrom(get); @@ -251,6 +257,131 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); } + @Test + public void + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_InOneOperationMode_ValidationNotRequired_ShouldReturnFromStorageAndUpdateSnapshot() + throws CrudException, ExecutionException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(false); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.mergeResult(key, transactionResult, getForStorage.getConjunctions())) + .thenReturn(transactionResult); + + // Act + Optional result = handler.get(get); + + // Assert + assertThat(result) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + verify(storage).get(getForStorage); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoGetSet(any(), any()); + } + + @Test + public void + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_InOneOperationMode_ValidationRequired_ShouldReturnFromStorageAndUpdateSnapshot() + throws CrudException, ExecutionException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(true); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.mergeResult(key, transactionResult, getForStorage.getConjunctions())) + .thenReturn(transactionResult); + + // Act + Optional result = handler.get(get); + + // Assert + assertThat(result) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + verify(storage).get(getForStorage); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); + } + + @Test + public void + get_GetWithConjunction_GetNotExistsInSnapshotAndRecordInStorageCommitted_InOneOperationMode_ValidationRequired_ShouldReturnFromStorageAndUpdateSnapshot() + throws CrudException, ExecutionException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(true); + + ConditionalExpression condition = mock(ConditionalExpression.class); + Get get = Get.newBuilder(prepareGet()).where(condition).build(); + Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.mergeResult( + key, transactionResult, Collections.singleton(Selection.Conjunction.of(condition)))) + .thenReturn(transactionResult); + + // Act + Optional result = handler.get(get); + + // Assert + assertThat(result) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + verify(storage).get(getForStorage); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); + } + @Test public void get_GetNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() @@ -351,7 +482,8 @@ public void get_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSn Optional expected = Optional.of(new TransactionResult(result)); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); handler = - new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); + new CrudHandler( + storage, snapshot, tableMetadataManager, false, parallelExecutor, false, false); when(storage.get(getForStorage)).thenReturn(Optional.of(result)); // Act @@ -433,7 +565,93 @@ void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshot false, mutationConditionsValidator, parallelExecutor, + true, + false); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + result = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key = new Snapshot.Key(scan, result); + TransactionResult expected = new TransactionResult(result); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); + + // Act + List results = scanOrGetScanner(scan, scanType); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoScanSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key, expected))); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)) + .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + } + + @ParameterizedTest + @EnumSource(ScanType.class) + void + scanOrGetScanner_ResultGivenFromStorage_InOneOperationMode_ValidationNotRequired_ShouldUpdateSnapshotAndReturn( + ScanType scanType) throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, true); + when(snapshot.isValidationRequired()).thenReturn(false); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + result = prepareResult(TransactionState.COMMITTED); + TransactionResult expected = new TransactionResult(result); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); + + // Act + List results = scanOrGetScanner(scan, scanType); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoScanSet(any(), any()); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)) + .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + } + + @ParameterizedTest + @EnumSource(ScanType.class) + void + scanOrGetScanner_ResultGivenFromStorage_InOneOperationMode_ValidationRequired_ShouldUpdateSnapshotAndReturn( + ScanType scanType) throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(true); Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); @@ -545,7 +763,8 @@ void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); handler = - new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); + new CrudHandler( + storage, snapshot, tableMetadataManager, false, parallelExecutor, false, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -613,7 +832,8 @@ void scanOrGetScanner_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorag result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); handler = - new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); + new CrudHandler( + storage, snapshot, tableMetadataManager, false, parallelExecutor, false, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -679,7 +899,8 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum deleteSet, new ArrayList<>()); handler = - new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); + new CrudHandler( + storage, snapshot, tableMetadataManager, false, parallelExecutor, false, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); } else { @@ -842,6 +1063,8 @@ public void getScanner_ExecutionExceptionThrownByScannerOne_ShouldThrowCrudExcep getScanner_ScannerNotFullyScanned_ShouldPutReadSetAndScannerSetInSnapshotAndVerifyScan() throws ExecutionException, CrudException, IOException { // Arrange + when(snapshot.isValidationRequired()).thenReturn(true); + Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); Result result1 = prepareResult(TransactionState.COMMITTED); @@ -870,6 +1093,92 @@ public void getScanner_ExecutionExceptionThrownByScannerOne_ShouldThrowCrudExcep .hasValue(new FilteredResult(txResult1, Collections.emptyList(), TABLE_METADATA, false)); } + @Test + public void + getScanner_ScannerNotFullyScanned_InOneOperationMode_ValidationNotRequired_ShouldUpdateSnapshotProperly() + throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(false); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + Result result1 = prepareResult(TransactionState.COMMITTED); + Result result2 = prepareResult(TransactionState.COMMITTED); + TransactionResult txResult1 = new TransactionResult(result1); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = handler.getScanner(scan); + Optional actualResult = actualScanner.one(); + actualScanner.close(); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot, never()).putIntoScannerSet(any(), any()); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + + assertThat(actualResult) + .hasValue(new FilteredResult(txResult1, Collections.emptyList(), TABLE_METADATA, false)); + } + + @Test + public void + getScanner_ScannerNotFullyScanned_InOneOperationMode_ValidationRequired_ShouldUpdateSnapshotProperly() + throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true, + true); + when(snapshot.isValidationRequired()).thenReturn(true); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + Result result1 = prepareResult(TransactionState.COMMITTED); + Result result2 = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key1 = new Snapshot.Key(scan, result1); + TransactionResult txResult1 = new TransactionResult(result1); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.empty()); + when(storage.scan(scanForStorage)).thenReturn(scanner); + + // Act + TransactionCrudOperable.Scanner actualScanner = handler.getScanner(scan); + Optional actualResult = actualScanner.one(); + actualScanner.close(); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot) + .putIntoScannerSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key1, txResult1))); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + + assertThat(actualResult) + .hasValue(new FilteredResult(txResult1, Collections.emptyList(), TABLE_METADATA, false)); + } + @Test public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws CrudException { // Arrange @@ -1344,41 +1653,105 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() } @Test - public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws CrudException { + public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() + throws CrudException, ExecutionException, ValidationConflictException { // Arrange + Key partitionKey1 = Key.ofText(ANY_NAME_1, ANY_TEXT_1); + Key partitionKey2 = Key.ofText(ANY_NAME_1, ANY_TEXT_2); + Key partitionKey3 = Key.ofText(ANY_NAME_1, ANY_TEXT_3); + Key partitionKey4 = Key.ofText(ANY_NAME_1, ANY_TEXT_4); + Key partitionKey5 = Key.ofText(ANY_NAME_1, ANY_TEXT_5); + Put put1 = mock(Put.class); when(put1.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); when(put1.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); - when(put1.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + when(put1.getPartitionKey()).thenReturn(partitionKey1); when(put1.getAttribute(ConsensusCommitOperationAttributes.IMPLICIT_PRE_READ_ENABLED)) .thenReturn(Optional.of("true")); Put put2 = mock(Put.class); when(put2.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); when(put2.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); - when(put2.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_2)); + when(put2.getPartitionKey()).thenReturn(partitionKey2); when(put2.getAttribute(ConsensusCommitOperationAttributes.IMPLICIT_PRE_READ_ENABLED)) .thenReturn(Optional.of("true")); Put put3 = mock(Put.class); when(put3.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); when(put3.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); - when(put3.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_3)); + when(put3.getPartitionKey()).thenReturn(partitionKey3); when(snapshot.getPutsInWriteSet()).thenReturn(Arrays.asList(put1, put2, put3)); Delete delete1 = mock(Delete.class); when(delete1.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); when(delete1.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); - when(delete1.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_1)); + when(delete1.getPartitionKey()).thenReturn(partitionKey4); Delete delete2 = mock(Delete.class); when(delete2.forNamespace()).thenReturn(Optional.of(ANY_NAMESPACE_NAME)); when(delete2.forTable()).thenReturn(Optional.of(ANY_TABLE_NAME)); - when(delete2.getPartitionKey()).thenReturn(Key.ofText(ANY_NAME_1, ANY_TEXT_2)); + when(delete2.getPartitionKey()).thenReturn(partitionKey5); when(snapshot.getDeletesInDeleteSet()).thenReturn(Arrays.asList(delete1, delete2)); + Get get1 = + toGetForStorageFrom( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey1) + .build()); + + Get get2 = + toGetForStorageFrom( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey2) + .build()); + + Get get3 = + toGetForStorageFrom( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey4) + .build()); + + Get get4 = + toGetForStorageFrom( + Get.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey5) + .build()); + + Result result1 = mock(Result.class); + when(result1.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result1.getContainedColumnNames()).thenReturn(Collections.singleton(ANY_NAME_1)); + when(result1.getAsObject(ANY_NAME_1)).thenReturn(ANY_TEXT_1); + + Result result2 = mock(Result.class); + when(result2.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result2.getContainedColumnNames()).thenReturn(Collections.singleton(ANY_NAME_1)); + when(result2.getAsObject(ANY_NAME_1)).thenReturn(ANY_TEXT_2); + + Result result3 = mock(Result.class); + when(result3.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result3.getContainedColumnNames()).thenReturn(Collections.singleton(ANY_NAME_1)); + when(result3.getAsObject(ANY_NAME_1)).thenReturn(ANY_TEXT_3); + + Result result4 = mock(Result.class); + when(result4.getInt(Attribute.STATE)).thenReturn(TransactionState.COMMITTED.get()); + when(result4.getContainedColumnNames()).thenReturn(Collections.singleton(ANY_NAME_1)); + when(result4.getAsObject(ANY_NAME_1)).thenReturn(ANY_TEXT_4); + + when(storage.get(get1)).thenReturn(Optional.of(result1)); + when(storage.get(get2)).thenReturn(Optional.of(result2)); + when(storage.get(get3)).thenReturn(Optional.of(result3)); + when(storage.get(get4)).thenReturn(Optional.of(result4)); + when(snapshot.getId()).thenReturn(ANY_TX_ID); // Act @@ -1395,6 +1768,29 @@ public void readIfImplicitPreReadEnabled_ShouldCallAppropriateMethods() throws C List tasks = tasksCaptor.getValue(); assertThat(tasks.size()).isEqualTo(4); + for (ParallelExecutor.ParallelExecutorTask task : tasks) { + task.run(); + } + + verify(storage).get(get1); + verify(storage).get(get2); + verify(storage).get(get3); + verify(storage).get(get4); + + verify(snapshot) + .putIntoReadSet(new Snapshot.Key(get1), Optional.of(new TransactionResult(result1))); + verify(snapshot) + .putIntoReadSet(new Snapshot.Key(get2), Optional.of(new TransactionResult(result2))); + verify(snapshot) + .putIntoReadSet(new Snapshot.Key(get3), Optional.of(new TransactionResult(result3))); + verify(snapshot) + .putIntoReadSet(new Snapshot.Key(get4), Optional.of(new TransactionResult(result4))); + + verify(snapshot).putIntoGetSet(get1, Optional.of(new TransactionResult(result1))); + verify(snapshot).putIntoGetSet(get2, Optional.of(new TransactionResult(result2))); + verify(snapshot).putIntoGetSet(get3, Optional.of(new TransactionResult(result3))); + verify(snapshot).putIntoGetSet(get4, Optional.of(new TransactionResult(result4))); + assertThat(transactionIdCaptor.getValue()).isEqualTo(ANY_TX_ID); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java index 880e7466c6..a0c0b62992 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManagerTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -529,7 +531,9 @@ public void get_ShouldGet() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -541,7 +545,7 @@ public void get_ShouldGet() throws TransactionException { Optional actual = spied.get(get); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).prepare(); verify(transaction).validate(); @@ -556,7 +560,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -565,7 +571,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -578,7 +584,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -587,7 +595,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -600,7 +608,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -609,7 +619,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -622,7 +632,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -631,7 +643,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).rollback(); } @@ -644,7 +656,9 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -653,7 +667,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).commit(); } @@ -665,7 +679,9 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -674,7 +690,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).get(get); verify(transaction).commit(); } @@ -685,7 +701,9 @@ public void scan_ShouldScan() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -698,7 +716,7 @@ public void scan_ShouldScan() throws TransactionException { List actual = spied.scan(scan); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).scan(scan); verify(transaction).prepare(); verify(transaction).validate(); @@ -712,7 +730,9 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -742,7 +762,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t assertThat(actual.one()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).prepare(); verify(transaction).validate(); verify(transaction).commit(); @@ -755,7 +775,9 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -782,7 +804,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() assertThat(actual.all()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).prepare(); verify(transaction).validate(); verify(transaction).commit(); @@ -796,7 +818,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -831,7 +855,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul assertThat(iterator.hasNext()).isFalse(); actual.close(); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).prepare(); verify(transaction).validate(); verify(transaction).commit(); @@ -846,7 +870,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -860,7 +886,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(transaction).rollback(); } @@ -872,7 +898,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -890,7 +918,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -902,7 +930,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -920,7 +950,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -932,7 +962,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); Scan scan = Scan.newBuilder() @@ -950,7 +982,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -963,7 +995,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(PreparationConflictException.class).when(transaction).prepare(); Scan scan = @@ -980,7 +1014,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -993,7 +1027,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(ValidationConflictException.class).when(transaction).validate(); Scan scan = @@ -1010,7 +1046,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -1023,7 +1059,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(CommitConflictException.class).when(transaction).commit(); Scan scan = @@ -1040,7 +1078,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -1053,7 +1091,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); Scan scan = @@ -1070,7 +1110,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); } @@ -1082,7 +1122,9 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); doThrow(CommitException.class).when(transaction).commit(); Scan scan = @@ -1099,7 +1141,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true), eq(true)); verify(scanner).close(); verify(transaction).rollback(); } @@ -1110,7 +1152,9 @@ public void put_ShouldPut() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Put put = Put.newBuilder() @@ -1124,7 +1168,7 @@ public void put_ShouldPut() throws TransactionException { spied.put(put); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).put(put); verify(transaction).prepare(); verify(transaction).validate(); @@ -1137,7 +1181,9 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List puts = Arrays.asList( @@ -1163,7 +1209,7 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { spied.put(puts); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).put(puts); verify(transaction).prepare(); verify(transaction).validate(); @@ -1176,7 +1222,9 @@ public void insert_ShouldInsert() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Insert insert = Insert.newBuilder() @@ -1190,7 +1238,7 @@ public void insert_ShouldInsert() throws TransactionException { spied.insert(insert); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).insert(insert); verify(transaction).prepare(); verify(transaction).validate(); @@ -1203,7 +1251,9 @@ public void upsert_ShouldUpsert() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Upsert upsert = Upsert.newBuilder() @@ -1217,7 +1267,7 @@ public void upsert_ShouldUpsert() throws TransactionException { spied.upsert(upsert); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).upsert(upsert); verify(transaction).prepare(); verify(transaction).validate(); @@ -1230,7 +1280,9 @@ public void update_ShouldUpdate() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Update update = Update.newBuilder() @@ -1244,7 +1296,7 @@ public void update_ShouldUpdate() throws TransactionException { spied.update(update); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).update(update); verify(transaction).prepare(); verify(transaction).validate(); @@ -1257,7 +1309,9 @@ public void delete_ShouldDelete() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); Delete delete = Delete.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1266,7 +1320,7 @@ public void delete_ShouldDelete() throws TransactionException { spied.delete(delete); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).delete(delete); verify(transaction).prepare(); verify(transaction).validate(); @@ -1279,7 +1333,9 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List deletes = Arrays.asList( @@ -1302,7 +1358,7 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti spied.delete(deletes); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).delete(deletes); verify(transaction).prepare(); verify(transaction).validate(); @@ -1315,7 +1371,9 @@ public void mutate_ShouldMutate() throws TransactionException { TwoPhaseCommitTransaction transaction = mock(TwoPhaseCommitTransaction.class); TwoPhaseConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction) + .when(spied) + .begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); List mutations = Arrays.asList( @@ -1353,7 +1411,7 @@ public void mutate_ShouldMutate() throws TransactionException { spied.mutate(mutations); // Assert - verify(spied).begin(); + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(false), eq(true)); verify(transaction).mutate(mutations); verify(transaction).prepare(); verify(transaction).validate(); diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java index 1ab6938241..129820f0ab 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java @@ -2352,6 +2352,49 @@ public void manager_delete_DeleteGivenForExisting_ShouldDeleteRecord() assertThat(result.isPresent()).isFalse(); } + @Test + public void manager_mutate_ShouldMutateRecords() throws TransactionException { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + manager.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + Update update = + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build(); + Delete delete = prepareDelete(1, 0); + + // Act + manager.mutate(Arrays.asList(update, delete)); + + // Assert + Optional result1 = manager.get(prepareGet(0, 0)); + Optional result2 = manager.get(prepareGet(1, 0)); + + assertThat(result1.isPresent()).isTrue(); + assertThat(getBalance(result1.get())).isEqualTo(1); + + assertThat(result2.isPresent()).isFalse(); + } + @Test public void manager_get_DefaultNamespaceGiven_ShouldWorkProperly() throws TransactionException { Properties properties = getProperties(getTestName()); diff --git a/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java index c81bfe92d2..3cc8f2f5bb 100644 --- a/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/TwoPhaseCommitTransactionIntegrationTestBase.java @@ -2608,6 +2608,49 @@ public void manager_delete_DeleteGivenForExisting_ShouldDeleteRecord() assertThat(result.isPresent()).isFalse(); } + @Test + public void manager_mutate_ShouldMutateRecords() throws TransactionException { + // Arrange + manager1.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + manager1.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build(); + Delete delete = prepareDelete(1, 0, namespace1, TABLE_1); + + // Act + manager1.mutate(Arrays.asList(update, delete)); + + // Assert + Optional result1 = manager1.get(prepareGet(0, 0, namespace1, TABLE_1)); + Optional result2 = manager1.get(prepareGet(1, 0, namespace1, TABLE_1)); + + assertThat(result1.isPresent()).isTrue(); + assertThat(getBalance(result1.get())).isEqualTo(1); + + assertThat(result2.isPresent()).isFalse(); + } + @Test public void manager_get_DefaultNamespaceGiven_ShouldWorkProperly() throws TransactionException { Properties properties = getProperties1(getTestName()); diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index 16c797a5fd..bccb946f4c 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -24,6 +24,7 @@ import com.scalar.db.api.DistributedStorage; import com.scalar.db.api.DistributedStorageAdmin; import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Get; import com.scalar.db.api.Insert; import com.scalar.db.api.Put; @@ -33,12 +34,15 @@ import com.scalar.db.api.Selection; import com.scalar.db.api.TableMetadata; import com.scalar.db.api.TransactionCrudOperable; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; +import com.scalar.db.exception.transaction.CrudConflictException; import com.scalar.db.exception.transaction.PreparationConflictException; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.io.DataType; @@ -46,6 +50,7 @@ import com.scalar.db.io.Key; import com.scalar.db.io.Value; import com.scalar.db.service.StorageFactory; +import com.scalar.db.service.TransactionFactory; import com.scalar.db.transaction.consensuscommit.CoordinatorGroupCommitter.CoordinatorGroupCommitKeyManipulator; import com.scalar.db.util.groupcommit.GroupCommitKeyManipulator.Keys; import java.time.Duration; @@ -5268,6 +5273,374 @@ public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyExceptio assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); } + @Test + public void manager_get_GetGivenForCommittedRecord_WithSerializable_ShouldReturnRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act + Optional result = managerWithSerializable.get(get); + + // Assert + assertThat(result.isPresent()).isTrue(); + assertThat(result.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + } + } + + @Test + public void manager_scan_ScanGivenForCommittedRecord_WithSerializable_ShouldReturnRecords() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + Scan scan = prepareScan(1, 0, 2, namespace1, TABLE_1); + + // Act + List results = managerWithSerializable.scan(scan); + + // Assert + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + } + } + + @Test + public void manager_getScanner_ScanGivenForCommittedRecord_WithSerializable_ShouldReturnRecords() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + Scan scan = prepareScan(1, 0, 2, namespace1, TABLE_1); + + // Act Assert + TransactionManagerCrudOperable.Scanner scanner = managerWithSerializable.getScanner(scan); + + Optional result1 = scanner.one(); + assertThat(result1).isPresent(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(result1.get())).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isPresent(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(result2.get())).isEqualTo(INITIAL_BALANCE); + + Optional result3 = scanner.one(); + assertThat(result3).isPresent(); + assertThat(result3.get().getInt(ACCOUNT_ID)).isEqualTo(1); + assertThat(result3.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(result3.get())).isEqualTo(INITIAL_BALANCE); + + assertThat(scanner.one()).isNotPresent(); + + scanner.close(); + } + } + + @Test + public void manager_put_PutGivenForNonExisting_WithSerializable_ShouldCreateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + int expected = INITIAL_BALANCE; + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + managerWithSerializable.put(put); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = managerWithSerializable.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + } + + @Test + public void manager_put_PutGivenForExisting_WithSerializable_ShouldUpdateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Put put = + Put.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .enableImplicitPreRead() + .build(); + managerWithSerializable.put(put); + + // Assert + Optional actual = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + } + + @Test + public void manager_insert_InsertGivenForNonExisting_WithSerializable_ShouldCreateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + int expected = INITIAL_BALANCE; + Insert insert = + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + managerWithSerializable.insert(insert); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = managerWithSerializable.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + } + + @Test + public void + manager_insert_InsertGivenForExisting_WithSerializable_ShouldThrowCrudConflictException() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + + // Act Assert + int expected = INITIAL_BALANCE + 100; + Insert insert = + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + assertThatThrownBy(() -> managerWithSerializable.insert(insert)) + .isInstanceOf(CrudConflictException.class); + } + } + + @Test + public void manager_upsert_UpsertGivenForNonExisting_WithSerializable_ShouldCreateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + int expected = INITIAL_BALANCE; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + + // Act + managerWithSerializable.upsert(upsert); + + // Assert + Get get = prepareGet(0, 0, namespace1, TABLE_1); + Optional result = managerWithSerializable.get(get); + + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(expected); + } + } + + @Test + public void manager_upsert_UpsertGivenForExisting_WithSerializable_ShouldUpdateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Upsert upsert = + Upsert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + managerWithSerializable.upsert(upsert); + + // Assert + Optional actual = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + } + + @Test + public void manager_update_UpdateGivenForNonExisting_WithSerializable_ShouldDoNothing() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(); + + // Act + assertThatCode(() -> managerWithSerializable.update(update)).doesNotThrowAnyException(); + + // Assert + Optional actual = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual).isEmpty(); + } + } + + @Test + public void manager_update_UpdateGivenForExisting_WithSerializable_ShouldUpdateRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + + // Act + int expected = INITIAL_BALANCE + 100; + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, expected) + .build(); + managerWithSerializable.update(update); + + // Assert + Optional actual = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(actual.isPresent()).isTrue(); + assertThat(getBalance(actual.get())).isEqualTo(expected); + } + } + + @Test + public void manager_delete_DeleteGivenForExisting_WithSerializable_ShouldDeleteRecord() + throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + populateRecords(namespace1, TABLE_1); + Delete delete = prepareDelete(0, 0, namespace1, TABLE_1); + + // Act + managerWithSerializable.delete(delete); + + // Assert + Optional result = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + + assertThat(result.isPresent()).isFalse(); + } + } + + @Test + public void manager_mutate_WithSerializable_ShouldMutateRecords() throws TransactionException { + try (DistributedTransactionManager managerWithSerializable = + getTransactionManagerWithSerializable()) { + // Arrange + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + manager.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 1)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + Update update = + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build(); + Delete delete = prepareDelete(1, 0, namespace1, TABLE_1); + + // Act + managerWithSerializable.mutate(Arrays.asList(update, delete)); + + // Assert + Optional result1 = managerWithSerializable.get(prepareGet(0, 0, namespace1, TABLE_1)); + Optional result2 = managerWithSerializable.get(prepareGet(1, 0, namespace1, TABLE_1)); + + assertThat(result1.isPresent()).isTrue(); + assertThat(getBalance(result1.get())).isEqualTo(1); + + assertThat(result2.isPresent()).isFalse(); + } + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace, @@ -5506,4 +5879,12 @@ private int getBalance(Result result) { assertThat(balance).isPresent(); return balance.get().getAsInt(); } + + private DistributedTransactionManager getTransactionManagerWithSerializable() { + Properties properties = getProperties(TEST_NAME); + // Add testName as a coordinator namespace suffix + ConsensusCommitTestUtils.addSuffixToCoordinatorNamespace(properties, TEST_NAME); + properties.put(ConsensusCommitConfig.ISOLATION_LEVEL, Isolation.SERIALIZABLE.name()); + return TransactionFactory.create(properties).getTransactionManager(); + } } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java index 5cc864af39..0b712f994d 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java @@ -449,6 +449,12 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {} @Test public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {} + @Disabled( + "Single CRUD operation transactions don't support executing multiple mutations in a transaction") + @Override + @Test + public void manager_mutate_ShouldMutateRecords() {} + @Disabled( "Single CRUD operation transactions don't support executing multiple mutations in a transaction") @Override