diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java index 9558947156c..4a4062a0bbf 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java @@ -24,7 +24,6 @@ import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; @@ -40,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; @@ -54,7 +54,7 @@ * opened and closed, and which {@link Spanner} objects could be closed. * *

Call the method {@link SpannerPool#closeSpannerPool()} at the end of your application to - * gracefully shutdown all instances in the pool. + * gracefully shut down all instances in the pool. */ public class SpannerPool { // TODO: create separate Client Lib Token for the Connection API. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 3443be192e6..ef637db594b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -2302,6 +2302,7 @@ public void waitForRequestsToContain(Class type, long throws InterruptedException, TimeoutException { Stopwatch watch = Stopwatch.createStarted(); while (countRequestsOfType(type) == 0) { + //noinspection BusyWait Thread.sleep(1L); if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) { throw new TimeoutException( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java index be3fe7be800..debb95d9a10 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/StatementTimeoutTest.java @@ -27,16 +27,18 @@ import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.ForceCloseSpannerFunction; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SessionPoolOptions; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.AbstractConnectionImplTest.ConnectionConsumer; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; +import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode; import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType; import com.google.common.base.Stopwatch; -import com.google.common.collect.Collections2; import com.google.longrunning.Operation; import com.google.protobuf.AbstractMessage; import com.google.protobuf.Any; @@ -47,6 +49,9 @@ import com.google.spanner.v1.ExecuteSqlRequest; import io.grpc.Status; import java.time.Duration; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -108,18 +113,23 @@ protected ITConnection createConnection() { .setUri(getBaseUrl() + ";trackSessionLeaks=false") .setStatementExecutorType(statementExecutorType) .setConfigurator( - optionsConfigurator -> - optionsConfigurator - .getDatabaseAdminStubSettingsBuilder() - .updateDatabaseDdlOperationSettings() - .setPollingAlgorithm( - OperationTimedPollAlgorithm.create( - RetrySettings.newBuilder() - .setInitialRetryDelayDuration(Duration.ofMillis(1L)) - .setMaxRetryDelayDuration(Duration.ofMillis(1L)) - .setRetryDelayMultiplier(1.0) - .setTotalTimeoutDuration(Duration.ofMinutes(10L)) - .build()))) + optionsConfigurator -> { + optionsConfigurator + .getDatabaseAdminStubSettingsBuilder() + .updateDatabaseDdlOperationSettings() + .setPollingAlgorithm( + OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setInitialRetryDelayDuration(Duration.ofMillis(1L)) + .setMaxRetryDelayDuration(Duration.ofMillis(1L)) + .setRetryDelayMultiplier(1.0) + .setTotalTimeoutDuration(Duration.ofMinutes(10L)) + .build())); + optionsConfigurator.setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessionsDuration(Duration.ofSeconds(5L)) + .build()); + }) .build(); return createITConnection(options); } @@ -138,6 +148,8 @@ public void setup() { @After public void clearExecutionTimes() { mockSpanner.removeAllExecutionTimes(); + SpannerPool.INSTANCE.checkAndCloseSpanners( + CheckAndCloseSpannersMode.ERROR, new ForceCloseSpannerFunction(5L, TimeUnit.MILLISECONDS)); } @Test @@ -617,20 +629,20 @@ static void waitForRequestsToContain(Class request) { private void waitForDdlRequestOnServer() { try { Stopwatch watch = Stopwatch.createStarted(); - while (Collections2.filter( - mockDatabaseAdmin.getRequests(), - input -> input.getClass().equals(UpdateDatabaseDdlRequest.class)) - .isEmpty()) { + while (watch.elapsed(TimeUnit.MILLISECONDS) < EXECUTION_TIME_SLOW_STATEMENT) { + try { + List requests = new ArrayList<>(mockDatabaseAdmin.getRequests()); + if (requests.stream().anyMatch(request -> request instanceof UpdateDatabaseDdlRequest)) { + break; + } + } catch (ConcurrentModificationException ignore) { + // Just ignore and retry. + } //noinspection BusyWait Thread.sleep(1L); - if (watch.elapsed(TimeUnit.MILLISECONDS) > EXECUTION_TIME_SLOW_STATEMENT) { - throw new TimeoutException("Timeout while waiting for DDL request"); - } } } catch (InterruptedException e) { throw SpannerExceptionFactory.propagateInterrupt(e); - } catch (TimeoutException e) { - throw SpannerExceptionFactory.propagateTimeout(e); } } @@ -1010,6 +1022,7 @@ public void testCancelDdlBatch() { } finally { executor.shutdownNow(); } + connection.closeAsync(); } } @@ -1036,6 +1049,7 @@ public void testCancelDdlAutocommit() { } finally { executor.shutdownNow(); } + connection.closeAsync(); } } @@ -1049,6 +1063,8 @@ public void testTimeoutExceptionDdlAutocommit() { SpannerException e = assertThrows(SpannerException.class, () -> connection.execute(Statement.of(SLOW_DDL))); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); + + connection.closeAsync(); } }