Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Struct;
Expand All @@ -35,6 +36,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
Expand All @@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
@GuardedBy("multiplexedSessionLock")
private final AtomicReference<SessionImpl> multiplexedSessionReference;

/**
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
this.sessionClient = checkNotNull(sessionClient);
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
Expand All @@ -85,7 +94,7 @@ public String getDatabaseRole() {
@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session;
if (isMultiplexedSessionEnabled) {
if (canUseMultiplexedSession()) {
session = getMultiplexedSession();
} else {
session = sessionClient.createSession();
Expand Down Expand Up @@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
batchTransactionId);
}

private boolean canUseMultiplexedSession() {
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
}

private SessionImpl getMultiplexedSession() {
this.multiplexedSessionLock.lock();
try {
Expand Down Expand Up @@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
builder.setPartitionOptions(pbuilder.build());

final PartitionReadRequest request = builder.build();
PartitionResponse response = rpc.partitionRead(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createReadPartition(
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
partitions.add(partition);
try {
PartitionResponse response = rpc.partitionRead(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createReadPartition(
p.getPartitionToken(),
partitionOptions,
table,
index,
keys,
columns,
readOptions);
partitions.add(partition);
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
throw e;
}
return partitions.build();
}

@Override
Expand Down Expand Up @@ -256,15 +280,29 @@ public List<Partition> partitionQuery(
builder.setPartitionOptions(pbuilder.build());

final PartitionQueryRequest request = builder.build();
PartitionResponse response = rpc.partitionQuery(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createQueryPartition(
p.getPartitionToken(), partitionOptions, statement, queryOptions);
partitions.add(partition);
try {
PartitionResponse response = rpc.partitionQuery(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createQueryPartition(
p.getPartitionToken(), partitionOptions, statement, queryOptions);
partitions.add(partition);
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
throw e;
}
}

void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
if (spannerException.getErrorCode() == ErrorCode.INVALID_ARGUMENT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that the error code will be INVALID_ARGUMENT (and not UNIMPLEMENTED)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for Partitioned Read or Query, we are returning INVALID_ARGUMENT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pratickchokhani
As discussed in the meeting, can we temporarily remove the error code check?

Context:
We plan to update the backend to return the UNIMPLEMENTED error for partitioned operations. However, this change will not be part of the current release and will be introduced in a follow-up update.

If we proceed with this error code check now, it will break backward compatibility when the backend starts returning UNIMPLEMENTED. To avoid this issue, we are removing the check for now and will reintroduce it once the backend update is complete.

cc: @rahul2393 to do the same for golang.

&& MultiplexedSessionDatabaseClient.verifyErrorMessage(
spannerException,
"Partitioned operations are not supported with multiplexed sessions")) {
unimplementedForPartitionedOps.set(true);
}
return partitions.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
}

private boolean canUseMultiplexedSessionsForPartitionedOps() {
return this.useMultiplexedSessionPartitionedOps
&& this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand Down Expand Up @@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
if (useMultiplexedSessionPartitionedOps) {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);

if (canUseMultiplexedSessionsForPartitionedOps()) {
try {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
} catch (SpannerException e) {
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
throw e;
}
}
}
return executePartitionedUpdateWithPooledSession(stmt, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
// multiplexed sessions" is returned.
this.client.maybeMarkUnimplementedForRW(spannerException);
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
// sessions if
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
// sessions".
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
}

@Override
Expand Down Expand Up @@ -214,6 +219,12 @@ public void close() {
*/
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);

/**
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
Expand Down Expand Up @@ -316,7 +327,18 @@ && verifyErrorMessage(
}
}

private boolean verifyErrorMessage(SpannerException spannerException, String message) {
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
&& verifyErrorMessage(
spannerException,
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
unimplementedForPartitionedOps.set(true);
return true;
}
return false;
}

static boolean verifyErrorMessage(SpannerException spannerException, String message) {
if (spannerException.getCause() == null) {
return false;
}
Expand Down Expand Up @@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
return !this.unimplementedForRW.get();
}

boolean isMultiplexedSessionsForPartitionedOpsSupported() {
return !this.unimplementedForPartitionedOps.get();
}

void close() {
synchronized (this) {
if (!this.isClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,12 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.*;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.Transaction;
import io.grpc.Status;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1540,6 +1532,89 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
assertFalse(session2.getMultiplexed());
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
@Test
public void
testInitialBeginTransactionWithPDML_receivesUnimplemented_fallsBackToRegularSession() {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNIMPLEMENTED
.withDescription(
"Transaction type partitioned_dml not supported with multiplexed sessions")
.asRuntimeException(),
Status.UNIMPLEMENTED
.withDescription(
"Transaction type partitioned_dml not supported with multiplexed sessions")
.asRuntimeException())));
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

assertNotNull(client.multiplexedSessionDatabaseClient);

// Partitioned Ops transaction should fallback to regular sessions
assertEquals(UPDATE_COUNT, client.executePartitionedUpdate(UPDATE_STATEMENT));

// Verify that we received one ExecuteSqlRequest, and it uses a regular session due to fallback.
List<ExecuteSqlRequest> executeSqlRequests =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertEquals(1, executeSqlRequests.size());
// Verify the requests are not executed using multiplexed sessions
Session session2 = mockSpanner.getSession(executeSqlRequests.get(0).getSession());
assertNotNull(session2);
assertFalse(session2.getMultiplexed());
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
@Test
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
mockSpanner.setPartitionQueryExecutionTime(
SimulatedExecutionTime.ofException(
Status.INVALID_ARGUMENT
.withDescription(
"Partitioned operations are not supported with multiplexed sessions")
.asRuntimeException()));
BatchClientImpl client = (BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));

try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
SpannerException spannerException =
assertThrows(
SpannerException.class,
() -> {
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
});
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());

// Verify that we received one PartitionQueryRequest.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(1, partitionQueryRequests.size());
// Verify the requests were executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
assertNotNull(session2);
assertTrue(session2.getMultiplexed());
assertTrue(client.unimplementedForPartitionedOps.get());
}
try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);

// // Verify that we received two PartitionQueryRequest. and it uses a regular session due to
// fallback.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(2, partitionQueryRequests.size());
// Verify the requests are not executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
assertNotNull(session2);
assertFalse(session2.getMultiplexed());
}
}

@Test
public void
testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstReceivesError_secondFallsBackToRegularSessions() {
Expand Down
Loading