diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index b6e450107e..f6c02711b5 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -1125,7 +1125,7 @@ public enum CoreError implements ScalarDbError { JDBC_ERROR_OCCURRED_IN_SELECTION( Category.INTERNAL_ERROR, "0027", "An error occurred in the selection. Details: %s", "", ""), JDBC_FETCHING_NEXT_RESULT_FAILED( - Category.INTERNAL_ERROR, "0028", "Fetching the next result failed", "", ""), + Category.INTERNAL_ERROR, "0028", "Fetching the next result failed. Details: %s", "", ""), JDBC_TRANSACTION_ROLLING_BACK_TRANSACTION_FAILED( Category.INTERNAL_ERROR, "0029", "Rolling back the transaction failed. Details: %s", "", ""), JDBC_TRANSACTION_COMMITTING_TRANSACTION_FAILED( @@ -1204,6 +1204,8 @@ public enum CoreError implements ScalarDbError { Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""), JDBC_TRANSACTION_GETTING_SCANNER_FAILED( Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""), + JDBC_CLOSING_SCANNER_FAILED( + Category.INTERNAL_ERROR, "0055", "Closing the scanner failed. Details: %s", "", ""), // // Errors for the unknown transaction status error category diff --git a/core/src/main/java/com/scalar/db/config/DatabaseConfig.java b/core/src/main/java/com/scalar/db/config/DatabaseConfig.java index 88e960699f..593e2fc9bc 100644 --- a/core/src/main/java/com/scalar/db/config/DatabaseConfig.java +++ b/core/src/main/java/com/scalar/db/config/DatabaseConfig.java @@ -38,6 +38,7 @@ public class DatabaseConfig { private boolean crossPartitionScanFilteringEnabled; private boolean crossPartitionScanOrderingEnabled; private String systemNamespaceName; + private int scanFetchSize; public static final String PREFIX = "scalar.db."; public static final String CONTACT_POINTS = PREFIX + "contact_points"; @@ -56,9 +57,11 @@ public class DatabaseConfig { public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled"; public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled"; public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name"; + public static final String SCAN_FETCH_SIZE = PREFIX + "scan_fetch_size"; public static final int DEFAULT_METADATA_CACHE_EXPIRATION_TIME_SECS = 60; public static final String DEFAULT_SYSTEM_NAMESPACE_NAME = "scalardb"; + public static final int DEFAULT_SCAN_FETCH_SIZE = 10; public DatabaseConfig(File propertiesFile) throws IOException { try (FileInputStream stream = new FileInputStream(propertiesFile)) { @@ -118,6 +121,8 @@ protected void load() { } systemNamespaceName = getSystemNamespaceName(getProperties()); + + scanFetchSize = getInt(getProperties(), SCAN_FETCH_SIZE, DEFAULT_SCAN_FETCH_SIZE); } public List getContactPoints() { @@ -172,6 +177,10 @@ public String getSystemNamespaceName() { return systemNamespaceName; } + public int getScanFetchSize() { + return scanFetchSize; + } + public static String getTransactionManager(Properties properties) { return getString(properties, TRANSACTION_MANAGER, "consensus-commit"); } diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java b/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java index a779bac2de..4b29a804a6 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java @@ -56,7 +56,7 @@ public Cassandra(DatabaseConfig config) { handlers = StatementHandlerManager.builder() - .select(new SelectStatementHandler(session)) + .select(new SelectStatementHandler(session, config.getScanFetchSize())) .insert(new InsertStatementHandler(session)) .update(new UpdateStatementHandler(session)) .delete(new DeleteStatementHandler(session)) diff --git a/core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java b/core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java index d32168c52a..ed2315e0a5 100644 --- a/core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java @@ -41,13 +41,18 @@ */ @ThreadSafe public class SelectStatementHandler extends StatementHandler { + + private final int fetchSize; + /** * Constructs {@code SelectStatementHandler} with the specified {@code Session} * * @param session session to be used with this statement + * @param fetchSize the number of rows to be fetched at once */ - public SelectStatementHandler(Session session) { + public SelectStatementHandler(Session session, int fetchSize) { super(session); + this.fetchSize = fetchSize; } @Override @@ -94,6 +99,7 @@ protected BoundStatement bind(PreparedStatement prepared, Operation operation) { @Override @Nonnull protected ResultSet execute(BoundStatement bound, Operation operation) { + bound.setFetchSize(fetchSize); return session.execute(bound); } diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java b/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java index 4b07a50194..01f65c9aae 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java @@ -62,7 +62,8 @@ public Cosmos(DatabaseConfig databaseConfig) { new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs()); operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager); - selectStatementHandler = new SelectStatementHandler(client, metadataManager); + selectStatementHandler = + new SelectStatementHandler(client, metadataManager, databaseConfig.getScanFetchSize()); putStatementHandler = new PutStatementHandler(client, metadataManager); deleteStatementHandler = new DeleteStatementHandler(client, metadataManager); batchHandler = new BatchHandler(client, metadataManager); diff --git a/core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java b/core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java index 22d9db2d59..187dfa924b 100644 --- a/core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java @@ -44,8 +44,12 @@ @ThreadSafe public class SelectStatementHandler extends StatementHandler { - public SelectStatementHandler(CosmosClient client, TableMetadataManager metadataManager) { + private final int fetchSize; + + public SelectStatementHandler( + CosmosClient client, TableMetadataManager metadataManager, int fetchSize) { super(client, metadataManager); + this.fetchSize = fetchSize; } /** @@ -85,9 +89,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE return executeReadWithIndex(get, tableMetadata); } + PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey(); + if (get.getProjections().isEmpty()) { String id = cosmosOperation.getId(); - PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey(); Record record = getContainer(get).readItem(id, partitionKey, Record.class).getItem(); return new SingleRecordScanner( record, new ResultInterpreter(get.getProjections(), tableMetadata)); @@ -100,8 +105,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE .eq(cosmosOperation.getConcatenatedPartitionKey()), DSL.field("r.id").eq(cosmosOperation.getId())) .getSQL(ParamType.INLINED); + CosmosQueryRequestOptions options = + new CosmosQueryRequestOptions().setPartitionKey(partitionKey); - return executeQuery(get, tableMetadata, query); + return executeQuery(get, tableMetadata, query, options); } private Scanner executeReadWithIndex(Selection selection, TableMetadata tableMetadata) @@ -327,8 +334,8 @@ private Scanner executeQuery( CosmosQueryRequestOptions queryOptions) { Iterator> pagesIterator = getContainer(selection) - .queryItems(query, queryOptions, Record.class) - .iterableByPage() + .queryItems(query, queryOptions.setMaxBufferedItemCount(fetchSize), Record.class) + .iterableByPage(fetchSize) .iterator(); return new ScannerImpl( diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java b/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java index c3545323c6..511bd1eaff 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java @@ -77,7 +77,11 @@ public Dynamo(DatabaseConfig databaseConfig) { operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager); selectStatementHandler = - new SelectStatementHandler(client, metadataManager, config.getNamespacePrefix()); + new SelectStatementHandler( + client, + metadataManager, + config.getNamespacePrefix(), + databaseConfig.getScanFetchSize()); putStatementHandler = new PutStatementHandler(client, metadataManager, config.getNamespacePrefix()); deleteStatementHandler = diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java b/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java index f34f9a702d..fc13cd0137 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java @@ -22,19 +22,22 @@ public class QueryScanner extends AbstractScanner { private final ResultInterpreter resultInterpreter; private Iterator> itemsIterator; + private final int fetchSize; @Nullable private Integer remainingLimit; @Nullable private Map lastEvaluatedKey; @SuppressFBWarnings("EI_EXPOSE_REP2") - public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) { + public QueryScanner( + PaginatedRequest request, int fetchSize, int limit, ResultInterpreter resultInterpreter) { this.request = request; + this.fetchSize = fetchSize; if (limit > 0) { remainingLimit = limit; - handleResponse(request.execute(limit)); + handleResponse(request.execute(Math.min(fetchSize, limit))); } else { remainingLimit = null; - handleResponse(request.execute()); + handleResponse(request.execute(fetchSize)); } this.resultInterpreter = resultInterpreter; @@ -54,15 +57,13 @@ private boolean hasNext() { if (itemsIterator.hasNext()) { return true; } - if (lastEvaluatedKey != null) { - if (remainingLimit != null) { - handleResponse(request.execute(lastEvaluatedKey, remainingLimit)); - } else { - handleResponse(request.execute(lastEvaluatedKey)); - } - return itemsIterator.hasNext(); + if (lastEvaluatedKey == null) { + return false; } - return false; + + int nextFetchSize = remainingLimit != null ? Math.min(fetchSize, remainingLimit) : fetchSize; + handleResponse(request.execute(lastEvaluatedKey, nextFetchSize)); + return itemsIterator.hasNext(); } private void handleResponse(PaginatedRequestResponse response) { @@ -71,25 +72,21 @@ private void handleResponse(PaginatedRequestResponse response) { remainingLimit -= items.size(); } itemsIterator = items.iterator(); - if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) { - lastEvaluatedKey = response.lastEvaluatedKey(); - } else { - lastEvaluatedKey = null; - } + + boolean shouldContinue = + (remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey(); + lastEvaluatedKey = shouldContinue ? response.lastEvaluatedKey() : null; } @Override @Nonnull public List all() { - List ret = new ArrayList<>(); - while (true) { - Optional one = one(); - if (!one.isPresent()) { - break; - } - ret.add(one.get()); + List results = new ArrayList<>(); + Optional next; + while ((next = one()).isPresent()) { + results.add(next.get()); } - return ret; + return results; } @Override diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java b/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java index 7457d4b009..095955a1eb 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java @@ -49,6 +49,7 @@ public class SelectStatementHandler { private final DynamoDbClient client; private final TableMetadataManager metadataManager; private final String namespacePrefix; + private final int fetchSize; /** * Constructs a {@code SelectStatementHandler} with the specified {@link DynamoDbClient} and a new @@ -62,10 +63,12 @@ public class SelectStatementHandler { public SelectStatementHandler( DynamoDbClient client, TableMetadataManager metadataManager, - Optional namespacePrefix) { + Optional namespacePrefix, + int fetchSize) { this.client = checkNotNull(client); this.metadataManager = checkNotNull(metadataManager); this.namespacePrefix = namespacePrefix.orElse(""); + this.fetchSize = fetchSize; } @Nonnull @@ -151,7 +154,10 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet com.scalar.db.storage.dynamo.request.QueryRequest request = new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build()); return new QueryScanner( - request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata)); + request, + fetchSize, + limit, + new ResultInterpreter(selection.getProjections(), tableMetadata)); } private Scanner executeScan(Scan scan, TableMetadata tableMetadata) { @@ -184,7 +190,10 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) { com.scalar.db.storage.dynamo.request.QueryRequest queryRequest = new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build()); return new QueryScanner( - queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata)); + queryRequest, + fetchSize, + scan.getLimit(), + new ResultInterpreter(scan.getProjections(), tableMetadata)); } private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) { @@ -205,6 +214,7 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) { new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build()); return new QueryScanner( requestWrapper, + fetchSize, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata)); } diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java index 7a17ea1a34..ef85ce7e9a 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java @@ -59,7 +59,9 @@ public JdbcDatabase(DatabaseConfig databaseConfig) { databaseConfig.getMetadataCacheExpirationTimeSecs()); OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager); - jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine); + jdbcService = + new JdbcService( + tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize()); } @VisibleForTesting @@ -98,9 +100,18 @@ public Scanner scan(Scan scan) throws ExecutionException { Connection connection = null; try { connection = dataSource.getConnection(); + connection.setAutoCommit(false); rdbEngine.setReadOnly(connection, true); return jdbcService.getScanner(scan, connection); } catch (SQLException e) { + try { + if (connection != null) { + connection.rollback(); + } + } catch (SQLException ex) { + e.addSuppressed(ex); + } + close(connection); throw new ExecutionException( CoreError.JDBC_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e); diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java index 852812ac94..5e6fd1f2c1 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java @@ -44,13 +44,20 @@ public class JdbcService { private final OperationChecker operationChecker; private final RdbEngineStrategy rdbEngine; private final QueryBuilder queryBuilder; + private final int scanFetchSize; @SuppressFBWarnings("EI_EXPOSE_REP2") public JdbcService( TableMetadataManager tableMetadataManager, OperationChecker operationChecker, - RdbEngineStrategy rdbEngine) { - this(tableMetadataManager, operationChecker, rdbEngine, new QueryBuilder(rdbEngine)); + RdbEngineStrategy rdbEngine, + int scanFetchSize) { + this( + tableMetadataManager, + operationChecker, + rdbEngine, + new QueryBuilder(rdbEngine), + scanFetchSize); } @VisibleForTesting @@ -58,11 +65,13 @@ public JdbcService( TableMetadataManager tableMetadataManager, OperationChecker operationChecker, RdbEngineStrategy rdbEngine, - QueryBuilder queryBuilder) { + QueryBuilder queryBuilder, + int scanFetchSize) { this.tableMetadataManager = Objects.requireNonNull(tableMetadataManager); this.operationChecker = Objects.requireNonNull(operationChecker); this.rdbEngine = Objects.requireNonNull(rdbEngine); this.queryBuilder = Objects.requireNonNull(queryBuilder); + this.scanFetchSize = scanFetchSize; } public Optional get(Get get, Connection connection) @@ -102,7 +111,8 @@ public Scanner getScanner(Scan scan, Connection connection) } @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE") - public Scanner getScanner(Scan scan, Connection connection, boolean closeConnectionOnScannerClose) + public Scanner getScanner( + Scan scan, Connection connection, boolean commitAndCloseConnectionOnScannerClose) throws SQLException, ExecutionException { operationChecker.check(scan); @@ -111,13 +121,14 @@ public Scanner getScanner(Scan scan, Connection connection, boolean closeConnect SelectQuery selectQuery = buildSelectQuery(scan, tableMetadata); PreparedStatement preparedStatement = connection.prepareStatement(selectQuery.sql()); selectQuery.bind(preparedStatement); + preparedStatement.setFetchSize(scanFetchSize); ResultSet resultSet = preparedStatement.executeQuery(); return new ScannerImpl( new ResultInterpreter(scan.getProjections(), tableMetadata, rdbEngine), connection, preparedStatement, resultSet, - closeConnectionOnScannerClose); + commitAndCloseConnectionOnScannerClose); } public List scan(Scan scan, Connection connection) diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java index d0ac655f99..6e1d662650 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/RdbEngineMysql.java @@ -20,6 +20,8 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.util.Collections; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -444,4 +446,9 @@ public TimestampTZColumn parseTimestampTZColumn(ResultSet resultSet, String colu getTimeTypeStrategy() { return timeTypeEngine; } + + @Override + public Map getConnectionProperties() { + return Collections.singletonMap("useCursorFetch", "true"); + } } diff --git a/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java b/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java index d9dc38268e..44b11c8b0a 100644 --- a/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java +++ b/core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java @@ -5,6 +5,7 @@ import com.scalar.db.common.error.CoreError; import com.scalar.db.exception.storage.ExecutionException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -25,7 +26,7 @@ public class ScannerImpl extends AbstractScanner { private final Connection connection; private final PreparedStatement preparedStatement; private final ResultSet resultSet; - private final boolean closeConnectionOnClose; + private final boolean commitAndCloseConnectionOnClose; @SuppressFBWarnings("EI_EXPOSE_REP2") public ScannerImpl( @@ -33,12 +34,12 @@ public ScannerImpl( Connection connection, PreparedStatement preparedStatement, ResultSet resultSet, - boolean closeConnectionOnClose) { + boolean commitAndCloseConnectionOnClose) { this.resultInterpreter = Objects.requireNonNull(resultInterpreter); this.connection = Objects.requireNonNull(connection); this.preparedStatement = Objects.requireNonNull(preparedStatement); this.resultSet = Objects.requireNonNull(resultSet); - this.closeConnectionOnClose = closeConnectionOnClose; + this.commitAndCloseConnectionOnClose = commitAndCloseConnectionOnClose; } @Override @@ -49,7 +50,8 @@ public Optional one() throws ExecutionException { } return Optional.empty(); } catch (SQLException e) { - throw new ExecutionException(CoreError.JDBC_FETCHING_NEXT_RESULT_FAILED.buildMessage(), e); + throw new ExecutionException( + CoreError.JDBC_FETCHING_NEXT_RESULT_FAILED.buildMessage(e.getMessage()), e); } } @@ -62,12 +64,13 @@ public List all() throws ExecutionException { } return ret; } catch (SQLException e) { - throw new ExecutionException(CoreError.JDBC_FETCHING_NEXT_RESULT_FAILED.buildMessage(), e); + throw new ExecutionException( + CoreError.JDBC_FETCHING_NEXT_RESULT_FAILED.buildMessage(e.getMessage()), e); } } @Override - public void close() { + public void close() throws IOException { try { resultSet.close(); } catch (SQLException e) { @@ -79,11 +82,24 @@ public void close() { logger.warn("Failed to close the preparedStatement", e); } - if (closeConnectionOnClose) { + if (commitAndCloseConnectionOnClose) { try { - connection.close(); + connection.commit(); } catch (SQLException e) { - logger.warn("Failed to close the connection", e); + try { + connection.rollback(); + } catch (SQLException ex) { + e.addSuppressed(ex); + } + + throw new IOException( + CoreError.JDBC_CLOSING_SCANNER_FAILED.buildMessage(e.getMessage()), e); + } finally { + try { + connection.close(); + } catch (SQLException e) { + logger.warn("Failed to close the connection", e); + } } } } diff --git a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java index d65b512106..f0c78430e0 100644 --- a/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java +++ b/core/src/main/java/com/scalar/db/transaction/jdbc/JdbcTransactionManager.java @@ -70,7 +70,9 @@ public JdbcTransactionManager(DatabaseConfig databaseConfig) { databaseConfig.getMetadataCacheExpirationTimeSecs()); OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager); - jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine); + jdbcService = + new JdbcService( + tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize()); } @VisibleForTesting diff --git a/core/src/test/java/com/scalar/db/config/DatabaseConfigTest.java b/core/src/test/java/com/scalar/db/config/DatabaseConfigTest.java index 5a370f88a4..0f4b157dfa 100644 --- a/core/src/test/java/com/scalar/db/config/DatabaseConfigTest.java +++ b/core/src/test/java/com/scalar/db/config/DatabaseConfigTest.java @@ -41,6 +41,7 @@ public void constructor_PropertiesWithoutPortGiven_ShouldLoadProperly() { assertThat(config.isCrossPartitionScanEnabled()).isFalse(); assertThat(config.isCrossPartitionScanFilteringEnabled()).isFalse(); assertThat(config.isCrossPartitionScanOrderingEnabled()).isFalse(); + assertThat(config.getScanFetchSize()).isEqualTo(10); } @Test @@ -69,6 +70,7 @@ public void constructor_PropertiesWithoutUsernameGiven_ShouldLoadProperly() { assertThat(config.isCrossPartitionScanEnabled()).isFalse(); assertThat(config.isCrossPartitionScanFilteringEnabled()).isFalse(); assertThat(config.isCrossPartitionScanOrderingEnabled()).isFalse(); + assertThat(config.getScanFetchSize()).isEqualTo(10); } @Test @@ -97,6 +99,7 @@ public void constructor_PropertiesWithoutPasswordGiven_ShouldLoadProperly() { assertThat(config.isCrossPartitionScanEnabled()).isFalse(); assertThat(config.isCrossPartitionScanFilteringEnabled()).isFalse(); assertThat(config.isCrossPartitionScanOrderingEnabled()).isFalse(); + assertThat(config.getScanFetchSize()).isEqualTo(10); } @Test @@ -127,6 +130,7 @@ public void constructor_PropertiesWithPortGiven_ShouldLoadProperly() { assertThat(config.isCrossPartitionScanEnabled()).isFalse(); assertThat(config.isCrossPartitionScanFilteringEnabled()).isFalse(); assertThat(config.isCrossPartitionScanOrderingEnabled()).isFalse(); + assertThat(config.getScanFetchSize()).isEqualTo(10); } @Test @@ -402,4 +406,17 @@ public void constructor_PropertiesWithCrossPartitionScanSettingsGiven_ShouldLoad assertThatThrownBy(() -> new DatabaseConfig(props)) .isInstanceOf(IllegalArgumentException.class); } + + @Test + public void constructor_PropertiesWithScanFetchSizeGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(DatabaseConfig.SCAN_FETCH_SIZE, "1000"); + + // Act + DatabaseConfig config = new DatabaseConfig(props); + + // Assert + assertThat(config.getScanFetchSize()).isEqualTo(1000); + } } diff --git a/core/src/test/java/com/scalar/db/storage/cassandra/SelectStatementHandlerTest.java b/core/src/test/java/com/scalar/db/storage/cassandra/SelectStatementHandlerTest.java index 93077ef7e0..be58d56385 100644 --- a/core/src/test/java/com/scalar/db/storage/cassandra/SelectStatementHandlerTest.java +++ b/core/src/test/java/com/scalar/db/storage/cassandra/SelectStatementHandlerTest.java @@ -32,6 +32,7 @@ import org.mockito.MockitoAnnotations; public class SelectStatementHandlerTest { + private static final int FETCH_SIZE = 10; private static final String ANY_NAMESPACE_NAME = "namespace"; private static final String ANY_TABLE_NAME = "table_name"; private static final String ANY_NAME_1 = "name1"; @@ -55,7 +56,7 @@ public class SelectStatementHandlerTest { public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); - handler = new SelectStatementHandler(session); + handler = new SelectStatementHandler(session, FETCH_SIZE); } private Get prepareGet() { @@ -497,7 +498,7 @@ public void setConsistency_ScanOperationWithEventualConsistencyGiven_ShouldPrepa public void handle_DriverExceptionThrown_ShouldThrowProperExecutionException() { // Arrange get = prepareGetWithClusteringKey(); - SelectStatementHandler spy = Mockito.spy(new SelectStatementHandler(session)); + SelectStatementHandler spy = Mockito.spy(new SelectStatementHandler(session, FETCH_SIZE)); doReturn(prepared).when(spy).prepare(get); doReturn(bound).when(spy).bind(prepared, get); @@ -525,7 +526,7 @@ public void checkArgument_WrongOperationGiven_ShouldThrowIllegalArgumentExceptio @Test public void constructor_NullGiven_ShouldThrowNullPointerException() { // Act Assert - assertThatThrownBy(() -> new SelectStatementHandler(null)) + assertThatThrownBy(() -> new SelectStatementHandler(null, FETCH_SIZE)) .isInstanceOf(NullPointerException.class); } @@ -595,4 +596,17 @@ public void prepare_ScanAllOperationWithProjectedColumns_ShouldPrepareProperQuer // Assert verify(session).prepare(expected); } + + @Test + public void execute_ShouldSetFetchSizeAndExecute() { + // Arrange + Get get = prepareGet(); + + // Act + handler.execute(bound, get); + + // Assert + verify(bound).setFetchSize(FETCH_SIZE); + verify(session).execute(bound); + } } diff --git a/core/src/test/java/com/scalar/db/storage/cosmos/SelectStatementHandlerTest.java b/core/src/test/java/com/scalar/db/storage/cosmos/SelectStatementHandlerTest.java index ecfb3488ae..e54d17e636 100644 --- a/core/src/test/java/com/scalar/db/storage/cosmos/SelectStatementHandlerTest.java +++ b/core/src/test/java/com/scalar/db/storage/cosmos/SelectStatementHandlerTest.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; @@ -17,6 +18,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.util.CosmosPagedIterable; import com.scalar.db.api.Get; @@ -31,13 +33,17 @@ import com.scalar.db.io.Key; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedHashSet; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; public class SelectStatementHandlerTest { + private static final int FETCH_SIZE = 10; private static final String ANY_NAMESPACE_NAME = "namespace"; private static final String ANY_TABLE_NAME = "table"; private static final String ANY_NAME_1 = "name1"; @@ -60,12 +66,16 @@ public class SelectStatementHandlerTest { @Mock private TableMetadata metadata; @Mock private CosmosItemResponse response; @Mock private CosmosPagedIterable responseIterable; + @Mock private Iterable> pagesIterable; + @Mock private Iterator> pagesIterator; + + @Captor ArgumentCaptor cosmosQueryRequestOptionsCaptor; @BeforeEach public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); - handler = new SelectStatementHandler(client, metadataManager); + handler = new SelectStatementHandler(client, metadataManager, FETCH_SIZE); when(client.getDatabase(anyString())).thenReturn(database); when(database.getContainer(anyString())).thenReturn(container); @@ -91,6 +101,7 @@ private Get prepareGet() { private Scan prepareScan() { Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); + cosmosPartitionKey = new PartitionKey(ANY_TEXT_1); return new Scan(partitionKey).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); } @@ -119,8 +130,8 @@ public void handle_GetOperationWithIndexGiven_ShouldCallQueryItems() { // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key indexKey = new Key(ANY_NAME_3, ANY_TEXT_3); Get get = new Get(indexKey).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); String query = @@ -130,7 +141,12 @@ public void handle_GetOperationWithIndexGiven_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(get)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -172,8 +188,8 @@ public void handle_ScanOperationGiven_ShouldCallQueryItems() { // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan(); String query = @@ -187,7 +203,14 @@ public void handle_ScanOperationGiven_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -195,8 +218,8 @@ public void handle_ScanOperationWithIndexGiven_ShouldCallQueryItems() { // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key indexKey = new Key(ANY_NAME_3, ANY_TEXT_3); Scan scan = new Scan(indexKey).forNamespace(ANY_NAMESPACE_NAME).forTable(ANY_TABLE_NAME); @@ -207,7 +230,12 @@ public void handle_ScanOperationWithIndexGiven_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -231,8 +259,8 @@ public void handle_ScanOperationWithSingleClusteringKey_ShouldCallQueryItemsWith // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -258,7 +286,14 @@ public void handle_ScanOperationWithSingleClusteringKey_ShouldCallQueryItemsWith assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -270,8 +305,8 @@ public void handle_ScanOperationWithMultipleClusteringKeys_ShouldCallQueryItemsW when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -307,7 +342,14 @@ public void handle_ScanOperationWithMultipleClusteringKeys_ShouldCallQueryItemsW assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -315,8 +357,8 @@ public void handle_ScanOperationWithNeitherInclusive_ShouldCallQueryItemsWithPro // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -342,7 +384,14 @@ public void handle_ScanOperationWithNeitherInclusive_ShouldCallQueryItemsWithPro assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -350,8 +399,8 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -375,7 +424,14 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -384,8 +440,8 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -409,7 +465,14 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -422,8 +485,8 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -450,7 +513,14 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -463,8 +533,8 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -491,7 +561,14 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryItemsWithPro assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -499,8 +576,8 @@ public void handle_ScanAllOperationWithLimit_ShouldCallQueryItemsWithProperQuery // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll().withLimit(ANY_LIMIT); // Act Assert @@ -509,7 +586,11 @@ public void handle_ScanAllOperationWithLimit_ShouldCallQueryItemsWithProperQuery // Assert String expectedQuery = "select * from Record r offset 0 limit " + ANY_LIMIT; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -517,8 +598,8 @@ public void handle_ScanAllOperationWithoutLimit_ShouldCallQueryItemsWithProperQu // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll(); // Act Assert @@ -527,7 +608,11 @@ public void handle_ScanAllOperationWithoutLimit_ShouldCallQueryItemsWithProperQu // Assert String expectedQuery = "select * from Record r"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -535,8 +620,8 @@ public void handle_GetOperationWithProjectedColumns_ShouldCallQueryItemsWithProj // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Get get = prepareGet().withProjections(Arrays.asList(ANY_NAME_3, ANY_NAME_4)); // Act Assert @@ -556,7 +641,13 @@ public void handle_GetOperationWithProjectedColumns_ShouldCallQueryItemsWithProj + ANY_TEXT_2 + "')"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -565,8 +656,8 @@ public void handle_GetOperationWithProjectedColumns_ShouldCallQueryItemsWithProj // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Get get = prepareGet().withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_2)); // Act Assert @@ -587,7 +678,13 @@ public void handle_GetOperationWithProjectedColumns_ShouldCallQueryItemsWithProj + ANY_TEXT_2 + "')"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -595,8 +692,8 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key indexKey = new Key(ANY_NAME_3, ANY_TEXT_3); Get get = new Get(indexKey) @@ -618,7 +715,12 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems assertThatCode(() -> handler.handle(get)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -627,8 +729,8 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll().withProjections(Arrays.asList(ANY_NAME_3, ANY_NAME_4)); // Act Assert @@ -641,7 +743,11 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems + "{\"name3\":r.values[\"name3\"],\"name4\":r.values[\"name4\"]} as values " + "from Record r"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -650,8 +756,8 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll().withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_2)); // Act Assert @@ -665,7 +771,11 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems + "{\"name2\":r.clusteringKey[\"name2\"]} as clusteringKey " + "from Record r"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -674,8 +784,8 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll().withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_4)); // Act Assert @@ -689,7 +799,11 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems + "{\"name4\":r.values[\"name4\"]} as values " + "from Record r"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -698,8 +812,8 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = prepareScanAll().withProjections(Arrays.asList(ANY_NAME_2, ANY_NAME_4)); // Act Assert @@ -713,7 +827,11 @@ public void handle_GetOperationWithIndexGivenAndProjections_ShouldCallQueryItems + "{\"name4\":r.values[\"name4\"]} as values " + "from Record r"; verify(container) - .queryItems(eq(expectedQuery), any(CosmosQueryRequestOptions.class), eq(Record.class)); + .queryItems(eq(expectedQuery), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -721,8 +839,8 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key indexKey = new Key(ANY_NAME_3, ANY_TEXT_3); Scan scan = @@ -745,7 +863,12 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -754,8 +877,8 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { // Arrange when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Scan scan = prepareScan() @@ -783,7 +906,14 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -799,14 +929,15 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key partitionKey = Key.of(ANY_NAME_1, ANY_TEXT_1, ANY_NAME_2, ANY_TEXT_2); Scan scan = new Scan(partitionKey) .withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_2, ANY_NAME_3, ANY_NAME_4)) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + cosmosPartitionKey = new PartitionKey(ANY_TEXT_1 + ":" + ANY_TEXT_2); String query = "select r.id, " @@ -821,7 +952,14 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scan)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -837,8 +975,8 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); ScanAll scanAll = new ScanAll() .withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_2, ANY_NAME_3, ANY_NAME_4)) @@ -856,7 +994,12 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(scanAll)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } @Test @@ -872,8 +1015,8 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class))) .thenReturn(responseIterable); - Record expected = new Record(); - when(responseIterable.iterator()).thenReturn(Collections.singletonList(expected).iterator()); + when(responseIterable.iterableByPage(anyInt())).thenReturn(pagesIterable); + when(pagesIterable.iterator()).thenReturn(pagesIterator); Key partitionKey = Key.of(ANY_NAME_1, ANY_TEXT_1, ANY_NAME_2, ANY_TEXT_2); Key clusteringKey = Key.of(ANY_NAME_3, ANY_TEXT_3, ANY_NAME_4, ANY_TEXT_4); Get get = @@ -881,6 +1024,7 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { .withProjections(Arrays.asList(ANY_NAME_1, ANY_NAME_2, ANY_NAME_3, ANY_NAME_4)) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); + cosmosPartitionKey = new PartitionKey(ANY_TEXT_1 + ":" + ANY_TEXT_2); String query = "select r.id, " @@ -894,6 +1038,13 @@ public void handle_ScanOperationWithIndexAndProjected_ShouldCallQueryItems() { assertThatCode(() -> handler.handle(get)).doesNotThrowAnyException(); // Assert - verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class)); + verify(container) + .queryItems(eq(query), cosmosQueryRequestOptionsCaptor.capture(), eq(Record.class)); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getPartitionKey()) + .isEqualTo(cosmosPartitionKey); + assertThat(cosmosQueryRequestOptionsCaptor.getValue().getMaxBufferedItemCount()) + .isEqualTo(FETCH_SIZE); + verify(responseIterable).iterableByPage(FETCH_SIZE); + verify(pagesIterable).iterator(); } } diff --git a/core/src/test/java/com/scalar/db/storage/cosmos/StatementHandlerTest.java b/core/src/test/java/com/scalar/db/storage/cosmos/StatementHandlerTest.java index e6d6b7e91d..e8a8069a85 100644 --- a/core/src/test/java/com/scalar/db/storage/cosmos/StatementHandlerTest.java +++ b/core/src/test/java/com/scalar/db/storage/cosmos/StatementHandlerTest.java @@ -9,6 +9,9 @@ import org.mockito.MockitoAnnotations; public class StatementHandlerTest { + + private static final int FETCH_SIZE = 10; + @Mock private TableMetadataManager metadataManager; @BeforeEach @@ -19,7 +22,7 @@ public void setUp() throws Exception { @Test public void constructor_NullGiven_ShouldThrowNullPointerException() { // Act Assert - assertThatThrownBy(() -> new SelectStatementHandler(null, metadataManager)) + assertThatThrownBy(() -> new SelectStatementHandler(null, metadataManager, FETCH_SIZE)) .isInstanceOf(NullPointerException.class); } } diff --git a/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java b/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java index 1e40781dba..bae4b2cca8 100644 --- a/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java +++ b/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java @@ -24,6 +24,8 @@ public class QueryScannerTest { + private static final int FETCH_SIZE = 2; + @Mock PaginatedRequest request; @Mock private ResultInterpreter resultInterpreter; @Mock private PaginatedRequestResponse response; @@ -38,68 +40,69 @@ public void setUp() throws Exception { public void one_ShouldReturnResult() { // Arrange Map item = Collections.emptyMap(); - List> items = Arrays.asList(item, item, item); - when(request.execute()).thenReturn(response); - when(response.items()).thenReturn(items); + Map lastEvaluatedKey = Collections.emptyMap(); + when(request.execute(FETCH_SIZE)).thenReturn(response); + when(response.items()).thenReturn(Arrays.asList(item, item)); + when(response.hasLastEvaluatedKey()).thenReturn(false); + when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); when(resultInterpreter.interpret(item)).thenReturn(result); - QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, FETCH_SIZE, 0, resultInterpreter); // Act Optional actual1 = queryScanner.one(); Optional actual2 = queryScanner.one(); Optional actual3 = queryScanner.one(); - Optional actual4 = queryScanner.one(); // Assert assertThat(actual1).isPresent(); assertThat(actual1.get()).isEqualTo(result); assertThat(actual2).isPresent(); assertThat(actual2.get()).isEqualTo(result); - assertThat(actual3).isPresent(); - assertThat(actual3.get()).isEqualTo(result); - assertThat(actual4).isNotPresent(); + assertThat(actual3).isNotPresent(); - verify(resultInterpreter, times(3)).interpret(item); - verify(request).execute(); + verify(resultInterpreter, times(2)).interpret(item); + verify(request).execute(FETCH_SIZE); } @Test public void all_ShouldReturnResults() { // Arrange Map item = Collections.emptyMap(); - List> items = Arrays.asList(item, item, item); - when(request.execute()).thenReturn(response); - when(response.items()).thenReturn(items); + Map lastEvaluatedKey = Collections.emptyMap(); + when(request.execute(FETCH_SIZE)).thenReturn(response); + when(response.items()).thenReturn(Arrays.asList(item, item)); + when(response.hasLastEvaluatedKey()).thenReturn(false); + when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); when(resultInterpreter.interpret(item)).thenReturn(result); - - QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, FETCH_SIZE, 0, resultInterpreter); // Act List results1 = queryScanner.all(); List results2 = queryScanner.all(); // Assert - assertThat(results1.size()).isEqualTo(3); + assertThat(results1.size()).isEqualTo(2); assertThat(results1.get(0)).isEqualTo(result); assertThat(results1.get(1)).isEqualTo(result); - assertThat(results1.get(2)).isEqualTo(result); assertThat(results2).isEmpty(); - verify(resultInterpreter, times(3)).interpret(item); - verify(request).execute(); + verify(resultInterpreter, times(2)).interpret(item); + verify(request).execute(FETCH_SIZE); } @Test public void iterator_ShouldReturnResults() { // Arrange Map item = Collections.emptyMap(); - List> items = Arrays.asList(item, item, item); - when(response.items()).thenReturn(items); + Map lastEvaluatedKey = Collections.emptyMap(); + when(request.execute(FETCH_SIZE)).thenReturn(response); + when(response.items()).thenReturn(Arrays.asList(item, item)); + when(response.hasLastEvaluatedKey()).thenReturn(false); + when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); when(resultInterpreter.interpret(item)).thenReturn(result); - when(request.execute()).thenReturn(response); - QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, FETCH_SIZE, 0, resultInterpreter); // Act Iterator iterator = queryScanner.iterator(); @@ -109,37 +112,34 @@ public void iterator_ShouldReturnResults() { assertThat(iterator.next()).isEqualTo(result); assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isEqualTo(result); - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isEqualTo(result); assertThat(iterator.hasNext()).isFalse(); assertThatThrownBy(iterator::next).isInstanceOf(NoSuchElementException.class); - verify(resultInterpreter, times(3)).interpret(item); - verify(request).execute(); + verify(resultInterpreter, times(2)).interpret(item); + verify(request).execute(FETCH_SIZE); } @Test public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() { // Arrange Map item = Collections.emptyMap(); - List> items = Arrays.asList(item, item); Map lastEvaluatedKey = Collections.emptyMap(); - - when(response.items()).thenReturn(items).thenReturn(items); + when(request.execute(FETCH_SIZE)).thenReturn(response); + when(request.execute(lastEvaluatedKey, FETCH_SIZE)).thenReturn(response); + when(response.items()) + .thenReturn(Arrays.asList(item, item)) + .thenReturn(Collections.singletonList(item)); when(response.hasLastEvaluatedKey()).thenReturn(true).thenReturn(false); when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); when(resultInterpreter.interpret(item)).thenReturn(result); - when(request.execute()).thenReturn(response); - when(request.execute(lastEvaluatedKey)).thenReturn(response); - QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, FETCH_SIZE, 0, resultInterpreter); // Act Optional actual1 = queryScanner.one(); Optional actual2 = queryScanner.one(); Optional actual3 = queryScanner.one(); Optional actual4 = queryScanner.one(); - Optional actual5 = queryScanner.one(); // Assert assertThat(actual1).isPresent(); @@ -148,13 +148,11 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() { assertThat(actual2.get()).isEqualTo(result); assertThat(actual3).isPresent(); assertThat(actual3.get()).isEqualTo(result); - assertThat(actual4).isPresent(); - assertThat(actual4.get()).isEqualTo(result); - assertThat(actual5).isNotPresent(); + assertThat(actual4).isNotPresent(); - verify(resultInterpreter, times(4)).interpret(item); - verify(request).execute(lastEvaluatedKey); - verify(request).execute(); + verify(resultInterpreter, times(3)).interpret(item); + verify(request).execute(FETCH_SIZE); + verify(request).execute(lastEvaluatedKey, FETCH_SIZE); } @Test @@ -168,13 +166,13 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu Map lastEvaluatedKey = Collections.emptyMap(); when(response.items()).thenReturn(items1).thenReturn(items2); - when(response.hasLastEvaluatedKey()).thenReturn(true); + when(response.hasLastEvaluatedKey()).thenReturn(true).thenReturn(false); when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); - when(request.execute(limit)).thenReturn(response); + when(request.execute(FETCH_SIZE)).thenReturn(response); when(request.execute(lastEvaluatedKey, limit - items1.size())).thenReturn(response); when(resultInterpreter.interpret(item)).thenReturn(result); - QueryScanner queryScanner = new QueryScanner(request, limit, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, FETCH_SIZE, limit, resultInterpreter); // Act Optional actual1 = queryScanner.one(); @@ -192,7 +190,7 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu assertThat(actual4).isNotPresent(); verify(resultInterpreter, times(limit)).interpret(item); - verify(request).execute(limit); + verify(request).execute(FETCH_SIZE); verify(request).execute(lastEvaluatedKey, limit - items1.size()); } } diff --git a/core/src/test/java/com/scalar/db/storage/dynamo/SelectStatementHandlerTestBase.java b/core/src/test/java/com/scalar/db/storage/dynamo/SelectStatementHandlerTestBase.java index e9aa363617..01a35e3931 100644 --- a/core/src/test/java/com/scalar/db/storage/dynamo/SelectStatementHandlerTestBase.java +++ b/core/src/test/java/com/scalar/db/storage/dynamo/SelectStatementHandlerTestBase.java @@ -49,6 +49,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanResponse; public abstract class SelectStatementHandlerTestBase { + private static final int FETCH_SIZE = 10; private static final String ANY_NAMESPACE_NAME = "namespace"; private static final String ANY_TABLE_NAME = "table"; private static final String ANY_NAME_1 = "name1"; @@ -74,7 +75,7 @@ public abstract class SelectStatementHandlerTestBase { public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); - handler = new SelectStatementHandler(client, metadataManager, getNamespacePrefix()); + handler = new SelectStatementHandler(client, metadataManager, getNamespacePrefix(), FETCH_SIZE); when(metadataManager.getTableMetadata(any(Operation.class))).thenReturn(metadata); when(metadata.getPartitionKeyNames()) @@ -1877,7 +1878,7 @@ public void handle_ScanOperationWithOrderingAndLimit_ShouldCallQueryWithProperRe assertThat(actualRequest.keyConditionExpression()).isEqualTo(expectedCondition); assertThat(actualRequest.expressionAttributeValues()).isEqualTo(expectedBindMap); assertThat(actualRequest.scanIndexForward()).isNull(); - assertThat(actualRequest.limit()).isEqualTo(ANY_LIMIT); + assertThat(actualRequest.limit()).isEqualTo(FETCH_SIZE); assertThat(actualRequest.tableName()).isEqualTo(getFullTableName()); } @@ -1927,7 +1928,7 @@ public void handle_ScanOperationWithMultipleOrderings_ShouldCallQueryWithProperR assertThat(actualRequest.keyConditionExpression()).isEqualTo(expectedCondition); assertThat(actualRequest.expressionAttributeValues()).isEqualTo(expectedBindMap); assertThat(actualRequest.scanIndexForward()).isNull(); - assertThat(actualRequest.limit()).isEqualTo(ANY_LIMIT); + assertThat(actualRequest.limit()).isEqualTo(FETCH_SIZE); assertThat(actualRequest.tableName()).isEqualTo(getFullTableName()); } @@ -1946,7 +1947,7 @@ public void prepare_ScanAllOperationWithLimit_ShouldPrepareProperQuery() { ArgumentCaptor captor = ArgumentCaptor.forClass(ScanRequest.class); verify(client).scan(captor.capture()); ScanRequest actualRequest = captor.getValue(); - assertThat(actualRequest.limit()).isEqualTo(ANY_LIMIT); + assertThat(actualRequest.limit()).isEqualTo(FETCH_SIZE); assertThat(actualRequest.tableName()).isEqualTo(getFullTableName()); } @@ -1965,7 +1966,7 @@ public void prepare_ScanAllOperationWithoutLimit_ShouldPrepareProperQuery() { ArgumentCaptor captor = ArgumentCaptor.forClass(ScanRequest.class); verify(client).scan(captor.capture()); ScanRequest actualRequest = captor.getValue(); - assertThat(actualRequest.limit()).isEqualTo(null); + assertThat(actualRequest.limit()).isEqualTo(FETCH_SIZE); assertThat(actualRequest.tableName()).isEqualTo(getFullTableName()); } diff --git a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java index 7710f6a426..3bcf119d2b 100644 --- a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java +++ b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcDatabaseTest.java @@ -2,6 +2,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -17,6 +18,7 @@ import com.scalar.db.exception.storage.NoMutationException; import com.scalar.db.exception.storage.RetriableExecutionException; import com.scalar.db.io.Key; +import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -89,7 +91,8 @@ public void whenGetOperationExecuted_shouldCallJdbcService() throws Exception { Get get = new Get(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE); jdbcDatabase.get(get); }) - .isInstanceOf(ExecutionException.class); + .isInstanceOf(ExecutionException.class) + .hasCause(sqlException); verify(connection).setReadOnly(true); verify(connection).close(); } @@ -107,8 +110,10 @@ public void whenScanOperationExecutedAndScannerClosed_shouldCallJdbcService() th scanner.close(); // Assert + verify(connection).setAutoCommit(false); verify(connection).setReadOnly(true); verify(jdbcService).getScanner(any(), any()); + verify(connection).commit(); verify(connection).close(); } @@ -125,8 +130,35 @@ public void whenScanOperationExecutedAndScannerClosed_shouldCallJdbcService() th Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE); jdbcDatabase.scan(scan); }) - .isInstanceOf(ExecutionException.class); + .isInstanceOf(ExecutionException.class) + .hasCause(sqlException); + verify(connection).setAutoCommit(false); verify(connection).setReadOnly(true); + verify(connection).rollback(); + verify(connection).close(); + } + + @Test + public void + whenScanOperationExecutedAndScannerClosed_SQLExceptionThrownByConnectionCommit_shouldThrowIOException() + throws Exception { + // Arrange + when(jdbcService.getScanner(any(), any())) + .thenReturn( + new ScannerImpl(resultInterpreter, connection, preparedStatement, resultSet, true)); + doThrow(sqlException).when(connection).commit(); + + // Act + Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE); + Scanner scanner = jdbcDatabase.scan(scan); + assertThatThrownBy(scanner::close).isInstanceOf(IOException.class).hasCause(sqlException); + + // Assert + verify(connection).setAutoCommit(false); + verify(connection).setReadOnly(true); + verify(jdbcService).getScanner(any(), any()); + verify(connection).commit(); + verify(connection).rollback(); verify(connection).close(); } @@ -187,7 +219,8 @@ public void whenPutOperationExecuted_shouldCallJdbcService() throws Exception { .forTable(TABLE); jdbcDatabase.put(put); }) - .isInstanceOf(ExecutionException.class); + .isInstanceOf(ExecutionException.class) + .hasCause(sqlException); verify(connection).close(); } @@ -240,7 +273,8 @@ public void whenDeleteOperationExecuted_shouldCallJdbcService() throws Exception new Delete(new Key("p1", "val1")).forNamespace(NAMESPACE).forTable(TABLE); jdbcDatabase.delete(delete); }) - .isInstanceOf(ExecutionException.class); + .isInstanceOf(ExecutionException.class) + .hasCause(sqlException); verify(connection).close(); } @@ -259,7 +293,9 @@ public void whenMutateOperationExecuted_shouldCallJdbcService() throws Exception jdbcDatabase.mutate(Arrays.asList(put, delete)); // Assert + verify(connection).setAutoCommit(false); verify(jdbcService).mutate(any(), any()); + verify(connection).commit(); verify(connection).close(); } @@ -287,6 +323,9 @@ public void whenMutateOperationExecuted_shouldCallJdbcService() throws Exception jdbcDatabase.mutate(Arrays.asList(put, delete)); }) .isInstanceOf(NoMutationException.class); + verify(connection).setAutoCommit(false); + verify(jdbcService).mutate(any(), any()); + verify(connection).rollback(); verify(connection).close(); } @@ -309,7 +348,11 @@ public void whenMutateOperationExecuted_shouldCallJdbcService() throws Exception new Delete(new Key("p1", "val1")).forNamespace(NAMESPACE).forTable(TABLE); jdbcDatabase.mutate(Arrays.asList(put, delete)); }) - .isInstanceOf(ExecutionException.class); + .isInstanceOf(ExecutionException.class) + .hasCause(sqlException); + verify(connection).setAutoCommit(false); + verify(jdbcService).mutate(any(), any()); + verify(connection).rollback(); verify(connection).close(); } @@ -332,7 +375,11 @@ public void mutate_withConflictError_shouldThrowRetriableExecutionException() new Delete(new Key("p1", "val1")).forNamespace(NAMESPACE).forTable(TABLE); jdbcDatabase.mutate(Arrays.asList(put, delete)); }) - .isInstanceOf(RetriableExecutionException.class); + .isInstanceOf(RetriableExecutionException.class) + .hasCause(sqlException); + verify(connection).setAutoCommit(false); + verify(jdbcService).mutate(any(), any()); + verify(connection).rollback(); verify(connection).close(); } } diff --git a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcServiceTest.java b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcServiceTest.java index 6cd6869d58..03313b4409 100644 --- a/core/src/test/java/com/scalar/db/storage/jdbc/JdbcServiceTest.java +++ b/core/src/test/java/com/scalar/db/storage/jdbc/JdbcServiceTest.java @@ -22,6 +22,7 @@ import com.scalar.db.api.PutIfNotExists; import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; +import com.scalar.db.api.Scanner; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.TableMetadataManager; import com.scalar.db.common.checker.OperationChecker; @@ -47,6 +48,7 @@ public class JdbcServiceTest { + private static final int SCAN_FETCH_SIZE = 10; private static final String NAMESPACE = "ns"; private static final String TABLE = "tbl"; @@ -78,7 +80,9 @@ public class JdbcServiceTest { @BeforeEach public void setUp() throws Exception { MockitoAnnotations.openMocks(this).close(); - jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine, queryBuilder); + jdbcService = + new JdbcService( + tableMetadataManager, operationChecker, rdbEngine, queryBuilder, SCAN_FETCH_SIZE); // Arrange when(tableMetadataManager.getTableMetadata(any(Operation.class))) @@ -129,11 +133,15 @@ public void whenGetScannerExecuted_withScan_shouldCallQueryBuilder() throws Exce // Act Scan scan = new Scan(new Key("p1", "val")).forNamespace(NAMESPACE).forTable(TABLE); - jdbcService.getScanner(scan, connection); + Scanner scanner = jdbcService.getScanner(scan, connection); // Assert verify(operationChecker).check(any(Scan.class)); verify(queryBuilder).select(any()); + verify(preparedStatement).setFetchSize(SCAN_FETCH_SIZE); + + assertThat(scanner).isNotNull(); + assertThat(scanner).isInstanceOf(ScannerImpl.class); } @Test @@ -153,11 +161,15 @@ public void whenGetScannerExecuted_withScanAll_shouldCallQueryBuilder() throws E // Act Scan scan = new ScanAll().forNamespace(NAMESPACE).forTable(TABLE); - jdbcService.getScanner(scan, connection); + Scanner scanner = jdbcService.getScanner(scan, connection); // Assert verify(operationChecker).check(any(ScanAll.class)); verify(queryBuilder).select(any()); + verify(preparedStatement).setFetchSize(SCAN_FETCH_SIZE); + + assertThat(scanner).isNotNull(); + assertThat(scanner).isInstanceOf(ScannerImpl.class); } @Test @@ -184,7 +196,7 @@ public void whenGetScannerExecuted_withCrossPartitionScan_shouldCallQueryBuilder .all() .where(ConditionBuilder.column("column").isEqualToInt(10)) .build(); - jdbcService.getScanner(scan, connection); + Scanner scanner = jdbcService.getScanner(scan, connection); // Assert verify(operationChecker).check(any(ScanAll.class)); @@ -193,6 +205,10 @@ public void whenGetScannerExecuted_withCrossPartitionScan_shouldCallQueryBuilder verify(queryBuilder.select(any())).where(anySet()); verify(queryBuilder.select(any())).orderBy(anyList()); verify(queryBuilder.select(any())).limit(anyInt()); + verify(preparedStatement).setFetchSize(SCAN_FETCH_SIZE); + + assertThat(scanner).isNotNull(); + assertThat(scanner).isInstanceOf(ScannerImpl.class); } @Test diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java index b15a292c86..bd9ad6da28 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java @@ -1977,7 +1977,8 @@ public void scan_ScanLargeDataWithOrdering_ShouldRetrieveExpectedValues() } @Test - public void scan_ScanLargeDataWithLimit_ShouldRetrieveExpectedValues() throws ExecutionException { + public void scan_ScanLargeDataWithLimit_ShouldRetrieveExpectedValues() + throws ExecutionException, IOException { // Arrange int recordCount = 345; int limit = 234; @@ -2003,7 +2004,7 @@ public void scan_ScanLargeDataWithLimit_ShouldRetrieveExpectedValues() throws Ex .build(); // Act - List results = storage.scan(scan).all(); + List results = scanAll(scan); // Assert assertThat(results.size()).isEqualTo(limit); @@ -2328,7 +2329,7 @@ public void scan_ScanAllLargeData_ShouldRetrieveExpectedValues() @Test public void scan_ScanAllLargeDataWithLimit_ShouldRetrieveExpectedValues() - throws ExecutionException { + throws ExecutionException, IOException { // Arrange int recordCount = 345; int limit = 234; @@ -2349,7 +2350,7 @@ public void scan_ScanAllLargeDataWithLimit_ShouldRetrieveExpectedValues() Scan scan = Scan.newBuilder().namespace(namespace).table(TABLE).all().limit(limit).build(); // Act - List results = storage.scan(scan).all(); + List results = scanAll(scan); // Assert assertThat(results.size()).isEqualTo(limit);