Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ public class DataCloudConnection implements Connection, AutoCloseable {
@Getter(AccessLevel.PACKAGE)
@NonNull private final HyperGrpcClientExecutor executor;

/**
* This creates a Data Cloud connection with minimal adjustments to the channels.
* The only added interceptors are those for handling connection parameters that influence headers.
* This will not provide auth / tracing, users of this API are expected to wire their own
*/
public static DataCloudConnection fromChannel(@NonNull ManagedChannelBuilder<?> builder, Properties properties)
throws SQLException {
val interceptors = getClientInterceptors(null, properties);
val interceptors = getPropertyDerivedClientInterceptors(properties);
val executor = HyperGrpcClientExecutor.of(builder.intercept(interceptors), properties);

return DataCloudConnection.builder()
Expand Down Expand Up @@ -122,19 +127,38 @@ public static DataCloudConnection fromTokenSupplier(
.build();
}

static List<ClientInterceptor> getClientInterceptors(
AuthorizationHeaderInterceptor authInterceptor, Properties properties) {
/**
* Initializes a list of interceptors that handle channel level concerns that can be defined through properties
* @param properties - The connection properties
* @return a list of client interceptors
*/
static List<ClientInterceptor> getPropertyDerivedClientInterceptors(Properties properties) {
return Stream.of(
authInterceptor,
TracingHeadersInterceptor.of(),
HyperExternalClientContextHeaderInterceptor.of(properties),
HyperWorkloadHeaderInterceptor.of(properties),
DataspaceHeaderInterceptor.of(properties))
.filter(Objects::nonNull)
.peek(t -> log.info("Registering interceptor. interceptor={}", t))
.collect(Collectors.toList());
}

/**
* Initializes the full set of client interceptors from property handling to tracing and auth
* @param authInterceptor an optional auth interceptor, is allowed to be null
* @param properties the connection properties
* @return a list of client interceptors
*/
static List<ClientInterceptor> getClientInterceptors(
AuthorizationHeaderInterceptor authInterceptor, Properties properties) {
val list = getPropertyDerivedClientInterceptors(properties);
list.add(0, TracingHeadersInterceptor.of());
if (authInterceptor != null) {
list.add(0, authInterceptor);
}
;
log.info("Registering interceptor. interceptor={}", list);
return list;
}

public static DataCloudConnection of(String url, Properties properties) throws SQLException {
val connectionString = DataCloudConnectionString.of(url);
addClientUsernameIfRequired(properties);
Expand Down Expand Up @@ -179,6 +203,7 @@ private DataCloudPreparedStatement getQueryPreparedStatement(String sql) {
/**
* Retrieves a collection of rows for the specified query once it is ready.
* Use {@link #getQueryStatus(String)} to check if the query has produced results or finished execution before calling this method.
* You can get the Query Id from the executeQuery `DataCloudResultSet`.
* <p>
* When using {@link RowBased.Mode#FULL_RANGE}, this method does not handle pagination near the end of available rows.
* The caller is responsible for calculating the correct offset and limit to avoid out-of-range errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.grpc.Metadata;
import lombok.NonNull;

interface SingleHeaderMutatingClientInterceptor extends HeaderMutatingClientInterceptor {
public interface SingleHeaderMutatingClientInterceptor extends HeaderMutatingClientInterceptor {
@NonNull Metadata.Key<String> getKey();

@NonNull String getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AsyncStreamingResultSetTest extends HyperTestBase {
@SneakyThrows
public void testThrowsOnNonsenseQueryAsync() {
val ex = Assertions.assertThrows(DataCloudJDBCException.class, () -> {
try (val connection = HyperTestBase.getHyperQueryConnection();
try (val connection = getHyperQueryConnection();
val statement = connection.createStatement().unwrap(DataCloudStatement.class)) {
val rs = statement.executeAsyncQuery("select * from nonsense");
waitUntilReady(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private String getQueryId(int max) {

try (val client = getHyperQueryConnection();
val statement = client.createStatement().unwrap(DataCloudStatement.class)) {
statement.executeAsyncQuery(query);
statement.executeQuery(query);
return statement.getQueryId();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2024, Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.salesforce.datacloud.jdbc.examples;

import static java.lang.Math.min;

import com.salesforce.datacloud.jdbc.core.DataCloudConnection;
import com.salesforce.datacloud.jdbc.core.DataCloudQueryStatus;
import com.salesforce.datacloud.jdbc.core.DataCloudResultSet;
import com.salesforce.datacloud.jdbc.core.partial.RowBased;
import com.salesforce.datacloud.jdbc.hyper.HyperTestBase;
import io.grpc.ManagedChannelBuilder;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

/**
* This example uses a locally spawned Hyper instance to demonstrate best practices around connecting to Hyper.
* This consciously only uses the JDBC API in the core and no helpers (outside of this class) to provide self contained
* examples.
*/
@Slf4j
public class SubmitQueryAndConsumeResultsTest extends HyperTestBase {
/**
* This example shows how to create a Data Cloud Connection while still having full control over concerns like
* authorization and tracing.
*/
@Test
public void testBareBonesExecuteQuery() throws SQLException {
// The connection properties
Properties properties = new Properties();

// You can bring your own gRPC channels, setup in the way you like (mTLS / Plaintext / ...) and your own
// interceptors as well as executors.
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
.usePlaintext();

// Use the JDBC Driver interface
try (DataCloudConnection conn = DataCloudConnection.fromChannel(channel, properties)) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT s FROM generate_series(1,10) s");
while (rs.next()) {
System.out.println("Retrieved value:" + rs.getLong(1));
}
}
}
}

/**
* Analyze the query status, as we have a query status we know that the query was last observed in a non failing
* state.
*
* Offset must always be larger or equal to get row count (which would happen for typical next based pagination)
*/
private static long rowBasedStatusObjectRowsCheck(DataCloudQueryStatus queryStatus, long offset, long pageLimit) {
// Check if we can at least return some data
if (queryStatus.getRowCount() > offset) {
return min(queryStatus.getRowCount() - offset, pageLimit);
}
// A negative count signals that no data is available
return -1;
}

/**
* Checks if the query status signals that all results are produced
*/
private static boolean allResultsProduced(DataCloudQueryStatus queryStatus) {
return queryStatus.isResultProduced() || queryStatus.isExecutionFinished();
}

/**
* This example shows how to use the row based pagination mode to get results segmented by approximate row count.
* For the example we access the results in 2 row ranges and have an implementation where the application doesn't
* know how many results would be produced in the end
*/
@Test
public void testRowBasedPagination() throws SQLException {
final int pageRowLimit = 2;
long offset = 0;
long page = 0;

// The connection properties
Properties properties = new Properties();

// You can bring your own gRPC channels, setup in the way you like (mTLS / Plaintext / ...) and your own
// interceptors as well as executors.
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
.usePlaintext();

try (DataCloudConnection conn = DataCloudConnection.fromChannel(channel, properties)) {
// Submit the query and consume the initial page
String queryId;
try (Statement stmt = conn.createStatement()) {
log.warn("Executing query using a single `ExecuteQuery` RPC Call");
ResultSet rs = stmt.executeQuery("SELECT s FROM generate_series(1,11) s");
queryId = ((DataCloudResultSet) rs).getQueryId();
// For this result set we as a consumer must currently implement the pagination limit ourselves
int i = 0;
while (rs.next() && (i++ < pageRowLimit)) {
++offset;
System.out.println("Retrieved value: " + rs.getLong(1) + " on page " + page);
}
++page;
}

// Consume further pages until the full result is consumed (could also be done on a new connection if
// needed)
// NIT: We should provide an API on the original result set to access the `DataCloudQueryStatus` that way,
// if the query is already finished we don't need to do another network round-trip.
Optional<DataCloudQueryStatus> cachedStatus = Optional.empty();
while (true) {
// Try to make sure we have a status object
if (!cachedStatus.isPresent()) {
// Identify if there is more data?
long lambdaOffset = offset;
// In case of query error this could throw an runtime exception
// NIT: What is the timeout enforced here?
log.warn("Fetching query status using a single `GetQueryInfo` RPC call");
// NIT: Semantically I would want takeWhile here which is only available in Java 11
cachedStatus = conn.getQueryStatus(queryId)
.filter(queryStatus ->
(rowBasedStatusObjectRowsCheck(queryStatus, lambdaOffset, pageRowLimit) > 0)
|| allResultsProduced(queryStatus))
.findFirst();

// Query is still running
// NIT: Check how we should handle this in the presence of timeouts
if (!cachedStatus.isPresent()) {
continue;
}
}

long availableRows = rowBasedStatusObjectRowsCheck(cachedStatus.get(), offset, pageRowLimit);
// Check if query completed and thus we can't produce more results
if (availableRows <= 0) {
if (allResultsProduced(cachedStatus.get())) {
break;
} else {
// We need to fetch a new status in the next iteration
// Due to the long-polling nature of `conn.getQueryStatus` this doesn't result in a busy
// spinning loop even if the query is still executing
cachedStatus = Optional.empty();
continue;
}
}

// At this point we know that rows are available
log.warn("Fetching query status using a single `GetQueryResult` RPC call");
try (ResultSet rs =
conn.getRowBasedResultSet(queryId, offset, pageRowLimit, RowBased.Mode.SINGLE_RPC)) {
while (rs.next()) {
++offset;
System.out.println("Retrieved value: " + rs.getLong(1) + " on page " + page);
}
++page;
}
}
}
log.warn("Completed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public HyperServerProcess(HyperServerConfig.HyperServerConfigBuilder config) {
}
}

int getPort() {
public int getPort() {
return port;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class HyperTestBase {
private static HyperServerProcess instance;
public HyperServerProcess instance;

@SneakyThrows
public static void assertEachRowIsTheSame(ResultSet rs, AtomicInteger prev) {
public final void assertEachRowIsTheSame(ResultSet rs, AtomicInteger prev) {
val expected = prev.incrementAndGet();
val a = rs.getBigDecimal(1).intValue();
assertThat(expected).isEqualTo(a);
}

@SafeVarargs
@SneakyThrows
public static void assertWithConnection(
public final void assertWithConnection(
ThrowingConsumer<DataCloudConnection> assertion, Map.Entry<String, String>... settings) {
try (val connection =
getHyperQueryConnection(settings == null ? ImmutableMap.of() : ImmutableMap.ofEntries(settings))) {
Expand All @@ -70,7 +70,7 @@ public static void assertWithConnection(

@SafeVarargs
@SneakyThrows
public static void assertWithStatement(
public final void assertWithStatement(
ThrowingConsumer<DataCloudStatement> assertion, Map.Entry<String, String>... settings) {
try (val connection = getHyperQueryConnection(
settings == null ? ImmutableMap.of() : ImmutableMap.ofEntries(settings));
Expand All @@ -79,12 +79,19 @@ public static void assertWithStatement(
}
}

public static DataCloudConnection getHyperQueryConnection() {
public DataCloudConnection getHyperQueryConnection() {
return getHyperQueryConnection(ImmutableMap.of());
}

public static DataCloudConnection getHyperQueryConnection(Map<String, String> connectionSettings) {
return instance.getConnection(connectionSettings);
@SneakyThrows
public DataCloudConnection getHyperQueryConnection(Map<String, String> connectionSettings) {
val properties = new Properties();
properties.putAll(connectionSettings);
log.info("Creating connection to port {}", instance.getPort());
ManagedChannelBuilder<?> channel = ManagedChannelBuilder.forAddress("127.0.0.1", instance.getPort())
.usePlaintext();

return DataCloudConnection.fromChannel(channel, properties);
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private static Map<String, String> getHeadersFor(Properties properties) {
.start();
val channel = InProcessChannelBuilder.forName(name).usePlaintext();

try (val connection = DataCloudConnection.fromChannel(channel, properties);
try (val connection = DataCloudConnection.fromTokenSupplier(null, channel, properties);
val statement = connection.createStatement().unwrap(DataCloudStatement.class)) {
statement.executeAsyncQuery("select 1");
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<flatten-maven-plugin.version>1.6.0</flatten-maven-plugin.version>
<git-build-hook-maven-plugin.version>3.5.0</git-build-hook-maven-plugin.version>
<!-- https://tableau.github.io/hyper-db/docs/releases#download -->
<hyperapi.version>0.0.21200.re11c8cb9</hyperapi.version>
<hyperapi.version>0.0.21408.rf5a406c0</hyperapi.version>
<hyperd.directory>${project.build.directory}/hyper</hyperd.directory>
<junit-bom.version>5.11.3</junit-bom.version>
<maven-depndency-plugin.version>3.8.1</maven-depndency-plugin.version>
Expand Down
Loading