Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ public enum CoreError implements ScalarDbError {
JDBC_ERROR_OCCURRED_IN_SELECTION(
Category.INTERNAL_ERROR, "0027", "An error occurred in the selection. Details: %s", "", ""),
JDBC_FETCHING_NEXT_RESULT_FAILED(
Category.INTERNAL_ERROR, "0028", "Fetching the next result failed", "", ""),
Category.INTERNAL_ERROR, "0028", "Fetching the next result failed. Details: %s", "", ""),
JDBC_TRANSACTION_ROLLING_BACK_TRANSACTION_FAILED(
Category.INTERNAL_ERROR, "0029", "Rolling back the transaction failed. Details: %s", "", ""),
JDBC_TRANSACTION_COMMITTING_TRANSACTION_FAILED(
Expand Down Expand Up @@ -1204,6 +1204,8 @@ public enum CoreError implements ScalarDbError {
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
JDBC_TRANSACTION_GETTING_SCANNER_FAILED(
Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""),
JDBC_CLOSING_SCANNER_FAILED(
Category.INTERNAL_ERROR, "0055", "Closing the scanner failed. Details: %s", "", ""),

//
// Errors for the unknown transaction status error category
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/com/scalar/db/config/DatabaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class DatabaseConfig {
private boolean crossPartitionScanFilteringEnabled;
private boolean crossPartitionScanOrderingEnabled;
private String systemNamespaceName;
private int scanFetchSize;

public static final String PREFIX = "scalar.db.";
public static final String CONTACT_POINTS = PREFIX + "contact_points";
Expand All @@ -56,9 +57,11 @@ public class DatabaseConfig {
public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";
public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name";
public static final String SCAN_FETCH_SIZE = PREFIX + "scan_fetch_size";

public static final int DEFAULT_METADATA_CACHE_EXPIRATION_TIME_SECS = 60;
public static final String DEFAULT_SYSTEM_NAMESPACE_NAME = "scalardb";
public static final int DEFAULT_SCAN_FETCH_SIZE = 10;

public DatabaseConfig(File propertiesFile) throws IOException {
try (FileInputStream stream = new FileInputStream(propertiesFile)) {
Expand Down Expand Up @@ -118,6 +121,8 @@ protected void load() {
}

systemNamespaceName = getSystemNamespaceName(getProperties());

scanFetchSize = getInt(getProperties(), SCAN_FETCH_SIZE, DEFAULT_SCAN_FETCH_SIZE);
}

public List<String> getContactPoints() {
Expand Down Expand Up @@ -172,6 +177,10 @@ public String getSystemNamespaceName() {
return systemNamespaceName;
}

public int getScanFetchSize() {
return scanFetchSize;
}

public static String getTransactionManager(Properties properties) {
return getString(properties, TRANSACTION_MANAGER, "consensus-commit");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Cassandra(DatabaseConfig config) {

handlers =
StatementHandlerManager.builder()
.select(new SelectStatementHandler(session))
.select(new SelectStatementHandler(session, config.getScanFetchSize()))
.insert(new InsertStatementHandler(session))
.update(new UpdateStatementHandler(session))
.delete(new DeleteStatementHandler(session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@
*/
@ThreadSafe
public class SelectStatementHandler extends StatementHandler {

private final int fetchSize;

/**
* Constructs {@code SelectStatementHandler} with the specified {@code Session}
*
* @param session session to be used with this statement
* @param fetchSize the number of rows to be fetched at once
*/
public SelectStatementHandler(Session session) {
public SelectStatementHandler(Session session, int fetchSize) {
super(session);
this.fetchSize = fetchSize;
}

@Override
Expand Down Expand Up @@ -94,6 +99,7 @@ protected BoundStatement bind(PreparedStatement prepared, Operation operation) {
@Override
@Nonnull
protected ResultSet execute(BoundStatement bound, Operation operation) {
bound.setFetchSize(fetchSize);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Cassandra, call Statement.setFetchSize().

return session.execute(bound);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public Cosmos(DatabaseConfig databaseConfig) {
new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs());
operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager);

selectStatementHandler = new SelectStatementHandler(client, metadataManager);
selectStatementHandler =
new SelectStatementHandler(client, metadataManager, databaseConfig.getScanFetchSize());
putStatementHandler = new PutStatementHandler(client, metadataManager);
deleteStatementHandler = new DeleteStatementHandler(client, metadataManager);
batchHandler = new BatchHandler(client, metadataManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
@ThreadSafe
public class SelectStatementHandler extends StatementHandler {

public SelectStatementHandler(CosmosClient client, TableMetadataManager metadataManager) {
private final int fetchSize;

public SelectStatementHandler(
CosmosClient client, TableMetadataManager metadataManager, int fetchSize) {
super(client, metadataManager);
this.fetchSize = fetchSize;
}

/**
Expand Down Expand Up @@ -85,9 +89,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE
return executeReadWithIndex(get, tableMetadata);
}

PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey();

if (get.getProjections().isEmpty()) {
String id = cosmosOperation.getId();
PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey();
Record record = getContainer(get).readItem(id, partitionKey, Record.class).getItem();
return new SingleRecordScanner(
record, new ResultInterpreter(get.getProjections(), tableMetadata));
Expand All @@ -100,8 +105,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE
.eq(cosmosOperation.getConcatenatedPartitionKey()),
DSL.field("r.id").eq(cosmosOperation.getId()))
.getSQL(ParamType.INLINED);
CosmosQueryRequestOptions options =
new CosmosQueryRequestOptions().setPartitionKey(partitionKey);

return executeQuery(get, tableMetadata, query);
return executeQuery(get, tableMetadata, query, options);
}

private Scanner executeReadWithIndex(Selection selection, TableMetadata tableMetadata)
Expand Down Expand Up @@ -327,8 +334,8 @@ private Scanner executeQuery(
CosmosQueryRequestOptions queryOptions) {
Iterator<FeedResponse<Record>> pagesIterator =
getContainer(selection)
.queryItems(query, queryOptions, Record.class)
.iterableByPage()
.queryItems(query, queryOptions.setMaxBufferedItemCount(fetchSize), Record.class)
.iterableByPage(fetchSize)
Comment on lines +337 to +338
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change for Cosmos.

.iterator();

return new ScannerImpl(
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public Dynamo(DatabaseConfig databaseConfig) {
operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager);

selectStatementHandler =
new SelectStatementHandler(client, metadataManager, config.getNamespacePrefix());
new SelectStatementHandler(
client,
metadataManager,
config.getNamespacePrefix(),
databaseConfig.getScanFetchSize());
putStatementHandler =
new PutStatementHandler(client, metadataManager, config.getNamespacePrefix());
deleteStatementHandler =
Expand Down
45 changes: 21 additions & 24 deletions core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ public class QueryScanner extends AbstractScanner {
private final ResultInterpreter resultInterpreter;

private Iterator<Map<String, AttributeValue>> itemsIterator;
private final int fetchSize;
@Nullable private Integer remainingLimit;
@Nullable private Map<String, AttributeValue> lastEvaluatedKey;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) {
public QueryScanner(
PaginatedRequest request, int fetchSize, int limit, ResultInterpreter resultInterpreter) {
this.request = request;
this.fetchSize = fetchSize;

if (limit > 0) {
remainingLimit = limit;
handleResponse(request.execute(limit));
handleResponse(request.execute(Math.min(fetchSize, limit)));
} else {
remainingLimit = null;
handleResponse(request.execute());
handleResponse(request.execute(fetchSize));
}

this.resultInterpreter = resultInterpreter;
Expand All @@ -54,15 +57,13 @@ private boolean hasNext() {
if (itemsIterator.hasNext()) {
return true;
}
if (lastEvaluatedKey != null) {
if (remainingLimit != null) {
handleResponse(request.execute(lastEvaluatedKey, remainingLimit));
} else {
handleResponse(request.execute(lastEvaluatedKey));
}
return itemsIterator.hasNext();
if (lastEvaluatedKey == null) {
return false;
}
return false;

int nextFetchSize = remainingLimit != null ? Math.min(fetchSize, remainingLimit) : fetchSize;
handleResponse(request.execute(lastEvaluatedKey, nextFetchSize));
return itemsIterator.hasNext();
}

private void handleResponse(PaginatedRequestResponse response) {
Expand All @@ -71,25 +72,21 @@ private void handleResponse(PaginatedRequestResponse response) {
remainingLimit -= items.size();
}
itemsIterator = items.iterator();
if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) {
lastEvaluatedKey = response.lastEvaluatedKey();
} else {
lastEvaluatedKey = null;
}

boolean shouldContinue =
(remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey();
lastEvaluatedKey = shouldContinue ? response.lastEvaluatedKey() : null;
}

@Override
@Nonnull
public List<Result> all() {
List<Result> ret = new ArrayList<>();
while (true) {
Optional<Result> one = one();
if (!one.isPresent()) {
break;
}
ret.add(one.get());
List<Result> results = new ArrayList<>();
Optional<Result> next;
while ((next = one()).isPresent()) {
results.add(next.get());
}
return ret;
return results;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class SelectStatementHandler {
private final DynamoDbClient client;
private final TableMetadataManager metadataManager;
private final String namespacePrefix;
private final int fetchSize;

/**
* Constructs a {@code SelectStatementHandler} with the specified {@link DynamoDbClient} and a new
Expand All @@ -62,10 +63,12 @@ public class SelectStatementHandler {
public SelectStatementHandler(
DynamoDbClient client,
TableMetadataManager metadataManager,
Optional<String> namespacePrefix) {
Optional<String> namespacePrefix,
int fetchSize) {
this.client = checkNotNull(client);
this.metadataManager = checkNotNull(metadataManager);
this.namespacePrefix = namespacePrefix.orElse("");
this.fetchSize = fetchSize;
}

@Nonnull
Expand Down Expand Up @@ -151,7 +154,10 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet
com.scalar.db.storage.dynamo.request.QueryRequest request =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata));
request,
fetchSize,
limit,
new ResultInterpreter(selection.getProjections(), tableMetadata));
}

private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
Expand Down Expand Up @@ -184,7 +190,10 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
com.scalar.db.storage.dynamo.request.QueryRequest queryRequest =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata));
queryRequest,
fetchSize,
scan.getLimit(),
new ResultInterpreter(scan.getProjections(), tableMetadata));
}

private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
Expand All @@ -205,6 +214,7 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build());
return new QueryScanner(
requestWrapper,
fetchSize,
scan.getLimit(),
new ResultInterpreter(scan.getProjections(), tableMetadata));
}
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public JdbcDatabase(DatabaseConfig databaseConfig) {
databaseConfig.getMetadataCacheExpirationTimeSecs());

OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager);
jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine);
jdbcService =
new JdbcService(
tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize());
}

@VisibleForTesting
Expand Down Expand Up @@ -98,9 +100,18 @@ public Scanner scan(Scan scan) throws ExecutionException {
Connection connection = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JDBC, call Connection.setAutoCommit(false).

rdbEngine.setReadOnly(connection, true);
return jdbcService.getScanner(scan, connection);
} catch (SQLException e) {
try {
if (connection != null) {
connection.rollback();
}
} catch (SQLException ex) {
e.addSuppressed(ex);
}

close(connection);
throw new ExecutionException(
CoreError.JDBC_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,34 @@ public class JdbcService {
private final OperationChecker operationChecker;
private final RdbEngineStrategy rdbEngine;
private final QueryBuilder queryBuilder;
private final int scanFetchSize;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public JdbcService(
TableMetadataManager tableMetadataManager,
OperationChecker operationChecker,
RdbEngineStrategy rdbEngine) {
this(tableMetadataManager, operationChecker, rdbEngine, new QueryBuilder(rdbEngine));
RdbEngineStrategy rdbEngine,
int scanFetchSize) {
this(
tableMetadataManager,
operationChecker,
rdbEngine,
new QueryBuilder(rdbEngine),
scanFetchSize);
}

@VisibleForTesting
JdbcService(
TableMetadataManager tableMetadataManager,
OperationChecker operationChecker,
RdbEngineStrategy rdbEngine,
QueryBuilder queryBuilder) {
QueryBuilder queryBuilder,
int scanFetchSize) {
this.tableMetadataManager = Objects.requireNonNull(tableMetadataManager);
this.operationChecker = Objects.requireNonNull(operationChecker);
this.rdbEngine = Objects.requireNonNull(rdbEngine);
this.queryBuilder = Objects.requireNonNull(queryBuilder);
this.scanFetchSize = scanFetchSize;
}

public Optional<Result> get(Get get, Connection connection)
Expand Down Expand Up @@ -102,7 +111,8 @@ public Scanner getScanner(Scan scan, Connection connection)
}

@SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE")
public Scanner getScanner(Scan scan, Connection connection, boolean closeConnectionOnScannerClose)
public Scanner getScanner(
Scan scan, Connection connection, boolean commitAndCloseConnectionOnScannerClose)
throws SQLException, ExecutionException {
operationChecker.check(scan);

Expand All @@ -111,13 +121,14 @@ public Scanner getScanner(Scan scan, Connection connection, boolean closeConnect
SelectQuery selectQuery = buildSelectQuery(scan, tableMetadata);
PreparedStatement preparedStatement = connection.prepareStatement(selectQuery.sql());
selectQuery.bind(preparedStatement);
preparedStatement.setFetchSize(scanFetchSize);
ResultSet resultSet = preparedStatement.executeQuery();
return new ScannerImpl(
new ResultInterpreter(scan.getProjections(), tableMetadata, rdbEngine),
connection,
preparedStatement,
resultSet,
closeConnectionOnScannerClose);
commitAndCloseConnectionOnScannerClose);
}

public List<Result> scan(Scan scan, Connection connection)
Expand Down
Loading