From 2189a5544af5704752c9bc8bf81f3317bb93acee Mon Sep 17 00:00:00 2001 From: brfrn169 Date: Thu, 10 Apr 2025 17:18:03 +0900 Subject: [PATCH] Implement scanner API for JDBC transaction --- .../jdbc/JdbcTransactionIntegrationTest.java | 10 - .../com/scalar/db/common/error/CoreError.java | 2 + .../scalar/db/storage/jdbc/JdbcService.java | 10 +- .../scalar/db/storage/jdbc/ScannerImpl.java | 16 +- .../db/transaction/jdbc/JdbcTransaction.java | 43 +- .../jdbc/JdbcTransactionManager.java | 91 ++++- .../db/storage/jdbc/JdbcDatabaseTest.java | 3 +- .../jdbc/JdbcTransactionManagerTest.java | 366 ++++++++++++++++++ .../transaction/jdbc/JdbcTransactionTest.java | 227 +++++++++++ 9 files changed, 748 insertions(+), 20 deletions(-) diff --git a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java index 0ca3b064b8..37bebaf726 100644 --- a/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/transaction/jdbc/JdbcTransactionIntegrationTest.java @@ -42,14 +42,4 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {} @Override @Test public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {} - - @Disabled("Implement later") - @Override - @Test - public void getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} - - @Disabled("Implement later") - @Override - @Test - public void manager_getScanner_ScanGivenForCommittedRecord_ShouldReturnRecords() {} } diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index acaa4566dd..1e465bd550 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -1182,6 +1182,8 @@ public enum CoreError implements ScalarDbError { Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""), DATA_LOADER_JSONLINES_FILE_READ_FAILED( Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""), + JDBC_TRANSACTION_GETTING_SCANNER_FAILED( + Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""), // // Errors for the unknown transaction status error category diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java index 82698ba5c9..852812ac94 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java @@ -96,9 +96,14 @@ public Optional get(Get get, Connection connection) } } - @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE") public Scanner getScanner(Scan scan, Connection connection) throws SQLException, ExecutionException { + return getScanner(scan, connection, true); + } + + @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE") + public Scanner getScanner(Scan scan, Connection connection, boolean closeConnectionOnScannerClose) + throws SQLException, ExecutionException { operationChecker.check(scan); TableMetadata tableMetadata = tableMetadataManager.getTableMetadata(scan); @@ -111,7 +116,8 @@ public Scanner getScanner(Scan scan, Connection connection) new ResultInterpreter(scan.getProjections(), tableMetadata, rdbEngine), connection, preparedStatement, - resultSet); + resultSet, + closeConnectionOnScannerClose); } public List scan(Scan scan, Connection connection) diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java b/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java index 3d48e2a2f9..d9dc38268e 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java @@ -25,17 +25,20 @@ public class ScannerImpl extends AbstractScanner { private final Connection connection; private final PreparedStatement preparedStatement; private final ResultSet resultSet; + private final boolean closeConnectionOnClose; @SuppressFBWarnings("EI_EXPOSE_REP2") public ScannerImpl( ResultInterpreter resultInterpreter, Connection connection, PreparedStatement preparedStatement, - ResultSet resultSet) { + ResultSet resultSet, + boolean closeConnectionOnClose) { this.resultInterpreter = Objects.requireNonNull(resultInterpreter); this.connection = Objects.requireNonNull(connection); this.preparedStatement = Objects.requireNonNull(preparedStatement); this.resultSet = Objects.requireNonNull(resultSet); + this.closeConnectionOnClose = closeConnectionOnClose; } @Override @@ -75,10 +78,13 @@ public void close() { } catch (SQLException e) { logger.warn("Failed to close the preparedStatement", e); } - try { - connection.close(); - } catch (SQLException e) { - logger.warn("Failed to close the connection", e); + + if (closeConnectionOnClose) { + try { + connection.close(); + } catch (SQLException e) { + logger.warn("Failed to close the connection", e); + } } } } diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransaction.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransaction.java index a469c47003..b29bf7ae31 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransaction.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransaction.java @@ -20,6 +20,7 @@ import com.scalar.db.api.UpdateIfExists; import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransaction; +import com.scalar.db.common.AbstractTransactionCrudOperableScanner; import com.scalar.db.common.error.CoreError; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.CommitConflictException; @@ -32,6 +33,7 @@ import com.scalar.db.storage.jdbc.JdbcService; import com.scalar.db.storage.jdbc.RdbEngineStrategy; import com.scalar.db.util.ScalarDbUtils; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.util.List; @@ -96,7 +98,46 @@ public List scan(Scan scan) throws CrudException { @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + scan = copyAndSetTargetToIfNot(scan); + + com.scalar.db.api.Scanner scanner; + try { + scanner = jdbcService.getScanner(scan, connection, false); + } catch (SQLException e) { + throw createCrudException( + e, CoreError.JDBC_TRANSACTION_GETTING_SCANNER_FAILED.buildMessage(e.getMessage())); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, txId); + } + + return new AbstractTransactionCrudOperableScanner() { + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, txId); + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (ExecutionException e) { + throw new CrudException(e.getMessage(), e, txId); + } + } + + @Override + public void close() throws CrudException { + try { + scanner.close(); + } catch (IOException e) { + throw new CrudException(e.getMessage(), e, txId); + } + } + }; } /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java index df7f02197d..d65b512106 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java @@ -12,10 +12,12 @@ import com.scalar.db.api.Result; import com.scalar.db.api.Scan; import com.scalar.db.api.SerializableStrategy; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.TransactionState; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.AbstractDistributedTransactionManager; +import com.scalar.db.common.AbstractTransactionManagerCrudOperableScanner; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; import com.scalar.db.common.error.CoreError; @@ -38,6 +40,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.dbcp2.BasicDataSource; import org.slf4j.Logger; @@ -170,9 +173,93 @@ public List scan(Scan scan) throws CrudException, UnknownTransactionStat @Override public Scanner getScanner(Scan scan) throws CrudException { - throw new UnsupportedOperationException("Implement later"); + DistributedTransaction transaction; + try { + transaction = begin(); + } catch (TransactionNotFoundException e) { + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (TransactionException e) { + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + + TransactionCrudOperable.Scanner scanner; + try { + scanner = transaction.getScanner(copyAndSetTargetToIfNot(scan)); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + return new AbstractTransactionManagerCrudOperableScanner() { + + private final AtomicBoolean closed = new AtomicBoolean(); + + @Override + public Optional one() throws CrudException { + try { + return scanner.one(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public List all() throws CrudException { + try { + return scanner.all(); + } catch (CrudException e) { + closed.set(true); + + try { + scanner.close(); + } catch (CrudException ex) { + e.addSuppressed(ex); + } + + rollbackTransaction(transaction); + throw e; + } + } + + @Override + public void close() throws CrudException, UnknownTransactionStatusException { + if (closed.get()) { + return; + } + closed.set(true); + + try { + scanner.close(); + } catch (CrudException e) { + rollbackTransaction(transaction); + throw e; + } + + try { + transaction.commit(); + } catch (CommitConflictException e) { + rollbackTransaction(transaction); + throw new CrudConflictException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } catch (UnknownTransactionStatusException e) { + throw e; + } catch (TransactionException e) { + rollbackTransaction(transaction); + throw new CrudException(e.getMessage(), e, e.getTransactionId().orElse(null)); + } + } + }; } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(Put put) throws CrudException, UnknownTransactionStatusException { @@ -183,6 +270,7 @@ public void put(Put put) throws CrudException, UnknownTransactionStatusException }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void put(List puts) throws CrudException, UnknownTransactionStatusException { @@ -229,6 +317,7 @@ public void delete(Delete delete) throws CrudException, UnknownTransactionStatus }); } + /** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */ @Deprecated @Override public void delete(List deletes) throws CrudException, UnknownTransactionStatusException { diff --git a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java index 82b40061a6..7710f6a426 100644 --- a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java +++ b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java @@ -98,7 +98,8 @@ public void whenGetOperationExecuted_shouldCallJdbcService() throws Exception { public void whenScanOperationExecutedAndScannerClosed_shouldCallJdbcService() throws Exception { // Arrange when(jdbcService.getScanner(any(), any())) - .thenReturn(new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet)); + .thenReturn( + new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet, true)); // Act Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE); diff --git a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java index 813d757334..80d0375e6d 100644 --- a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionManagerTest.java @@ -20,6 +20,8 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; +import com.scalar.db.api.TransactionManagerCrudOperable; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.common.ActiveTransactionManagedDistributedTransactionManager; @@ -41,6 +43,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import org.apache.commons.dbcp2.BasicDataSource; @@ -178,6 +181,369 @@ public void scan_withConflictError_shouldThrowCrudConflictException() .isInstanceOf(CrudConflictException.class); } + @Test + public void getScannerAndScannerOne_ShouldReturnScannerAndReturnProperResult() throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThat(actual.one()).hasValue(result1); + assertThat(actual.one()).hasValue(result2); + assertThat(actual.one()).hasValue(result3); + assertThat(actual.one()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerAll_ShouldReturnScannerAndReturnProperResults() throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.all()) + .thenReturn(Arrays.asList(result1, result2, result3)) + .thenReturn(Collections.emptyList()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + List results = actual.all(); + assertThat(results).containsExactly(result1, result2, result3); + assertThat(actual.all()).isEmpty(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerIterator_ShouldReturnScannerAndReturnProperResults() + throws Exception { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + + Iterator iterator = actual.iterator(); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result1); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result2); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result3); + assertThat(iterator.hasNext()).isFalse(); + actual.close(); + + verify(spied).begin(); + verify(transaction).commit(); + verify(scanner).close(); + } + + @Test + public void + getScanner_TransactionNotFoundExceptionThrownByTransactionBegin_ShouldThrowCrudConflictException() + throws TransactionException { + // Arrange + JdbcTransactionManager spied = spy(manager); + doThrow(TransactionNotFoundException.class).when(spied).begin(); + + Scan scan = mock(Scan.class); + + // Act Assert + assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + } + + @Test + public void getScanner_TransactionExceptionThrownByTransactionBegin_ShouldThrowCrudException() + throws TransactionException { + // Arrange + JdbcTransactionManager spied = spy(manager); + doThrow(TransactionException.class).when(spied).begin(); + + Scan scan = mock(Scan.class); + + // Act Assert + assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + } + + @Test + public void + getScanner_CrudExceptionThrownByTransactionGetScanner_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + when(transaction.getScanner(scan)).thenThrow(CrudException.class); + + // Act Assert + assertThatThrownBy(() -> spied.getScanner(scan)).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerOne_CrudExceptionThrownByScannerOne_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.one()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerAll_CrudExceptionThrownByScannerAll_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + when(scanner.all()).thenThrow(CrudException.class); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CrudExceptionThrownByScannerClose_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + doThrow(CrudException.class).when(scanner).close(); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_CommitConflictExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudConflictException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitConflictException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudConflictException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + + @Test + public void + getScannerAndScannerClose_UnknownTransactionStatusExceptionByTransactionCommit_ShouldThrowUnknownTransactionStatusException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(UnknownTransactionStatusException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(UnknownTransactionStatusException.class); + + verify(spied).begin(); + verify(scanner).close(); + } + + @Test + public void + getScannerAndScannerClose_CommitExceptionThrownByTransactionCommit_ShouldRollbackTransactionAndThrowCrudException() + throws TransactionException { + // Arrange + DistributedTransaction transaction = mock(DistributedTransaction.class); + + JdbcTransactionManager spied = spy(manager); + doReturn(transaction).when(spied).begin(); + doThrow(CommitException.class).when(transaction).commit(); + + Scan scan = + Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + TransactionCrudOperable.Scanner scanner = mock(TransactionCrudOperable.Scanner.class); + when(transaction.getScanner(scan)).thenReturn(scanner); + + // Act Assert + TransactionManagerCrudOperable.Scanner actual = spied.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + + verify(spied).begin(); + verify(scanner).close(); + verify(transaction).rollback(); + } + @Test public void whenPutOperationsExecutedAndJdbcServiceThrowsSQLException_shouldThrowCrudException() throws Exception { diff --git a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionTest.java b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionTest.java index e09b04f8e4..2423457186 100644 --- a/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionTest.java +++ b/core/src/test/java/com/scalar/db/transaction/jdbc/JdbcTransactionTest.java @@ -1,8 +1,10 @@ package com.scalar.db.transaction.jdbc; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -12,6 +14,10 @@ import com.scalar.db.api.Insert; import com.scalar.db.api.MutationCondition; import com.scalar.db.api.Put; +import com.scalar.db.api.Result; +import com.scalar.db.api.Scan; +import com.scalar.db.api.Scanner; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.api.Update; import com.scalar.db.api.Upsert; import com.scalar.db.exception.storage.ExecutionException; @@ -21,8 +27,12 @@ import com.scalar.db.io.Key; import com.scalar.db.storage.jdbc.JdbcService; import com.scalar.db.storage.jdbc.RdbEngineStrategy; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Optional; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +80,223 @@ public void setUp() throws Exception { transaction = new JdbcTransaction(ANY_TX_ID, jdbcService, connection, rdbEngineStrategy); } + @Test + public void getScannerAndScannerOne_ShouldReturnScannerAndShouldReturnProperResult() + throws SQLException, ExecutionException, CrudException, IOException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + Scanner scanner = mock(Scanner.class); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + assertThat(actual.one()).hasValue(result1); + assertThat(actual.one()).hasValue(result2); + assertThat(actual.one()).hasValue(result3); + assertThat(actual.one()).isEmpty(); + actual.close(); + + verify(jdbcService).getScanner(scan, connection, false); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerAll_ShouldReturnScannerAndShouldReturnProperResults() + throws SQLException, ExecutionException, CrudException, IOException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + Scanner scanner = mock(Scanner.class); + when(scanner.all()).thenReturn(Arrays.asList(result1, result2, result3)); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + assertThat(actual.all()).containsExactly(result1, result2, result3); + actual.close(); + + verify(jdbcService).getScanner(scan, connection, false); + verify(scanner).close(); + } + + @Test + public void getScannerAndScannerIterator_ShouldReturnScannerAndShouldReturnProperResults() + throws SQLException, ExecutionException, CrudException, IOException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Result result1 = mock(Result.class); + Result result2 = mock(Result.class); + Result result3 = mock(Result.class); + + Scanner scanner = mock(Scanner.class); + when(scanner.one()) + .thenReturn(Optional.of(result1)) + .thenReturn(Optional.of(result2)) + .thenReturn(Optional.of(result3)) + .thenReturn(Optional.empty()); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + + Iterator iterator = actual.iterator(); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result1); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result2); + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isEqualTo(result3); + assertThat(iterator.hasNext()).isFalse(); + actual.close(); + + verify(jdbcService).getScanner(scan, connection, false); + verify(scanner).close(); + } + + @Test + public void getScanner_WhenSQLExceptionThrownByJdbcService_ShouldThrowCrudException() + throws SQLException, ExecutionException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + when(jdbcService.getScanner(scan, connection, false)).thenThrow(SQLException.class); + + // Act Assert + assertThatThrownBy(() -> transaction.getScanner(scan)).isInstanceOf(CrudException.class); + } + + @Test + public void getScanner_WhenExecutionExceptionThrownByJdbcService_ShouldThrowCrudException() + throws SQLException, ExecutionException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + ExecutionException executionException = mock(ExecutionException.class); + when(executionException.getMessage()).thenReturn("error"); + when(jdbcService.getScanner(scan, connection, false)).thenThrow(executionException); + + // Act Assert + assertThatThrownBy(() -> transaction.getScanner(scan)).isInstanceOf(CrudException.class); + } + + @Test + public void + getScannerAndScannerOne_WhenExecutionExceptionThrownByScannerOne_ShouldThrowCrudException() + throws SQLException, ExecutionException, CrudException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Scanner scanner = mock(Scanner.class); + + ExecutionException executionException = mock(ExecutionException.class); + when(executionException.getMessage()).thenReturn("error"); + when(scanner.one()).thenThrow(executionException); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + assertThatThrownBy(actual::one).isInstanceOf(CrudException.class); + } + + @Test + public void + getScannerAndScannerAll_WhenExecutionExceptionThrownByScannerAll_ShouldThrowCrudException() + throws SQLException, ExecutionException, CrudException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Scanner scanner = mock(Scanner.class); + + ExecutionException executionException = mock(ExecutionException.class); + when(executionException.getMessage()).thenReturn("error"); + when(scanner.all()).thenThrow(executionException); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + assertThatThrownBy(actual::all).isInstanceOf(CrudException.class); + } + + @Test + public void + getScannerAndScannerClose_WhenIOExceptionThrownByScannerClose_ShouldThrowCrudException() + throws SQLException, ExecutionException, CrudException, IOException { + // Arrange + Scan scan = + Scan.newBuilder() + .namespace(ANY_NAMESPACE) + .table(ANY_TABLE_NAME) + .partitionKey(Key.ofText("p1", "val")) + .build(); + + Scanner scanner = mock(Scanner.class); + + IOException ioException = mock(IOException.class); + when(ioException.getMessage()).thenReturn("error"); + doThrow(ioException).when(scanner).close(); + + when(jdbcService.getScanner(scan, connection, false)).thenReturn(scanner); + + // Act Assert + TransactionCrudOperable.Scanner actual = transaction.getScanner(scan); + assertThatThrownBy(actual::close).isInstanceOf(CrudException.class); + } + @Test public void put_putDoesNotSucceed_shouldThrowUnsatisfiedConditionException() throws SQLException, ExecutionException {