From b18084767cf1310330d546c6a1aabe148bf2c339 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Thu, 5 Jun 2025 14:27:53 +0900 Subject: [PATCH 1/3] Add beginReadOnly() method to transaction abstraction (#2734) --- .../jdbc/JdbcTransactionIntegrationTest.java | 14 ++++ .../db/api/DistributedTransactionManager.java | 61 +++++++++++++++ ...nManagedDistributedTransactionManager.java | 2 + ...nagedTwoPhaseCommitTransactionManager.java | 2 + ...ecoratedDistributedTransactionManager.java | 20 +++++ .../ReadOnlyDistributedTransaction.java | 75 +++++++++++++++++++ ...eManagedDistributedTransactionManager.java | 2 + ...nagedTwoPhaseCommitTransactionManager.java | 2 + .../com/scalar/db/common/error/CoreError.java | 6 ++ .../scalar/db/service/TransactionService.java | 20 +++++ .../ConsensusCommitManager.java | 10 +++ .../jdbc/JdbcTransactionManager.java | 10 +++ ...SingleCrudOperationTransactionManager.java | 14 ++++ ...ributedTransactionIntegrationTestBase.java | 36 +++++++++ .../ConsensusCommitIntegrationTestBase.java | 15 ++++ ...erationTransactionIntegrationTestBase.java | 12 +++ 16 files changed, 301 insertions(+) create mode 100644 core/src/main/java/com/scalar/db/common/ReadOnlyDistributedTransaction.java diff --git a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java index 37bebaf726..fa67a2d870 100644 --- a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java @@ -7,6 +7,8 @@ import java.util.Properties; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; public class JdbcTransactionIntegrationTest extends DistributedTransactionIntegrationTestBase { @@ -42,4 +44,16 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {} @Override @Test public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {} + + @Disabled("Implement later") + @Override + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} + + @Disabled("Implement later") + @Override + @ParameterizedTest + @EnumSource(ScanType.class) + public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( + ScanType scanType) {} } diff --git a/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java b/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java index f9c84cfacf..c6262432fd 100644 --- a/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/api/DistributedTransactionManager.java @@ -81,6 +81,34 @@ public interface DistributedTransactionManager DistributedTransaction begin(String txId) throws TransactionNotFoundException, TransactionException; + /** + * Begins a new transaction in read-only mode. + * + * @return {@link DistributedTransaction} + * @throws TransactionNotFoundException if the transaction fails to begin due to transient faults. + * You can retry the transaction + * @throws TransactionException if the transaction fails to begin due to transient or nontransient + * faults. You can try retrying the transaction, but you may not be able to begin the + * transaction due to nontransient faults + */ + DistributedTransaction beginReadOnly() throws TransactionNotFoundException, TransactionException; + + /** + * Begins a new transaction with the specified transaction ID in read-only mode. It is users' + * responsibility to guarantee uniqueness of the ID, so it is not recommended to use this method + * unless you know exactly what you are doing. + * + * @param txId an user-provided unique transaction ID + * @return {@link DistributedTransaction} + * @throws TransactionNotFoundException if the transaction fails to begin due to transient faults. + * You can retry the transaction + * @throws TransactionException if the transaction fails to begin due to transient or nontransient + * faults. You can try retrying the transaction, but you may not be able to begin the + * transaction due to nontransient faults + */ + DistributedTransaction beginReadOnly(String txId) + throws TransactionNotFoundException, TransactionException; + /** * Starts a new transaction. This method is an alias of {@link #begin()}. * @@ -112,6 +140,39 @@ default DistributedTransaction start(String txId) return begin(txId); } + /** + * Starts a new transaction in read-only mode. This method is an alias of {@link + * #beginReadOnly()}. + * + * @return {@link DistributedTransaction} + * @throws TransactionNotFoundException if the transaction fails to start due to transient faults. + * You can retry the transaction + * @throws TransactionException if the transaction fails to start due to transient or nontransient + * faults. You can try retrying the transaction, but you may not be able to start the + * transaction due to nontransient faults + */ + default DistributedTransaction startReadOnly() + throws TransactionNotFoundException, TransactionException { + return beginReadOnly(); + } + + /** + * Starts a new transaction with the specified transaction ID in read-only mode. This method is an + * alias of {@link #beginReadOnly(String)}. + * + * @param txId an user-provided unique transaction ID + * @return {@link DistributedTransaction} + * @throws TransactionNotFoundException if the transaction fails to start due to transient faults. + * You can retry the transaction + * @throws TransactionException if the transaction fails to start due to transient or nontransient + * faults. You can try retrying the transaction, but you may not be able to start the + * transaction due to nontransient faults + */ + default DistributedTransaction startReadOnly(String txId) + throws TransactionNotFoundException, TransactionException { + return beginReadOnly(txId); + } + /** * Starts a new transaction with the specified {@link Isolation} level. * diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java index ea592e5b41..5cce5603e4 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedDistributedTransactionManager.java @@ -26,9 +26,11 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ThreadSafe public class ActiveTransactionManagedDistributedTransactionManager extends DecoratedDistributedTransactionManager { diff --git a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java index b0543433d3..f1d52f897d 100644 --- a/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/ActiveTransactionManagedTwoPhaseCommitTransactionManager.java @@ -28,9 +28,11 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ThreadSafe public class ActiveTransactionManagedTwoPhaseCommitTransactionManager extends DecoratedTwoPhaseCommitTransactionManager { diff --git a/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransactionManager.java index dac3cfa2c7..9ade75d900 100644 --- a/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/DecoratedDistributedTransactionManager.java @@ -76,6 +76,16 @@ public DistributedTransaction begin(String txId) throws TransactionException { return decorateTransactionOnBeginOrStart(transactionManager.begin(txId)); } + @Override + public DistributedTransaction beginReadOnly() throws TransactionException { + return decorateTransactionOnBeginOrStart(transactionManager.beginReadOnly()); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) throws TransactionException { + return decorateTransactionOnBeginOrStart(transactionManager.beginReadOnly(txId)); + } + @Override public DistributedTransaction start() throws TransactionException { return decorateTransactionOnBeginOrStart(transactionManager.start()); @@ -86,6 +96,16 @@ public DistributedTransaction start(String txId) throws TransactionException { return decorateTransactionOnBeginOrStart(transactionManager.start(txId)); } + @Override + public DistributedTransaction startReadOnly(String txId) throws TransactionException { + return decorateTransactionOnBeginOrStart(transactionManager.startReadOnly(txId)); + } + + @Override + public DistributedTransaction startReadOnly() throws TransactionException { + return decorateTransactionOnBeginOrStart(transactionManager.startReadOnly()); + } + /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @Deprecated @Override diff --git a/core/src/main/java/com/scalar/db/common/ReadOnlyDistributedTransaction.java b/core/src/main/java/com/scalar/db/common/ReadOnlyDistributedTransaction.java new file mode 100644 index 0000000000..ffff08acaf --- /dev/null +++ b/core/src/main/java/com/scalar/db/common/ReadOnlyDistributedTransaction.java @@ -0,0 +1,75 @@ +package com.scalar.db.common; + +import com.scalar.db.api.Delete; +import com.scalar.db.api.DistributedTransaction; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Mutation; +import com.scalar.db.api.Put; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.common.error.CoreError; +import com.scalar.db.exception.transaction.CrudException; +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; + +@NotThreadSafe +public class ReadOnlyDistributedTransaction extends DecoratedDistributedTransaction { + + public ReadOnlyDistributedTransaction(DistributedTransaction transaction) { + super(transaction); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(Put put) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void put(List puts) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + @Override + public void insert(Insert insert) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + @Override + public void upsert(Upsert upsert) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + @Override + public void update(Update update) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + @Override + public void delete(Delete delete) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ + @Deprecated + @Override + public void delete(List deletes) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } + + @Override + public void mutate(List mutations) throws CrudException { + throw new IllegalStateException( + CoreError.MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION.buildMessage(getId())); + } +} diff --git a/core/src/main/java/com/scalar/db/common/StateManagedDistributedTransactionManager.java b/core/src/main/java/com/scalar/db/common/StateManagedDistributedTransactionManager.java index 52866cb405..1dad240f24 100644 --- a/core/src/main/java/com/scalar/db/common/StateManagedDistributedTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/StateManagedDistributedTransactionManager.java @@ -20,7 +20,9 @@ import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import java.util.List; import java.util.Optional; +import javax.annotation.concurrent.ThreadSafe; +@ThreadSafe public class StateManagedDistributedTransactionManager extends DecoratedDistributedTransactionManager { diff --git a/core/src/main/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManager.java b/core/src/main/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManager.java index 1d79240d04..8e59e13a50 100644 --- a/core/src/main/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManager.java +++ b/core/src/main/java/com/scalar/db/common/StateManagedTwoPhaseCommitTransactionManager.java @@ -22,7 +22,9 @@ import com.scalar.db.exception.transaction.ValidationException; import java.util.List; import java.util.Optional; +import javax.annotation.concurrent.ThreadSafe; +@ThreadSafe public class StateManagedTwoPhaseCommitTransactionManager extends DecoratedTwoPhaseCommitTransactionManager { diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index 7c5a5fd1d4..9f4990f617 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -923,6 +923,12 @@ public enum CoreError implements ScalarDbError { "Some scanners were not closed. All scanners must be closed before preparing the transaction.", "", ""), + MUTATION_NOT_ALLOWED_IN_READ_ONLY_TRANSACTION( + Category.USER_ERROR, + "0207", + "Mutations are not allowed in read-only transactions. Transaction ID: %s", + "", + ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/service/TransactionService.java b/core/src/main/java/com/scalar/db/service/TransactionService.java index 8acc748eaa..492dc6e9b5 100644 --- a/core/src/main/java/com/scalar/db/service/TransactionService.java +++ b/core/src/main/java/com/scalar/db/service/TransactionService.java @@ -81,6 +81,16 @@ public DistributedTransaction begin(String txId) throws TransactionException { return manager.begin(txId); } + @Override + public DistributedTransaction beginReadOnly() throws TransactionException { + return manager.beginReadOnly(); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) throws TransactionException { + return manager.beginReadOnly(txId); + } + @Override public DistributedTransaction start() throws TransactionException { return manager.start(); @@ -91,6 +101,16 @@ public DistributedTransaction start(String txId) throws TransactionException { return manager.start(txId); } + @Override + public DistributedTransaction startReadOnly() throws TransactionException { + return manager.startReadOnly(); + } + + @Override + public DistributedTransaction startReadOnly(String txId) throws TransactionException { + return manager.startReadOnly(txId); + } + /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @Deprecated @Override diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 9eff205532..6c553c53f7 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -145,6 +145,16 @@ public DistributedTransaction begin(String txId) { return begin(txId, config.getIsolation()); } + @Override + public DistributedTransaction beginReadOnly() { + throw new UnsupportedOperationException("implement later"); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) { + throw new UnsupportedOperationException("implement later"); + } + /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @Deprecated @Override diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java index d65b512106..b38862f7ba 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java @@ -109,6 +109,16 @@ public DistributedTransaction begin(String txId) throws TransactionException { } } + @Override + public DistributedTransaction beginReadOnly() { + throw new UnsupportedOperationException("implement later"); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) { + throw new UnsupportedOperationException("implement later"); + } + /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @SuppressWarnings("InlineMeSuggester") @Deprecated diff --git a/core/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionManager.java index cbc136d120..573aec61f7 100644 --- a/core/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionManager.java @@ -76,6 +76,20 @@ public DistributedTransaction begin(String txId) throws TransactionException { .buildMessage()); } + @Override + public DistributedTransaction beginReadOnly() throws TransactionException { + throw new UnsupportedOperationException( + CoreError.SINGLE_CRUD_OPERATION_TRANSACTION_BEGINNING_TRANSACTION_NOT_ALLOWED + .buildMessage()); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) throws TransactionException { + throw new UnsupportedOperationException( + CoreError.SINGLE_CRUD_OPERATION_TRANSACTION_BEGINNING_TRANSACTION_NOT_ALLOWED + .buildMessage()); + } + /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @Deprecated @Override diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java index 2577ca9163..1ab6938241 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedTransactionIntegrationTestBase.java @@ -197,6 +197,22 @@ public void get_GetGivenForCommittedRecord_ShouldReturnRecord() throws Transacti assertResult(2, 3, result); } + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(); + DistributedTransaction transaction = manager.beginReadOnly(); + Get get = prepareGet(2, 3); + + // Act + Optional result = transaction.get(get); + transaction.commit(); + + // Assert + assertResult(2, 3, result); + } + @Test public void get_GetWithProjectionGivenForCommittedRecord_ShouldReturnRecord() throws TransactionException { @@ -295,6 +311,26 @@ public void scanOrGetScanner_ScanGivenForCommittedRecord_ShouldReturnRecords(Sca assertResult(1, 2, results.get(2)); } + @ParameterizedTest + @EnumSource(ScanType.class) + public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( + ScanType scanType) throws TransactionException { + // Arrange + populateRecords(); + DistributedTransaction transaction = manager.beginReadOnly(); + Scan scan = prepareScan(1, 0, 2); + + // Act + List results = scanOrGetScanner(transaction, scan, scanType); + transaction.commit(); + + // Assert + assertThat(results.size()).isEqualTo(3); + assertResult(1, 0, results.get(0)); + assertResult(1, 1, results.get(1)); + assertResult(1, 2, results.get(2)); + } + @ParameterizedTest @EnumSource(ScanType.class) public void scanOrGetScanner_ScanWithConjunctionsGivenForCommittedRecord_ShouldReturnRecords( diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java index bf5abaae05..e5216ce7e5 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java @@ -15,7 +15,10 @@ import com.scalar.db.io.Key; import java.util.Optional; import java.util.Properties; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; public abstract class ConsensusCommitIntegrationTestBase extends DistributedTransactionIntegrationTestBase { @@ -929,4 +932,16 @@ public void deleteAndDelete_forSameRecord_shouldWorkCorrectly() throws Transacti Optional optResult = get(prepareGet(0, 0)); assertThat(optResult).isNotPresent(); } + + @Disabled("Implement later") + @Override + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} + + @Disabled("Implement later") + @Override + @ParameterizedTest + @EnumSource(ScanType.class) + public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( + ScanType scanType) {} } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java index e9df3c8941..5cc864af39 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/singlecrudoperation/SingleCrudOperationTransactionIntegrationTestBase.java @@ -55,6 +55,11 @@ protected void populateRecords() throws TransactionException { @Test public void get_GetGivenForCommittedRecord_ShouldReturnRecord() {} + @Disabled("Single CRUD operation transactions don't support beginning a transaction") + @Override + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} + @Disabled("Single CRUD operation transactions don't support beginning a transaction") @Override @Test @@ -76,6 +81,13 @@ public void get_GetWithUnmatchedConjunctionsGivenForCommittedRecord_ShouldReturn @EnumSource(ScanType.class) public void scanOrGetScanner_ScanGivenForCommittedRecord_ShouldReturnRecords(ScanType scanType) {} + @Disabled("Single CRUD operation transactions don't support beginning a transaction") + @Override + @ParameterizedTest + @EnumSource(ScanType.class) + public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( + ScanType scanType) {} + @Disabled("Single CRUD operation transactions don't support beginning a transaction") @Override @ParameterizedTest From f7925f6ce64518cd6af072735aaf2bc10da8b167 Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 9 Jun 2025 16:38:21 +0900 Subject: [PATCH 2/3] Support begin in read-only mode for Consensus Commit transactions (#2739) --- .../ConsensusCommitManager.java | 86 ++- .../consensuscommit/CrudHandler.java | 40 +- .../transaction/consensuscommit/Snapshot.java | 20 +- .../TwoPhaseConsensusCommitManager.java | 7 +- .../ConsensusCommitManagerTest.java | 126 +++- .../consensuscommit/CrudHandlerTest.java | 93 ++- .../ConsensusCommitIntegrationTestBase.java | 15 - ...nsusCommitSpecificIntegrationTestBase.java | 597 +++++++++++++++++- 8 files changed, 873 insertions(+), 111 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 6c553c53f7..1657611990 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -22,6 +22,7 @@ import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransactionManager; import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; @@ -137,22 +138,24 @@ private CommitHandler createCommitHandler() { @Override public DistributedTransaction begin() { - return begin(config.getIsolation()); + String txId = UUID.randomUUID().toString(); + return begin(txId); } @Override public DistributedTransaction begin(String txId) { - return begin(txId, config.getIsolation()); + return begin(txId, config.getIsolation(), false); } @Override public DistributedTransaction beginReadOnly() { - throw new UnsupportedOperationException("implement later"); + String txId = UUID.randomUUID().toString(); + return beginReadOnly(txId); } @Override public DistributedTransaction beginReadOnly(String txId) { - throw new UnsupportedOperationException("implement later"); + return begin(txId, config.getIsolation(), true); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -166,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) { @Deprecated @Override public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) { - return begin(txId, Isolation.valueOf(isolation.name())); + return begin(txId, Isolation.valueOf(isolation.name()), false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -189,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat @Override public DistributedTransaction start( String txId, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.SERIALIZABLE); + return begin(txId, Isolation.SERIALIZABLE, false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -199,17 +202,23 @@ public DistributedTransaction start( String txId, com.scalar.db.api.Isolation isolation, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.valueOf(isolation.name())); + return begin(txId, Isolation.valueOf(isolation.name()), false); } @VisibleForTesting DistributedTransaction begin(Isolation isolation) { String txId = UUID.randomUUID().toString(); - return begin(txId, isolation); + return begin(txId, isolation, false); + } + + @VisibleForTesting + DistributedTransaction beginReadOnly(Isolation isolation) { + String txId = UUID.randomUUID().toString(); + return begin(txId, isolation, true); } @VisibleForTesting - DistributedTransaction begin(String txId, Isolation isolation) { + DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) { checkArgument(!Strings.isNullOrEmpty(txId)); checkNotNull(isolation); if (isGroupCommitEnabled()) { @@ -224,27 +233,35 @@ DistributedTransaction begin(String txId, Isolation isolation) { Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor); CrudHandler crud = new CrudHandler( - storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor); - ConsensusCommit consensus = + storage, + snapshot, + tableMetadataManager, + isIncludeMetadataEnabled, + parallelExecutor, + readOnly); + DistributedTransaction transaction = new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter); - getNamespace().ifPresent(consensus::withNamespace); - getTable().ifPresent(consensus::withTable); - return consensus; + if (readOnly) { + transaction = new ReadOnlyDistributedTransaction(transaction); + } + getNamespace().ifPresent(transaction::withNamespace); + getTable().ifPresent(transaction::withTable); + return transaction; } @Override public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true); } @Override public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true); } @Override public Scanner getScanner(Scan scan) throws CrudException { - DistributedTransaction transaction = begin(); + DistributedTransaction transaction = beginReadOnly(); TransactionCrudOperable.Scanner scanner; try { @@ -331,7 +348,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException t -> { t.put(copyAndSetTargetToIfNot(put)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -342,7 +360,8 @@ public void put(List puts) throws CrudException, UnknownTransactionStatusEx t -> { t.put(copyAndSetTargetToIfNot(puts)); return null; - }); + }, + false); } @Override @@ -351,7 +370,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus t -> { t.insert(copyAndSetTargetToIfNot(insert)); return null; - }); + }, + false); } @Override @@ -360,7 +380,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus t -> { t.upsert(copyAndSetTargetToIfNot(upsert)); return null; - }); + }, + false); } @Override @@ -369,7 +390,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus t -> { t.update(copyAndSetTargetToIfNot(update)); return null; - }); + }, + false); } @Override @@ -378,7 +400,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus t -> { t.delete(copyAndSetTargetToIfNot(delete)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -389,7 +412,8 @@ public void delete(List deletes) throws CrudException, UnknownTransactio t -> { t.delete(copyAndSetTargetToIfNot(deletes)); return null; - }); + }, + false); } @Override @@ -399,13 +423,21 @@ public void mutate(List mutations) t -> { t.mutate(copyAndSetTargetToIfNot(mutations)); return null; - }); + }, + false); } private R executeTransaction( - ThrowableFunction throwableFunction) + ThrowableFunction throwableFunction, + boolean readOnly) throws CrudException, UnknownTransactionStatusException { - DistributedTransaction transaction = begin(); + DistributedTransaction transaction; + if (readOnly) { + transaction = beginReadOnly(); + } else { + transaction = begin(); + } + try { R result = throwableFunction.apply(transaction); transaction.commit(); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index ebe5da6a13..73d420215f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -47,6 +47,10 @@ public class CrudHandler { private final boolean isIncludeMetadataEnabled; private final MutationConditionsValidator mutationConditionsValidator; private final ParallelExecutor parallelExecutor; + + // Whether the transaction is in read-only mode or not. + private final boolean readOnly; + private final List scanners = new ArrayList<>(); @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -55,13 +59,15 @@ public CrudHandler( Snapshot snapshot, TransactionTableMetadataManager tableMetadataManager, boolean isIncludeMetadataEnabled, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean readOnly) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; this.isIncludeMetadataEnabled = isIncludeMetadataEnabled; this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId()); this.parallelExecutor = parallelExecutor; + this.readOnly = readOnly; } @VisibleForTesting @@ -71,13 +77,15 @@ public CrudHandler( TransactionTableMetadataManager tableMetadataManager, boolean isIncludeMetadataEnabled, MutationConditionsValidator mutationConditionsValidator, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean readOnly) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; this.isIncludeMetadataEnabled = isIncludeMetadataEnabled; this.mutationConditionsValidator = mutationConditionsValidator; this.parallelExecutor = parallelExecutor; + this.readOnly = readOnly; } public Optional get(Get originalGet) throws CrudException { @@ -122,7 +130,7 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException { // conjunction or the result exists. This is because we don’t know whether the record // actually exists or not due to the conjunction. if (key != null) { - snapshot.putIntoReadSet(key, result); + putIntoReadSetInSnapshot(key, result); } else { // Only for a Get with index, the argument `key` is null @@ -130,11 +138,11 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException { // Only when we can get the record with the Get with index, we can put it into the read // set key = new Snapshot.Key(get, result.get()); - snapshot.putIntoReadSet(key, result); + putIntoReadSetInSnapshot(key, result); } } } - snapshot.putIntoGetSet(get, result); // for re-read and validation + snapshot.putIntoGetSet(get, result); return; } throw new UncommittedRecordException( @@ -148,7 +156,7 @@ public List scan(Scan originalScan) throws CrudException { List originalProjections = new ArrayList<>(originalScan.getProjections()); Scan scan = (Scan) prepareStorageSelection(originalScan); LinkedHashMap results = scanInternal(scan); - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); TableMetadata metadata = getTableMetadata(scan); return results.values().stream() @@ -214,7 +222,7 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re // We always update the read set to create before image by using the latest record (result) // because another conflicting transaction might have updated the record after this // transaction read it first. - snapshot.putIntoReadSet(key, Optional.of(result)); + putIntoReadSetInSnapshot(key, Optional.of(result)); } public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException { @@ -248,6 +256,20 @@ public void closeScanners() throws CrudException { } } + private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional result) { + // In read-only mode, we don't need to put the result into the read set + if (!readOnly) { + snapshot.putIntoReadSet(key, result); + } + } + + private void verifyNoOverlap(Scan scan, Map results) { + // In read-only mode, we don't need to verify the overlap + if (!readOnly) { + snapshot.verifyNoOverlap(scan, results); + } + } + public void put(Put put) throws CrudException { Snapshot.Key key = new Snapshot.Key(put); @@ -483,7 +505,7 @@ public void close() { snapshot.putIntoScannerSet(scan, results); } - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); } @Override @@ -554,7 +576,7 @@ public List all() throws CrudException { @Override public void close() { closed = true; - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); } @Override diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 24b16e59c7..ff0ac712f3 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -58,15 +58,29 @@ public class Snapshot { private final Isolation isolation; private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; + + // The read set stores information about the records that are read in this transaction. This is + // used as a previous version for write operations. private final ConcurrentMap> readSet; + + // The get set stores information about the records retrieved by Get operations in this + // transaction. This is used for validation and snapshot read. private final ConcurrentMap> getSet; + + // The scan set stores information about the records retrieved by Scan operations in this + // transaction. This is used for validation and snapshot read. private final Map> scanSet; - private final Map writeSet; - private final Map deleteSet; - // The scanner set used to store information about scanners that are not fully scanned + // The scanner set stores information about scanners that are not fully scanned. This is used for + // validation. private final List scannerSet; + // The write set stores information about writes in this transaction. + private final Map writeSet; + + // The delete set stores information about deletes in this transaction. + private final Map deleteSet; + public Snapshot( String id, Isolation isolation, diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 1097f0b62f..35d8d74aa9 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -170,7 +170,12 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor); CrudHandler crud = new CrudHandler( - storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor); + storage, + snapshot, + tableMetadataManager, + isIncludeMetadataEnabled, + parallelExecutor, + false); TwoPhaseConsensusCommit transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index 0260c70309..e1d61e8253 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -28,6 +29,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; @@ -174,6 +176,35 @@ public void begin_CalledTwiceWithSameTxId_ThrowTransactionException() assertThatThrownBy(() -> manager.begin(ANY_TX_ID)).isInstanceOf(TransactionException.class); } + @Test + public void + beginReadOnly_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolationInReadOnlyMode() { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.beginReadOnly(); + + // Assert + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + + @Test + public void beginReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationInReadOnlyMode() { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.beginReadOnly(ANY_TX_ID); + + // Assert + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + @Test public void start_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolation() throws TransactionException { @@ -258,6 +289,37 @@ public void start_CalledTwiceWithSameTxId_ThrowTransactionException() assertThatThrownBy(() -> manager.start(ANY_TX_ID)).isInstanceOf(TransactionException.class); } + @Test + public void + startReadOnly_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolationInReadOnlyMode() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.startReadOnly(); + + // Assert + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + + @Test + public void startReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationInReadOnlyMode() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.startReadOnly(ANY_TX_ID); + + // Assert + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + @Test public void resume_CalledWithBegin_ReturnSameTransactionObject() throws TransactionException { // Arrange @@ -541,7 +603,7 @@ public void get_ShouldGet() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -553,7 +615,7 @@ public void get_ShouldGet() throws TransactionException { Optional actual = spied.get(get); // Assert - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); assertThat(actual).isEqualTo(Optional.of(result)); @@ -566,7 +628,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -575,7 +637,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -588,7 +650,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -597,7 +659,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -610,7 +672,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -619,7 +681,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -631,7 +693,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -640,7 +702,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -651,7 +713,7 @@ public void scan_ShouldScan() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -664,7 +726,7 @@ public void scan_ShouldScan() throws TransactionException { List actual = spied.scan(scan); // Assert - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).scan(scan); verify(transaction).commit(); assertThat(actual).isEqualTo(results); @@ -676,7 +738,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -706,7 +768,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t assertThat(actual.one()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -717,7 +779,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -744,7 +806,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() assertThat(actual.all()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -756,7 +818,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -791,7 +853,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul assertThat(iterator.hasNext()).isFalse(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -804,7 +866,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -818,7 +880,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).rollback(); } @@ -830,7 +892,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -848,7 +910,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -860,7 +922,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -878,7 +940,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -890,7 +952,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -908,7 +970,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -921,7 +983,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitConflictException.class).when(transaction).commit(); Scan scan = @@ -938,7 +1000,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -951,7 +1013,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); Scan scan = @@ -968,7 +1030,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); } @@ -980,7 +1042,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitException.class).when(transaction).commit(); Scan scan = @@ -997,7 +1059,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 689ede3bb9..e524942791 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -98,7 +98,8 @@ public void setUp() throws Exception { tableMetadataManager, false, mutationConditionsValidator, - parallelExecutor); + parallelExecutor, + false); // Arrange when(tableMetadataManager.getTransactionTableMetadata(any())) @@ -212,6 +213,44 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); } + @Test + public void + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_InReadOnlyMode_ShouldReturnFromStorageAndUpdateSnapshot() + throws CrudException, ExecutionException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.getResult(key, getForStorage)).thenReturn(transactionResult); + + // Act + Optional result = handler.get(get); + + // Assert + assertThat(result) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + verify(storage).get(getForStorage); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); + } + @Test public void get_GetNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() @@ -311,7 +350,8 @@ public void get_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSn Result result = prepareResult(TransactionState.COMMITTED); Optional expected = Optional.of(new TransactionResult(result)); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); when(storage.get(getForStorage)).thenReturn(Optional.of(result)); // Act @@ -380,6 +420,46 @@ void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanT .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); } + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshotAndReturn( + ScanType scanType) throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + result = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key = new Snapshot.Key(scan, result); + TransactionResult expected = new TransactionResult(result); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); + + // Act + List results = scanOrGetScanner(scan, scanType); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoScanSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key, expected))); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)) + .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + } + @ParameterizedTest @EnumSource(ScanType.class) void @@ -464,7 +544,8 @@ void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot result = prepareResult(TransactionState.COMMITTED); TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -531,7 +612,8 @@ void scanOrGetScanner_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorag Scan scan = toScanForStorageFrom(prepareScan()); result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -596,7 +678,8 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum new HashMap<>(), deleteSet, new ArrayList<>()); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); } else { diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java index e5216ce7e5..bf5abaae05 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java @@ -15,10 +15,7 @@ import com.scalar.db.io.Key; import java.util.Optional; import java.util.Properties; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; public abstract class ConsensusCommitIntegrationTestBase extends DistributedTransactionIntegrationTestBase { @@ -932,16 +929,4 @@ public void deleteAndDelete_forSameRecord_shouldWorkCorrectly() throws Transacti Optional optResult = get(prepareGet(0, 0)); assertThat(optResult).isNotPresent(); } - - @Disabled("Implement later") - @Override - @Test - public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} - - @Disabled("Implement later") - @Override - @ParameterizedTest - @EnumSource(ScanType.class) - public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( - ScanType scanType) {} } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index e4a31e9673..9a7e1c59a9 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -59,7 +59,6 @@ import java.util.Set; import java.util.stream.IntStream; import javax.annotation.Nullable; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -236,8 +235,7 @@ public void get_GetGivenForCommittedRecord_ShouldReturnRecord() throws Transacti // Assert assertThat(result.isPresent()).isTrue(); - Assertions.assertThat( - ((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) + assertThat(((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -254,7 +252,7 @@ public void scan_ScanGivenForCommittedRecord_ShouldReturnRecord() throws Transac // Assert assertThat(results.size()).isEqualTo(1); - Assertions.assertThat( + assertThat( ((TransactionResult) ((FilteredResult) results.get(0)).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -375,7 +373,7 @@ private void selection_SelectionGivenForPreparedWhenCoordinatorStateCommitted_Sh transaction.commit(); assertThat(result.getId()).isEqualTo(ongoingTxId); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(2); assertThat(result.getCommittedAt()).isGreaterThan(0); } @@ -440,7 +438,7 @@ private void selection_SelectionGivenForPreparedWhenCoordinatorStateAborted_Shou transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -551,7 +549,7 @@ public void scan_ScanGivenForPreparedWhenCoordinatorStateAborted_ShouldRollback( transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -636,7 +634,7 @@ public void get_GetGivenForPreparedWhenCoordinatorStateNotExistAndExpired_Should transaction.commit(); assertThat(result.getId()).isEqualTo(ongoingTxId); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(2); assertThat(result.getCommittedAt()).isGreaterThan(0); } @@ -722,7 +720,7 @@ public void get_GetGivenForPreparedWhenCoordinatorStateNotExistAndExpired_Should transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -847,7 +845,7 @@ private void selection_SelectionGivenForDeletedWhenCoordinatorStateAborted_Shoul transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -958,7 +956,7 @@ public void scan_ScanGivenForDeletedWhenCoordinatorStateAborted_ShouldRollback( transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -1117,7 +1115,7 @@ public void scan_ScanGivenForDeletedWhenCoordinatorStateNotExistAndExpired_Shoul transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -1192,7 +1190,7 @@ public void putAndCommit_PutGivenForNonExisting_ShouldCreateRecord() throws Tran assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1221,7 +1219,7 @@ public void putAndCommit_PutWithImplicitPreReadEnabledGivenForNonExisting_Should assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1251,7 +1249,7 @@ public void putAndCommit_PutGivenForExistingAfterRead_ShouldUpdateRecord() assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(2); } @@ -1282,7 +1280,7 @@ public void putAndCommit_PutWithImplicitPreReadEnabledGivenForExisting_ShouldUpd assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(2); } @@ -1327,7 +1325,7 @@ public void putAndCommit_PutWithInsertModeEnabledGivenForNonExisting_ShouldCreat assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1361,7 +1359,7 @@ public void putAndCommit_PutWithInsertModeEnabledGivenForNonExistingAfterRead_Sh assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(1); } @@ -2801,7 +2799,7 @@ public void scanAll_ScanAllGivenForCommittedRecord_ShouldReturnRecord() // Assert assertThat(results.size()).isEqualTo(1); - Assertions.assertThat( + assertThat( ((TransactionResult) ((FilteredResult) results.get(0)).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -4709,6 +4707,567 @@ public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionEx assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2); } + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_WithSerializable_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act + Optional result = transaction.get(get); + transaction.commit(); + + // Assert + assertThat(result.isPresent()).isTrue(); + assertThat(((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) + .isEqualTo(TransactionState.COMMITTED); + } + + @Test + public void + get_GetGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act Assert + Optional result = transaction.get(get); + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + get_GetGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act Assert + Optional result = transaction.get(get); + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void scan_ScanGivenForCommittedRecord_InReadOnlyMode_WithSerializable_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act + List results = transaction.scan(scan); + transaction.commit(); + + // Assert + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + transaction.commit(); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 5)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + transaction.commit(); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 5)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace, From c8230de70c9b62b34f610038189b3409b975324b Mon Sep 17 00:00:00 2001 From: Toshihiro Suzuki Date: Mon, 9 Jun 2025 16:38:38 +0900 Subject: [PATCH 3/3] Support begin in read-only mode for JDBC transactions (#2738) --- .../jdbc/JdbcTransactionIntegrationTest.java | 14 -- .../com/scalar/db/storage/jdbc/JdbcAdmin.java | 10 +- .../scalar/db/storage/jdbc/JdbcDatabase.java | 4 +- .../db/storage/jdbc/RdbEngineSqlite.java | 2 +- .../db/storage/jdbc/RdbEngineStrategy.java | 3 +- .../jdbc/JdbcTransactionManager.java | 82 +++++--- .../jdbc/JdbcTransactionManagerTest.java | 184 ++++++++++++------ 7 files changed, 194 insertions(+), 105 deletions(-) diff --git a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java index fa67a2d870..37bebaf726 100644 --- a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java @@ -7,8 +7,6 @@ import java.util.Properties; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; public class JdbcTransactionIntegrationTest extends DistributedTransactionIntegrationTestBase { @@ -44,16 +42,4 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {} @Override @Test public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {} - - @Disabled("Implement later") - @Override - @Test - public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} - - @Disabled("Implement later") - @Override - @ParameterizedTest - @EnumSource(ScanType.class) - public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( - ScanType scanType) {} } diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java index e084b63ae2..a9e34787b7 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcAdmin.java @@ -438,7 +438,7 @@ public TableMetadata getTableMetadata(String namespace, String table) throws Exe boolean tableExists = false; try (Connection connection = dataSource.getConnection()) { - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); try (PreparedStatement preparedStatement = connection.prepareStatement(getSelectColumnsStatement())) { @@ -510,7 +510,7 @@ public TableMetadata getImportTableMetadata( } try (Connection connection = dataSource.getConnection()) { - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); String catalogName = rdbEngine.getCatalogName(namespace); String schemaName = rdbEngine.getSchemaName(namespace); @@ -608,7 +608,7 @@ public Set getNamespaceTableNames(String namespace) throws ExecutionExce + enclose(METADATA_COL_FULL_TABLE_NAME) + " LIKE ?"; try (Connection connection = dataSource.getConnection()) { - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); try (PreparedStatement preparedStatement = connection.prepareStatement(selectTablesOfNamespaceStatement)) { @@ -644,7 +644,7 @@ public boolean namespaceExists(String namespace) throws ExecutionException { + enclose(NAMESPACE_COL_NAMESPACE_NAME) + " = ?"; try (Connection connection = dataSource.getConnection()) { - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); try (PreparedStatement statement = connection.prepareStatement(selectQuery)) { statement.setString(1, namespace); @@ -992,7 +992,7 @@ private String encloseFullTableName(String schema, String table) { @Override public Set getNamespaceNames() throws ExecutionException { try (Connection connection = dataSource.getConnection()) { - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); String selectQuery = "SELECT * FROM " + encloseFullTableName(metadataSchema, NAMESPACES_TABLE); diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java index 7a17ea1a34..c6c6af21d7 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java @@ -82,7 +82,7 @@ public Optional get(Get get) throws ExecutionException { Connection connection = null; try { connection = dataSource.getConnection(); - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); return jdbcService.get(get, connection); } catch (SQLException e) { throw new ExecutionException( @@ -98,7 +98,7 @@ public Scanner scan(Scan scan) throws ExecutionException { Connection connection = null; try { connection = dataSource.getConnection(); - rdbEngine.setReadOnly(connection, true); + rdbEngine.setConnectionToReadOnly(connection, true); return jdbcService.getScanner(scan, connection); } catch (SQLException e) { close(connection); diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlite.java b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlite.java index c2e39ec827..0501d9a22e 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlite.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineSqlite.java @@ -340,7 +340,7 @@ public RdbEngineTimeTypeStrategy getTimeTypeStrategy( } @Override - public void setReadOnly(Connection connection, boolean readOnly) { + public void setConnectionToReadOnly(Connection connection, boolean readOnly) { // Do nothing. SQLite does not support read-only mode. } } diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java index 9195dd7975..583cf016ee 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineStrategy.java @@ -230,7 +230,8 @@ default void throwIfDuplicatedIndexWarning(SQLWarning warning) throws SQLExcepti // Do nothing } - default void setReadOnly(Connection connection, boolean readOnly) throws SQLException { + default void setConnectionToReadOnly(Connection connection, boolean readOnly) + throws SQLException { connection.setReadOnly(readOnly); } } diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java index b38862f7ba..0f334227af 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java @@ -18,6 +18,7 @@ import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransactionManager; import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -36,6 +37,7 @@ import com.scalar.db.storage.jdbc.RdbEngineFactory; import com.scalar.db.storage.jdbc.RdbEngineStrategy; import com.scalar.db.util.ThrowableFunction; +import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Optional; @@ -90,14 +92,39 @@ public JdbcTransactionManager(DatabaseConfig databaseConfig) { @Override public DistributedTransaction begin() throws TransactionException { String txId = UUID.randomUUID().toString(); - return begin(txId); + return begin(txId, false); } @Override public DistributedTransaction begin(String txId) throws TransactionException { + return begin(txId, false); + } + + @Override + public DistributedTransaction beginReadOnly() throws TransactionException { + String txId = UUID.randomUUID().toString(); + return begin(txId, true); + } + + @Override + public DistributedTransaction beginReadOnly(String txId) throws TransactionException { + return begin(txId, true); + } + + private DistributedTransaction begin(String txId, boolean readOnly) throws TransactionException { try { - JdbcTransaction transaction = - new JdbcTransaction(txId, jdbcService, dataSource.getConnection(), rdbEngine); + Connection connection = dataSource.getConnection(); + + DistributedTransaction transaction; + if (readOnly) { + rdbEngine.setConnectionToReadOnly(connection, true); + transaction = + new ReadOnlyDistributedTransaction( + new JdbcTransaction(txId, jdbcService, connection, rdbEngine)); + } else { + transaction = new JdbcTransaction(txId, jdbcService, connection, rdbEngine); + } + getNamespace().ifPresent(transaction::withNamespace); getTable().ifPresent(transaction::withTable); return transaction; @@ -109,16 +136,6 @@ public DistributedTransaction begin(String txId) throws TransactionException { } } - @Override - public DistributedTransaction beginReadOnly() { - throw new UnsupportedOperationException("implement later"); - } - - @Override - public DistributedTransaction beginReadOnly(String txId) { - throw new UnsupportedOperationException("implement later"); - } - /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @SuppressWarnings("InlineMeSuggester") @Deprecated @@ -173,19 +190,19 @@ public DistributedTransaction start( @Override public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true); } @Override public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true); } @Override public Scanner getScanner(Scan scan) throws CrudException { DistributedTransaction transaction; try { - transaction = begin(); + transaction = beginReadOnly(); } catch (TransactionNotFoundException e) { throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (TransactionException e) { @@ -277,7 +294,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException t -> { t.put(copyAndSetTargetToIfNot(put)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -288,7 +306,8 @@ public void put(List puts) throws CrudException, UnknownTransactionStatusEx t -> { t.put(copyAndSetTargetToIfNot(puts)); return null; - }); + }, + false); } @Override @@ -297,7 +316,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus t -> { t.insert(copyAndSetTargetToIfNot(insert)); return null; - }); + }, + false); } @Override @@ -306,7 +326,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus t -> { t.upsert(copyAndSetTargetToIfNot(upsert)); return null; - }); + }, + false); } @Override @@ -315,7 +336,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus t -> { t.update(copyAndSetTargetToIfNot(update)); return null; - }); + }, + false); } @Override @@ -324,7 +346,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus t -> { t.delete(copyAndSetTargetToIfNot(delete)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -335,7 +358,8 @@ public void delete(List deletes) throws CrudException, UnknownTransactio t -> { t.delete(copyAndSetTargetToIfNot(deletes)); return null; - }); + }, + false); } @Override @@ -345,15 +369,21 @@ public void mutate(List mutations) t -> { t.mutate(copyAndSetTargetToIfNot(mutations)); return null; - }); + }, + false); } private R executeTransaction( - ThrowableFunction throwableFunction) + ThrowableFunction throwableFunction, + boolean readOnly) throws CrudException, UnknownTransactionStatusException { DistributedTransaction transaction; try { - transaction = begin(); + if (readOnly) { + transaction = beginReadOnly(); + } else { + transaction = begin(); + } } catch (TransactionNotFoundException e) { throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); } catch (TransactionException e) { diff --git a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java index 80d0375e6d..8a34348e14 100644 --- a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java @@ -25,6 +25,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.AbortException; @@ -131,6 +132,77 @@ public void whenGetOperationsExecutedAndJdbcServiceThrowsSQLException_shouldThro .isInstanceOf(CrudException.class); } + @Test + public void begin_WithoutTxId_ShouldCreateNewTransaction() throws Exception { + // Arrange + Connection connection = mock(Connection.class); + when(dataSource.getConnection()).thenReturn(connection); + + // Act + DistributedTransaction actual = manager.begin(); + + // Assert + verify(dataSource).getConnection(); + assertThat(actual).isInstanceOf(JdbcTransaction.class); + } + + @Test + public void begin_WithTxId_ShouldCreateTransactionWithGivenId() throws Exception { + // Arrange + Connection connection = mock(Connection.class); + when(dataSource.getConnection()).thenReturn(connection); + String txId = "my-tx-id"; + + // Act + DistributedTransaction actual = manager.begin(txId); + + // Assert + verify(dataSource).getConnection(); + assertThat(actual).isInstanceOf(JdbcTransaction.class); + assertThat(actual.getId()).isEqualTo(txId); + } + + @Test + public void beginReadOnly_WithoutTxId_ShouldCreateReadOnlyTransaction() throws Exception { + // Arrange + Connection connection = mock(Connection.class); + when(dataSource.getConnection()).thenReturn(connection); + + // Act + DistributedTransaction actual = manager.beginReadOnly(); + + // Assert + verify(dataSource).getConnection(); + verify(connection).setReadOnly(true); + assertThat(actual).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + + @Test + public void beginReadOnly_WithTxId_ShouldCreateReadOnlyTransactionWithGivenId() throws Exception { + // Arrange + Connection connection = mock(Connection.class); + when(dataSource.getConnection()).thenReturn(connection); + String txId = "my-tx-id"; + + // Act + DistributedTransaction result = manager.beginReadOnly(txId); + + // Assert + verify(dataSource).getConnection(); + verify(connection).setReadOnly(true); + assertThat(result).isInstanceOf(ReadOnlyDistributedTransaction.class); + assertThat(result.getId()).isEqualTo(txId); + } + + @Test + public void begin_SQLExceptionThrown_ShouldThrowTransactionException() throws Exception { + // Arrange + when(dataSource.getConnection()).thenThrow(SQLException.class); + + // Act Assert + assertThatThrownBy(() -> manager.begin()).isInstanceOf(TransactionException.class); + } + @Test public void get_withConflictError_shouldThrowCrudConflictException() throws SQLException, ExecutionException { @@ -187,7 +259,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -217,7 +289,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t assertThat(actual.one()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -228,7 +300,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -255,7 +327,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() assertThat(actual.all()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -267,7 +339,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -302,7 +374,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul assertThat(iterator.hasNext()).isFalse(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -313,14 +385,14 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul throws TransactionException { // Arrange JdbcTransactionManager spied = spy(manager); - doThrow(TransactionNotFoundException.class).when(spied).begin(); + doThrow(TransactionNotFoundException.class).when(spied).beginReadOnly(); Scan scan = mock(Scan.class); // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); } @Test @@ -328,14 +400,14 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC throws TransactionException { // Arrange JdbcTransactionManager spied = spy(manager); - doThrow(TransactionException.class).when(spied).begin(); + doThrow(TransactionException.class).when(spied).beginReadOnly(); Scan scan = mock(Scan.class); // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); } @Test @@ -346,7 +418,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -360,7 +432,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).rollback(); } @@ -372,7 +444,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -390,7 +462,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -402,7 +474,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -420,7 +492,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -432,7 +504,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -450,7 +522,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -463,7 +535,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitConflictException.class).when(transaction).commit(); Scan scan = @@ -480,7 +552,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -493,7 +565,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); Scan scan = @@ -510,7 +582,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); } @@ -522,7 +594,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitException.class).when(transaction).commit(); Scan scan = @@ -539,7 +611,7 @@ public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowC TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -994,7 +1066,7 @@ public void get_ShouldGet() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1006,7 +1078,7 @@ public void get_ShouldGet() throws TransactionException { Optional actual = spied.get(get); // Assert - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); assertThat(actual).isEqualTo(Optional.of(result)); @@ -1018,7 +1090,7 @@ public void get_ShouldGet() throws TransactionException { throws TransactionException { // Arrange JdbcTransactionManager spied = spy(manager); - doThrow(TransactionNotFoundException.class).when(spied).begin(any()); + doThrow(TransactionNotFoundException.class).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1026,7 +1098,7 @@ public void get_ShouldGet() throws TransactionException { // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); } @Test @@ -1034,7 +1106,7 @@ public void get_TransactionExceptionThrownByTransactionBegin_ShouldThrowCrudExce throws TransactionException { // Arrange JdbcTransactionManager spied = spy(manager); - doThrow(TransactionException.class).when(spied).begin(any()); + doThrow(TransactionException.class).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1042,7 +1114,7 @@ public void get_TransactionExceptionThrownByTransactionBegin_ShouldThrowCrudExce // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); } @Test @@ -1052,7 +1124,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1061,7 +1133,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -1074,7 +1146,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1083,7 +1155,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -1096,7 +1168,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1105,7 +1177,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -1117,7 +1189,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1126,7 +1198,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -1137,7 +1209,7 @@ public void scan_ShouldScan() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1150,7 +1222,7 @@ public void scan_ShouldScan() throws TransactionException { List actual = spied.scan(scan); // Assert - verify(spied).begin(any()); + verify(spied).beginReadOnly(); verify(transaction).scan(scan); verify(transaction).commit(); assertThat(actual).isEqualTo(results); @@ -1162,7 +1234,7 @@ public void put_ShouldPut() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); Put put = Put.newBuilder() @@ -1176,7 +1248,7 @@ public void put_ShouldPut() throws TransactionException { spied.put(put); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).put(put); verify(transaction).commit(); } @@ -1187,7 +1259,7 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); List puts = Arrays.asList( @@ -1214,7 +1286,7 @@ public void put_MultiplePutsGiven_ShouldPut() throws TransactionException { spied.put(puts); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).put(puts); verify(transaction).commit(); } @@ -1225,7 +1297,7 @@ public void insert_ShouldInsert() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); Insert insert = Insert.newBuilder() @@ -1239,7 +1311,7 @@ public void insert_ShouldInsert() throws TransactionException { spied.insert(insert); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).insert(insert); verify(transaction).commit(); } @@ -1250,7 +1322,7 @@ public void upsert_ShouldUpsert() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); Upsert upsert = Upsert.newBuilder() @@ -1264,7 +1336,7 @@ public void upsert_ShouldUpsert() throws TransactionException { spied.upsert(upsert); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).upsert(upsert); verify(transaction).commit(); } @@ -1275,7 +1347,7 @@ public void update_ShouldUpdate() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); Update update = Update.newBuilder() @@ -1289,7 +1361,7 @@ public void update_ShouldUpdate() throws TransactionException { spied.update(update); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).update(update); verify(transaction).commit(); } @@ -1300,7 +1372,7 @@ public void delete_ShouldDelete() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); Delete delete = Delete.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -1309,7 +1381,7 @@ public void delete_ShouldDelete() throws TransactionException { spied.delete(delete); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).delete(delete); verify(transaction).commit(); } @@ -1320,7 +1392,7 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); List deletes = Arrays.asList( @@ -1344,7 +1416,7 @@ public void delete_MultipleDeletesGiven_ShouldDelete() throws TransactionExcepti spied.delete(deletes); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).delete(deletes); verify(transaction).commit(); } @@ -1355,7 +1427,7 @@ public void mutate_ShouldMutate() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); JdbcTransactionManager spied = spy(manager); - doReturn(transaction).when(spied).begin(any()); + doReturn(transaction).when(spied).begin(); List mutations = Arrays.asList( @@ -1393,7 +1465,7 @@ public void mutate_ShouldMutate() throws TransactionException { spied.mutate(mutations); // Assert - verify(spied).begin(any()); + verify(spied).begin(); verify(transaction).mutate(mutations); verify(transaction).commit(); }