From 309d1ebbd0f578237c9600211baf08eaddff607d Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Sat, 17 May 2025 12:58:35 +0900 Subject: [PATCH] Support begin in read-only mode for Consensus Commit --- .../ConsensusCommitManager.java | 86 ++- .../consensuscommit/CrudHandler.java | 40 +- .../transaction/consensuscommit/Snapshot.java | 20 +- .../TwoPhaseConsensusCommitManager.java | 7 +- .../ConsensusCommitManagerTest.java | 126 +++- .../consensuscommit/CrudHandlerTest.java | 93 ++- .../ConsensusCommitIntegrationTestBase.java | 15 - ...nsusCommitSpecificIntegrationTestBase.java | 597 +++++++++++++++++- 8 files changed, 873 insertions(+), 111 deletions(-) diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java index 6c553c53f7..1657611990 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManager.java @@ -22,6 +22,7 @@ import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransactionManager; import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CrudConflictException; @@ -137,22 +138,24 @@ private CommitHandler createCommitHandler() { @Override public DistributedTransaction begin() { - return begin(config.getIsolation()); + String txId = UUID.randomUUID().toString(); + return begin(txId); } @Override public DistributedTransaction begin(String txId) { - return begin(txId, config.getIsolation()); + return begin(txId, config.getIsolation(), false); } @Override public DistributedTransaction beginReadOnly() { - throw new UnsupportedOperationException("implement later"); + String txId = UUID.randomUUID().toString(); + return beginReadOnly(txId); } @Override public DistributedTransaction beginReadOnly(String txId) { - throw new UnsupportedOperationException("implement later"); + return begin(txId, config.getIsolation(), true); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -166,7 +169,7 @@ public DistributedTransaction start(com.scalar.db.api.Isolation isolation) { @Deprecated @Override public DistributedTransaction start(String txId, com.scalar.db.api.Isolation isolation) { - return begin(txId, Isolation.valueOf(isolation.name())); + return begin(txId, Isolation.valueOf(isolation.name()), false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -189,7 +192,7 @@ public DistributedTransaction start(com.scalar.db.api.SerializableStrategy strat @Override public DistributedTransaction start( String txId, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.SERIALIZABLE); + return begin(txId, Isolation.SERIALIZABLE, false); } /** @deprecated As of release 2.4.0. Will be removed in release 4.0.0. */ @@ -199,17 +202,23 @@ public DistributedTransaction start( String txId, com.scalar.db.api.Isolation isolation, com.scalar.db.api.SerializableStrategy strategy) { - return begin(txId, Isolation.valueOf(isolation.name())); + return begin(txId, Isolation.valueOf(isolation.name()), false); } @VisibleForTesting DistributedTransaction begin(Isolation isolation) { String txId = UUID.randomUUID().toString(); - return begin(txId, isolation); + return begin(txId, isolation, false); + } + + @VisibleForTesting + DistributedTransaction beginReadOnly(Isolation isolation) { + String txId = UUID.randomUUID().toString(); + return begin(txId, isolation, true); } @VisibleForTesting - DistributedTransaction begin(String txId, Isolation isolation) { + DistributedTransaction begin(String txId, Isolation isolation, boolean readOnly) { checkArgument(!Strings.isNullOrEmpty(txId)); checkNotNull(isolation); if (isGroupCommitEnabled()) { @@ -224,27 +233,35 @@ DistributedTransaction begin(String txId, Isolation isolation) { Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor); CrudHandler crud = new CrudHandler( - storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor); - ConsensusCommit consensus = + storage, + snapshot, + tableMetadataManager, + isIncludeMetadataEnabled, + parallelExecutor, + readOnly); + DistributedTransaction transaction = new ConsensusCommit(crud, commit, recovery, mutationOperationChecker, groupCommitter); - getNamespace().ifPresent(consensus::withNamespace); - getTable().ifPresent(consensus::withTable); - return consensus; + if (readOnly) { + transaction = new ReadOnlyDistributedTransaction(transaction); + } + getNamespace().ifPresent(transaction::withNamespace); + getTable().ifPresent(transaction::withTable); + return transaction; } @Override public Optional get(Get get) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get))); + return executeTransaction(t -> t.get(copyAndSetTargetToIfNot(get)), true); } @Override public List scan(Scan scan) throws CrudException, UnknownTransactionStatusException { - return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan))); + return executeTransaction(t -> t.scan(copyAndSetTargetToIfNot(scan)), true); } @Override public Scanner getScanner(Scan scan) throws CrudException { - DistributedTransaction transaction = begin(); + DistributedTransaction transaction = beginReadOnly(); TransactionCrudOperable.Scanner scanner; try { @@ -331,7 +348,8 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException t -> { t.put(copyAndSetTargetToIfNot(put)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -342,7 +360,8 @@ public void put(List puts) throws CrudException, UnknownTransactionStatusEx t -> { t.put(copyAndSetTargetToIfNot(puts)); return null; - }); + }, + false); } @Override @@ -351,7 +370,8 @@ public void insert(Insert insert) throws CrudException, UnknownTransactionStatus t -> { t.insert(copyAndSetTargetToIfNot(insert)); return null; - }); + }, + false); } @Override @@ -360,7 +380,8 @@ public void upsert(Upsert upsert) throws CrudException, UnknownTransactionStatus t -> { t.upsert(copyAndSetTargetToIfNot(upsert)); return null; - }); + }, + false); } @Override @@ -369,7 +390,8 @@ public void update(Update update) throws CrudException, UnknownTransactionStatus t -> { t.update(copyAndSetTargetToIfNot(update)); return null; - }); + }, + false); } @Override @@ -378,7 +400,8 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus t -> { t.delete(copyAndSetTargetToIfNot(delete)); return null; - }); + }, + false); } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @@ -389,7 +412,8 @@ public void delete(List deletes) throws CrudException, UnknownTransactio t -> { t.delete(copyAndSetTargetToIfNot(deletes)); return null; - }); + }, + false); } @Override @@ -399,13 +423,21 @@ public void mutate(List mutations) t -> { t.mutate(copyAndSetTargetToIfNot(mutations)); return null; - }); + }, + false); } private R executeTransaction( - ThrowableFunction throwableFunction) + ThrowableFunction throwableFunction, + boolean readOnly) throws CrudException, UnknownTransactionStatusException { - DistributedTransaction transaction = begin(); + DistributedTransaction transaction; + if (readOnly) { + transaction = beginReadOnly(); + } else { + transaction = begin(); + } + try { R result = throwableFunction.apply(transaction); transaction.commit(); diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index ebe5da6a13..73d420215f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -47,6 +47,10 @@ public class CrudHandler { private final boolean isIncludeMetadataEnabled; private final MutationConditionsValidator mutationConditionsValidator; private final ParallelExecutor parallelExecutor; + + // Whether the transaction is in read-only mode or not. + private final boolean readOnly; + private final List scanners = new ArrayList<>(); @SuppressFBWarnings("EI_EXPOSE_REP2") @@ -55,13 +59,15 @@ public CrudHandler( Snapshot snapshot, TransactionTableMetadataManager tableMetadataManager, boolean isIncludeMetadataEnabled, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean readOnly) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; this.isIncludeMetadataEnabled = isIncludeMetadataEnabled; this.mutationConditionsValidator = new MutationConditionsValidator(snapshot.getId()); this.parallelExecutor = parallelExecutor; + this.readOnly = readOnly; } @VisibleForTesting @@ -71,13 +77,15 @@ public CrudHandler( TransactionTableMetadataManager tableMetadataManager, boolean isIncludeMetadataEnabled, MutationConditionsValidator mutationConditionsValidator, - ParallelExecutor parallelExecutor) { + ParallelExecutor parallelExecutor, + boolean readOnly) { this.storage = checkNotNull(storage); this.snapshot = checkNotNull(snapshot); this.tableMetadataManager = tableMetadataManager; this.isIncludeMetadataEnabled = isIncludeMetadataEnabled; this.mutationConditionsValidator = mutationConditionsValidator; this.parallelExecutor = parallelExecutor; + this.readOnly = readOnly; } public Optional get(Get originalGet) throws CrudException { @@ -122,7 +130,7 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException { // conjunction or the result exists. This is because we don’t know whether the record // actually exists or not due to the conjunction. if (key != null) { - snapshot.putIntoReadSet(key, result); + putIntoReadSetInSnapshot(key, result); } else { // Only for a Get with index, the argument `key` is null @@ -130,11 +138,11 @@ void read(@Nullable Snapshot.Key key, Get get) throws CrudException { // Only when we can get the record with the Get with index, we can put it into the read // set key = new Snapshot.Key(get, result.get()); - snapshot.putIntoReadSet(key, result); + putIntoReadSetInSnapshot(key, result); } } } - snapshot.putIntoGetSet(get, result); // for re-read and validation + snapshot.putIntoGetSet(get, result); return; } throw new UncommittedRecordException( @@ -148,7 +156,7 @@ public List scan(Scan originalScan) throws CrudException { List originalProjections = new ArrayList<>(originalScan.getProjections()); Scan scan = (Scan) prepareStorageSelection(originalScan); LinkedHashMap results = scanInternal(scan); - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); TableMetadata metadata = getTableMetadata(scan); return results.values().stream() @@ -214,7 +222,7 @@ private void processScanResult(Snapshot.Key key, Scan scan, TransactionResult re // We always update the read set to create before image by using the latest record (result) // because another conflicting transaction might have updated the record after this // transaction read it first. - snapshot.putIntoReadSet(key, Optional.of(result)); + putIntoReadSetInSnapshot(key, Optional.of(result)); } public TransactionCrudOperable.Scanner getScanner(Scan originalScan) throws CrudException { @@ -248,6 +256,20 @@ public void closeScanners() throws CrudException { } } + private void putIntoReadSetInSnapshot(Snapshot.Key key, Optional result) { + // In read-only mode, we don't need to put the result into the read set + if (!readOnly) { + snapshot.putIntoReadSet(key, result); + } + } + + private void verifyNoOverlap(Scan scan, Map results) { + // In read-only mode, we don't need to verify the overlap + if (!readOnly) { + snapshot.verifyNoOverlap(scan, results); + } + } + public void put(Put put) throws CrudException { Snapshot.Key key = new Snapshot.Key(put); @@ -483,7 +505,7 @@ public void close() { snapshot.putIntoScannerSet(scan, results); } - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); } @Override @@ -554,7 +576,7 @@ public List all() throws CrudException { @Override public void close() { closed = true; - snapshot.verifyNoOverlap(scan, results); + verifyNoOverlap(scan, results); } @Override diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index 24b16e59c7..ff0ac712f3 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -58,15 +58,29 @@ public class Snapshot { private final Isolation isolation; private final TransactionTableMetadataManager tableMetadataManager; private final ParallelExecutor parallelExecutor; + + // The read set stores information about the records that are read in this transaction. This is + // used as a previous version for write operations. private final ConcurrentMap> readSet; + + // The get set stores information about the records retrieved by Get operations in this + // transaction. This is used for validation and snapshot read. private final ConcurrentMap> getSet; + + // The scan set stores information about the records retrieved by Scan operations in this + // transaction. This is used for validation and snapshot read. private final Map> scanSet; - private final Map writeSet; - private final Map deleteSet; - // The scanner set used to store information about scanners that are not fully scanned + // The scanner set stores information about scanners that are not fully scanned. This is used for + // validation. private final List scannerSet; + // The write set stores information about writes in this transaction. + private final Map writeSet; + + // The delete set stores information about deletes in this transaction. + private final Map deleteSet; + public Snapshot( String id, Isolation isolation, diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java index 1097f0b62f..35d8d74aa9 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommitManager.java @@ -170,7 +170,12 @@ private TwoPhaseCommitTransaction createNewTransaction(String txId, Isolation is Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor); CrudHandler crud = new CrudHandler( - storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor); + storage, + snapshot, + tableMetadataManager, + isIncludeMetadataEnabled, + parallelExecutor, + false); TwoPhaseConsensusCommit transaction = new TwoPhaseConsensusCommit(crud, commit, recovery, mutationOperationChecker); diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java index 0260c70309..e1d61e8253 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitManagerTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -28,6 +29,7 @@ import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; +import com.scalar.db.common.ReadOnlyDistributedTransaction; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.transaction.CommitConflictException; import com.scalar.db.exception.transaction.CommitException; @@ -174,6 +176,35 @@ public void begin_CalledTwiceWithSameTxId_ThrowTransactionException() assertThatThrownBy(() -> manager.begin(ANY_TX_ID)).isInstanceOf(TransactionException.class); } + @Test + public void + beginReadOnly_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolationInReadOnlyMode() { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.beginReadOnly(); + + // Assert + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + + @Test + public void beginReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationInReadOnlyMode() { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.beginReadOnly(ANY_TX_ID); + + // Assert + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + @Test public void start_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolation() throws TransactionException { @@ -258,6 +289,37 @@ public void start_CalledTwiceWithSameTxId_ThrowTransactionException() assertThatThrownBy(() -> manager.start(ANY_TX_ID)).isInstanceOf(TransactionException.class); } + @Test + public void + startReadOnly_NoArgumentGiven_ReturnConsensusCommitWithSomeTxIdAndSnapshotIsolationInReadOnlyMode() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.startReadOnly(); + + // Assert + verify(spied).begin(anyString(), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + + @Test + public void startReadOnly_TxIdGiven_ReturnWithSpecifiedTxIdAndSnapshotIsolationInReadOnlyMode() + throws TransactionException { + // Arrange + ConsensusCommitManager spied = spy(manager); + + // Act + DistributedTransaction transaction = spied.startReadOnly(ANY_TX_ID); + + // Assert + verify(spied).begin(eq(ANY_TX_ID), eq(Isolation.SNAPSHOT), eq(true)); + + assertThat(transaction).isInstanceOf(ReadOnlyDistributedTransaction.class); + } + @Test public void resume_CalledWithBegin_ReturnSameTransactionObject() throws TransactionException { // Arrange @@ -541,7 +603,7 @@ public void get_ShouldGet() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -553,7 +615,7 @@ public void get_ShouldGet() throws TransactionException { Optional actual = spied.get(get); // Assert - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); assertThat(actual).isEqualTo(Optional.of(result)); @@ -566,7 +628,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -575,7 +637,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -588,7 +650,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -597,7 +659,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).rollback(); } @@ -610,7 +672,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -619,7 +681,7 @@ public void get_CrudExceptionThrownByTransactionGet_ShouldThrowCrudException() // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -631,7 +693,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Get get = Get.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -640,7 +702,7 @@ public void get_CommitExceptionThrownByTransactionCommit_ShouldThrowUCrudExcepti // Act Assert assertThatThrownBy(() -> spied.get(get)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).get(get); verify(transaction).commit(); } @@ -651,7 +713,7 @@ public void scan_ShouldScan() throws TransactionException { DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder().namespace("ns").table("tbl").partitionKey(Key.ofInt("pk", 0)).build(); @@ -664,7 +726,7 @@ public void scan_ShouldScan() throws TransactionException { List actual = spied.scan(scan); // Assert - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).scan(scan); verify(transaction).commit(); assertThat(actual).isEqualTo(results); @@ -676,7 +738,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -706,7 +768,7 @@ public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() t assertThat(actual.one()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -717,7 +779,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -744,7 +806,7 @@ public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() assertThat(actual.all()).isEmpty(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -756,7 +818,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -791,7 +853,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul assertThat(iterator.hasNext()).isFalse(); actual.close(); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).commit(); verify(scanner).close(); } @@ -804,7 +866,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -818,7 +880,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Act Assert assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(transaction).rollback(); } @@ -830,7 +892,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -848,7 +910,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -860,7 +922,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -878,7 +940,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -890,7 +952,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul // Arrange DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); Scan scan = Scan.newBuilder() @@ -908,7 +970,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -921,7 +983,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitConflictException.class).when(transaction).commit(); Scan scan = @@ -938,7 +1000,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } @@ -951,7 +1013,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); Scan scan = @@ -968,7 +1030,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); } @@ -980,7 +1042,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul DistributedTransaction transaction = mock(DistributedTransaction.class); ConsensusCommitManager spied = spy(manager); - doReturn(transaction).when(spied).begin(); + doReturn(transaction).when(spied).beginReadOnly(); doThrow(CommitException.class).when(transaction).commit(); Scan scan = @@ -997,7 +1059,7 @@ public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResul TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); - verify(spied).begin(); + verify(spied).beginReadOnly(); verify(scanner).close(); verify(transaction).rollback(); } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index 689ede3bb9..e524942791 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -98,7 +98,8 @@ public void setUp() throws Exception { tableMetadataManager, false, mutationConditionsValidator, - parallelExecutor); + parallelExecutor, + false); // Arrange when(tableMetadataManager.getTransactionTableMetadata(any())) @@ -212,6 +213,44 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); } + @Test + public void + get_GetNotExistsInSnapshotAndRecordInStorageCommitted_InReadOnlyMode_ShouldReturnFromStorageAndUpdateSnapshot() + throws CrudException, ExecutionException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true); + + Get get = prepareGet(); + Get getForStorage = toGetForStorageFrom(get); + Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); + Optional transactionResult = expected.map(e -> (TransactionResult) e); + Snapshot.Key key = new Snapshot.Key(getForStorage); + when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); + when(storage.get(getForStorage)).thenReturn(expected); + when(snapshot.getResult(key, getForStorage)).thenReturn(transactionResult); + + // Act + Optional result = handler.get(get); + + // Assert + assertThat(result) + .isEqualTo( + Optional.of( + new FilteredResult( + expected.get(), Collections.emptyList(), TABLE_METADATA, false))); + verify(storage).get(getForStorage); + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); + } + @Test public void get_GetNotExistsInSnapshotAndRecordInStorageNotCommitted_ShouldThrowUncommittedRecordException() @@ -311,7 +350,8 @@ public void get_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSn Result result = prepareResult(TransactionState.COMMITTED); Optional expected = Optional.of(new TransactionResult(result)); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); when(storage.get(getForStorage)).thenReturn(Optional.of(result)); // Act @@ -380,6 +420,46 @@ void scanOrGetScanner_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn(ScanT .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); } + @ParameterizedTest + @EnumSource(ScanType.class) + void scanOrGetScanner_ResultGivenFromStorage_InReadOnlyMode_ShouldUpdateSnapshotAndReturn( + ScanType scanType) throws ExecutionException, CrudException { + // Arrange + handler = + new CrudHandler( + storage, + snapshot, + tableMetadataManager, + false, + mutationConditionsValidator, + parallelExecutor, + true); + + Scan scan = prepareScan(); + Scan scanForStorage = toScanForStorageFrom(scan); + result = prepareResult(TransactionState.COMMITTED); + Snapshot.Key key = new Snapshot.Key(scan, result); + TransactionResult expected = new TransactionResult(result); + if (scanType == ScanType.SCAN) { + when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); + } else { + when(scanner.one()).thenReturn(Optional.of(result)).thenReturn(Optional.empty()); + } + when(storage.scan(scanForStorage)).thenReturn(scanner); + when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); + + // Act + List results = scanOrGetScanner(scan, scanType); + + // Assert + verify(snapshot, never()).putIntoReadSet(any(), any()); + verify(snapshot).putIntoScanSet(scan, Maps.newLinkedHashMap(ImmutableMap.of(key, expected))); + verify(snapshot, never()).verifyNoOverlap(any(), any()); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)) + .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); + } + @ParameterizedTest @EnumSource(ScanType.class) void @@ -464,7 +544,8 @@ void scan_CalledTwiceUnderRealSnapshot_SecondTimeShouldReturnTheSameFromSnapshot result = prepareResult(TransactionState.COMMITTED); TransactionResult expected = new TransactionResult(result); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -531,7 +612,8 @@ void scanOrGetScanner_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorag Scan scan = toScanForStorageFrom(prepareScan()); result = prepareResult(TransactionState.COMMITTED); snapshot = new Snapshot(ANY_TX_ID, Isolation.SNAPSHOT, tableMetadataManager, parallelExecutor); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); } else { @@ -596,7 +678,8 @@ void scanOrGetScanner_CalledAfterDeleteUnderRealSnapshot_ShouldThrowIllegalArgum new HashMap<>(), deleteSet, new ArrayList<>()); - handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); + handler = + new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor, false); if (scanType == ScanType.SCAN) { when(scanner.iterator()).thenReturn(Arrays.asList(result, result2).iterator()); } else { diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java index e5216ce7e5..bf5abaae05 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java @@ -15,10 +15,7 @@ import com.scalar.db.io.Key; import java.util.Optional; import java.util.Properties; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; public abstract class ConsensusCommitIntegrationTestBase extends DistributedTransactionIntegrationTestBase { @@ -932,16 +929,4 @@ public void deleteAndDelete_forSameRecord_shouldWorkCorrectly() throws Transacti Optional optResult = get(prepareGet(0, 0)); assertThat(optResult).isNotPresent(); } - - @Disabled("Implement later") - @Override - @Test - public void get_GetGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecord() {} - - @Disabled("Implement later") - @Override - @ParameterizedTest - @EnumSource(ScanType.class) - public void scanOrGetScanner_ScanGivenForCommittedRecord_InReadOnlyMode_ShouldReturnRecords( - ScanType scanType) {} } diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java index e4a31e9673..9a7e1c59a9 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitSpecificIntegrationTestBase.java @@ -59,7 +59,6 @@ import java.util.Set; import java.util.stream.IntStream; import javax.annotation.Nullable; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -236,8 +235,7 @@ public void get_GetGivenForCommittedRecord_ShouldReturnRecord() throws Transacti // Assert assertThat(result.isPresent()).isTrue(); - Assertions.assertThat( - ((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) + assertThat(((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -254,7 +252,7 @@ public void scan_ScanGivenForCommittedRecord_ShouldReturnRecord() throws Transac // Assert assertThat(results.size()).isEqualTo(1); - Assertions.assertThat( + assertThat( ((TransactionResult) ((FilteredResult) results.get(0)).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -375,7 +373,7 @@ private void selection_SelectionGivenForPreparedWhenCoordinatorStateCommitted_Sh transaction.commit(); assertThat(result.getId()).isEqualTo(ongoingTxId); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(2); assertThat(result.getCommittedAt()).isGreaterThan(0); } @@ -440,7 +438,7 @@ private void selection_SelectionGivenForPreparedWhenCoordinatorStateAborted_Shou transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -551,7 +549,7 @@ public void scan_ScanGivenForPreparedWhenCoordinatorStateAborted_ShouldRollback( transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -636,7 +634,7 @@ public void get_GetGivenForPreparedWhenCoordinatorStateNotExistAndExpired_Should transaction.commit(); assertThat(result.getId()).isEqualTo(ongoingTxId); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(2); assertThat(result.getCommittedAt()).isGreaterThan(0); } @@ -722,7 +720,7 @@ public void get_GetGivenForPreparedWhenCoordinatorStateNotExistAndExpired_Should transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -847,7 +845,7 @@ private void selection_SelectionGivenForDeletedWhenCoordinatorStateAborted_Shoul transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -958,7 +956,7 @@ public void scan_ScanGivenForDeletedWhenCoordinatorStateAborted_ShouldRollback( transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -1117,7 +1115,7 @@ public void scan_ScanGivenForDeletedWhenCoordinatorStateNotExistAndExpired_Shoul transaction.commit(); assertThat(result.getId()).isEqualTo(ANY_ID_1); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); assertThat(result.getCommittedAt()).isEqualTo(1); } @@ -1192,7 +1190,7 @@ public void putAndCommit_PutGivenForNonExisting_ShouldCreateRecord() throws Tran assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1221,7 +1219,7 @@ public void putAndCommit_PutWithImplicitPreReadEnabledGivenForNonExisting_Should assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1251,7 +1249,7 @@ public void putAndCommit_PutGivenForExistingAfterRead_ShouldUpdateRecord() assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(2); } @@ -1282,7 +1280,7 @@ public void putAndCommit_PutWithImplicitPreReadEnabledGivenForExisting_ShouldUpd assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(2); } @@ -1327,7 +1325,7 @@ public void putAndCommit_PutWithInsertModeEnabledGivenForNonExisting_ShouldCreat assertThat(r).isPresent(); TransactionResult result = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(result)).isEqualTo(expected); - Assertions.assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(result.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(result.getVersion()).isEqualTo(1); } @@ -1361,7 +1359,7 @@ public void putAndCommit_PutWithInsertModeEnabledGivenForNonExistingAfterRead_Sh assertThat(r).isPresent(); TransactionResult actual = (TransactionResult) ((FilteredResult) r.get()).getOriginalResult(); assertThat(getBalance(actual)).isEqualTo(expected); - Assertions.assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); + assertThat(actual.getState()).isEqualTo(TransactionState.COMMITTED); assertThat(actual.getVersion()).isEqualTo(1); } @@ -2801,7 +2799,7 @@ public void scanAll_ScanAllGivenForCommittedRecord_ShouldReturnRecord() // Assert assertThat(results.size()).isEqualTo(1); - Assertions.assertThat( + assertThat( ((TransactionResult) ((FilteredResult) results.get(0)).getOriginalResult()).getState()) .isEqualTo(TransactionState.COMMITTED); } @@ -4709,6 +4707,567 @@ public void scanAndUpdate_ScanWithIndexGiven_ShouldUpdate() throws TransactionEx assertThat(actual2.get().getInt(BALANCE)).isEqualTo(2); } + @Test + public void get_GetGivenForCommittedRecord_InReadOnlyMode_WithSerializable_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act + Optional result = transaction.get(get); + transaction.commit(); + + // Assert + assertThat(result.isPresent()).isTrue(); + assertThat(((TransactionResult) ((FilteredResult) result.get()).getOriginalResult()).getState()) + .isEqualTo(TransactionState.COMMITTED); + } + + @Test + public void + get_GetGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act Assert + Optional result = transaction.get(get); + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + get_GetGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Get get = prepareGet(0, 0, namespace1, TABLE_1); + + // Act Assert + Optional result = transaction.get(get); + assertThat(result.isPresent()).isTrue(); + assertThat(getBalance(result.get())).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void scan_ScanGivenForCommittedRecord_InReadOnlyMode_WithSerializable_ShouldReturnRecord() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act + List results = transaction.scan(scan); + transaction.commit(); + + // Assert + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + transaction.commit(); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 5)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + transaction.commit(); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + scan_ScanGivenForCommittedRecord_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + populateRecords(namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + Scan scan = prepareScan(0, namespace1, TABLE_1); + + // Act Assert + List results = transaction.scan(scan); + + assertThat(results.size()).isEqualTo(4); + assertThat(results.get(0).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(0).getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(getBalance(results.get(0))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(1).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(1).getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(getBalance(results.get(1))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(2).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(2).getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(getBalance(results.get(2))).isEqualTo(INITIAL_BALANCE); + + assertThat(results.get(3).getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(results.get(3).getInt(ACCOUNT_TYPE)).isEqualTo(3); + assertThat(getBalance(results.get(3))).isEqualTo(INITIAL_BALANCE); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 5)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void getScanner_InReadOnlyMode_WithSerializable_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_ShouldNotThrowAnyException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatCode(transaction::commit).doesNotThrowAnyException(); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordUpdatedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.update( + Update.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, 1) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + getScanner_InReadOnlyMode_WhenRecordInsertedByAnotherTransaction_WithSerializable_ShouldThrowCommitConflictException() + throws TransactionException { + // Arrange + manager.mutate( + Arrays.asList( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 1)) + .intValue(BALANCE, INITIAL_BALANCE) + .build(), + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 2)) + .intValue(BALANCE, INITIAL_BALANCE) + .build())); + + Scan scan = prepareScan(0, namespace1, TABLE_1); + DistributedTransaction transaction = manager.beginReadOnly(Isolation.SERIALIZABLE); + + // Act Assert + TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan); + Optional result1 = scanner.one(); + assertThat(result1).isNotEmpty(); + assertThat(result1.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result1.get().getInt(ACCOUNT_TYPE)).isEqualTo(1); + assertThat(result1.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + Optional result2 = scanner.one(); + assertThat(result2).isNotEmpty(); + assertThat(result2.get().getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result2.get().getInt(ACCOUNT_TYPE)).isEqualTo(2); + assertThat(result2.get().getInt(BALANCE)).isEqualTo(INITIAL_BALANCE); + + scanner.close(); + + DistributedTransaction another = manager.begin(Isolation.SERIALIZABLE); + another.insert( + Insert.newBuilder() + .namespace(namespace1) + .table(TABLE_1) + .partitionKey(Key.ofInt(ACCOUNT_ID, 0)) + .clusteringKey(Key.ofInt(ACCOUNT_TYPE, 0)) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + another.commit(); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + private DistributedTransaction prepareTransfer( int fromId, String fromNamespace,