2727import com .google .api .gax .longrunning .OperationTimedPollAlgorithm ;
2828import com .google .api .gax .retrying .RetrySettings ;
2929import com .google .cloud .spanner .ErrorCode ;
30+ import com .google .cloud .spanner .ForceCloseSpannerFunction ;
3031import com .google .cloud .spanner .MockSpannerServiceImpl .SimulatedExecutionTime ;
3132import com .google .cloud .spanner .ResultSet ;
33+ import com .google .cloud .spanner .SessionPoolOptions ;
3234import com .google .cloud .spanner .SpannerException ;
3335import com .google .cloud .spanner .SpannerExceptionFactory ;
3436import com .google .cloud .spanner .Statement ;
3537import com .google .cloud .spanner .connection .AbstractConnectionImplTest .ConnectionConsumer ;
3638import com .google .cloud .spanner .connection .ITAbstractSpannerTest .ITConnection ;
39+ import com .google .cloud .spanner .connection .SpannerPool .CheckAndCloseSpannersMode ;
3740import com .google .cloud .spanner .connection .StatementExecutor .StatementExecutorType ;
3841import com .google .common .base .Stopwatch ;
39- import com .google .common .collect .Collections2 ;
4042import com .google .longrunning .Operation ;
4143import com .google .protobuf .AbstractMessage ;
4244import com .google .protobuf .Any ;
4749import com .google .spanner .v1 .ExecuteSqlRequest ;
4850import io .grpc .Status ;
4951import java .time .Duration ;
52+ import java .util .ArrayList ;
53+ import java .util .ConcurrentModificationException ;
54+ import java .util .List ;
5055import java .util .concurrent .CountDownLatch ;
5156import java .util .concurrent .ExecutionException ;
5257import java .util .concurrent .ExecutorService ;
@@ -108,18 +113,23 @@ protected ITConnection createConnection() {
108113 .setUri (getBaseUrl () + ";trackSessionLeaks=false" )
109114 .setStatementExecutorType (statementExecutorType )
110115 .setConfigurator (
111- optionsConfigurator ->
112- optionsConfigurator
113- .getDatabaseAdminStubSettingsBuilder ()
114- .updateDatabaseDdlOperationSettings ()
115- .setPollingAlgorithm (
116- OperationTimedPollAlgorithm .create (
117- RetrySettings .newBuilder ()
118- .setInitialRetryDelayDuration (Duration .ofMillis (1L ))
119- .setMaxRetryDelayDuration (Duration .ofMillis (1L ))
120- .setRetryDelayMultiplier (1.0 )
121- .setTotalTimeoutDuration (Duration .ofMinutes (10L ))
122- .build ())))
116+ optionsConfigurator -> {
117+ optionsConfigurator
118+ .getDatabaseAdminStubSettingsBuilder ()
119+ .updateDatabaseDdlOperationSettings ()
120+ .setPollingAlgorithm (
121+ OperationTimedPollAlgorithm .create (
122+ RetrySettings .newBuilder ()
123+ .setInitialRetryDelayDuration (Duration .ofMillis (1L ))
124+ .setMaxRetryDelayDuration (Duration .ofMillis (1L ))
125+ .setRetryDelayMultiplier (1.0 )
126+ .setTotalTimeoutDuration (Duration .ofMinutes (10L ))
127+ .build ()));
128+ optionsConfigurator .setSessionPoolOption (
129+ SessionPoolOptions .newBuilder ()
130+ .setWaitForMinSessionsDuration (Duration .ofSeconds (5L ))
131+ .build ());
132+ })
123133 .build ();
124134 return createITConnection (options );
125135 }
@@ -138,6 +148,8 @@ public void setup() {
138148 @ After
139149 public void clearExecutionTimes () {
140150 mockSpanner .removeAllExecutionTimes ();
151+ SpannerPool .INSTANCE .checkAndCloseSpanners (
152+ CheckAndCloseSpannersMode .ERROR , new ForceCloseSpannerFunction (5L , TimeUnit .MILLISECONDS ));
141153 }
142154
143155 @ Test
@@ -617,20 +629,20 @@ static void waitForRequestsToContain(Class<? extends AbstractMessage> request) {
617629 private void waitForDdlRequestOnServer () {
618630 try {
619631 Stopwatch watch = Stopwatch .createStarted ();
620- while (Collections2 .filter (
621- mockDatabaseAdmin .getRequests (),
622- input -> input .getClass ().equals (UpdateDatabaseDdlRequest .class ))
623- .isEmpty ()) {
632+ while (watch .elapsed (TimeUnit .MILLISECONDS ) < EXECUTION_TIME_SLOW_STATEMENT ) {
633+ try {
634+ List <AbstractMessage > requests = new ArrayList <>(mockDatabaseAdmin .getRequests ());
635+ if (requests .stream ().anyMatch (request -> request instanceof UpdateDatabaseDdlRequest )) {
636+ break ;
637+ }
638+ } catch (ConcurrentModificationException ignore ) {
639+ // Just ignore and retry.
640+ }
624641 //noinspection BusyWait
625642 Thread .sleep (1L );
626- if (watch .elapsed (TimeUnit .MILLISECONDS ) > EXECUTION_TIME_SLOW_STATEMENT ) {
627- throw new TimeoutException ("Timeout while waiting for DDL request" );
628- }
629643 }
630644 } catch (InterruptedException e ) {
631645 throw SpannerExceptionFactory .propagateInterrupt (e );
632- } catch (TimeoutException e ) {
633- throw SpannerExceptionFactory .propagateTimeout (e );
634646 }
635647 }
636648
@@ -1010,6 +1022,7 @@ public void testCancelDdlBatch() {
10101022 } finally {
10111023 executor .shutdownNow ();
10121024 }
1025+ connection .closeAsync ();
10131026 }
10141027 }
10151028
@@ -1036,6 +1049,7 @@ public void testCancelDdlAutocommit() {
10361049 } finally {
10371050 executor .shutdownNow ();
10381051 }
1052+ connection .closeAsync ();
10391053 }
10401054 }
10411055
@@ -1049,6 +1063,8 @@ public void testTimeoutExceptionDdlAutocommit() {
10491063 SpannerException e =
10501064 assertThrows (SpannerException .class , () -> connection .execute (Statement .of (SLOW_DDL )));
10511065 assertEquals (ErrorCode .DEADLINE_EXCEEDED , e .getErrorCode ());
1066+
1067+ connection .closeAsync ();
10521068 }
10531069 }
10541070
0 commit comments