Skip to content

Commit 4c5686c

Browse files
authored
Add support for scan fetch size in storage adapters (#2731)
1 parent b61607a commit 4c5686c

23 files changed

+524
-193
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,7 @@ public enum CoreError implements ScalarDbError {
11251125
JDBC_ERROR_OCCURRED_IN_SELECTION(
11261126
Category.INTERNAL_ERROR, "0027", "An error occurred in the selection. Details: %s", "", ""),
11271127
JDBC_FETCHING_NEXT_RESULT_FAILED(
1128-
Category.INTERNAL_ERROR, "0028", "Fetching the next result failed", "", ""),
1128+
Category.INTERNAL_ERROR, "0028", "Fetching the next result failed. Details: %s", "", ""),
11291129
JDBC_TRANSACTION_ROLLING_BACK_TRANSACTION_FAILED(
11301130
Category.INTERNAL_ERROR, "0029", "Rolling back the transaction failed. Details: %s", "", ""),
11311131
JDBC_TRANSACTION_COMMITTING_TRANSACTION_FAILED(
@@ -1204,6 +1204,8 @@ public enum CoreError implements ScalarDbError {
12041204
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
12051205
JDBC_TRANSACTION_GETTING_SCANNER_FAILED(
12061206
Category.INTERNAL_ERROR, "0054", "Getting the scanner failed. Details: %s", "", ""),
1207+
JDBC_CLOSING_SCANNER_FAILED(
1208+
Category.INTERNAL_ERROR, "0055", "Closing the scanner failed. Details: %s", "", ""),
12071209

12081210
//
12091211
// Errors for the unknown transaction status error category

core/src/main/java/com/scalar/db/config/DatabaseConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DatabaseConfig {
3838
private boolean crossPartitionScanFilteringEnabled;
3939
private boolean crossPartitionScanOrderingEnabled;
4040
private String systemNamespaceName;
41+
private int scanFetchSize;
4142

4243
public static final String PREFIX = "scalar.db.";
4344
public static final String CONTACT_POINTS = PREFIX + "contact_points";
@@ -56,9 +57,11 @@ public class DatabaseConfig {
5657
public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
5758
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";
5859
public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name";
60+
public static final String SCAN_FETCH_SIZE = PREFIX + "scan_fetch_size";
5961

6062
public static final int DEFAULT_METADATA_CACHE_EXPIRATION_TIME_SECS = 60;
6163
public static final String DEFAULT_SYSTEM_NAMESPACE_NAME = "scalardb";
64+
public static final int DEFAULT_SCAN_FETCH_SIZE = 10;
6265

6366
public DatabaseConfig(File propertiesFile) throws IOException {
6467
try (FileInputStream stream = new FileInputStream(propertiesFile)) {
@@ -118,6 +121,8 @@ protected void load() {
118121
}
119122

120123
systemNamespaceName = getSystemNamespaceName(getProperties());
124+
125+
scanFetchSize = getInt(getProperties(), SCAN_FETCH_SIZE, DEFAULT_SCAN_FETCH_SIZE);
121126
}
122127

123128
public List<String> getContactPoints() {
@@ -172,6 +177,10 @@ public String getSystemNamespaceName() {
172177
return systemNamespaceName;
173178
}
174179

180+
public int getScanFetchSize() {
181+
return scanFetchSize;
182+
}
183+
175184
public static String getTransactionManager(Properties properties) {
176185
return getString(properties, TRANSACTION_MANAGER, "consensus-commit");
177186
}

core/src/main/java/com/scalar/db/storage/cassandra/Cassandra.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Cassandra(DatabaseConfig config) {
5656

5757
handlers =
5858
StatementHandlerManager.builder()
59-
.select(new SelectStatementHandler(session))
59+
.select(new SelectStatementHandler(session, config.getScanFetchSize()))
6060
.insert(new InsertStatementHandler(session))
6161
.update(new UpdateStatementHandler(session))
6262
.delete(new DeleteStatementHandler(session))

core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,18 @@
4141
*/
4242
@ThreadSafe
4343
public class SelectStatementHandler extends StatementHandler {
44+
45+
private final int fetchSize;
46+
4447
/**
4548
* Constructs {@code SelectStatementHandler} with the specified {@code Session}
4649
*
4750
* @param session session to be used with this statement
51+
* @param fetchSize the number of rows to be fetched at once
4852
*/
49-
public SelectStatementHandler(Session session) {
53+
public SelectStatementHandler(Session session, int fetchSize) {
5054
super(session);
55+
this.fetchSize = fetchSize;
5156
}
5257

5358
@Override
@@ -94,6 +99,7 @@ protected BoundStatement bind(PreparedStatement prepared, Operation operation) {
9499
@Override
95100
@Nonnull
96101
protected ResultSet execute(BoundStatement bound, Operation operation) {
102+
bound.setFetchSize(fetchSize);
97103
return session.execute(bound);
98104
}
99105

core/src/main/java/com/scalar/db/storage/cosmos/Cosmos.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public Cosmos(DatabaseConfig databaseConfig) {
6262
new CosmosAdmin(client, config), databaseConfig.getMetadataCacheExpirationTimeSecs());
6363
operationChecker = new CosmosOperationChecker(databaseConfig, metadataManager);
6464

65-
selectStatementHandler = new SelectStatementHandler(client, metadataManager);
65+
selectStatementHandler =
66+
new SelectStatementHandler(client, metadataManager, databaseConfig.getScanFetchSize());
6667
putStatementHandler = new PutStatementHandler(client, metadataManager);
6768
deleteStatementHandler = new DeleteStatementHandler(client, metadataManager);
6869
batchHandler = new BatchHandler(client, metadataManager);

core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,12 @@
4444
@ThreadSafe
4545
public class SelectStatementHandler extends StatementHandler {
4646

47-
public SelectStatementHandler(CosmosClient client, TableMetadataManager metadataManager) {
47+
private final int fetchSize;
48+
49+
public SelectStatementHandler(
50+
CosmosClient client, TableMetadataManager metadataManager, int fetchSize) {
4851
super(client, metadataManager);
52+
this.fetchSize = fetchSize;
4953
}
5054

5155
/**
@@ -85,9 +89,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE
8589
return executeReadWithIndex(get, tableMetadata);
8690
}
8791

92+
PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey();
93+
8894
if (get.getProjections().isEmpty()) {
8995
String id = cosmosOperation.getId();
90-
PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey();
9196
Record record = getContainer(get).readItem(id, partitionKey, Record.class).getItem();
9297
return new SingleRecordScanner(
9398
record, new ResultInterpreter(get.getProjections(), tableMetadata));
@@ -100,8 +105,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE
100105
.eq(cosmosOperation.getConcatenatedPartitionKey()),
101106
DSL.field("r.id").eq(cosmosOperation.getId()))
102107
.getSQL(ParamType.INLINED);
108+
CosmosQueryRequestOptions options =
109+
new CosmosQueryRequestOptions().setPartitionKey(partitionKey);
103110

104-
return executeQuery(get, tableMetadata, query);
111+
return executeQuery(get, tableMetadata, query, options);
105112
}
106113

107114
private Scanner executeReadWithIndex(Selection selection, TableMetadata tableMetadata)
@@ -327,8 +334,8 @@ private Scanner executeQuery(
327334
CosmosQueryRequestOptions queryOptions) {
328335
Iterator<FeedResponse<Record>> pagesIterator =
329336
getContainer(selection)
330-
.queryItems(query, queryOptions, Record.class)
331-
.iterableByPage()
337+
.queryItems(query, queryOptions.setMaxBufferedItemCount(fetchSize), Record.class)
338+
.iterableByPage(fetchSize)
332339
.iterator();
333340

334341
return new ScannerImpl(

core/src/main/java/com/scalar/db/storage/dynamo/Dynamo.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ public Dynamo(DatabaseConfig databaseConfig) {
7777
operationChecker = new DynamoOperationChecker(databaseConfig, metadataManager);
7878

7979
selectStatementHandler =
80-
new SelectStatementHandler(client, metadataManager, config.getNamespacePrefix());
80+
new SelectStatementHandler(
81+
client,
82+
metadataManager,
83+
config.getNamespacePrefix(),
84+
databaseConfig.getScanFetchSize());
8185
putStatementHandler =
8286
new PutStatementHandler(client, metadataManager, config.getNamespacePrefix());
8387
deleteStatementHandler =

core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@ public class QueryScanner extends AbstractScanner {
2222
private final ResultInterpreter resultInterpreter;
2323

2424
private Iterator<Map<String, AttributeValue>> itemsIterator;
25+
private final int fetchSize;
2526
@Nullable private Integer remainingLimit;
2627
@Nullable private Map<String, AttributeValue> lastEvaluatedKey;
2728

2829
@SuppressFBWarnings("EI_EXPOSE_REP2")
29-
public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) {
30+
public QueryScanner(
31+
PaginatedRequest request, int fetchSize, int limit, ResultInterpreter resultInterpreter) {
3032
this.request = request;
33+
this.fetchSize = fetchSize;
3134

3235
if (limit > 0) {
3336
remainingLimit = limit;
34-
handleResponse(request.execute(limit));
37+
handleResponse(request.execute(Math.min(fetchSize, limit)));
3538
} else {
3639
remainingLimit = null;
37-
handleResponse(request.execute());
40+
handleResponse(request.execute(fetchSize));
3841
}
3942

4043
this.resultInterpreter = resultInterpreter;
@@ -54,15 +57,13 @@ private boolean hasNext() {
5457
if (itemsIterator.hasNext()) {
5558
return true;
5659
}
57-
if (lastEvaluatedKey != null) {
58-
if (remainingLimit != null) {
59-
handleResponse(request.execute(lastEvaluatedKey, remainingLimit));
60-
} else {
61-
handleResponse(request.execute(lastEvaluatedKey));
62-
}
63-
return itemsIterator.hasNext();
60+
if (lastEvaluatedKey == null) {
61+
return false;
6462
}
65-
return false;
63+
64+
int nextFetchSize = remainingLimit != null ? Math.min(fetchSize, remainingLimit) : fetchSize;
65+
handleResponse(request.execute(lastEvaluatedKey, nextFetchSize));
66+
return itemsIterator.hasNext();
6667
}
6768

6869
private void handleResponse(PaginatedRequestResponse response) {
@@ -71,25 +72,21 @@ private void handleResponse(PaginatedRequestResponse response) {
7172
remainingLimit -= items.size();
7273
}
7374
itemsIterator = items.iterator();
74-
if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) {
75-
lastEvaluatedKey = response.lastEvaluatedKey();
76-
} else {
77-
lastEvaluatedKey = null;
78-
}
75+
76+
boolean shouldContinue =
77+
(remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey();
78+
lastEvaluatedKey = shouldContinue ? response.lastEvaluatedKey() : null;
7979
}
8080

8181
@Override
8282
@Nonnull
8383
public List<Result> all() {
84-
List<Result> ret = new ArrayList<>();
85-
while (true) {
86-
Optional<Result> one = one();
87-
if (!one.isPresent()) {
88-
break;
89-
}
90-
ret.add(one.get());
84+
List<Result> results = new ArrayList<>();
85+
Optional<Result> next;
86+
while ((next = one()).isPresent()) {
87+
results.add(next.get());
9188
}
92-
return ret;
89+
return results;
9390
}
9491

9592
@Override

core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class SelectStatementHandler {
4949
private final DynamoDbClient client;
5050
private final TableMetadataManager metadataManager;
5151
private final String namespacePrefix;
52+
private final int fetchSize;
5253

5354
/**
5455
* Constructs a {@code SelectStatementHandler} with the specified {@link DynamoDbClient} and a new
@@ -62,10 +63,12 @@ public class SelectStatementHandler {
6263
public SelectStatementHandler(
6364
DynamoDbClient client,
6465
TableMetadataManager metadataManager,
65-
Optional<String> namespacePrefix) {
66+
Optional<String> namespacePrefix,
67+
int fetchSize) {
6668
this.client = checkNotNull(client);
6769
this.metadataManager = checkNotNull(metadataManager);
6870
this.namespacePrefix = namespacePrefix.orElse("");
71+
this.fetchSize = fetchSize;
6972
}
7073

7174
@Nonnull
@@ -151,7 +154,10 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet
151154
com.scalar.db.storage.dynamo.request.QueryRequest request =
152155
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
153156
return new QueryScanner(
154-
request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata));
157+
request,
158+
fetchSize,
159+
limit,
160+
new ResultInterpreter(selection.getProjections(), tableMetadata));
155161
}
156162

157163
private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
@@ -184,7 +190,10 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
184190
com.scalar.db.storage.dynamo.request.QueryRequest queryRequest =
185191
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
186192
return new QueryScanner(
187-
queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata));
193+
queryRequest,
194+
fetchSize,
195+
scan.getLimit(),
196+
new ResultInterpreter(scan.getProjections(), tableMetadata));
188197
}
189198

190199
private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
@@ -205,6 +214,7 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
205214
new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build());
206215
return new QueryScanner(
207216
requestWrapper,
217+
fetchSize,
208218
scan.getLimit(),
209219
new ResultInterpreter(scan.getProjections(), tableMetadata));
210220
}

core/src/main/java/com/scalar/db/storage/jdbc/JdbcDatabase.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ public JdbcDatabase(DatabaseConfig databaseConfig) {
5959
databaseConfig.getMetadataCacheExpirationTimeSecs());
6060

6161
OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager);
62-
jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine);
62+
jdbcService =
63+
new JdbcService(
64+
tableMetadataManager, operationChecker, rdbEngine, databaseConfig.getScanFetchSize());
6365
}
6466

6567
@VisibleForTesting
@@ -98,9 +100,18 @@ public Scanner scan(Scan scan) throws ExecutionException {
98100
Connection connection = null;
99101
try {
100102
connection = dataSource.getConnection();
103+
connection.setAutoCommit(false);
101104
rdbEngine.setReadOnly(connection, true);
102105
return jdbcService.getScanner(scan, connection);
103106
} catch (SQLException e) {
107+
try {
108+
if (connection != null) {
109+
connection.rollback();
110+
}
111+
} catch (SQLException ex) {
112+
e.addSuppressed(ex);
113+
}
114+
104115
close(connection);
105116
throw new ExecutionException(
106117
CoreError.JDBC_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e);

0 commit comments

Comments
 (0)