Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,51 @@ public void clearRequests() {
@Test
public void testAsyncRunner_doesNotReturnCommitTimestampBeforeCommit() {
AsyncRunner runner = client().runAsync();
IllegalStateException e =
assertThrows(IllegalStateException.class, () -> runner.getCommitTimestamp());
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
if (isMultiplexedSessionsEnabledForRW()) {
Throwable e = assertThrows(Throwable.class, () -> runner.getCommitTimestamp().get());
// If the error occurs within the future, it gets wrapped in an ExecutionException.
// This happens when DelayedAsyncRunner is invoked while the multiplexed session is not yet
// created.
// If the error occurs before the future is created, it may throw an IllegalStateException
// instead.
assertTrue(e instanceof ExecutionException || e instanceof IllegalStateException);
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
assertTrue(cause instanceof IllegalStateException);
assertTrue(cause.getMessage().contains("runAsync() has not yet been called"));
} else {
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
}
} else {
IllegalStateException e =
assertThrows(IllegalStateException.class, () -> runner.getCommitTimestamp());
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
}
}

@Test
public void testAsyncRunner_doesNotReturnCommitResponseBeforeCommit() {
AsyncRunner runner = client().runAsync();
IllegalStateException e =
assertThrows(IllegalStateException.class, () -> runner.getCommitResponse());
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
if (isMultiplexedSessionsEnabledForRW()) {
Throwable e = assertThrows(Throwable.class, () -> runner.getCommitResponse().get());
// If the error occurs within the future, it gets wrapped in an ExecutionException.
// This happens when DelayedAsyncRunner is invoked while the multiplexed session is not yet
// created.
// If the error occurs before the future is created, it may throw an IllegalStateException
// instead.
assertTrue(e instanceof ExecutionException || e instanceof IllegalStateException);
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
assertTrue(cause instanceof IllegalStateException);
assertTrue(cause.getMessage().contains("runAsync() has not yet been called"));
} else {
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
}
} else {
IllegalStateException e =
assertThrows(IllegalStateException.class, () -> runner.getCommitResponse());
assertTrue(e.getMessage().contains("runAsync() has not yet been called"));
}
}

@Test
Expand Down Expand Up @@ -558,7 +592,9 @@ public void closeTransactionBeforeEndOfAsyncQuery() throws Exception {
// Wait until at least one row has been fetched. At that moment there should be one session
// checked out.
dataReceived.await();
assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(1);
if (!isMultiplexedSessionsEnabledForRW()) {
assertThat(clientImpl.pool.getNumberOfSessionsInUse()).isEqualTo(1);
}
assertThat(res.isDone()).isFalse();
dataChecked.countDown();
// Get the data from the transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
Expand Down Expand Up @@ -250,6 +251,11 @@ public void asyncTransactionManagerUpdate() throws Exception {

@Test
public void asyncTransactionManagerIsNonBlocking() throws Exception {
// TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with
// multiplexed sessions.
assumeFalse(
"DelayedAsyncTransactionManager is currently blocking with multiplexed sessions.",
spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW());
mockSpanner.freeze();
try (AsyncTransactionManager manager = clientWithEmptySessionPool().transactionManagerAsync()) {
TransactionContextFuture transactionContextFuture = manager.beginAsync();
Expand Down Expand Up @@ -633,6 +639,11 @@ public void asyncTransactionManagerBatchUpdate() throws Exception {

@Test
public void asyncTransactionManagerIsNonBlockingWithBatchUpdate() throws Exception {
// TODO: Remove this condition once DelayedAsyncTransactionManager is made non-blocking with
// multiplexed sessions.
assumeFalse(
"DelayedAsyncTransactionManager is currently blocking with multiplexed sessions.",
spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW());
mockSpanner.freeze();
try (AsyncTransactionManager manager = clientWithEmptySessionPool().transactionManagerAsync()) {
TransactionContextFuture transactionContextFuture = manager.beginAsync();
Expand Down
Loading