Skip to content

Commit d2ec067

Browse files
feat: Add result set constraints and chunk-based polling (#47)
Prevent unnecessary result set iteration: - Add `DataCloudStatement::setResultSetConstraints()` method to limit rows and bytes - Support byte limit configuration via `resultset.bytelimit` property Chunk-Based result polling: - Implement waitForChunksAvailable method for pagination Minor changes: - Changes default byte size when using row-based access to the server's largest allowed value - Implement `getRow()` which is optional on type forward-only - Add examples demonstrating result set constraints with various styles of pagination - Prefer `float4` over `real` since it's more explict
1 parent 7316400 commit d2ec067

File tree

14 files changed

+548
-178
lines changed

14 files changed

+548
-178
lines changed

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/ArrowStreamReaderCursor.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.List;
2626
import java.util.concurrent.atomic.AtomicInteger;
2727
import java.util.stream.Collectors;
28-
import lombok.AllArgsConstructor;
28+
import lombok.RequiredArgsConstructor;
2929
import lombok.SneakyThrows;
3030
import lombok.extern.slf4j.Slf4j;
3131
import lombok.val;
@@ -36,15 +36,18 @@
3636
import org.apache.calcite.avatica.util.AbstractCursor;
3737
import org.apache.calcite.avatica.util.ArrayImpl;
3838

39-
@AllArgsConstructor
39+
@RequiredArgsConstructor
4040
@Slf4j
4141
class ArrowStreamReaderCursor extends AbstractCursor {
4242

4343
private static final int INIT_ROW_NUMBER = -1;
4444

4545
private final ArrowStreamReader reader;
4646

47-
private final AtomicInteger currentRow = new AtomicInteger(INIT_ROW_NUMBER);
47+
@lombok.Getter
48+
private int rowsSeen = 0;
49+
50+
private final AtomicInteger currentIndex = new AtomicInteger(INIT_ROW_NUMBER);
4851

4952
private void wasNullConsumer(boolean wasNull) {
5053
this.wasNull[0] = wasNull;
@@ -65,13 +68,13 @@ public List<Accessor> createAccessors(
6568
}
6669

6770
private Accessor createAccessor(FieldVector vector) throws SQLException {
68-
return QueryJDBCAccessorFactory.createAccessor(vector, currentRow::get, this::wasNullConsumer);
71+
return QueryJDBCAccessorFactory.createAccessor(vector, currentIndex::get, this::wasNullConsumer);
6972
}
7073

7174
private boolean loadNextBatch() throws SQLException {
7275
try {
7376
if (reader.loadNextBatch()) {
74-
currentRow.set(0);
77+
currentIndex.set(0);
7578
return true;
7679
}
7780
} catch (IOException e) {
@@ -83,11 +86,15 @@ private boolean loadNextBatch() throws SQLException {
8386
@SneakyThrows
8487
@Override
8588
public boolean next() {
86-
val current = currentRow.incrementAndGet();
89+
val current = currentIndex.incrementAndGet();
8790
val total = getSchemaRoot().getRowCount();
8891

8992
try {
90-
return current < total || loadNextBatch();
93+
val next = current < total || loadNextBatch();
94+
if (next) {
95+
rowsSeen++;
96+
}
97+
return next;
9198
} catch (Exception e) {
9299
throw new DataCloudJDBCException("Failed to load next batch", e);
93100
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,25 +185,44 @@ public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId) t
185185
}
186186

187187
/**
188-
* Checks if all the query's results are ready, the row count and chunk count will be stable.
188+
* Checks if a given row range is available for a query.
189+
* This method will wait until the row range specified by the other params is available in the given timeout.
190+
*
189191
* @param queryId The identifier of the query to check
190192
* @param offset The starting row offset.
191193
* @param limit The quantity of rows relative to the offset to wait for
192194
* @param timeout The duration to wait for the engine have results produced.
193195
* @param allowLessThan Whether to return early when the available rows is less than {@code offset + limit}
194-
* @return The final {@link DataCloudQueryStatus} the server replied with.
196+
* @return The last {@link DataCloudQueryStatus} the server replied with.
195197
*/
196198
public DataCloudQueryStatus waitForRowsAvailable(
197199
String queryId, long offset, long limit, Duration timeout, boolean allowLessThan)
198200
throws DataCloudJDBCException {
199201
return executor.waitForRowsAvailable(queryId, offset, limit, timeout, allowLessThan);
200202
}
201203

204+
/**
205+
* Checks if a given chunk range is available for a query.
206+
* This method will wait until the chunk range specified by the other params is available in the given timeout.
207+
*
208+
* @param queryId The identifier of the query to check
209+
* @param offset The starting chunk offset.
210+
* @param limit The quantity of chunks relative to the offset to wait for
211+
* @param timeout The duration to wait for the engine have results produced.
212+
* @param allowLessThan Whether to return early when the available chunks is less than {@code offset + limit}
213+
* @return The last {@link DataCloudQueryStatus} the server replied with.
214+
*/
215+
public DataCloudQueryStatus waitForChunksAvailable(
216+
String queryId, long offset, long limit, Duration timeout, boolean allowLessThan)
217+
throws DataCloudJDBCException {
218+
return executor.waitForChunksAvailable(queryId, offset, limit, timeout, allowLessThan);
219+
}
220+
202221
/**
203222
* Checks if all the query's results are ready, the row count and chunk count will be stable.
204223
* @param queryId The identifier of the query to check
205224
* @param timeout The duration to wait for the engine have results produced.
206-
* @return The final {@link DataCloudQueryStatus} the server replied with.
225+
* @return The last {@link DataCloudQueryStatus} the server replied with.
207226
*/
208227
public DataCloudQueryStatus waitForResultsProduced(String queryId, Duration timeout) throws DataCloudJDBCException {
209228
return executor.waitForResultsProduced(queryId, timeout);

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.salesforce.datacloud.jdbc.core;
1717

18+
import static com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor.HYPER_MAX_ROW_LIMIT_BYTE_SIZE;
19+
import static com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor.HYPER_MIN_ROW_LIMIT_BYTE_SIZE;
1820
import static com.salesforce.datacloud.jdbc.util.PropertiesExtensions.getIntegerOrDefault;
1921
import static com.salesforce.datacloud.jdbc.util.PropertiesExtensions.optional;
2022

@@ -32,6 +34,8 @@
3234
import java.sql.SQLWarning;
3335
import java.sql.Statement;
3436
import java.time.Duration;
37+
import lombok.AccessLevel;
38+
import lombok.Getter;
3539
import lombok.NonNull;
3640
import lombok.extern.slf4j.Slf4j;
3741
import lombok.val;
@@ -53,7 +57,9 @@ public class DataCloudStatement implements Statement, AutoCloseable {
5357

5458
public DataCloudStatement(@NonNull DataCloudConnection connection) {
5559
this.dataCloudConnection = connection;
56-
this.queryTimeout = getIntegerOrDefault(connection.getProperties(), QUERY_TIMEOUT, DEFAULT_QUERY_TIMEOUT);
60+
val properties = connection.getProperties();
61+
this.queryTimeout = getIntegerOrDefault(properties, QUERY_TIMEOUT, DEFAULT_QUERY_TIMEOUT);
62+
this.targetMaxBytes = getIntegerOrDefault(properties, Constants.BYTE_LIMIT, HYPER_MAX_ROW_LIMIT_BYTE_SIZE);
5763
}
5864

5965
protected QueryStatusListener listener;
@@ -66,6 +72,7 @@ protected boolean useSync() {
6672

6773
private HyperGrpcClientExecutor getQueryExecutor() {
6874
return dataCloudConnection.getExecutor().toBuilder()
75+
.byteLimit(targetMaxBytes)
6976
.queryTimeout(getQueryTimeout())
7077
.build();
7178
}
@@ -117,6 +124,7 @@ public DataCloudResultSet executeSyncQuery(String sql) throws SQLException {
117124
return executeSyncQuery(sql, client);
118125
}
119126

127+
@Deprecated
120128
protected DataCloudResultSet executeSyncQuery(String sql, HyperGrpcClientExecutor client) throws SQLException {
121129
listener = SyncQueryStatusListener.of(sql, client);
122130
resultSet = listener.generateResultSet();
@@ -133,7 +141,12 @@ public DataCloudResultSet executeAdaptiveQuery(String sql) throws SQLException {
133141

134142
protected DataCloudResultSet executeAdaptiveQuery(String sql, HyperGrpcClientExecutor client, Duration timeout)
135143
throws SQLException {
136-
listener = AdaptiveQueryStatusListener.of(sql, client, resolveQueryTimeout(timeout));
144+
val queryTimeout = resolveQueryTimeout(timeout);
145+
146+
listener = targetMaxRows > 0
147+
? AdaptiveQueryStatusListener.of(sql, client, queryTimeout, targetMaxRows)
148+
: AdaptiveQueryStatusListener.of(sql, client, queryTimeout);
149+
137150
resultSet = listener.generateResultSet();
138151
log.info("executeAdaptiveQuery completed. queryId={}", listener.getQueryId());
139152
return (DataCloudResultSet) resultSet;
@@ -177,6 +190,54 @@ public int getMaxFieldSize() {
177190
@Override
178191
public void setMaxFieldSize(int max) {}
179192

193+
@Getter(AccessLevel.PACKAGE)
194+
private int targetMaxRows;
195+
196+
@Getter(AccessLevel.PACKAGE)
197+
private int targetMaxBytes;
198+
199+
public void clearResultSetConstraints() throws DataCloudJDBCException {
200+
setResultSetConstraints(0, HYPER_MAX_ROW_LIMIT_BYTE_SIZE);
201+
}
202+
203+
private void checkResultSetConstraints(int maxRows, int maxBytes) throws DataCloudJDBCException {
204+
if (maxRows < 0) {
205+
throw new DataCloudJDBCException(
206+
"setResultSetConstraints maxRows must be set to 0 to be disabled but was " + maxRows);
207+
}
208+
209+
if (maxBytes < HYPER_MIN_ROW_LIMIT_BYTE_SIZE || maxBytes > HYPER_MAX_ROW_LIMIT_BYTE_SIZE) {
210+
throw new DataCloudJDBCException(String.format(
211+
"The specified maxBytes (%d) must satisfy the following constraints: %d >= x >= %d",
212+
maxBytes, HYPER_MIN_ROW_LIMIT_BYTE_SIZE, HYPER_MAX_ROW_LIMIT_BYTE_SIZE));
213+
}
214+
}
215+
216+
/**
217+
* Sets the constraints that would limit the number of rows and overall bytes in any ResultSet object generated by this Statement.
218+
* This is used to tell the database the maximum number of rows and bytes to return, you may get less than expected because of this.
219+
*
220+
* @param maxRows The target maximum number of rows a ResultSet can have, zero means there is no limit.
221+
* @param maxBytes The target maximum byte size a ResultSet can be,
222+
* must fall in the range {@link HyperGrpcClientExecutor#HYPER_MIN_ROW_LIMIT_BYTE_SIZE}
223+
* and {@link HyperGrpcClientExecutor#HYPER_MAX_ROW_LIMIT_BYTE_SIZE}
224+
* @throws DataCloudJDBCException If the target maximum byte size is outside the aforementioned range
225+
*/
226+
public void setResultSetConstraints(int maxRows, int maxBytes) throws DataCloudJDBCException {
227+
checkResultSetConstraints(maxRows, maxBytes);
228+
229+
targetMaxRows = maxRows;
230+
targetMaxBytes = maxBytes;
231+
}
232+
233+
/**
234+
* @see DataCloudStatement#setResultSetConstraints
235+
* @param maxRows The target maximum number of rows a ResultSet can have, zero means there is no limit.
236+
*/
237+
public void setResultSetConstraints(int maxRows) throws DataCloudJDBCException {
238+
setResultSetConstraints(maxRows, HYPER_MAX_ROW_LIMIT_BYTE_SIZE);
239+
}
240+
180241
@Override
181242
public int getMaxRows() {
182243
return 0;

0 commit comments

Comments
 (0)