Skip to content

Commit 3778a5b

Browse files
committed
chore(spanner): make AsyncRunner non blocking with multiplexed sessions
1 parent db2e31f commit 3778a5b

File tree

2 files changed

+53
-17
lines changed

2 files changed

+53
-17
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedAsyncRunner.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
2021
import com.google.cloud.Timestamp;
21-
import java.util.concurrent.ExecutionException;
22+
import com.google.common.util.concurrent.MoreExecutors;
2223
import java.util.concurrent.Executor;
2324

2425
/**
@@ -34,33 +35,36 @@ public DelayedAsyncRunner(ApiFuture<AsyncRunner> asyncRunnerFuture) {
3435
this.asyncRunnerFuture = asyncRunnerFuture;
3536
}
3637

37-
AsyncRunner getAsyncRunner() {
38-
try {
39-
return this.asyncRunnerFuture.get();
40-
} catch (ExecutionException executionException) {
41-
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
42-
// RuntimeException).
43-
if (executionException.getCause() instanceof RuntimeException) {
44-
throw (RuntimeException) executionException.getCause();
45-
}
46-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
47-
} catch (InterruptedException interruptedException) {
48-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
49-
}
38+
ApiFuture<AsyncRunner> getAsyncRunner() {
39+
return ApiFutures.catchingAsync(
40+
asyncRunnerFuture,
41+
Exception.class,
42+
exception -> {
43+
if (exception instanceof InterruptedException) {
44+
throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) exception);
45+
}
46+
throw SpannerExceptionFactory.asSpannerException(exception.getCause());
47+
},
48+
MoreExecutors.directExecutor());
5049
}
5150

5251
@Override
5352
public <R> ApiFuture<R> runAsync(AsyncWork<R> work, Executor executor) {
54-
return getAsyncRunner().runAsync(work, executor);
53+
return ApiFutures.transformAsync(
54+
getAsyncRunner(),
55+
asyncRunner -> asyncRunner.runAsync(work, executor),
56+
MoreExecutors.directExecutor());
5557
}
5658

5759
@Override
5860
public ApiFuture<Timestamp> getCommitTimestamp() {
59-
return getAsyncRunner().getCommitTimestamp();
61+
return ApiFutures.transformAsync(
62+
getAsyncRunner(), AsyncRunner::getCommitTimestamp, MoreExecutors.directExecutor());
6063
}
6164

6265
@Override
6366
public ApiFuture<CommitResponse> getCommitResponse() {
64-
return getAsyncRunner().getCommitResponse();
67+
return ApiFutures.transformAsync(
68+
getAsyncRunner(), AsyncRunner::getCommitResponse, MoreExecutors.directExecutor());
6569
}
6670
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,38 @@ public void testReadWriteTransactionUsingAsyncRunner() throws Exception {
713713
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
714714
}
715715

716+
@Test
717+
public void testAsyncRunnerIsNonBlockingWithMultiplexedSession() throws Exception {
718+
mockSpanner.freeze();
719+
DatabaseClientImpl client =
720+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
721+
AsyncRunner runner = client.runAsync();
722+
ApiFuture<Void> res =
723+
runner.runAsync(
724+
txn -> {
725+
txn.executeUpdateAsync(UPDATE_STATEMENT);
726+
return ApiFutures.immediateFuture(null);
727+
},
728+
MoreExecutors.directExecutor());
729+
ApiFuture<Timestamp> ts = runner.getCommitTimestamp();
730+
mockSpanner.unfreeze();
731+
assertThat(res.get()).isNull();
732+
assertThat(ts.get()).isNotNull();
733+
734+
List<ExecuteSqlRequest> executeSqlRequests =
735+
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
736+
assertEquals(1L, executeSqlRequests.size());
737+
738+
// Verify the requests are executed using multiplexed sessions
739+
for (ExecuteSqlRequest request : executeSqlRequests) {
740+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
741+
}
742+
743+
assertNotNull(client.multiplexedSessionDatabaseClient);
744+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
745+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
746+
}
747+
716748
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
717749
assertNotNull(client.multiplexedSessionDatabaseClient);
718750
SessionReference sessionReference =

0 commit comments

Comments
 (0)