Skip to content

Commit dca693c

Browse files
feat(spanner): Implement fallback for Partitioned Operations with multiplexed session. If PartitionQueryRequest or PartitionReadRequest with multiplexed session return unimplemented error, then an implicit fallback to regular session will occur.
1 parent 4435485 commit dca693c

File tree

5 files changed

+185
-66
lines changed

5 files changed

+185
-66
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,27 @@ ByteString getTransactionId() {
404404
}
405405
}
406406

407+
void initFallbackTransaction() {
408+
synchronized (txnLock) {
409+
span.addAnnotation("Creating Transaction");
410+
TransactionOptions.Builder options = TransactionOptions.newBuilder();
411+
if (timestamp != null) {
412+
options
413+
.getReadOnlyBuilder()
414+
.setReadTimestamp(timestamp.toProto())
415+
.setReturnReadTimestamp(true);
416+
} else {
417+
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
418+
}
419+
final BeginTransactionRequest request =
420+
BeginTransactionRequest.newBuilder()
421+
.setSession(session.getName())
422+
.setOptions(options)
423+
.build();
424+
initTransactionInternal(request);
425+
}
426+
}
427+
407428
void initTransaction() {
408429
SessionImpl.throwIfTransactionsPending();
409430

@@ -419,40 +440,43 @@ void initTransaction() {
419440
return;
420441
}
421442
span.addAnnotation("Creating Transaction");
443+
TransactionOptions.Builder options = TransactionOptions.newBuilder();
444+
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
445+
final BeginTransactionRequest request =
446+
BeginTransactionRequest.newBuilder()
447+
.setSession(session.getName())
448+
.setOptions(options)
449+
.build();
450+
initTransactionInternal(request);
451+
}
452+
}
453+
454+
private void initTransactionInternal(BeginTransactionRequest request) {
455+
try {
456+
Transaction transaction =
457+
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
458+
if (!transaction.hasReadTimestamp()) {
459+
throw SpannerExceptionFactory.newSpannerException(
460+
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
461+
}
462+
if (transaction.getId().isEmpty()) {
463+
throw SpannerExceptionFactory.newSpannerException(
464+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
465+
}
422466
try {
423-
TransactionOptions.Builder options = TransactionOptions.newBuilder();
424-
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
425-
final BeginTransactionRequest request =
426-
BeginTransactionRequest.newBuilder()
427-
.setSession(session.getName())
428-
.setOptions(options)
429-
.build();
430-
Transaction transaction =
431-
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
432-
if (!transaction.hasReadTimestamp()) {
433-
throw SpannerExceptionFactory.newSpannerException(
434-
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
435-
}
436-
if (transaction.getId().isEmpty()) {
437-
throw SpannerExceptionFactory.newSpannerException(
438-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
439-
}
440-
try {
441-
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
442-
} catch (IllegalArgumentException e) {
443-
throw SpannerExceptionFactory.newSpannerException(
444-
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
445-
}
446-
transactionId = transaction.getId();
447-
span.addAnnotation(
448-
"Transaction Creation Done",
449-
ImmutableMap.of(
450-
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
451-
452-
} catch (SpannerException e) {
453-
span.addAnnotation("Transaction Creation Failed", e);
454-
throw e;
467+
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
468+
} catch (IllegalArgumentException e) {
469+
throw SpannerExceptionFactory.newSpannerException(
470+
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
455471
}
472+
transactionId = transaction.getId();
473+
span.addAnnotation(
474+
"Transaction Creation Done",
475+
ImmutableMap.of(
476+
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
477+
} catch (SpannerException e) {
478+
span.addAnnotation("Transaction Creation Failed", e);
479+
throw e;
456480
}
457481
}
458482
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
114114
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
115115
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
116116
.setTracer(sessionClient.getSpanner().getTracer()),
117-
checkNotNull(bound));
117+
checkNotNull(bound),
118+
sessionClient);
118119
}
119120

120121
@Override
@@ -137,7 +138,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
137138
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
138139
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
139140
.setTracer(sessionClient.getSpanner().getTracer()),
140-
batchTransactionId);
141+
batchTransactionId,
142+
sessionClient);
141143
}
142144

143145
private boolean canUseMultiplexedSession() {
@@ -160,20 +162,27 @@ private SessionImpl getMultiplexedSession() {
160162

161163
private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
162164
implements BatchReadOnlyTransaction {
163-
private final String sessionName;
165+
private String sessionName;
164166
private final Map<SpannerRpc.Option, ?> options;
167+
private final SessionClient sessionClient;
165168

166169
BatchReadOnlyTransactionImpl(
167-
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
170+
MultiUseReadOnlyTransaction.Builder builder,
171+
TimestampBound bound,
172+
SessionClient sessionClient) {
168173
super(builder.setTimestampBound(bound));
174+
this.sessionClient = sessionClient;
169175
this.sessionName = session.getName();
170176
this.options = session.getOptions();
171177
initTransaction();
172178
}
173179

174180
BatchReadOnlyTransactionImpl(
175-
MultiUseReadOnlyTransaction.Builder builder, BatchTransactionId batchTransactionId) {
181+
MultiUseReadOnlyTransaction.Builder builder,
182+
BatchTransactionId batchTransactionId,
183+
SessionClient sessionClient) {
176184
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
185+
this.sessionClient = sessionClient;
177186
this.sessionName = session.getName();
178187
this.options = session.getOptions();
179188
}
@@ -204,6 +213,18 @@ public List<Partition> partitionReadUsingIndex(
204213
Iterable<String> columns,
205214
ReadOption... option)
206215
throws SpannerException {
216+
return partitionReadUsingIndex(partitionOptions, table, index, keys, columns, false, option);
217+
}
218+
219+
public List<Partition> partitionReadUsingIndex(
220+
PartitionOptions partitionOptions,
221+
String table,
222+
String index,
223+
KeySet keys,
224+
Iterable<String> columns,
225+
boolean isFallback,
226+
ReadOption... option)
227+
throws SpannerException {
207228
Options readOptions = Options.fromReadOptions(option);
208229
Preconditions.checkArgument(
209230
!readOptions.hasLimit(),
@@ -246,7 +267,10 @@ public List<Partition> partitionReadUsingIndex(
246267
}
247268
return partitions.build();
248269
} catch (SpannerException e) {
249-
maybeMarkUnimplementedForPartitionedOps(e);
270+
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
271+
return partitionReadUsingIndex(
272+
partitionOptions, table, index, keys, columns, true, option);
273+
}
250274
throw e;
251275
}
252276
}
@@ -255,6 +279,15 @@ public List<Partition> partitionReadUsingIndex(
255279
public List<Partition> partitionQuery(
256280
PartitionOptions partitionOptions, Statement statement, QueryOption... option)
257281
throws SpannerException {
282+
return partitionQuery(partitionOptions, statement, false, option);
283+
}
284+
285+
private List<Partition> partitionQuery(
286+
PartitionOptions partitionOptions,
287+
Statement statement,
288+
boolean isFallback,
289+
QueryOption... option)
290+
throws SpannerException {
258291
Options queryOptions = Options.fromQueryOptions(option);
259292
final PartitionQueryRequest.Builder builder =
260293
PartitionQueryRequest.newBuilder().setSession(sessionName).setSql(statement.getSql());
@@ -291,16 +324,23 @@ public List<Partition> partitionQuery(
291324
}
292325
return partitions.build();
293326
} catch (SpannerException e) {
294-
maybeMarkUnimplementedForPartitionedOps(e);
327+
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
328+
return partitionQuery(partitionOptions, statement, true, option);
329+
}
295330
throw e;
296331
}
297332
}
298333

299-
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
334+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300335
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301336
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302337
unimplementedForPartitionedOps.set(true);
338+
session.setFallbackSessionReference(sessionClient.createSession().getSessionReference());
339+
sessionName = session.getName();
340+
initFallbackTransaction();
341+
return true;
303342
}
343+
return false;
304344
}
305345

306346
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.cloud.Timestamp;
20+
import com.google.common.annotations.VisibleForTesting;
2021
import com.google.common.base.Preconditions;
2122
import com.google.protobuf.ByteString;
2223
import java.io.Serializable;
@@ -34,6 +35,7 @@ public class BatchTransactionId implements Serializable {
3435
private final Timestamp timestamp;
3536
private static final long serialVersionUID = 8067099123096783939L;
3637

38+
@VisibleForTesting
3739
BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) {
3840
this.transactionId = Preconditions.checkNotNull(transactionId);
3941
this.sessionId = Preconditions.checkNotNull(sessionId);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ interface SessionTransaction {
120120
static final int NO_CHANNEL_HINT = -1;
121121

122122
private final SpannerImpl spanner;
123-
private final SessionReference sessionReference;
123+
private SessionReference sessionReference;
124124
private SessionTransaction activeTransaction;
125125
private ISpan currentSpan;
126126
private final Clock clock;
@@ -160,6 +160,14 @@ public String getName() {
160160
return sessionReference.getName();
161161
}
162162

163+
/**
164+
* Updates the session reference with the fallback session. This should only be used for updating
165+
* session reference with regular session in case of unimplemented error in multiplexed session.
166+
*/
167+
void setFallbackSessionReference(SessionReference sessionReference) {
168+
this.sessionReference = sessionReference;
169+
}
170+
163171
Map<SpannerRpc.Option, ?> getOptions() {
164172
return options;
165173
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,11 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
15321532
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
15331533
}
15341534

1535-
// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
1535+
/**
1536+
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions. 2
1537+
* PartitionQueryRequest should be received. First with Multiplexed session and second with
1538+
* regular session.
1539+
*/
15361540
@Test
15371541
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
15381542
try {
@@ -1547,40 +1551,81 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession
15471551

15481552
try (BatchReadOnlyTransaction transaction =
15491553
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1550-
// Partitioned Query should fail
1551-
SpannerException spannerException =
1552-
assertThrows(
1553-
SpannerException.class,
1554-
() -> {
1555-
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1556-
});
1557-
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());
1554+
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
15581555

15591556
// Verify that we received one PartitionQueryRequest.
15601557
List<PartitionQueryRequest> partitionQueryRequests =
15611558
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1562-
assertEquals(1, partitionQueryRequests.size());
1559+
assertEquals(2, partitionQueryRequests.size());
15631560
// Verify the requests were executed using multiplexed sessions
1564-
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1565-
assertNotNull(session2);
1566-
assertTrue(session2.getMultiplexed());
1561+
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1562+
assertNotNull(session);
1563+
assertTrue(session.getMultiplexed());
15671564
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());
1565+
1566+
session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1567+
assertNotNull(session);
1568+
assertFalse(session.getMultiplexed());
15681569
}
1570+
} finally {
1571+
BatchClientImpl.unimplementedForPartitionedOps.set(false);
1572+
}
1573+
}
1574+
1575+
/**
1576+
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions. 2
1577+
* PartitionQueryRequest should be received. First with Multiplexed session and second with
1578+
* regular session.
1579+
*/
1580+
@Test
1581+
public void
1582+
testPartitionedQueryWithTransactionId_receivesUnimplemented_fallsBackToRegularSession() {
1583+
try {
1584+
mockSpanner.setPartitionQueryExecutionTime(
1585+
SimulatedExecutionTime.ofException(
1586+
Status.INVALID_ARGUMENT
1587+
.withDescription(
1588+
"Partitioned operations are not supported with multiplexed sessions")
1589+
.asRuntimeException()));
1590+
BatchClientImpl client =
1591+
(BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));
1592+
15691593
try (BatchReadOnlyTransaction transaction =
15701594
client.batchReadOnlyTransaction(TimestampBound.strong())) {
1571-
// Partitioned Query should fail
1572-
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
15731595

1574-
// // Verify that we received two PartitionQueryRequest. and it uses a regular session due
1575-
// to
1576-
// fallback.
1577-
List<PartitionQueryRequest> partitionQueryRequests =
1578-
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1579-
assertEquals(2, partitionQueryRequests.size());
1580-
// Verify the requests are not executed using multiplexed sessions
1581-
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1582-
assertNotNull(session2);
1583-
assertFalse(session2.getMultiplexed());
1596+
try (BatchReadOnlyTransaction transaction1 =
1597+
client.batchReadOnlyTransaction(transaction.getBatchTransactionId())) {
1598+
transaction1.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
1599+
1600+
// Verify that we received one PartitionQueryRequest.
1601+
List<PartitionQueryRequest> partitionQueryRequests =
1602+
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
1603+
assertEquals(2, partitionQueryRequests.size());
1604+
// Verify the requests were executed using multiplexed sessions
1605+
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
1606+
assertNotNull(session);
1607+
assertTrue(session.getMultiplexed());
1608+
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());
1609+
1610+
session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
1611+
assertNotNull(session);
1612+
assertFalse(session.getMultiplexed());
1613+
1614+
List<BeginTransactionRequest> beginTransactionRequests =
1615+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
1616+
assertEquals(2, beginTransactionRequests.size());
1617+
1618+
session = mockSpanner.getSession(beginTransactionRequests.get(0).getSession());
1619+
assertNotNull(session);
1620+
assertTrue(session.getMultiplexed());
1621+
1622+
session = mockSpanner.getSession(beginTransactionRequests.get(1).getSession());
1623+
assertNotNull(session);
1624+
assertFalse(session.getMultiplexed());
1625+
assertEquals(
1626+
transaction.getBatchTransactionId().getTimestamp(),
1627+
transaction1.getBatchTransactionId().getTimestamp());
1628+
}
15841629
}
15851630
} finally {
15861631
BatchClientImpl.unimplementedForPartitionedOps.set(false);

0 commit comments

Comments
 (0)