Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ public void close() {
}
}

@Unstable
public void cancel(String queryId) {
getExecutor().cancel(queryId);
}

@Override
public boolean isClosed() {
return closed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.salesforce.datacloud.jdbc.core.listener.AsyncQueryStatusListener;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
import com.salesforce.datacloud.jdbc.util.Constants;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Calendar;
import java.util.Map;
import java.util.TimeZone;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand Down Expand Up @@ -86,33 +88,46 @@ private <T> void setParameter(int parameterIndex, int sqlType, T value) throws S

@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.sql = sql;
return executeQuery();
throw new DataCloudJDBCException(
"Per the JDBC specification this method cannot be called on a PreparedStatement, use DataCloudPreparedStatement::executeQuery() instead.");
}

@Override
public boolean execute(String sql) throws SQLException {
resultSet = executeQuery(sql);
return true;
throw new DataCloudJDBCException(
"Per the JDBC specification this method cannot be called on a PreparedStatement, use DataCloudPreparedStatement::execute() instead.");
}

@Override
public ResultSet executeQuery() throws SQLException {
@SneakyThrows
protected HyperGrpcClientExecutor getQueryExecutor() {
final byte[] encodedRow;
try {
encodedRow = ArrowUtils.toArrowByteArray(parameterManager.getParameters(), calendar);
} catch (IOException e) {
throw new DataCloudJDBCException("Failed to encode parameters on prepared statement", e);
}

val queryParamBuilder = QueryParam.newBuilder()
val preparedQueryParams = QueryParam.newBuilder()
.setParamStyle(QueryParam.ParameterStyle.QUESTION_MARK)
.setArrowParameters(QueryParameterArrow.newBuilder()
.setData(ByteString.copyFrom(encodedRow))
.build())
.build();

val client = getQueryExecutor(queryParamBuilder);
return getQueryExecutor(preparedQueryParams);
}

@Override
public boolean execute() throws SQLException {
val client = getQueryExecutor();
listener = AsyncQueryStatusListener.of(sql, client);
return true;
}

@Override
public ResultSet executeQuery() throws SQLException {
val client = getQueryExecutor();
val timeout = Duration.ofSeconds(getQueryTimeout());

val useSync = optional(this.dataCloudConnection.getProperties(), Constants.FORCE_SYNC)
Expand Down Expand Up @@ -246,12 +261,6 @@ public void setObject(int parameterIndex, Object x) throws SQLException {
}
}

@Override
public boolean execute() throws SQLException {
resultSet = executeQuery();
return true;
}

@Override
public void addBatch() throws SQLException {
throw new DataCloudJDBCException(BATCH_EXECUTION_IS_NOT_SUPPORTED, SqlErrorCodes.FEATURE_NOT_SUPPORTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package com.salesforce.datacloud.jdbc.core;

import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import java.sql.ResultSet;

public interface DataCloudResultSet extends ResultSet {
String getQueryId();

String getStatus();

boolean isReady();
boolean isReady() throws DataCloudJDBCException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import salesforce.cdp.hyperdb.v1.QueryParam;

@Slf4j
public class DataCloudStatement implements Statement {
public class DataCloudStatement implements Statement, AutoCloseable {
protected ResultSet resultSet;

protected static final String NOT_SUPPORTED_IN_DATACLOUD_QUERY = "Write is not supported in Data Cloud query";
Expand Down Expand Up @@ -96,14 +96,15 @@ public String getQueryId() throws SQLException {
return listener.getQueryId();
}

public boolean isReady() {
public boolean isReady() throws DataCloudJDBCException {
return listener.isReady();
}

@Override
public boolean execute(String sql) throws SQLException {
log.debug("Entering execute");
this.resultSet = executeQuery(sql);
val client = getQueryExecutor();
listener = AsyncQueryStatusListener.of(sql, client);
return true;
}

Expand Down Expand Up @@ -211,7 +212,16 @@ public void setQueryTimeout(int seconds) {
}

@Override
public void cancel() {}
public void cancel() throws SQLException {
if (listener == null) {
log.warn("There was no in-progress query registered with this statement to cancel");
return;
}

val queryId = getQueryId();
val executor = dataCloudConnection.getExecutor();
executor.cancel(queryId);
}

@Override
public SQLWarning getWarnings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.CancelQueryParam;
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
import salesforce.cdp.hyperdb.v1.OutputFormat;
Expand Down Expand Up @@ -135,11 +136,6 @@ public Iterator<QueryInfo> getQueryInfo(String queryId) {
return getStub(queryId).getQueryInfo(param);
}

public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
val param = getQueryInfoParamStreaming(queryId);
return getStub(queryId).getQueryInfo(param);
}

@Unstable
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
val iterator = getQueryInfo(queryId);
Expand All @@ -149,6 +145,12 @@ public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
.map(Optional::get);
}

public void cancel(String queryId) {
val request = CancelQueryParam.newBuilder().setQueryId(queryId).build();
val stub = getStub(queryId);
stub.cancelQuery(request);
}

public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
val rowRange =
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);
Expand Down Expand Up @@ -196,10 +198,6 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
}

private QueryInfoParam getQueryInfoParam(String queryId) {
return QueryInfoParam.newBuilder().setQueryId(queryId).build();
}

private QueryInfoParam getQueryInfoParamStreaming(String queryId) {
return QueryInfoParam.newBuilder()
.setQueryId(queryId)
.setStreaming(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.salesforce.datacloud.jdbc.core;

import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
Expand Down Expand Up @@ -115,7 +116,7 @@ public String getStatus() {
}

@Override
public boolean isReady() {
public boolean isReady() throws DataCloudJDBCException {
return listener.isReady();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static com.salesforce.datacloud.jdbc.util.ThrowingSupplier.rethrowLongSupplier;
import static com.salesforce.datacloud.jdbc.util.ThrowingSupplier.rethrowSupplier;

import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
Expand All @@ -30,19 +31,18 @@
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.ExecuteQueryResponse;
import salesforce.cdp.hyperdb.v1.QueryResult;
import salesforce.cdp.hyperdb.v1.QueryStatus;

@Slf4j
@AllArgsConstructor(access = AccessLevel.PRIVATE)
Expand All @@ -61,22 +61,14 @@ public class AdaptiveQueryStatusListener implements QueryStatusListener {

private final AdaptiveQueryStatusPoller headPoller;

private final AsyncQueryStatusPoller tailPoller;

public static AdaptiveQueryStatusListener of(String query, HyperGrpcClientExecutor client, Duration timeout)
throws SQLException {
try {
val response = client.executeAdaptiveQuery(query);
val queryId = response.next().getQueryInfo().getQueryStatus().getQueryId();

return new AdaptiveQueryStatusListener(
queryId,
query,
client,
timeout,
response,
new AdaptiveQueryStatusPoller(queryId, client),
new AsyncQueryStatusPoller(queryId, client));
queryId, query, client, timeout, response, new AdaptiveQueryStatusPoller(queryId, client));
} catch (StatusRuntimeException ex) {
throw QueryExceptionHandler.createQueryException(query, ex);
}
Expand All @@ -89,12 +81,11 @@ public boolean isReady() {

@Override
public String getStatus() {
val poller = headPoller.pollChunkCount() > 1 ? tailPoller : headPoller;
return Optional.of(poller)
.map(QueryStatusPoller::pollQueryStatus)
.map(QueryStatus::getCompletionStatus)
return client.getQueryStatus(queryId)
.map(DataCloudQueryStatus::getCompletionStatus)
.map(Enum::name)
.orElse(QueryStatus.CompletionStatus.RUNNING_OR_UNSPECIFIED.name());
.findFirst()
.orElse("UNKNOWN");
}

@Override
Expand Down Expand Up @@ -126,8 +117,8 @@ private Stream<Stream<QueryResult>> infiniteChunks() {

private long getChunkLimit() throws SQLException {
if (headPoller.pollChunkCount() > 1) {
blockUntilReady(tailPoller, timeout);
return tailPoller.pollChunkCount() - 1;
val status = blockUntilReady(timeout);
return status.getChunkCount() - 1;
}

return 0;
Expand All @@ -146,23 +137,19 @@ private Stream<QueryResult> tryGetQueryResult(long chunkId) {
.orElse(Stream.empty());
}

@SneakyThrows
private void blockUntilReady(QueryStatusPoller poller, Duration timeout) {
val end = Instant.now().plus(timeout);
int millis = 1000;
while (!poller.pollIsReady() && Instant.now().isBefore(end)) {
log.info(
"Waiting for additional query results. queryId={}, timeout={}, sleep={}",
queryId,
timeout,
Duration.ofSeconds(millis));

Thread.sleep(millis);
millis *= 2;
private DataCloudQueryStatus blockUntilReady(Duration timeout) throws DataCloudJDBCException {
val deadline = Instant.now().plus(timeout);
val last = new AtomicReference<DataCloudQueryStatus>();

while (Instant.now().isBefore(deadline)) {
val isReady = client.getQueryStatus(queryId)
.peek(last::set)
.anyMatch(t -> t.isResultProduced() || t.isExecutionFinished());
if (isReady) {
return last.get();
}
}

if (!tailPoller.pollIsReady()) {
throw new DataCloudJDBCException(BEFORE_READY + ". queryId=" + queryId + ", timeout=" + timeout);
}
throw new DataCloudJDBCException(BEFORE_READY + ". queryId=" + queryId + ", timeout=" + timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AdaptiveQueryStatusPoller implements QueryStatusPoller {
@SneakyThrows
private Iterator<QueryInfo> getQueryInfoStreaming() {
try {
return client.getQueryInfoStreaming(queryId);
return client.getQueryInfo(queryId);
} catch (StatusRuntimeException ex) {
throw QueryExceptionHandler.createException("Failed when getting query status", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.salesforce.datacloud.jdbc.core.listener;

import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
import com.salesforce.datacloud.jdbc.core.HyperGrpcClientExecutor;
import com.salesforce.datacloud.jdbc.core.StreamingResultSet;
Expand All @@ -23,7 +24,6 @@
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import io.grpc.StatusRuntimeException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.function.UnaryOperator;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand All @@ -34,7 +34,6 @@
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import salesforce.cdp.hyperdb.v1.QueryResult;
import salesforce.cdp.hyperdb.v1.QueryStatus;

@Slf4j
@Builder(access = AccessLevel.PRIVATE)
Expand Down Expand Up @@ -66,17 +65,21 @@ public static AsyncQueryStatusListener of(String query, HyperGrpcClientExecutor
}

@Override
public boolean isReady() {
return getPoller().pollIsReady();
public boolean isReady() throws DataCloudJDBCException {
try {
return client.getQueryStatus(queryId).anyMatch(t -> t.isResultProduced() || t.isExecutionFinished());
} catch (StatusRuntimeException ex) {
throw QueryExceptionHandler.createQueryException(query, ex);
}
}

@Override
public String getStatus() {
return Optional.of(getPoller())
.map(AsyncQueryStatusPoller::pollQueryStatus)
.map(QueryStatus::getCompletionStatus)
return client.getQueryStatus(queryId)
.map(DataCloudQueryStatus::getCompletionStatus)
.map(Enum::name)
.orElse(null);
.findFirst()
.orElse("UNKNOWN");
}

@Override
Expand Down
Loading
Loading