Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jdbc-slim/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
</goals>
<configuration>
<addOutputDirectory>false</addOutputDirectory>
<sourceDirectory>jdbc-driver/src/main/java</sourceDirectory>
<sourceDirectory>jdbc-slim/src/main/java</sourceDirectory>
<outputDirectory>${project.build.directory}/delombok</outputDirectory>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
* @return A {@link DataCloudResultSet} containing the query results.
*/
public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long limit, RowBased.Mode mode) {
log.info("Get row-based result set. queryId={}, offset={}, limit={}, mode={}", queryId, offset, limit, mode);
val iterator = RowBased.of(executor, queryId, offset, limit, mode);
return StreamingResultSet.of(queryId, executor, iterator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DataCloudStatement implements Statement {
protected static final String NOT_SUPPORTED_IN_DATACLOUD_QUERY = "Write is not supported in Data Cloud query";
protected static final String BATCH_EXECUTION_IS_NOT_SUPPORTED =
"Batch execution is not supported in Data Cloud query";
protected static final String CHANGE_FETCH_DIRECTION_IS_NOT_SUPPORTED = "Changing fetch direction is not supported";
private static final String QUERY_TIMEOUT = "queryTimeout";
public static final int DEFAULT_QUERY_TIMEOUT = 3 * 60 * 60;

Expand Down Expand Up @@ -246,11 +247,14 @@ public boolean getMoreResults() {
}

@Override
public void setFetchDirection(int direction) {}
public void setFetchDirection(int direction) throws SQLException {
throw new DataCloudJDBCException(CHANGE_FETCH_DIRECTION_IS_NOT_SUPPORTED, SqlErrorCodes.FEATURE_NOT_SUPPORTED);
}

@Override
public int getFetchDirection() {
return ResultSet.FETCH_FORWARD;
public int getFetchDirection() throws SQLException {
assertQueryExecuted();
return resultSet.getFetchDirection();
}

@Override
Expand All @@ -262,13 +266,15 @@ public int getFetchSize() {
}

@Override
public int getResultSetConcurrency() {
return 0;
public int getResultSetConcurrency() throws SQLException {
assertQueryExecuted();
return resultSet.getConcurrency();
}

@Override
public int getResultSetType() {
return ResultSet.TYPE_FORWARD_ONLY;
public int getResultSetType() throws SQLException {
assertQueryExecuted();
return resultSet.getType();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.salesforce.datacloud.jdbc.exception.QueryExceptionHandler;
import com.salesforce.datacloud.jdbc.util.ArrowUtils;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
Expand Down Expand Up @@ -137,4 +138,19 @@ public Stream<QueryResult> stream() {
return Stream.empty();
}
}

@Override
public int getType() {
return ResultSet.TYPE_FORWARD_ONLY;
}

@Override
public int getConcurrency() {
return ResultSet.CONCUR_READ_ONLY;
}

@Override
public int getFetchDirection() {
return ResultSet.FETCH_FORWARD;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2024, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.salesforce.datacloud.jdbc.core;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.hyper.HyperTestBase;
import java.sql.ResultSet;
import lombok.SneakyThrows;
import lombok.val;
import org.junit.jupiter.api.Test;

public class DataCloudStatementFunctionalTest extends HyperTestBase {
@Test
@SneakyThrows
public void forwardAndReadOnly() {
assertWithStatement(statement -> {
val rs = statement.executeQuery("select 1");

assertThat(statement.getResultSetConcurrency()).isEqualTo(ResultSet.CONCUR_READ_ONLY);
assertThat(statement.getFetchDirection()).isEqualTo(ResultSet.FETCH_FORWARD);
assertThat(statement.getResultSetType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);

assertThat(rs.getType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
assertThat(rs.getConcurrency()).isEqualTo(ResultSet.CONCUR_READ_ONLY);
});
}

private static final String EXECUTED_MESSAGE = "a query was not executed before attempting to access results";

@SneakyThrows
@Test
public void requiresExecutedResultSet() {
assertWithStatement(statement -> assertThatThrownBy(statement::getResultSetType)
.isInstanceOf(DataCloudJDBCException.class)
.hasMessage(EXECUTED_MESSAGE));

assertWithStatement(statement -> assertThatThrownBy(statement::getResultSetConcurrency)
.isInstanceOf(DataCloudJDBCException.class)
.hasMessage(EXECUTED_MESSAGE));

assertWithStatement(statement -> assertThatThrownBy(statement::getFetchDirection)
.isInstanceOf(DataCloudJDBCException.class)
.hasMessage(EXECUTED_MESSAGE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ public void beforeEach() {
statement = new DataCloudStatement(connection);
}

@Test
@SneakyThrows
public void forwardOnly() {
assertThat(statement.getFetchDirection()).isEqualTo(ResultSet.FETCH_FORWARD);
assertThat(statement.getResultSetType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
}

private static Stream<Executable> unsupportedBatchExecutes() {
return Stream.of(
() -> statement.execute("", 1),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.salesforce.datacloud.jdbc.core.partial;

import java.util.stream.LongStream;
import java.util.stream.Stream;
import lombok.Value;
import lombok.val;

@Value
class Page {
long offset;
long limit;

/**
* Calculates some number of full pages of some limit with a final page making up the remainder of rows
* @param rows the total number of rows in the query result
* @param limit the total number of rows to be acquired in this page
* @return a stream of pages that can be mapped to
* {@link com.salesforce.datacloud.jdbc.core.DataCloudConnection#getRowBasedResultSet } calls
*/
public static Stream<Page> stream(long rows, long limit) {
long baseSize = Math.min(rows, limit);
long fullPageCount = rows / baseSize;
long remainder = rows % baseSize;

val fullPages = LongStream.range(0, fullPageCount).mapToObj(i -> new Page(i * baseSize, baseSize));
return Stream.concat(
fullPages, remainder > 0 ? Stream.of(new Page(fullPageCount * baseSize, remainder)) : Stream.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.mock;

import com.salesforce.datacloud.jdbc.core.DataCloudConnection;
import com.salesforce.datacloud.jdbc.core.DataCloudPreparedStatement;
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
import com.salesforce.datacloud.jdbc.core.DataCloudStatement;
Expand All @@ -31,16 +32,16 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

@Slf4j
class RowBasedTest extends HyperTestBase {
Expand Down Expand Up @@ -115,46 +116,62 @@ void throwsWhenFullRangeOverrunsAvailableRows() {
tinySize, tinySize));
}

Stream<Arguments> querySizeAndPageSize() {
val sizes = IntStream.rangeClosed(0, 13).mapToObj(i -> 1 << i).collect(Collectors.toList());
return sizes.stream().flatMap(left -> sizes.stream().map(right -> Arguments.of(left, right)));
}

@SneakyThrows
@Test
void fetchWithRowsNearEndRange_FULL_RANGE() {
val threads = 3;
final long rows;
try (val conn = getHyperQueryConnection()) {
rows = conn.getQueryStatus(small)
.filter(t -> t.isResultProduced() || t.isExecutionFinished())
.map(DataCloudQueryStatus::getRowCount)
.findFirst()
.orElseThrow(() -> new RuntimeException("boom"));
}
@ParameterizedTest
@MethodSource("querySizeAndPageSize")
void fullRangeRowBasedParameterizedQuery(int querySize, int limit) {
val expected = rangeClosed(1, querySize);

final String queryId;

try (val conn = getHyperQueryConnection();
val statement = conn.prepareStatement("select a from generate_series(1, ?) as s(a)")
.unwrap(DataCloudPreparedStatement.class)) {
statement.setInt(1, querySize);
statement.executeQuery();

val baseSize = rows / threads;
val remainder = rows % threads;
queryId = statement.getQueryId();
}

val pages = StreamUtilities.takeWhile(
LongStream.range(0, threads).mapToObj(i -> {
val limit = baseSize + (i < remainder ? 1 : 0);
val offset = baseSize * i + Math.min(i, remainder);
return new Page(offset, limit);
}),
p -> p.limit > 0);
try (val conn = getHyperQueryConnection()) {
val rows = getRowCount(conn, queryId);
val pages = Page.stream(rows, limit).collect(Collectors.toList());
log.info("pages: {}", pages);
val actual = pages.parallelStream()
.map(page -> conn.getRowBasedResultSet(
queryId, page.getOffset(), page.getLimit(), RowBased.Mode.FULL_RANGE))
.flatMap(RowBasedTest::toStream)
.collect(Collectors.toList());
assertThat(actual).containsExactlyElementsOf(expected);
}
}

@SneakyThrows
@Test
void fetchWithRowsNearEndRange_FULL_RANGE() {
try (val conn = getHyperQueryConnection()) {
val actual = pages.parallel()
.map(p -> {
log.info("Executing FULL_RANGE request for page {}", p);
return conn.getRowBasedResultSet(small, p.offset, p.limit, RowBased.Mode.FULL_RANGE);
})
val rows = getRowCount(conn, small);
val actual = Page.stream(rows, 5)
.parallel()
.map(page -> conn.getRowBasedResultSet(
small, page.getOffset(), page.getLimit(), RowBased.Mode.FULL_RANGE))
.flatMap(RowBasedTest::toStream)
.collect(Collectors.toList());
assertThat(actual).containsExactlyElementsOf(rangeClosed(1, smallSize));
}
}

@Value
private static class Page {
long offset;
long limit;
private long getRowCount(DataCloudConnection conn, String queryId) {
return conn.getQueryStatus(queryId)
.filter(t -> t.isResultProduced() || t.isExecutionFinished())
.map(DataCloudQueryStatus::getRowCount)
.findFirst()
.orElseThrow(() -> new RuntimeException("boom"));
}

@SneakyThrows
Expand Down
Loading