Skip to content

Commit c11d500

Browse files
committed
fix tests
1 parent db6ca28 commit c11d500

File tree

12 files changed

+223
-75
lines changed

12 files changed

+223
-75
lines changed

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/DataCloudConnection.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,16 @@ public DataCloudResultSet getRowBasedResultSet(String queryId, long offset, long
184184
return StreamingResultSet.of(queryId, executor, iterator);
185185
}
186186

187-
@Unstable
188187
public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, long limit) {
189188
log.info("Get chunk-based result set. queryId={}, chunkId={}, limit={}", queryId, chunkId, limit);
190189
val iterator = ChunkBased.of(executor, queryId, chunkId, limit);
191190
return StreamingResultSet.of(queryId, executor, iterator);
192191
}
193192

193+
public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId) {
194+
return getChunkBasedResultSet(queryId, chunkId, 1);
195+
}
196+
194197
/**
195198
* Checks if all the query's results are ready, the row count and chunk count will be stable.
196199
* @param queryId The identifier of the query to check
@@ -200,7 +203,6 @@ public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId, l
200203
* @param allowLessThan Whether or not to return early when the available rows is less than {@code offset + limit}
201204
* @return The final {@link DataCloudQueryStatus} the server replied with.
202205
*/
203-
@Unstable
204206
public DataCloudQueryStatus waitForRowsAvailable(
205207
String queryId, long offset, long limit, Duration timeout, boolean allowLessThan)
206208
throws DataCloudJDBCException {
@@ -213,20 +215,13 @@ public DataCloudQueryStatus waitForRowsAvailable(
213215
* @param timeout The duration to wait for the engine have results produced.
214216
* @return The final {@link DataCloudQueryStatus} the server replied with.
215217
*/
216-
@Unstable
217218
public DataCloudQueryStatus waitForResultsProduced(String queryId, Duration timeout) throws DataCloudJDBCException {
218219
return executor.waitForResultsProduced(queryId, timeout);
219220
}
220221

221-
@Unstable
222-
public DataCloudResultSet getChunkBasedResultSet(String queryId, long chunkId) {
223-
return getChunkBasedResultSet(queryId, chunkId, 1);
224-
}
225-
226222
/**
227223
* Use this to determine when a given query is complete by filtering the responses and a subsequent findFirst()
228224
*/
229-
@Unstable
230225
public Stream<DataCloudQueryStatus> getQueryStatus(String queryId) {
231226
return executor.getQueryStatus(queryId);
232227
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/HyperGrpcClientExecutor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,5 +261,15 @@ public void close() throws Exception {
261261
}
262262

263263
channel.shutdown();
264+
265+
try {
266+
channel.awaitTermination(5, TimeUnit.SECONDS);
267+
} catch (InterruptedException e) {
268+
log.error("Failed to shutdown channel within 5 seconds", e);
269+
} finally {
270+
if (!channel.isTerminated()) {
271+
channel.shutdownNow();
272+
}
273+
}
264274
}
265275
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/core/partial/DataCloudQueryPolling.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@
1919
import com.salesforce.datacloud.jdbc.util.Unstable;
2020
import com.salesforce.datacloud.query.v3.DataCloudQueryStatus;
2121
import io.grpc.StatusRuntimeException;
22-
import java.time.Duration;
23-
import java.time.Instant;
24-
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicReference;
26-
import java.util.function.Predicate;
2722
import lombok.SneakyThrows;
2823
import lombok.experimental.UtilityClass;
2924
import lombok.extern.slf4j.Slf4j;
@@ -34,6 +29,12 @@
3429
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
3530
import salesforce.cdp.hyperdb.v1.QueryInfoParam;
3631

32+
import java.time.Duration;
33+
import java.time.Instant;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
import java.util.function.Predicate;
37+
3738
@Unstable
3839
@Slf4j
3940
@UtilityClass
@@ -43,16 +44,31 @@ public static DataCloudQueryStatus waitForRowsAvailable(
4344
String queryId,
4445
long offset,
4546
long limit,
46-
Duration timeout,
47+
Duration timeoutDuration,
4748
boolean allowLessThan)
4849
throws DataCloudJDBCException {
49-
return waitForQueryStatus(stub, queryId, timeout, status -> {
50+
51+
Predicate<DataCloudQueryStatus> predicate = status -> {
5052
if (allowLessThan) {
5153
return status.getRowCount() > offset;
5254
} else {
5355
return status.getRowCount() >= offset + limit;
5456
}
55-
});
57+
};
58+
59+
val result = waitForQueryStatus(stub, queryId, timeoutDuration, predicate);
60+
61+
if (predicate.test(result)) {
62+
return result;
63+
} else {
64+
if (allowLessThan) {
65+
throw new DataCloudJDBCException(
66+
"Timed out waiting for new rows to be available. queryId=" + queryId + ", status=" + result);
67+
} else {
68+
throw new DataCloudJDBCException(
69+
"Timed out waiting for enough rows to be available. queryId=" + queryId + ", status=" + result);
70+
}
71+
}
5672
}
5773

5874
public static DataCloudQueryStatus waitForResultsProduced(
@@ -64,17 +80,21 @@ public static DataCloudQueryStatus waitForResultsProduced(
6480
public static DataCloudQueryStatus waitForQueryStatus(
6581
HyperServiceGrpc.HyperServiceBlockingStub stub,
6682
String queryId,
67-
Duration timeout,
83+
Duration timeoutDuration,
6884
Predicate<DataCloudQueryStatus> predicate)
6985
throws DataCloudJDBCException {
7086
val last = new AtomicReference<DataCloudQueryStatus>();
71-
val deadline = Instant.now().plus(timeout);
72-
val attempts = new AtomicInteger(1);
87+
val deadline = Instant.now().plus(timeoutDuration);
88+
val attempts = new AtomicInteger(0);
7389

74-
val retry = new RetryPolicy<DataCloudQueryStatus>()
75-
.withMaxDuration(timeout)
76-
.handle(StatusRuntimeException.class)
90+
val retryPolicy = new RetryPolicy<DataCloudQueryStatus>()
91+
.withMaxDuration(timeoutDuration)
7792
.handleIf(e -> {
93+
if (!(e instanceof StatusRuntimeException)) {
94+
log.error("Got an unexpected exception when getting query status for queryId={}", queryId, e);
95+
return false;
96+
}
97+
7898
if (last.get() == null) {
7999
log.error(
80100
"Failed to get query status response, will not try again. queryId={}, attempts={}",
@@ -103,7 +123,7 @@ public static DataCloudQueryStatus waitForQueryStatus(
103123
});
104124

105125
try {
106-
return Failsafe.with(retry)
126+
return Failsafe.with(retryPolicy)
107127
.get(() -> waitForQueryStatusWithoutRetry(stub, queryId, deadline, last, attempts, predicate));
108128
} catch (FailsafeException ex) {
109129
throw new DataCloudJDBCException(
@@ -126,14 +146,21 @@ static DataCloudQueryStatus waitForQueryStatusWithoutRetry(
126146
times.getAndIncrement();
127147
val param = QueryInfoParam.newBuilder().setQueryId(queryId).build();
128148
while (Instant.now().isBefore(deadline)) {
129-
val info = stub.getQueryInfo(param);
149+
val timeRemaining = Duration.between(Instant.now(), deadline);
150+
val info = stub.withDeadlineAfter(timeRemaining).getQueryInfo(param);
130151
while (info.hasNext()) {
131-
val next = DataCloudQueryStatus.of(info.next());
132-
next.ifPresent(last::set);
133-
if (predicate.test(last.get())) {
152+
val matched = DataCloudQueryStatus.of(info.next())
153+
.map(next -> {
154+
last.set(next);
155+
return predicate.test(next);
156+
})
157+
.orElse(false);
158+
159+
if (matched) {
134160
return last.get();
135161
}
136162
}
163+
log.info("end of info stream, maybe starting a new one: {}", last.get());
137164
}
138165
return last.get();
139166
}

jdbc-core/src/main/java/com/salesforce/datacloud/jdbc/interceptor/HeaderMutatingClientInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
@FunctionalInterface
2929
public interface HeaderMutatingClientInterceptor extends ClientInterceptor {
30-
void mutate(final Metadata headers);
30+
void mutate(final Metadata headers) throws DataCloudJDBCException;
3131

3232
@Override
3333
default <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/DataCloudStatementFunctionalTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ public class DataCloudStatementFunctionalTest extends HyperTestBase {
3636
@SneakyThrows
3737
public void canCancelStatementQuery() {
3838
try (val server = configWithSleep.start();
39-
val statement = server.getConnection().createStatement().unwrap(DataCloudStatement.class)) {
39+
val statement = server.getConnection().createStatement().unwrap(DataCloudStatement.class);
40+
val client = server.getRawClient()) {
4041
statement.execute("select pg_sleep(5000000);");
41-
val client = server.getRawClient();
42+
4243
val queryId = statement.getQueryId();
4344
val a = client.getQueryStatus(queryId).findFirst().get();
4445
assertThat(a.getCompletionStatus()).isEqualTo(DataCloudQueryStatus.CompletionStatus.RUNNING);
@@ -56,10 +57,12 @@ public void canCancelPreparedStatementQuery() {
5657
try (val server = configWithSleep.start();
5758
val statement = server.getConnection()
5859
.prepareStatement("select pg_sleep(?)")
59-
.unwrap(DataCloudPreparedStatement.class)) {
60+
.unwrap(DataCloudPreparedStatement.class);
61+
val client = server.getRawClient()) {
62+
6063
statement.setInt(1, 5000000);
6164
statement.execute();
62-
val client = server.getRawClient();
65+
6366
val queryId = statement.getQueryId();
6467
val a = client.getQueryStatus(queryId).findFirst().get();
6568
assertThat(a.getCompletionStatus()).isEqualTo(DataCloudQueryStatus.CompletionStatus.RUNNING);
@@ -75,14 +78,13 @@ public void canCancelPreparedStatementQuery() {
7578
@SneakyThrows
7679
public void canCancelAnotherQueryById() {
7780
try (val server = configWithSleep.start();
78-
val statement = server.getConnection().createStatement().unwrap(DataCloudStatement.class);
79-
val cancel = server.getConnection().unwrap(DataCloudConnection.class)) {
81+
val statement = server.getConnection().createStatement().unwrap(DataCloudStatement.class);
82+
val cancel = server.getConnection().unwrap(DataCloudConnection.class);
83+
val client = server.getRawClient()) {
8084

8185
statement.execute("select pg_sleep(5000000);");
8286
val queryId = statement.getQueryId();
8387

84-
val client = server.getRawClient();
85-
8688
val a = client.getQueryStatus(queryId).findFirst().get();
8789
assertThat(a.getCompletionStatus()).isEqualTo(DataCloudQueryStatus.CompletionStatus.RUNNING);
8890

@@ -111,6 +113,8 @@ public void forwardAndReadOnly() {
111113

112114
assertThat(rs.getType()).isEqualTo(ResultSet.TYPE_FORWARD_ONLY);
113115
assertThat(rs.getConcurrency()).isEqualTo(ResultSet.CONCUR_READ_ONLY);
116+
117+
assertThat(rs.getRow()).isEqualTo(0);
114118
});
115119
}
116120

jdbc-core/src/test/java/com/salesforce/datacloud/jdbc/core/StreamingResultSetTest.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.salesforce.datacloud.jdbc.util.ThrowingBiFunction;
2424
import io.grpc.StatusRuntimeException;
2525
import java.sql.SQLException;
26+
import java.time.Duration;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.stream.Stream;
2829
import lombok.SneakyThrows;
@@ -59,15 +60,17 @@ public void exercisePreparedStatement() {
5960
val expected = new AtomicInteger(0);
6061

6162
assertWithConnection(conn -> {
62-
try (val statement = conn.prepareStatement(sql)) {
63+
try (val statement = conn.prepareStatement(sql).unwrap(DataCloudPreparedStatement.class)) {
6364
statement.setInt(1, large);
6465

6566
val rs = statement.executeQuery();
67+
conn.waitForResultsProduced(statement.getQueryId(), Duration.ofSeconds(30));
6668
assertThat(rs).isInstanceOf(StreamingResultSet.class);
6769
assertThat(((StreamingResultSet) rs).isReady()).isTrue();
6870

6971
while (rs.next()) {
7072
assertEachRowIsTheSame(rs, expected);
73+
assertThat(rs.getRow()).isEqualTo(expected.get());
7174
}
7275
}
7376
});
@@ -85,6 +88,7 @@ public void exerciseQueryMode(
8588

8689
assertWithStatement(statement -> {
8790
val rs = queryMode.apply(statement, sql);
91+
8892
assertThat(rs).isInstanceOf(StreamingResultSet.class);
8993
assertThat(rs.isReady()).isTrue();
9094

@@ -133,23 +137,13 @@ private static Arguments deferred(
133137
impl.apply(s, x);
134138

135139
if (wait) {
136-
waitUntilReady(s);
140+
val conn = s.getConnection().unwrap(DataCloudConnection.class);
141+
conn.waitForResultsProduced(s.getQueryId(), Duration.ofSeconds(30));
137142
}
138143

139144
return (DataCloudResultSet) s.getResultSet();
140145
};
141146
return arguments(named(String.format("%s; getResultSet -> DataCloudResultSet", name), deferred), size);
142147
}
143148

144-
@SneakyThrows
145-
static boolean waitUntilReady(DataCloudStatement statement) {
146-
while (!statement.isReady()) {
147-
try {
148-
Thread.sleep(1000);
149-
} catch (InterruptedException e) {
150-
return false;
151-
}
152-
}
153-
return true;
154-
}
155149
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2024, Salesforce, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.salesforce.datacloud.jdbc.core.partial;
17+
18+
import com.salesforce.datacloud.jdbc.core.DataCloudStatement;
19+
import com.salesforce.datacloud.jdbc.core.HyperGrpcTestBase;
20+
import lombok.SneakyThrows;
21+
import lombok.val;
22+
import org.assertj.core.api.AssertionsForClassTypes;
23+
import org.junit.jupiter.api.Disabled;
24+
import org.junit.jupiter.api.Test;
25+
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
26+
27+
import java.time.Duration;
28+
29+
import static org.grpcmock.GrpcMock.calledMethod;
30+
import static org.grpcmock.GrpcMock.times;
31+
import static org.grpcmock.GrpcMock.verifyThat;
32+
33+
public class DataCloudQueryPollingMockTests extends HyperGrpcTestBase {
34+
@Test
35+
@SneakyThrows
36+
@Disabled
37+
void getQueryInfoDoesNotRetryIfFailureToConnect() {
38+
try (val connection = getInterceptedClientConnection();
39+
val statement = connection.createStatement().unwrap(DataCloudStatement.class)) {
40+
statement.execute("select * from nonsense");
41+
42+
verifyThat(calledMethod(HyperServiceGrpc.getGetQueryInfoMethod()), times(0));
43+
44+
AssertionsForClassTypes.assertThatThrownBy(
45+
() -> connection.waitForResultsProduced(statement.getQueryId(), Duration.ofSeconds(30)));
46+
47+
verifyThat(calledMethod(HyperServiceGrpc.getGetQueryInfoMethod()), times(1));
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)