Skip to content

Commit 5996c91

Browse files
Fix StreamingResultSet metadata methods (#21)
- Manually implemented `StreamingResultSet::getType`, `StreamingResultSet::getConcurrency`, and `StreamingResultSet::getFetchDirection` methods to return appropriate values that match how `StreamingResultSet` works. - Moved a file into the `jdbc-slim` module that must have been missed during a subtle merge conflict in #19 - Added some more test coverage for row-based pagination, specifically for `DataCloudPreparedStatement`
1 parent 1471135 commit 5996c91

File tree

8 files changed

+184
-46
lines changed

8 files changed

+184
-46
lines changed

jdbc-slim/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@
206206
</goals>
207207
<configuration>
208208
<addOutputDirectory>false</addOutputDirectory>
209-
<sourceDirectory>jdbc-driver/src/main/java</sourceDirectory>
209+
<sourceDirectory>jdbc-slim/src/main/java</sourceDirectory>
210210
<outputDirectory>${project.build.directory}/delombok</outputDirectory>
211211
</configuration>
212212
</execution>

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
190190
* @return A {@link DataCloudResultSet} containing the query results.
191191
*/
192192
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
193+
log.info("Get row-based result set. queryId={}, offset={}, limit={}, mode={}", queryId, offset, limit, mode);
193194
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
194195
return StreamingResultSet.of(queryId, executor, iterator);
195196
}

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudStatement.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class DataCloudStatement implements Statement {
4444
protected static final String NOT_SUPPORTED_IN_DATACLOUD_QUERY = "Write is not supported in Data Cloud query";
4545
protected static final String BATCH_EXECUTION_IS_NOT_SUPPORTED =
4646
"Batch execution is not supported in Data Cloud query";
47+
protected static final String CHANGE_FETCH_DIRECTION_IS_NOT_SUPPORTED = "Changing fetch direction is not supported";
4748
private static final String QUERY_TIMEOUT = "queryTimeout";
4849
public static final int DEFAULT_QUERY_TIMEOUT = 3 * 60 * 60;
4950

@@ -246,11 +247,14 @@ public boolean getMoreResults() {
246247
}
247248

248249
@Override
249-
public void setFetchDirection(int direction) {}
250+
public void setFetchDirection(int direction) throws SQLException {
251+
throw new DataCloudJDBCException(CHANGE_FETCH_DIRECTION_IS_NOT_SUPPORTED, SqlErrorCodes.FEATURE_NOT_SUPPORTED);
252+
}
250253

251254
@Override
252-
public int getFetchDirection() {
253-
return ResultSet.FETCH_FORWARD;
255+
public int getFetchDirection() throws SQLException {
256+
assertQueryExecuted();
257+
return resultSet.getFetchDirection();
254258
}
255259

256260
@Override
@@ -262,13 +266,15 @@ public int getFetchSize() {
262266
}
263267

264268
@Override
265-
public int getResultSetConcurrency() {
266-
return 0;
269+
public int getResultSetConcurrency() throws SQLException {
270+
assertQueryExecuted();
271+
return resultSet.getConcurrency();
267272
}
268273

269274
@Override
270-
public int getResultSetType() {
271-
return ResultSet.TYPE_FORWARD_ONLY;
275+
public int getResultSetType() throws SQLException {
276+
assertQueryExecuted();
277+
return resultSet.getType();
272278
}
273279

274280
@Override

jdbc-slim/src/main/java/com/salesforce/datacloud/jdbc/core/StreamingResultSet.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
2020
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
2121
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
22+
import java.sql.ResultSet;
2223
import java.sql.ResultSetMetaData;
2324
import java.sql.SQLException;
2425
import java.util.Collections;
@@ -137,4 +138,19 @@ public Stream<QueryResult> stream() {
137138
return Stream.empty();
138139
}
139140
}
141+
142+
@Override
143+
public int getType() {
144+
return ResultSet.TYPE_FORWARD_ONLY;
145+
}
146+
147+
@Override
148+
public int getConcurrency() {
149+
return ResultSet.CONCUR_READ_ONLY;
150+
}
151+
152+
@Override
153+
public int getFetchDirection() {
154+
return ResultSet.FETCH_FORWARD;
155+
}
140156
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
20+
21+
import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
22+
import com.salesforce.datacloud.jdbc.hyper.HyperTestBase;
23+
import java.sql.ResultSet;
24+
import lombok.SneakyThrows;
25+
import lombok.val;
26+
import org.junit.jupiter.api.Test;
27+
28+
public class DataCloudStatementFunctionalTest extends HyperTestBase {
29+
@Test
30+
@SneakyThrows
31+
public void forwardAndReadOnly() {
32+
assertWithStatement(statement -> {
33+
val rs = statement.executeQuery("select 1");
34+
35+
assertThat(statement.getResultSetConcurrency()).isEqualTo(ResultSet.CONCUR_READ_ONLY);
36+
assertThat(statement.getFetchDirection()).isEqualTo(ResultSet.FETCH_FORWARD);
37+
assertThat(statement.getResultSetType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
38+
39+
assertThat(rs.getType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
40+
assertThat(rs.getConcurrency()).isEqualTo(ResultSet.CONCUR_READ_ONLY);
41+
});
42+
}
43+
44+
private static final String EXECUTED_MESSAGE = "a query was not executed before attempting to access results";
45+
46+
@SneakyThrows
47+
@Test
48+
public void requiresExecutedResultSet() {
49+
assertWithStatement(statement -> assertThatThrownBy(statement::getResultSetType)
50+
.isInstanceOf(DataCloudJDBCException.class)
51+
.hasMessage(EXECUTED_MESSAGE));
52+
53+
assertWithStatement(statement -> assertThatThrownBy(statement::getResultSetConcurrency)
54+
.isInstanceOf(DataCloudJDBCException.class)
55+
.hasMessage(EXECUTED_MESSAGE));
56+
57+
assertWithStatement(statement -> assertThatThrownBy(statement::getFetchDirection)
58+
.isInstanceOf(DataCloudJDBCException.class)
59+
.hasMessage(EXECUTED_MESSAGE));
60+
}
61+
}

jdbc-slim/src/test/java/com/salesforce/datacloud/jdbc/core/DataCloudStatementTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,6 @@ public void beforeEach() {
6565
statement = new DataCloudStatement(connection);
6666
}
6767

68-
@Test
69-
@SneakyThrows
70-
public void forwardOnly() {
71-
assertThat(statement.getFetchDirection()).isEqualTo(ResultSet.FETCH_FORWARD);
72-
assertThat(statement.getResultSetType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
73-
}
74-
7568
private static Stream<Executable> unsupportedBatchExecutes() {
7669
return Stream.of(
7770
() -> statement.execute("", 1),
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core.partial;
17+
18+
import java.util.stream.LongStream;
19+
import java.util.stream.Stream;
20+
import lombok.Value;
21+
import lombok.val;
22+
23+
@Value
24+
class Page {
25+
long offset;
26+
long limit;
27+
28+
/**
29+
* Calculates some number of full pages of some limit with a final page making up the remainder of rows
30+
* @param rows the total number of rows in the query result
31+
* @param limit the total number of rows to be acquired in this page
32+
* @return a stream of pages that can be mapped to
33+
* {@link com.salesforce.datacloud.jdbc.core.DataCloudConnection#getRowBasedResultSet } calls
34+
*/
35+
public static Stream<Page> stream(long rows, long limit) {
36+
long baseSize = Math.min(rows, limit);
37+
long fullPageCount = rows / baseSize;
38+
long remainder = rows % baseSize;
39+
40+
val fullPages = LongStream.range(0, fullPageCount).mapToObj(i -> new Page(i * baseSize, baseSize));
41+
return Stream.concat(
42+
fullPages, remainder > 0 ? Stream.of(new Page(fullPageCount * baseSize, remainder)) : Stream.empty());
43+
}
44+
}

src/test/java/com/salesforce/datacloud/jdbc/core/partial/RowBasedTest.java renamed to jdbc-slim/src/test/java/com/salesforce/datacloud/jdbc/core/partial/RowBasedTest.java

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.mock;
2121

2222
import com.salesforce.datacloud.jdbc.core.DataCloudConnection;
23+
import com.salesforce.datacloud.jdbc.core.DataCloudPreparedStatement;
2324
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
2425
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
2526
import com.salesforce.datacloud.jdbc.core.DataCloudStatement;
@@ -31,16 +32,16 @@
3132
import java.util.List;
3233
import java.util.stream.Collectors;
3334
import java.util.stream.IntStream;
34-
import java.util.stream.LongStream;
3535
import java.util.stream.Stream;
3636
import lombok.SneakyThrows;
37-
import lombok.Value;
3837
import lombok.extern.slf4j.Slf4j;
3938
import lombok.val;
4039
import org.junit.jupiter.api.BeforeAll;
4140
import org.junit.jupiter.api.Test;
4241
import org.junit.jupiter.params.ParameterizedTest;
42+
import org.junit.jupiter.params.provider.Arguments;
4343
import org.junit.jupiter.params.provider.EnumSource;
44+
import org.junit.jupiter.params.provider.MethodSource;
4445

4546
@Slf4j
4647
class RowBasedTest extends HyperTestBase {
@@ -115,46 +116,62 @@ void throwsWhenFullRangeOverrunsAvailableRows() {
115116
tinySize, tinySize));
116117
}
117118

119+
Stream<Arguments> querySizeAndPageSize() {
120+
val sizes = IntStream.rangeClosed(0, 13).mapToObj(i -> 1 << i).collect(Collectors.toList());
121+
return sizes.stream().flatMap(left -> sizes.stream().map(right -> Arguments.of(left, right)));
122+
}
123+
118124
@SneakyThrows
119-
@Test
120-
void fetchWithRowsNearEndRange_FULL_RANGE() {
121-
val threads = 3;
122-
final long rows;
123-
try (val conn = getHyperQueryConnection()) {
124-
rows = conn.getQueryStatus(small)
125-
.filter(t -> t.isResultProduced() || t.isExecutionFinished())
126-
.map(DataCloudQueryStatus::getRowCount)
127-
.findFirst()
128-
.orElseThrow(() -> new RuntimeException("boom"));
129-
}
125+
@ParameterizedTest
126+
@MethodSource("querySizeAndPageSize")
127+
void fullRangeRowBasedParameterizedQuery(int querySize, int limit) {
128+
val expected = rangeClosed(1, querySize);
129+
130+
final String queryId;
131+
132+
try (val conn = getHyperQueryConnection();
133+
val statement = conn.prepareStatement("select a from generate_series(1, ?) as s(a)")
134+
.unwrap(DataCloudPreparedStatement.class)) {
135+
statement.setInt(1, querySize);
136+
statement.executeQuery();
130137

131-
val baseSize = rows / threads;
132-
val remainder = rows % threads;
138+
queryId = statement.getQueryId();
139+
}
133140

134-
val pages = StreamUtilities.takeWhile(
135-
LongStream.range(0, threads).mapToObj(i -> {
136-
val limit = baseSize + (i < remainder ? 1 : 0);
137-
val offset = baseSize * i + Math.min(i, remainder);
138-
return new Page(offset, limit);
139-
}),
140-
p -> p.limit > 0);
141+
try (val conn = getHyperQueryConnection()) {
142+
val rows = getRowCount(conn, queryId);
143+
val pages = Page.stream(rows, limit).collect(Collectors.toList());
144+
log.info("pages: {}", pages);
145+
val actual = pages.parallelStream()
146+
.map(page -> conn.getRowBasedResultSet(
147+
queryId, page.getOffset(), page.getLimit(), RowBased.Mode.FULL_RANGE))
148+
.flatMap(RowBasedTest::toStream)
149+
.collect(Collectors.toList());
150+
assertThat(actual).containsExactlyElementsOf(expected);
151+
}
152+
}
141153

154+
@SneakyThrows
155+
@Test
156+
void fetchWithRowsNearEndRange_FULL_RANGE() {
142157
try (val conn = getHyperQueryConnection()) {
143-
val actual = pages.parallel()
144-
.map(p -> {
145-
log.info("Executing FULL_RANGE request for page {}", p);
146-
return conn.getRowBasedResultSet(small, p.offset, p.limit, RowBased.Mode.FULL_RANGE);
147-
})
158+
val rows = getRowCount(conn, small);
159+
val actual = Page.stream(rows, 5)
160+
.parallel()
161+
.map(page -> conn.getRowBasedResultSet(
162+
small, page.getOffset(), page.getLimit(), RowBased.Mode.FULL_RANGE))
148163
.flatMap(RowBasedTest::toStream)
149164
.collect(Collectors.toList());
150165
assertThat(actual).containsExactlyElementsOf(rangeClosed(1, smallSize));
151166
}
152167
}
153168

154-
@Value
155-
private static class Page {
156-
long offset;
157-
long limit;
169+
private long getRowCount(DataCloudConnection conn, String queryId) {
170+
return conn.getQueryStatus(queryId)
171+
.filter(t -> t.isResultProduced() || t.isExecutionFinished())
172+
.map(DataCloudQueryStatus::getRowCount)
173+
.findFirst()
174+
.orElseThrow(() -> new RuntimeException("boom"));
158175
}
159176

160177
@SneakyThrows

0 commit comments

Comments
 (0)