Skip to content

Commit 7d45994

Browse files
Merge branch 'main' into renovate/com.google.cloud-sdk-platform-java-config-3.x
2 parents ac59a8c + 23c6449 commit 7d45994

File tree

6 files changed

+65
-45
lines changed

6 files changed

+65
-45
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.spanner.SpannerException;
2525
import com.google.cloud.spanner.SpannerExceptionFactory;
2626
import com.google.common.annotations.VisibleForTesting;
27-
import com.google.common.base.Function;
2827
import com.google.common.base.MoreObjects;
2928
import com.google.common.base.Preconditions;
3029
import com.google.common.base.Ticker;
@@ -40,6 +39,7 @@
4039
import java.util.concurrent.Executors;
4140
import java.util.concurrent.ScheduledExecutorService;
4241
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Function;
4343
import java.util.logging.Level;
4444
import java.util.logging.Logger;
4545
import java.util.stream.Stream;
@@ -54,7 +54,7 @@
5454
* opened and closed, and which {@link Spanner} objects could be closed.
5555
*
5656
* <p>Call the method {@link SpannerPool#closeSpannerPool()} at the end of your application to
57-
* gracefully shutdown all instances in the pool.
57+
* gracefully shut down all instances in the pool.
5858
*/
5959
public class SpannerPool {
6060
// TODO: create separate Client Lib Token for the Connection API.

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void tryNextNotAllowed() {
104104
new AsyncResultSetImpl(
105105
mockedProvider, mock(ResultSet.class), AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
106106
rs.setCallback(mock(Executor.class), mock(ReadyCallback.class));
107-
IllegalStateException e = assertThrows(IllegalStateException.class, () -> rs.tryNext());
107+
IllegalStateException e = assertThrows(IllegalStateException.class, rs::tryNext);
108108
assertThat(e.getMessage()).contains("tryNext may only be called from a DataReady callback.");
109109
}
110110
}
@@ -152,7 +152,7 @@ public void toListAsync() throws InterruptedException, ExecutionException {
152152
}
153153

154154
@Test
155-
public void toListAsyncPropagatesError() throws InterruptedException {
155+
public void toListAsyncPropagatesError() {
156156
ExecutorService executor = Executors.newFixedThreadPool(1);
157157
ResultSet delegate = mock(ResultSet.class);
158158
when(delegate.next())
@@ -326,10 +326,7 @@ public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, Ex
326326
@Override
327327
public Boolean answer(InvocationOnMock invocation) throws Throwable {
328328
row++;
329-
if (row > simulatedRows) {
330-
return false;
331-
}
332-
return true;
329+
return row <= simulatedRows;
333330
}
334331
});
335332
when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class));
@@ -345,17 +342,17 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
345342
assertFalse(paused.get());
346343
callbackCounter.incrementAndGet();
347344
try {
348-
while (true) {
349-
switch (resultSet.tryNext()) {
350-
case OK:
351-
paused.set(true);
352-
queue.put(new Object());
353-
return CallbackResponse.PAUSE;
354-
case DONE:
355-
return CallbackResponse.DONE;
356-
case NOT_READY:
357-
return CallbackResponse.CONTINUE;
358-
}
345+
switch (resultSet.tryNext()) {
346+
case OK:
347+
paused.set(true);
348+
queue.put(new Object());
349+
return CallbackResponse.PAUSE;
350+
case DONE:
351+
return CallbackResponse.DONE;
352+
case NOT_READY:
353+
return CallbackResponse.CONTINUE;
354+
default:
355+
throw new IllegalStateException();
359356
}
360357
} catch (InterruptedException e) {
361358
throw SpannerExceptionFactory.propagateInterrupt(e);
@@ -384,9 +381,8 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
384381
}
385382

386383
@Test
387-
public void testCallbackIsNotCalledWhilePausedAndCanceled()
388-
throws InterruptedException, ExecutionException {
389-
Executor executor = Executors.newSingleThreadExecutor();
384+
public void testCallbackIsNotCalledWhilePausedAndCanceled() {
385+
ExecutorService executor = Executors.newSingleThreadExecutor();
390386
StreamingResultSet delegate = mock(StreamingResultSet.class);
391387

392388
final AtomicInteger callbackCounter = new AtomicInteger();
@@ -414,6 +410,8 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
414410
SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
415411
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
416412
assertEquals(1, callbackCounter.get());
413+
} finally {
414+
executor.shutdown();
417415
}
418416
}
419417

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2302,6 +2302,7 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
23022302
throws InterruptedException, TimeoutException {
23032303
Stopwatch watch = Stopwatch.createStarted();
23042304
while (countRequestsOfType(type) == 0) {
2305+
//noinspection BusyWait
23052306
Thread.sleep(1L);
23062307
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
23072308
throw new TimeoutException(

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public void clearRequests() {
9090
mockSpanner.clearRequests();
9191
}
9292

93+
@SuppressWarnings("ClassEscapesDefinedScope")
9394
@Override
9495
public ITConnection createConnection() {
9596
return createConnection(
@@ -698,9 +699,10 @@ public void testKeepAlive() throws InterruptedException, TimeoutException {
698699
connection.savepoint("s1");
699700
connection.execute(INSERT_STATEMENT);
700701
connection.rollbackToSavepoint("s1");
701-
mockSpanner.waitForRequestsToContain(RollbackRequest.class, 1000L);
702702
String keepAliveTagAfterRollback = "test_keep_alive_tag_after_rollback";
703703
System.setProperty("spanner.connection.keep_alive_query_tag", keepAliveTagAfterRollback);
704+
mockSpanner.waitForRequestsToContain(RollbackRequest.class, 1000L);
705+
mockSpanner.clearRequests();
704706

705707
// Verify that we don't get any new keep-alive requests from this point.
706708
Thread.sleep(2L);

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,18 @@
2727
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
2828
import com.google.api.gax.retrying.RetrySettings;
2929
import com.google.cloud.spanner.ErrorCode;
30+
import com.google.cloud.spanner.ForceCloseSpannerFunction;
3031
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3132
import com.google.cloud.spanner.ResultSet;
33+
import com.google.cloud.spanner.SessionPoolOptions;
3234
import com.google.cloud.spanner.SpannerException;
3335
import com.google.cloud.spanner.SpannerExceptionFactory;
3436
import com.google.cloud.spanner.Statement;
3537
import com.google.cloud.spanner.connection.AbstractConnectionImplTest.ConnectionConsumer;
3638
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
39+
import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode;
3740
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
3841
import com.google.common.base.Stopwatch;
39-
import com.google.common.collect.Collections2;
4042
import com.google.longrunning.Operation;
4143
import com.google.protobuf.AbstractMessage;
4244
import com.google.protobuf.Any;
@@ -47,6 +49,9 @@
4749
import com.google.spanner.v1.ExecuteSqlRequest;
4850
import io.grpc.Status;
4951
import java.time.Duration;
52+
import java.util.ArrayList;
53+
import java.util.ConcurrentModificationException;
54+
import java.util.List;
5055
import java.util.concurrent.CountDownLatch;
5156
import java.util.concurrent.ExecutionException;
5257
import java.util.concurrent.ExecutorService;
@@ -108,18 +113,23 @@ protected ITConnection createConnection() {
108113
.setUri(getBaseUrl() + ";trackSessionLeaks=false")
109114
.setStatementExecutorType(statementExecutorType)
110115
.setConfigurator(
111-
optionsConfigurator ->
112-
optionsConfigurator
113-
.getDatabaseAdminStubSettingsBuilder()
114-
.updateDatabaseDdlOperationSettings()
115-
.setPollingAlgorithm(
116-
OperationTimedPollAlgorithm.create(
117-
RetrySettings.newBuilder()
118-
.setInitialRetryDelayDuration(Duration.ofMillis(1L))
119-
.setMaxRetryDelayDuration(Duration.ofMillis(1L))
120-
.setRetryDelayMultiplier(1.0)
121-
.setTotalTimeoutDuration(Duration.ofMinutes(10L))
122-
.build())))
116+
optionsConfigurator -> {
117+
optionsConfigurator
118+
.getDatabaseAdminStubSettingsBuilder()
119+
.updateDatabaseDdlOperationSettings()
120+
.setPollingAlgorithm(
121+
OperationTimedPollAlgorithm.create(
122+
RetrySettings.newBuilder()
123+
.setInitialRetryDelayDuration(Duration.ofMillis(1L))
124+
.setMaxRetryDelayDuration(Duration.ofMillis(1L))
125+
.setRetryDelayMultiplier(1.0)
126+
.setTotalTimeoutDuration(Duration.ofMinutes(10L))
127+
.build()));
128+
optionsConfigurator.setSessionPoolOption(
129+
SessionPoolOptions.newBuilder()
130+
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
131+
.build());
132+
})
123133
.build();
124134
return createITConnection(options);
125135
}
@@ -138,6 +148,8 @@ public void setup() {
138148
@After
139149
public void clearExecutionTimes() {
140150
mockSpanner.removeAllExecutionTimes();
151+
SpannerPool.INSTANCE.checkAndCloseSpanners(
152+
CheckAndCloseSpannersMode.ERROR, new ForceCloseSpannerFunction(5L, TimeUnit.MILLISECONDS));
141153
}
142154

143155
@Test
@@ -617,20 +629,20 @@ static void waitForRequestsToContain(Class<? extends AbstractMessage> request) {
617629
private void waitForDdlRequestOnServer() {
618630
try {
619631
Stopwatch watch = Stopwatch.createStarted();
620-
while (Collections2.filter(
621-
mockDatabaseAdmin.getRequests(),
622-
input -> input.getClass().equals(UpdateDatabaseDdlRequest.class))
623-
.isEmpty()) {
632+
while (watch.elapsed(TimeUnit.MILLISECONDS) < EXECUTION_TIME_SLOW_STATEMENT) {
633+
try {
634+
List<AbstractMessage> requests = new ArrayList<>(mockDatabaseAdmin.getRequests());
635+
if (requests.stream().anyMatch(request -> request instanceof UpdateDatabaseDdlRequest)) {
636+
break;
637+
}
638+
} catch (ConcurrentModificationException ignore) {
639+
// Just ignore and retry.
640+
}
624641
//noinspection BusyWait
625642
Thread.sleep(1L);
626-
if (watch.elapsed(TimeUnit.MILLISECONDS) > EXECUTION_TIME_SLOW_STATEMENT) {
627-
throw new TimeoutException("Timeout while waiting for DDL request");
628-
}
629643
}
630644
} catch (InterruptedException e) {
631645
throw SpannerExceptionFactory.propagateInterrupt(e);
632-
} catch (TimeoutException e) {
633-
throw SpannerExceptionFactory.propagateTimeout(e);
634646
}
635647
}
636648

@@ -1010,6 +1022,7 @@ public void testCancelDdlBatch() {
10101022
} finally {
10111023
executor.shutdownNow();
10121024
}
1025+
connection.closeAsync();
10131026
}
10141027
}
10151028

@@ -1036,6 +1049,7 @@ public void testCancelDdlAutocommit() {
10361049
} finally {
10371050
executor.shutdownNow();
10381051
}
1052+
connection.closeAsync();
10391053
}
10401054
}
10411055

@@ -1049,6 +1063,8 @@ public void testTimeoutExceptionDdlAutocommit() {
10491063
SpannerException e =
10501064
assertThrows(SpannerException.class, () -> connection.execute(Statement.of(SLOW_DDL)));
10511065
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
1066+
1067+
connection.closeAsync();
10521068
}
10531069
}
10541070

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDmlReturningTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ private List<Struct> executeQuery(long expectedCount, String stmt) {
272272
List<Struct> rows = new ArrayList<>();
273273
final TransactionCallable<Void> callable =
274274
transaction -> {
275+
// Make sure we start with an empty list if the transaction is aborted and retried.
276+
rows.clear();
275277
ResultSet resultSet = transaction.executeQuery(Statement.of(stmt));
276278
// resultSet.next() returns false, when no more row exists.
277279
// So, number of times resultSet.next() returns true, is the number of rows
@@ -335,6 +337,7 @@ private List<Struct> executeQueryAsync(long expectedCount, String stmt) {
335337
List<Struct> rows = new ArrayList<>();
336338
final TransactionCallable<Void> callable =
337339
transaction -> {
340+
rows.clear();
338341
AsyncResultSet rs = transaction.executeQueryAsync(Statement.of(stmt));
339342
rs.setCallback(
340343
Executors.newSingleThreadExecutor(),

0 commit comments

Comments
 (0)