Skip to content

Commit 49841ee

Browse files
Add support for query cancellation via DataCloudConnection and DataCloudStatement (#24)
1 parent a360792 commit 49841ee

25 files changed

+391
-174
lines changed

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,11 @@ public void close() {
238238
}
239239
}
240240

241+
@Unstable
242+
public void cancel(String queryId) {
243+
getExecutor().cancel(queryId);
244+
}
245+
241246
@Override
242247
public boolean isClosed() {
243248
return closed.get();

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudPreparedStatement.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.ImmutableMap;
2424
import com.google.common.collect.Maps;
2525
import com.google.protobuf.ByteString;
26+
import com.salesforce.datacloud.jdbc.core.listener.AsyncQueryStatusListener;
2627
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2728
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
2829
import com.salesforce.datacloud.jdbc.util.Constants;
@@ -53,6 +54,7 @@
5354
import java.util.Calendar;
5455
import java.util.Map;
5556
import java.util.TimeZone;
57+
import lombok.SneakyThrows;
5658
import lombok.experimental.UtilityClass;
5759
import lombok.extern.slf4j.Slf4j;
5860
import lombok.val;
@@ -86,33 +88,46 @@ private <T> void setParameter(int parameterIndex, int sqlType, T value) throws S
8688

8789
@Override
8890
public ResultSet executeQuery(String sql) throws SQLException {
89-
this.sql = sql;
90-
return executeQuery();
91+
throw new DataCloudJDBCException(
92+
"Per the JDBC specification this method cannot be called on a PreparedStatement, use DataCloudPreparedStatement::executeQuery() instead.");
9193
}
9294

9395
@Override
9496
public boolean execute(String sql) throws SQLException {
95-
resultSet = executeQuery(sql);
96-
return true;
97+
throw new DataCloudJDBCException(
98+
"Per the JDBC specification this method cannot be called on a PreparedStatement, use DataCloudPreparedStatement::execute() instead.");
9799
}
98100

99101
@Override
100-
public ResultSet executeQuery() throws SQLException {
102+
@SneakyThrows
103+
protected HyperGrpcClientExecutor getQueryExecutor() {
101104
final byte[] encodedRow;
102105
try {
103106
encodedRow = ArrowUtils.toArrowByteArray(parameterManager.getParameters(), calendar);
104107
} catch (IOException e) {
105108
throw new DataCloudJDBCException("Failed to encode parameters on prepared statement", e);
106109
}
107110

108-
val queryParamBuilder = QueryParam.newBuilder()
111+
val preparedQueryParams = QueryParam.newBuilder()
109112
.setParamStyle(QueryParam.ParameterStyle.QUESTION_MARK)
110113
.setArrowParameters(QueryParameterArrow.newBuilder()
111114
.setData(ByteString.copyFrom(encodedRow))
112115
.build())
113116
.build();
114117

115-
val client = getQueryExecutor(queryParamBuilder);
118+
return getQueryExecutor(preparedQueryParams);
119+
}
120+
121+
@Override
122+
public boolean execute() throws SQLException {
123+
val client = getQueryExecutor();
124+
listener = AsyncQueryStatusListener.of(sql, client);
125+
return true;
126+
}
127+
128+
@Override
129+
public ResultSet executeQuery() throws SQLException {
130+
val client = getQueryExecutor();
116131
val timeout = Duration.ofSeconds(getQueryTimeout());
117132

118133
val useSync = optional(this.dataCloudConnection.getProperties(), Constants.FORCE_SYNC)
@@ -246,12 +261,6 @@ public void setObject(int parameterIndex, Object x) throws SQLException {
246261
}
247262
}
248263

249-
@Override
250-
public boolean execute() throws SQLException {
251-
resultSet = executeQuery();
252-
return true;
253-
}
254-
255264
@Override
256265
public void addBatch() throws SQLException {
257266
throw new DataCloudJDBCException(BATCH_EXECUTION_IS_NOT_SUPPORTED, SqlErrorCodes.FEATURE_NOT_SUPPORTED);

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudResultSet.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616
package com.salesforce.datacloud.jdbc.core;
1717

18+
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
1819
import java.sql.ResultSet;
1920

2021
public interface DataCloudResultSet extends ResultSet {
2122
String getQueryId();
2223

2324
String getStatus();
2425

25-
boolean isReady();
26+
boolean isReady() throws DataCloudJDBCException;
2627
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import salesforce.cdp.hyperdb.v1.QueryParam;
3939

4040
@Slf4j
41-
public class DataCloudStatement implements Statement {
41+
public class DataCloudStatement implements Statement, AutoCloseable {
4242
protected ResultSet resultSet;
4343

4444
protected static final String NOT_SUPPORTED_IN_DATACLOUD_QUERY = "Write is not supported in Data Cloud query";
@@ -96,14 +96,15 @@ public String getQueryId() throws SQLException {
9696
return listener.getQueryId();
9797
}
9898

99-
public boolean isReady() {
99+
public boolean isReady() throws DataCloudJDBCException {
100100
return listener.isReady();
101101
}
102102

103103
@Override
104104
public boolean execute(String sql) throws SQLException {
105105
log.debug("Entering execute");
106-
this.resultSet = executeQuery(sql);
106+
val client = getQueryExecutor();
107+
listener = AsyncQueryStatusListener.of(sql, client);
107108
return true;
108109
}
109110

@@ -211,7 +212,16 @@ public void setQueryTimeout(int seconds) {
211212
}
212213

213214
@Override
214-
public void cancel() {}
215+
public void cancel() throws SQLException {
216+
if (listener == null) {
217+
log.warn("There was no in-progress query registered with this statement to cancel");
218+
return;
219+
}
220+
221+
val queryId = getQueryId();
222+
val executor = dataCloudConnection.getExecutor();
223+
executor.cancel(queryId);
224+
}
215225

216226
@Override
217227
public SQLWarning getWarnings() {

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import lombok.NonNull;
4242
import lombok.extern.slf4j.Slf4j;
4343
import lombok.val;
44+
import salesforce.cdp.hyperdb.v1.CancelQueryParam;
4445
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
4546
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
4647
import salesforce.cdp.hyperdb.v1.OutputFormat;
@@ -135,11 +136,6 @@ public Iterator<QueryInfo> getQueryInfo(String queryId) {
135136
return getStub(queryId).getQueryInfo(param);
136137
}
137138

138-
public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
139-
val param = getQueryInfoParamStreaming(queryId);
140-
return getStub(queryId).getQueryInfo(param);
141-
}
142-
143139
@Unstable
144140
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
145141
val iterator = getQueryInfo(queryId);
@@ -149,6 +145,12 @@ public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
149145
.map(Optional::get);
150146
}
151147

148+
public void cancel(String queryId) {
149+
val request = CancelQueryParam.newBuilder().setQueryId(queryId).build();
150+
val stub = getStub(queryId);
151+
stub.cancelQuery(request);
152+
}
153+
152154
public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
153155
val rowRange =
154156
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);
@@ -196,10 +198,6 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
196198
}
197199

198200
private QueryInfoParam getQueryInfoParam(String queryId) {
199-
return QueryInfoParam.newBuilder().setQueryId(queryId).build();
200-
}
201-
202-
private QueryInfoParam getQueryInfoParamStreaming(String queryId) {
203201
return QueryInfoParam.newBuilder()
204202
.setQueryId(queryId)
205203
.setStreaming(true)

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.salesforce.datacloud.jdbc.core;
1717

1818
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
19+
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
1920
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
2021
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
2122
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
@@ -115,7 +116,7 @@ public String getStatus() {
115116
}
116117

117118
@Override
118-
public boolean isReady() {
119+
public boolean isReady() throws DataCloudJDBCException {
119120
return listener.isReady();
120121
}
121122

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/listener/AdaptiveQueryStatusListener.java

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.salesforce.datacloud.jdbc.util.ThrowingSupplier.rethrowLongSupplier;
1919
import static com.salesforce.datacloud.jdbc.util.ThrowingSupplier.rethrowSupplier;
2020

21+
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
2122
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
2223
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
2324
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
@@ -30,19 +31,18 @@
3031
import java.time.Instant;
3132
import java.util.Iterator;
3233
import java.util.Optional;
34+
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.function.Supplier;
3436
import java.util.function.UnaryOperator;
3537
import java.util.stream.LongStream;
3638
import java.util.stream.Stream;
3739
import lombok.AccessLevel;
3840
import lombok.AllArgsConstructor;
3941
import lombok.Getter;
40-
import lombok.SneakyThrows;
4142
import lombok.extern.slf4j.Slf4j;
4243
import lombok.val;
4344
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
4445
import salesforce.cdp.hyperdb.v1.QueryResult;
45-
import salesforce.cdp.hyperdb.v1.QueryStatus;
4646

4747
@Slf4j
4848
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@@ -61,22 +61,14 @@ public class AdaptiveQueryStatusListener implements QueryStatusListener {
6161

6262
private final AdaptiveQueryStatusPoller headPoller;
6363

64-
private final AsyncQueryStatusPoller tailPoller;
65-
6664
public static AdaptiveQueryStatusListener of(String query, HyperGrpcClientExecutor client, Duration timeout)
6765
throws SQLException {
6866
try {
6967
val response = client.executeAdaptiveQuery(query);
7068
val queryId = response.next().getQueryInfo().getQueryStatus().getQueryId();
7169

7270
return new AdaptiveQueryStatusListener(
73-
queryId,
74-
query,
75-
client,
76-
timeout,
77-
response,
78-
new AdaptiveQueryStatusPoller(queryId, client),
79-
new AsyncQueryStatusPoller(queryId, client));
71+
queryId, query, client, timeout, response, new AdaptiveQueryStatusPoller(queryId, client));
8072
} catch (StatusRuntimeException ex) {
8173
throw QueryExceptionHandler.createQueryException(query, ex);
8274
}
@@ -89,12 +81,11 @@ public boolean isReady() {
8981

9082
@Override
9183
public String getStatus() {
92-
val poller = headPoller.pollChunkCount() > 1 ? tailPoller : headPoller;
93-
return Optional.of(poller)
94-
.map(QueryStatusPoller::pollQueryStatus)
95-
.map(QueryStatus::getCompletionStatus)
84+
return client.getQueryStatus(queryId)
85+
.map(DataCloudQueryStatus::getCompletionStatus)
9686
.map(Enum::name)
97-
.orElse(QueryStatus.CompletionStatus.RUNNING_OR_UNSPECIFIED.name());
87+
.findFirst()
88+
.orElse("UNKNOWN");
9889
}
9990

10091
@Override
@@ -126,8 +117,8 @@ private Stream<Stream<QueryResult>> infiniteChunks() {
126117

127118
private long getChunkLimit() throws SQLException {
128119
if (headPoller.pollChunkCount() > 1) {
129-
blockUntilReady(tailPoller, timeout);
130-
return tailPoller.pollChunkCount() - 1;
120+
val status = blockUntilReady(timeout);
121+
return status.getChunkCount() - 1;
131122
}
132123

133124
return 0;
@@ -146,23 +137,19 @@ private Stream<QueryResult> tryGetQueryResult(long chunkId) {
146137
.orElse(Stream.empty());
147138
}
148139

149-
@SneakyThrows
150-
private void blockUntilReady(QueryStatusPoller poller, Duration timeout) {
151-
val end = Instant.now().plus(timeout);
152-
int millis = 1000;
153-
while (!poller.pollIsReady() && Instant.now().isBefore(end)) {
154-
log.info(
155-
"Waiting for additional query results. queryId={}, timeout={}, sleep={}",
156-
queryId,
157-
timeout,
158-
Duration.ofSeconds(millis));
159-
160-
Thread.sleep(millis);
161-
millis *= 2;
140+
private DataCloudQueryStatus blockUntilReady(Duration timeout) throws DataCloudJDBCException {
141+
val deadline = Instant.now().plus(timeout);
142+
val last = new AtomicReference<DataCloudQueryStatus>();
143+
144+
while (Instant.now().isBefore(deadline)) {
145+
val isReady = client.getQueryStatus(queryId)
146+
.peek(last::set)
147+
.anyMatch(t -> t.isResultProduced() || t.isExecutionFinished());
148+
if (isReady) {
149+
return last.get();
150+
}
162151
}
163152

164-
if (!tailPoller.pollIsReady()) {
165-
throw new DataCloudJDBCException(BEFORE_READY + ". queryId=" + queryId + ", timeout=" + timeout);
166-
}
153+
throw new DataCloudJDBCException(BEFORE_READY + ". queryId=" + queryId + ", timeout=" + timeout);
167154
}
168155
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/listener/AdaptiveQueryStatusPoller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class AdaptiveQueryStatusPoller implements QueryStatusPoller {
4444
@SneakyThrows
4545
private Iterator<QueryInfo> getQueryInfoStreaming() {
4646
try {
47-
return client.getQueryInfoStreaming(queryId);
47+
return client.getQueryInfo(queryId);
4848
} catch (StatusRuntimeException ex) {
4949
throw QueryExceptionHandler.createException("Failed when getting query status", ex);
5050
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/listener/AsyncQueryStatusListener.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.salesforce.datacloud.jdbc.core.listener;
1717

18+
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
1819
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
1920
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
2021
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
@@ -23,7 +24,6 @@
2324
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
2425
import io.grpc.StatusRuntimeException;
2526
import java.sql.SQLException;
26-
import java.util.Optional;
2727
import java.util.function.UnaryOperator;
2828
import java.util.stream.LongStream;
2929
import java.util.stream.Stream;
@@ -34,7 +34,6 @@
3434
import lombok.extern.slf4j.Slf4j;
3535
import lombok.val;
3636
import salesforce.cdp.hyperdb.v1.QueryResult;
37-
import salesforce.cdp.hyperdb.v1.QueryStatus;
3837

3938
@Slf4j
4039
@Builder(access = AccessLevel.PRIVATE)
@@ -66,17 +65,21 @@ public static AsyncQueryStatusListener of(String query, HyperGrpcClientExecutor
6665
}
6766

6867
@Override
69-
public boolean isReady() {
70-
return getPoller().pollIsReady();
68+
public boolean isReady() throws DataCloudJDBCException {
69+
try {
70+
return client.getQueryStatus(queryId).anyMatch(t -> t.isResultProduced() || t.isExecutionFinished());
71+
} catch (StatusRuntimeException ex) {
72+
throw QueryExceptionHandler.createQueryException(query, ex);
73+
}
7174
}
7275

7376
@Override
7477
public String getStatus() {
75-
return Optional.of(getPoller())
76-
.map(AsyncQueryStatusPoller::pollQueryStatus)
77-
.map(QueryStatus::getCompletionStatus)
78+
return client.getQueryStatus(queryId)
79+
.map(DataCloudQueryStatus::getCompletionStatus)
7880
.map(Enum::name)
79-
.orElse(null);
81+
.findFirst()
82+
.orElse("UNKNOWN");
8083
}
8184

8285
@Override

0 commit comments

Comments
 (0)