Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
Expand All @@ -40,6 +39,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
Expand All @@ -54,7 +54,7 @@
* opened and closed, and which {@link Spanner} objects could be closed.
*
* <p>Call the method {@link SpannerPool#closeSpannerPool()} at the end of your application to
* gracefully shutdown all instances in the pool.
* gracefully shut down all instances in the pool.
*/
public class SpannerPool {
// TODO: create separate Client Lib Token for the Connection API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,7 @@ public void waitForRequestsToContain(Class<? extends AbstractMessage> type, long
throws InterruptedException, TimeoutException {
Stopwatch watch = Stopwatch.createStarted();
while (countRequestsOfType(type) == 0) {
//noinspection BusyWait
Thread.sleep(1L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > timeoutMillis) {
throw new TimeoutException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.ForceCloseSpannerFunction;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractConnectionImplTest.ConnectionConsumer;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode;
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Collections2;
import com.google.longrunning.Operation;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.Any;
Expand All @@ -47,6 +49,9 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import io.grpc.Status;
import java.time.Duration;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -108,18 +113,23 @@ protected ITConnection createConnection() {
.setUri(getBaseUrl() + ";trackSessionLeaks=false")
.setStatementExecutorType(statementExecutorType)
.setConfigurator(
optionsConfigurator ->
optionsConfigurator
.getDatabaseAdminStubSettingsBuilder()
.updateDatabaseDdlOperationSettings()
.setPollingAlgorithm(
OperationTimedPollAlgorithm.create(
RetrySettings.newBuilder()
.setInitialRetryDelayDuration(Duration.ofMillis(1L))
.setMaxRetryDelayDuration(Duration.ofMillis(1L))
.setRetryDelayMultiplier(1.0)
.setTotalTimeoutDuration(Duration.ofMinutes(10L))
.build())))
optionsConfigurator -> {
optionsConfigurator
.getDatabaseAdminStubSettingsBuilder()
.updateDatabaseDdlOperationSettings()
.setPollingAlgorithm(
OperationTimedPollAlgorithm.create(
RetrySettings.newBuilder()
.setInitialRetryDelayDuration(Duration.ofMillis(1L))
.setMaxRetryDelayDuration(Duration.ofMillis(1L))
.setRetryDelayMultiplier(1.0)
.setTotalTimeoutDuration(Duration.ofMinutes(10L))
.build()));
optionsConfigurator.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessionsDuration(Duration.ofSeconds(5L))
.build());
})
.build();
return createITConnection(options);
}
Expand All @@ -138,6 +148,8 @@ public void setup() {
@After
public void clearExecutionTimes() {
mockSpanner.removeAllExecutionTimes();
SpannerPool.INSTANCE.checkAndCloseSpanners(
CheckAndCloseSpannersMode.ERROR, new ForceCloseSpannerFunction(5L, TimeUnit.MILLISECONDS));
}

@Test
Expand Down Expand Up @@ -617,20 +629,20 @@ static void waitForRequestsToContain(Class<? extends AbstractMessage> request) {
private void waitForDdlRequestOnServer() {
try {
Stopwatch watch = Stopwatch.createStarted();
while (Collections2.filter(
mockDatabaseAdmin.getRequests(),
input -> input.getClass().equals(UpdateDatabaseDdlRequest.class))
.isEmpty()) {
while (watch.elapsed(TimeUnit.MILLISECONDS) < EXECUTION_TIME_SLOW_STATEMENT) {
try {
List<AbstractMessage> requests = new ArrayList<>(mockDatabaseAdmin.getRequests());
if (requests.stream().anyMatch(request -> request instanceof UpdateDatabaseDdlRequest)) {
break;
}
} catch (ConcurrentModificationException ignore) {
// Just ignore and retry.
}
//noinspection BusyWait
Thread.sleep(1L);
if (watch.elapsed(TimeUnit.MILLISECONDS) > EXECUTION_TIME_SLOW_STATEMENT) {
throw new TimeoutException("Timeout while waiting for DDL request");
}
}
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (TimeoutException e) {
throw SpannerExceptionFactory.propagateTimeout(e);
}
}

Expand Down Expand Up @@ -1010,6 +1022,7 @@ public void testCancelDdlBatch() {
} finally {
executor.shutdownNow();
}
connection.closeAsync();
}
}

Expand All @@ -1036,6 +1049,7 @@ public void testCancelDdlAutocommit() {
} finally {
executor.shutdownNow();
}
connection.closeAsync();
}
}

Expand All @@ -1049,6 +1063,8 @@ public void testTimeoutExceptionDdlAutocommit() {
SpannerException e =
assertThrows(SpannerException.class, () -> connection.execute(Statement.of(SLOW_DDL)));
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());

connection.closeAsync();
}
}

Expand Down
Loading