Skip to content

Commit 3783c85

Browse files
committed
feat: add row-based access of results to connection
1 parent a1e5d3c commit 3783c85

File tree

13 files changed

+700
-7
lines changed

13 files changed

+700
-7
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ properties.put("clientSecret", "${clientSecret}");
7575

7676
The documentation for jwt authentication can be found [here][jwt flow].
7777

78-
Instuctions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
78+
Instructions to generate a private key can be found [here](#generating-a-private-key-for-jwt-authentication)
7979

8080
```java
8181
Properties properties = new Properties();

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.salesforce.datacloud</groupId>
55
<artifactId>jdbc</artifactId>
6-
<version>0.23.0-SNAPSHOT</version>
6+
<version>0.24.0-SNAPSHOT</version>
77
<packaging>jar</packaging>
88
<name>Salesforce Data Cloud JDBC Driver</name>
99
<description>Salesforce Data Cloud JDBC Driver</description>

src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.salesforce.datacloud.jdbc.auth.AuthenticationSettings;
2323
import com.salesforce.datacloud.jdbc.auth.DataCloudTokenProcessor;
2424
import com.salesforce.datacloud.jdbc.auth.TokenProcessor;
25+
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
2526
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2627
import com.salesforce.datacloud.jdbc.http.ClientBuilder;
2728
import com.salesforce.datacloud.jdbc.interceptor.AuthorizationHeaderInterceptor;
2829
import com.salesforce.datacloud.jdbc.interceptor.DataspaceHeaderInterceptor;
2930
import com.salesforce.datacloud.jdbc.interceptor.HyperExternalClientContextHeaderInterceptor;
3031
import com.salesforce.datacloud.jdbc.interceptor.HyperWorkloadHeaderInterceptor;
3132
import com.salesforce.datacloud.jdbc.interceptor.TracingHeadersInterceptor;
33+
import com.salesforce.datacloud.jdbc.util.Unstable;
3234
import io.grpc.ClientInterceptor;
3335
import io.grpc.ManagedChannelBuilder;
3436
import java.sql.Array;
@@ -173,6 +175,24 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
173175
return new DataCloudPreparedStatement(this, sql, new DefaultParameterManager());
174176
}
175177

178+
/**
179+
* Use getQueryStatus to determine if your query is "ready" then use this to get a collection of rows.
180+
* When using {@link RowBased.Mode#FULL_RANGE} this method is not responsible for calculating the offset near the end of available rows,
181+
* you must calculate the correct "pages" of offset and limit.
182+
*/
183+
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
184+
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
185+
return StreamingResultSet.of(queryId, executor, iterator);
186+
}
187+
188+
/**
189+
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
190+
*/
191+
@Unstable
192+
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
193+
return executor.getQueryStatus(queryId);
194+
}
195+
176196
@Override
177197
public CallableStatement prepareCall(String sql) {
178198
return null;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core;
17+
18+
import java.util.Optional;
19+
import lombok.Value;
20+
import lombok.val;
21+
import salesforce.cdp.hyperdb.v1.QueryInfo;
22+
import salesforce.cdp.hyperdb.v1.QueryStatus;
23+
24+
@Value
25+
public class DataCloudQueryStatus {
26+
public enum CompletionStatus {
27+
RUNNING,
28+
RESULTS_PRODUCED,
29+
FINISHED
30+
}
31+
32+
String queryId;
33+
34+
long chunkCount;
35+
36+
long rowCount;
37+
38+
double progress;
39+
40+
CompletionStatus completionStatus;
41+
42+
public boolean isResultsProduced() {
43+
return completionStatus == CompletionStatus.RESULTS_PRODUCED;
44+
}
45+
46+
public boolean isExecutionFinished() {
47+
return completionStatus == CompletionStatus.FINISHED;
48+
}
49+
50+
static Optional<DataCloudQueryStatus> of(QueryInfo queryInfo) {
51+
return Optional.ofNullable(queryInfo).map(QueryInfo::getQueryStatus).map(DataCloudQueryStatus::of);
52+
}
53+
54+
private static DataCloudQueryStatus of(QueryStatus s) {
55+
val completionStatus = of(s.getCompletionStatus());
56+
return new DataCloudQueryStatus(
57+
s.getQueryId(), s.getChunkCount(), s.getRowCount(), s.getProgress(), completionStatus);
58+
}
59+
60+
private static CompletionStatus of(QueryStatus.CompletionStatus completionStatus) {
61+
switch (completionStatus) {
62+
case RUNNING_OR_UNSPECIFIED:
63+
return CompletionStatus.RUNNING;
64+
case RESULTS_PRODUCED:
65+
return CompletionStatus.RESULTS_PRODUCED;
66+
case FINISHED:
67+
return CompletionStatus.FINISHED;
68+
default:
69+
throw new IllegalArgumentException("Unknown completion status. status=" + completionStatus);
70+
}
71+
}
72+
}

src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
2626
import com.salesforce.datacloud.jdbc.util.Constants;
2727
import com.salesforce.datacloud.jdbc.util.SqlErrorCodes;
28+
import com.salesforce.datacloud.jdbc.util.Unstable;
2829
import java.sql.Connection;
2930
import java.sql.ResultSet;
3031
import java.sql.SQLException;
@@ -73,16 +74,27 @@ protected HyperGrpcClientExecutor getQueryExecutor(QueryParam additionalQueryPar
7374
return clientBuilder.queryTimeout(getQueryTimeout()).build();
7475
}
7576

76-
private void assertQueryReady() throws SQLException {
77+
private void assertQueryExecuted() throws SQLException {
7778
if (listener == null) {
7879
throw new DataCloudJDBCException("a query was not executed before attempting to access results");
7980
}
81+
}
82+
83+
private void assertQueryReady() throws SQLException {
84+
assertQueryExecuted();
8085

8186
if (!listener.isReady()) {
8287
throw new DataCloudJDBCException("query results were not ready");
8388
}
8489
}
8590

91+
@Unstable
92+
public String getQueryId() throws SQLException {
93+
assertQueryExecuted();
94+
95+
return listener.getQueryId();
96+
}
97+
8698
public boolean isReady() {
8799
return listener.isReady();
88100
}

src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.salesforce.datacloud.jdbc.config.DriverVersion;
2121
import com.salesforce.datacloud.jdbc.interceptor.QueryIdHeaderInterceptor;
2222
import com.salesforce.datacloud.jdbc.util.PropertiesExtensions;
23+
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
24+
import com.salesforce.datacloud.jdbc.util.Unstable;
2325
import io.grpc.ClientInterceptor;
2426
import io.grpc.ManagedChannel;
2527
import io.grpc.ManagedChannelBuilder;
@@ -29,8 +31,10 @@
2931
import java.util.Iterator;
3032
import java.util.List;
3133
import java.util.Map;
34+
import java.util.Optional;
3235
import java.util.Properties;
3336
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.Stream;
3438
import lombok.AccessLevel;
3539
import lombok.Builder;
3640
import lombok.Getter;
@@ -45,6 +49,7 @@
4549
import salesforce.cdp.hyperdb.v1.QueryParam;
4650
import salesforce.cdp.hyperdb.v1.QueryResult;
4751
import salesforce.cdp.hyperdb.v1.QueryResultParam;
52+
import salesforce.cdp.hyperdb.v1.ResultRange;
4853

4954
@Slf4j
5055
@Builder(toBuilder = true)
@@ -135,6 +140,29 @@ public Iterator<QueryInfo> getQueryInfoStreaming(String queryId) {
135140
return getStub(queryId).getQueryInfo(param);
136141
}
137142

143+
@Unstable
144+
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
145+
val iterator = getQueryInfo(queryId);
146+
return StreamUtilities.toStream(iterator)
147+
.map(DataCloudQueryStatus::of)
148+
.filter(Optional::isPresent)
149+
.map(Optional::get);
150+
}
151+
152+
public Iterator<QueryResult> getQueryResult(String queryId, long offset, long limit, boolean omitSchema) {
153+
val rowRange =
154+
ResultRange.newBuilder().setRowOffset(offset).setRowLimit(limit).setByteLimit(1024);
155+
156+
final QueryResultParam param = QueryResultParam.newBuilder()
157+
.setQueryId(queryId)
158+
.setResultRange(rowRange)
159+
.setOmitSchema(omitSchema)
160+
.setOutputFormat(OutputFormat.ARROW_IPC)
161+
.build();
162+
163+
return getStub(queryId).getQueryResult(param);
164+
}
165+
138166
public Iterator<QueryResult> getQueryResult(String queryId, long chunkId, boolean omitSchema) {
139167
val param = getQueryResultParam(queryId, chunkId, omitSchema);
140168
return getStub(queryId).getQueryResult(param);
@@ -161,12 +189,9 @@ private QueryResultParam getQueryResultParam(String queryId, long chunkId, boole
161189
val builder = QueryResultParam.newBuilder()
162190
.setQueryId(queryId)
163191
.setChunkId(chunkId)
192+
.setOmitSchema(omitSchema)
164193
.setOutputFormat(OutputFormat.ARROW_IPC);
165194

166-
if (omitSchema) {
167-
builder.setOmitSchema(true);
168-
}
169-
170195
return builder.build();
171196
}
172197

src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
import com.salesforce.datacloud.jdbc.core.listener.QueryStatusListener;
1919
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
2020
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
21+
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
2122
import java.sql.ResultSetMetaData;
2223
import java.sql.SQLException;
2324
import java.util.Collections;
25+
import java.util.Iterator;
2426
import java.util.TimeZone;
27+
import java.util.stream.Stream;
2528
import lombok.SneakyThrows;
29+
import lombok.Value;
2630
import lombok.extern.slf4j.Slf4j;
2731
import lombok.val;
2832
import org.apache.arrow.memory.RootAllocator;
@@ -32,6 +36,7 @@
3236
import org.apache.calcite.avatica.AvaticaStatement;
3337
import org.apache.calcite.avatica.Meta;
3438
import org.apache.calcite.avatica.QueryState;
39+
import salesforce.cdp.hyperdb.v1.QueryResult;
3540

3641
@Slf4j
3742
public class StreamingResultSet extends AvaticaResultSet implements DataCloudResultSet {
@@ -51,6 +56,7 @@ private StreamingResultSet(
5156
this.listener = listener;
5257
}
5358

59+
@Deprecated
5460
@SneakyThrows
5561
public static StreamingResultSet of(String sql, QueryStatusListener listener) {
5662
try {
@@ -73,6 +79,30 @@ public static StreamingResultSet of(String sql, QueryStatusListener listener) {
7379
}
7480
}
7581

82+
@SneakyThrows
83+
public static StreamingResultSet of(
84+
String queryId, HyperGrpcClientExecutor client, Iterator<QueryResult> iterator) {
85+
try {
86+
val channel = ExecuteQueryResponseChannel.of(StreamUtilities.toStream(iterator));
87+
val reader = new ArrowStreamReader(channel, new RootAllocator(ROOT_ALLOCATOR_MB_FROM_V2));
88+
val schemaRoot = reader.getVectorSchemaRoot();
89+
val columns = ArrowUtils.toColumnMetaData(schemaRoot.getSchema().getFields());
90+
val timezone = TimeZone.getDefault();
91+
val state = new QueryState();
92+
val signature = new Meta.Signature(
93+
columns, null, Collections.emptyList(), Collections.emptyMap(), null, Meta.StatementType.SELECT);
94+
val metadata = new AvaticaResultSetMetaData(null, null, signature);
95+
val listener = new AlreadyReadyNoopListener(queryId);
96+
val result = new StreamingResultSet(listener, null, state, signature, metadata, timezone, null);
97+
val cursor = new ArrowStreamReaderCursor(reader);
98+
result.execute2(cursor, columns);
99+
100+
return result;
101+
} catch (Exception ex) {
102+
throw QueryExceptionHandler.createException(QUERY_FAILURE + queryId, ex);
103+
}
104+
}
105+
76106
@Override
77107
public String getQueryId() {
78108
return listener.getQueryId();
@@ -89,4 +119,22 @@ public boolean isReady() {
89119
}
90120

91121
private static final String QUERY_FAILURE = "Failed to execute query: ";
122+
123+
@Value
124+
private static class AlreadyReadyNoopListener implements QueryStatusListener {
125+
String queryId;
126+
String status = "Status should be determined via DataCloudConnection::getStatus";
127+
String query = null;
128+
boolean ready = true;
129+
130+
@Override
131+
public DataCloudResultSet generateResultSet() {
132+
return null;
133+
}
134+
135+
@Override
136+
public Stream<QueryResult> stream() {
137+
return Stream.empty();
138+
}
139+
}
92140
}

src/main/java/com/salesforce/datacloud/jdbc/core/listener/QueryStatusListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.stream.Stream;
2121
import salesforce.cdp.hyperdb.v1.QueryResult;
2222

23+
@Deprecated
2324
public interface QueryStatusListener {
2425
String BEFORE_READY = "Results were requested before ready";
2526

0 commit comments

Comments
 (0)