Skip to content

Commit 1471135

Browse files
feat: add row-based access of results to connection (#18)
This pull request includes new methods for acquiring query status and "row-based" result sets. --------- Co-authored-by: Kaviarasu <ksakthivadivel@salesforce.com>
1 parent 67ae9c5 commit 1471135

File tree

13 files changed

+729
-7
lines changed

13 files changed

+729
-7
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ properties.put("clientSecret", "${clientSecret}");
7575

7676
The documentation for jwt authentication can be found [here][jwt flow].
7777

78-
Instuctions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
78+
Instructions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
7979

8080
```java
8181
Properties properties = new Properties();

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.salesforce.datacloud.jdbc.auth.AuthenticationSettings;
2323
import com.salesforce.datacloud.jdbc.auth.DataCloudTokenProcessor;
2424
import com.salesforce.datacloud.jdbc.auth.TokenProcessor;
25+
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
2526
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2627
import com.salesforce.datacloud.jdbc.http.ClientBuilder;
2728
import com.salesforce.datacloud.jdbc.interceptor.AuthorizationHeaderInterceptor;
2829
import com.salesforce.datacloud.jdbc.interceptor.DataspaceHeaderInterceptor;
2930
import com.salesforce.datacloud.jdbc.interceptor.HyperExternalClientContextHeaderInterceptor;
3031
import com.salesforce.datacloud.jdbc.interceptor.HyperWorkloadHeaderInterceptor;
3132
import com.salesforce.datacloud.jdbc.interceptor.TracingHeadersInterceptor;
33+
import com.salesforce.datacloud.jdbc.util.Unstable;
3234
import io.grpc.ClientInterceptor;
3335
import io.grpc.ManagedChannelBuilder;
3436
import java.sql.Array;
@@ -173,6 +175,33 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
173175
return new DataCloudPreparedStatement(this, sql, new DefaultParameterManager());
174176
}
175177

178+
/**
179+
* Retrieves a collection of rows for the specified query once it is ready.
180+
* Use {@link #getQueryStatus(String)} to check if the query has produced results or finished execution before calling this method.
181+
* <p>
182+
* When using {@link RowBased.Mode#FULL_RANGE}, this method does not handle pagination near the end of available rows.
183+
* The caller is responsible for calculating the correct offset and limit to avoid out-of-range errors.
184+
*
185+
* @param queryId The identifier of the query to fetch results for.
186+
* @param offset The starting row offset.
187+
* @param limit The maximum number of rows to retrieve.
188+
* @param mode The fetching mode—either {@link RowBased.Mode#SINGLE_RPC} for a single request or
189+
* {@link RowBased.Mode#FULL_RANGE} to iterate through all available rows.
190+
* @return A {@link DataCloudResultSet} containing the query results.
191+
*/
192+
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
193+
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
194+
return StreamingResultSet.of(queryId, executor, iterator);
195+
}
196+
197+
/**
198+
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
199+
*/
200+
@Unstable
201+
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
202+
return executor.getQueryStatus(queryId);
203+
}
204+
176205
@Override
177206
public CallableStatement prepareCall(String sql) {
178207
return null;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core;
17+
18+
import java.util.Optional;
19+
import lombok.Value;
20+
import lombok.val;
21+
import salesforce.cdp.hyperdb.v1.QueryInfo;
22+
import salesforce.cdp.hyperdb.v1.QueryStatus;
23+
24+
/**
25+
* Represents the status of a query.
26+
* The {@link CompletionStatus} enum defines the possible states of the query, which are:
27+
* <ul>
28+
* <li><b>RUNNING</b>: The query is still running or its status is unspecified.</li>
29+
* <li><b>RESULTS_PRODUCED</b>: The query has completed, and the results are ready for retrieval.</li>
30+
* <li><b>FINISHED</b>: The query has finished execution and its results have been persisted, guaranteed to be available until the expiration time.</li>
31+
* </ul>
32+
*/
33+
@Value
34+
public class DataCloudQueryStatus {
35+
public enum CompletionStatus {
36+
RUNNING,
37+
RESULTS_PRODUCED,
38+
FINISHED
39+
}
40+
41+
String queryId;
42+
43+
long chunkCount;
44+
45+
long rowCount;
46+
47+
double progress;
48+
49+
CompletionStatus completionStatus;
50+
51+
/**
52+
* Checks if the query's results have been produced.
53+
*
54+
* @return {@code true} if the query's results are available for retrieval, otherwise {@code false}.
55+
*/
56+
public boolean isResultProduced() {
57+
return completionStatus == CompletionStatus.RESULTS_PRODUCED;
58+
}
59+
60+
/**
61+
* Checks if the query execution is finished.
62+
*
63+
* @return {@code true} if the query has completed execution and results have been persisted, otherwise {@code false}.
64+
*/
65+
public boolean isExecutionFinished() {
66+
return completionStatus == CompletionStatus.FINISHED;
67+
}
68+
69+
static Optional<DataCloudQueryStatus> of(QueryInfo queryInfo) {
70+
return Optional.ofNullable(queryInfo).map(QueryInfo::getQueryStatus).map(DataCloudQueryStatus::of);
71+
}
72+
73+
private static DataCloudQueryStatus of(QueryStatus s) {
74+
val completionStatus = of(s.getCompletionStatus());
75+
return new DataCloudQueryStatus(
76+
s.getQueryId(), s.getChunkCount(), s.getRowCount(), s.getProgress(), completionStatus);
77+
}
78+
79+
private static CompletionStatus of(QueryStatus.CompletionStatus completionStatus) {
80+
switch (completionStatus) {
81+
case RUNNING_OR_UNSPECIFIED:
82+
return CompletionStatus.RUNNING;
83+
case RESULTS_PRODUCED:
84+
return CompletionStatus.RESULTS_PRODUCED;
85+
case FINISHED:
86+
return CompletionStatus.FINISHED;
87+
default:
88+
throw new IllegalArgumentException("Unknown completion status. status=" + completionStatus);
89+
}
90+
}
91+
}

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2626
import com.salesforce.datacloud.jdbc.util.Constants;
2727
import com.salesforce.datacloud.jdbc.util.SqlErrorCodes;
28+
import com.salesforce.datacloud.jdbc.util.Unstable;
2829
import java.sql.Connection;
2930
import java.sql.ResultSet;
3031
import java.sql.SQLException;
@@ -73,16 +74,27 @@ protected HyperGrpcClientExecutor getQueryExecutor(QueryParam additionalQueryPar
7374
return clientBuilder.queryTimeout(getQueryTimeout()).build();
7475
}
7576

76-
private void assertQueryReady() throws SQLException {
77+
private void assertQueryExecuted() throws SQLException {
7778
if (listener == null) {
7879
throw new DataCloudJDBCException("a query was not executed before attempting to access results");
7980
}
81+
}
82+
83+
private void assertQueryReady() throws SQLException {
84+
assertQueryExecuted();
8085

8186
if (!listener.isReady()) {
8287
throw new DataCloudJDBCException("query results were not ready");
8388
}
8489
}
8590

91+
@Unstable
92+
public String getQueryId() throws SQLException {
93+
assertQueryExecuted();
94+
95+
return listener.getQueryId();
96+
}
97+
8698
public boolean isReady() {
8799
return listener.isReady();
88100
}

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.salesforce.datacloud.jdbc.config.DriverVersion;
2121
import com.salesforce.datacloud.jdbc.interceptor.QueryIdHeaderInterceptor;
2222
import com.salesforce.datacloud.jdbc.util.PropertiesExtensions;
23+
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
24+
import com.salesforce.datacloud.jdbc.util.Unstable;
2325
import io.grpc.ClientInterceptor;
2426
import io.grpc.ManagedChannel;
2527
import io.grpc.ManagedChannelBuilder;
@@ -29,8 +31,10 @@
2931
import java.util.Iterator;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.Optional;
3235
import java.util.Properties;
3336
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.Stream;
3438
import lombok.AccessLevel;
3539
import lombok.Builder;
3640
import lombok.Getter;
@@ -45,6 +49,7 @@
4549
import salesforce.cdp.hyperdb.v1.QueryParam;
4650
import salesforce.cdp.hyperdb.v1.QueryResult;
4751
import salesforce.cdp.hyperdb.v1.QueryResultParam;
52+
import salesforce.cdp.hyperdb.v1.ResultRange;
4853

4954
@Slf4j
5055
@Builder(toBuilder = true)
@@ -135,6 +140,29 @@ public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
135140
return getStub(queryId).getQueryInfo(param);
136141
}
137142

143+
@Unstable
144+
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
145+
val iterator = getQueryInfo(queryId);
146+
return StreamUtilities.toStream(iterator)
147+
.map(DataCloudQueryStatus::of)
148+
.filter(Optional::isPresent)
149+
.map(Optional::get);
150+
}
151+
152+
public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
153+
val rowRange =
154+
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);
155+
156+
final QueryResultParam param = QueryResultParam.newBuilder()
157+
.setQueryId(queryId)
158+
.setResultRange(rowRange)
159+
.setOmitSchema(omitSchema)
160+
.setOutputFormat(OutputFormat.ARROW_IPC)
161+
.build();
162+
163+
return getStub(queryId).getQueryResult(param);
164+
}
165+
138166
public Iterator<QueryResult> getQueryResult(String queryId, long chunkId, boolean omitSchema) {
139167
val param = getQueryResultParam(queryId, chunkId, omitSchema);
140168
return getStub(queryId).getQueryResult(param);
@@ -161,12 +189,9 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
161189
val builder = QueryResultParam.newBuilder()
162190
.setQueryId(queryId)
163191
.setChunkId(chunkId)
192+
.setOmitSchema(omitSchema)
164193
.setOutputFormat(OutputFormat.ARROW_IPC);
165194

166-
if (omitSchema) {
167-
builder.setOmitSchema(true);
168-
}
169-
170195
return builder.build();
171196
}
172197

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
1919
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
2020
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
21+
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
2122
import java.sql.ResultSetMetaData;
2223
import java.sql.SQLException;
2324
import java.util.Collections;
25+
import java.util.Iterator;
2426
import java.util.TimeZone;
27+
import java.util.stream.Stream;
2528
import lombok.SneakyThrows;
29+
import lombok.Value;
2630
import lombok.extern.slf4j.Slf4j;
2731
import lombok.val;
2832
import org.apache.arrow.memory.RootAllocator;
@@ -32,6 +36,7 @@
3236
import org.apache.calcite.avatica.AvaticaStatement;
3337
import org.apache.calcite.avatica.Meta;
3438
import org.apache.calcite.avatica.QueryState;
39+
import salesforce.cdp.hyperdb.v1.QueryResult;
3540

3641
@Slf4j
3742
public class StreamingResultSet extends AvaticaResultSet implements DataCloudResultSet {
@@ -51,6 +56,7 @@ private StreamingResultSet(
5156
this.listener = listener;
5257
}
5358

59+
@Deprecated
5460
@SneakyThrows
5561
public static StreamingResultSet of(String sql, QueryStatusListener listener) {
5662
try {
@@ -73,6 +79,30 @@ public static StreamingResultSet of(String sql, QueryStatusListener listener) {
7379
}
7480
}
7581

82+
@SneakyThrows
83+
public static StreamingResultSet of(
84+
String queryId, HyperGrpcClientExecutor client, Iterator<QueryResult> iterator) {
85+
try {
86+
val channel = ExecuteQueryResponseChannel.of(StreamUtilities.toStream(iterator));
87+
val reader = new ArrowStreamReader(channel, new RootAllocator(ROOT_ALLOCATOR_MB_FROM_V2));
88+
val schemaRoot = reader.getVectorSchemaRoot();
89+
val columns = ArrowUtils.toColumnMetaData(schemaRoot.getSchema().getFields());
90+
val timezone = TimeZone.getDefault();
91+
val state = new QueryState();
92+
val signature = new Meta.Signature(
93+
columns, null, Collections.emptyList(), Collections.emptyMap(), null, Meta.StatementType.SELECT);
94+
val metadata = new AvaticaResultSetMetaData(null, null, signature);
95+
val listener = new AlreadyReadyNoopListener(queryId);
96+
val result = new StreamingResultSet(listener, null, state, signature, metadata, timezone, null);
97+
val cursor = new ArrowStreamReaderCursor(reader);
98+
result.execute2(cursor, columns);
99+
100+
return result;
101+
} catch (Exception ex) {
102+
throw QueryExceptionHandler.createException(QUERY_FAILURE + queryId, ex);
103+
}
104+
}
105+
76106
@Override
77107
public String getQueryId() {
78108
return listener.getQueryId();
@@ -87,4 +117,24 @@ public String getStatus() {
87117
public boolean isReady() {
88118
return listener.isReady();
89119
}
120+
121+
private static final String QUERY_FAILURE = "Failed to execute query: ";
122+
123+
@Value
124+
private static class AlreadyReadyNoopListener implements QueryStatusListener {
125+
String queryId;
126+
String status = "Status should be determined via DataCloudConnection::getStatus";
127+
String query = null;
128+
boolean ready = true;
129+
130+
@Override
131+
public DataCloudResultSet generateResultSet() {
132+
return null;
133+
}
134+
135+
@Override
136+
public Stream<QueryResult> stream() {
137+
return Stream.empty();
138+
}
139+
}
90140
}

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/listener/QueryStatusListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.stream.Stream;
2121
import salesforce.cdp.hyperdb.v1.QueryResult;
2222

23+
@Deprecated
2324
public interface QueryStatusListener {
2425
String BEFORE_READY = "Results were requested before ready";
2526

0 commit comments

Comments
 (0)