Skip to content

Commit 20e6fd6

Browse files
authored
Merge branch 'main' into generate-libraries-main
2 parents 93b0cfc + 1b4b4d3 commit 20e6fd6

19 files changed

+554
-42
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
698698
if (!isReadOnly()) {
699699
builder.setSeqno(getSeqNo());
700700
}
701+
if (options.hasLastStatement()) {
702+
builder.setLastStatement(options.isLastStatement());
703+
}
701704
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
702705
builder.setRequestOptions(buildRequestOptions(options));
703706
return builder;
@@ -743,6 +746,9 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
743746
if (selector != null) {
744747
builder.setTransaction(selector);
745748
}
749+
if (options.hasLastStatement()) {
750+
builder.setLastStatements(options.isLastStatement());
751+
}
746752
builder.setSeqno(getSeqNo());
747753
builder.setRequestOptions(buildRequestOptions(options));
748754
return builder;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.ImmutableList;
2728
import com.google.protobuf.Struct;
@@ -35,6 +36,7 @@
3536
import java.time.Instant;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicReference;
3941
import java.util.concurrent.locks.ReentrantLock;
4042
import javax.annotation.Nullable;
@@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
5961
@GuardedBy("multiplexedSessionLock")
6062
private final AtomicReference<SessionImpl> multiplexedSessionReference;
6163

64+
/**
65+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
66+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
67+
*/
68+
@VisibleForTesting
69+
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
70+
6271
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
6372
this.sessionClient = checkNotNull(sessionClient);
6473
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
@@ -85,7 +94,7 @@ public String getDatabaseRole() {
8594
@Override
8695
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
8796
SessionImpl session;
88-
if (isMultiplexedSessionEnabled) {
97+
if (canUseMultiplexedSession()) {
8998
session = getMultiplexedSession();
9099
} else {
91100
session = sessionClient.createSession();
@@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
131140
batchTransactionId);
132141
}
133142

143+
private boolean canUseMultiplexedSession() {
144+
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
145+
}
146+
134147
private SessionImpl getMultiplexedSession() {
135148
this.multiplexedSessionLock.lock();
136149
try {
@@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
216229
builder.setPartitionOptions(pbuilder.build());
217230

218231
final PartitionReadRequest request = builder.build();
219-
PartitionResponse response = rpc.partitionRead(request, options);
220-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
221-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
222-
Partition partition =
223-
Partition.createReadPartition(
224-
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
225-
partitions.add(partition);
232+
try {
233+
PartitionResponse response = rpc.partitionRead(request, options);
234+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
235+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
236+
Partition partition =
237+
Partition.createReadPartition(
238+
p.getPartitionToken(),
239+
partitionOptions,
240+
table,
241+
index,
242+
keys,
243+
columns,
244+
readOptions);
245+
partitions.add(partition);
246+
}
247+
return partitions.build();
248+
} catch (SpannerException e) {
249+
maybeMarkUnimplementedForPartitionedOps(e);
250+
throw e;
226251
}
227-
return partitions.build();
228252
}
229253

230254
@Override
@@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
256280
builder.setPartitionOptions(pbuilder.build());
257281

258282
final PartitionQueryRequest request = builder.build();
259-
PartitionResponse response = rpc.partitionQuery(request, options);
260-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
261-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
262-
Partition partition =
263-
Partition.createQueryPartition(
264-
p.getPartitionToken(), partitionOptions, statement, queryOptions);
265-
partitions.add(partition);
283+
try {
284+
PartitionResponse response = rpc.partitionQuery(request, options);
285+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
286+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
287+
Partition partition =
288+
Partition.createQueryPartition(
289+
p.getPartitionToken(), partitionOptions, statement, queryOptions);
290+
partitions.add(partition);
291+
}
292+
return partitions.build();
293+
} catch (SpannerException e) {
294+
maybeMarkUnimplementedForPartitionedOps(e);
295+
throw e;
296+
}
297+
}
298+
299+
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300+
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301+
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302+
unimplementedForPartitionedOps.set(true);
266303
}
267-
return partitions.build();
268304
}
269305

270306
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
124124
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
125125
}
126126

127+
private boolean canUseMultiplexedSessionsForPartitionedOps() {
128+
return this.useMultiplexedSessionPartitionedOps
129+
&& this.multiplexedSessionDatabaseClient != null
130+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
131+
}
132+
127133
@Override
128134
public Dialect getDialect() {
129135
return pool.getDialect();
@@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
323329

324330
@Override
325331
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
326-
if (useMultiplexedSessionPartitionedOps) {
327-
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
332+
333+
if (canUseMultiplexedSessionsForPartitionedOps()) {
334+
try {
335+
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
336+
} catch (SpannerException e) {
337+
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
338+
throw e;
339+
}
340+
}
328341
}
329342
return executePartitionedUpdateWithPooledSession(stmt, options);
330343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
104104
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
105105
// multiplexed sessions" is returned.
106106
this.client.maybeMarkUnimplementedForRW(spannerException);
107+
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
108+
// sessions if
109+
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
110+
// sessions".
111+
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
107112
}
108113

109114
@Override
@@ -214,6 +219,12 @@ public void close() {
214219
*/
215220
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
216221

222+
/**
223+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
224+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
225+
*/
226+
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
227+
217228
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
218229
this(sessionClient, Clock.systemUTC());
219230
}
@@ -316,7 +327,18 @@ && verifyErrorMessage(
316327
}
317328
}
318329

319-
private boolean verifyErrorMessage(SpannerException spannerException, String message) {
330+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
331+
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
332+
&& verifyErrorMessage(
333+
spannerException,
334+
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
335+
unimplementedForPartitionedOps.set(true);
336+
return true;
337+
}
338+
return false;
339+
}
340+
341+
static boolean verifyErrorMessage(SpannerException spannerException, String message) {
320342
if (spannerException.getCause() == null) {
321343
return false;
322344
}
@@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
391413
return !this.unimplementedForRW.get();
392414
}
393415

416+
boolean isMultiplexedSessionsForPartitionedOpsSupported() {
417+
return !this.unimplementedForPartitionedOps.get();
418+
}
419+
394420
void close() {
395421
synchronized (this) {
396422
if (!this.isClosed) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public interface ReadQueryUpdateTransactionOption
108108
/** Marker interface to mark options applicable to Update and Write operations */
109109
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}
110110

111+
/** Marker interface for options that can be used with both executeQuery and executeUpdate. */
112+
public interface QueryUpdateOption extends QueryOption, UpdateOption {}
113+
111114
/**
112115
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
113116
* API.
@@ -236,6 +239,20 @@ public static DataBoostQueryOption dataBoostEnabled(Boolean dataBoostEnabled) {
236239
return new DataBoostQueryOption(dataBoostEnabled);
237240
}
238241

242+
/**
243+
* If set to true, this option marks the end of the transaction. The transaction should be
244+
* committed or aborted after this statement executes, and attempts to execute any other requests
245+
* against this transaction (including reads and queries) will be rejected. Mixing mutations with
246+
* statements that are marked as the last statement is not allowed.
247+
*
248+
* <p>For DML statements, setting this option may cause some error reporting to be deferred until
249+
* commit time (e.g. validation of unique constraints). Given this, successful execution of a DML
250+
* statement should not be assumed until the transaction commits.
251+
*/
252+
public static QueryUpdateOption lastStatement() {
253+
return new LastStatementUpdateOption();
254+
}
255+
239256
/**
240257
* Specifying this will cause the list operation to start fetching the record from this onwards.
241258
*/
@@ -494,6 +511,7 @@ void appendToOptions(Options options) {
494511
private DecodeMode decodeMode;
495512
private RpcOrderBy orderBy;
496513
private RpcLockHint lockHint;
514+
private Boolean lastStatement;
497515

498516
// Construction is via factory methods below.
499517
private Options() {}
@@ -630,6 +648,14 @@ OrderBy orderBy() {
630648
return orderBy == null ? null : orderBy.proto;
631649
}
632650

651+
boolean hasLastStatement() {
652+
return lastStatement != null;
653+
}
654+
655+
Boolean isLastStatement() {
656+
return lastStatement;
657+
}
658+
633659
boolean hasLockHint() {
634660
return lockHint != null;
635661
}
@@ -694,6 +720,9 @@ public String toString() {
694720
if (orderBy != null) {
695721
b.append("orderBy: ").append(orderBy).append(' ');
696722
}
723+
if (lastStatement != null) {
724+
b.append("lastStatement: ").append(lastStatement).append(' ');
725+
}
697726
if (lockHint != null) {
698727
b.append("lockHint: ").append(lockHint).append(' ');
699728
}
@@ -737,6 +766,7 @@ public boolean equals(Object o) {
737766
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
738767
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
739768
&& Objects.equals(orderBy(), that.orderBy())
769+
&& Objects.equals(isLastStatement(), that.isLastStatement())
740770
&& Objects.equals(lockHint(), that.lockHint());
741771
}
742772

@@ -797,6 +827,9 @@ public int hashCode() {
797827
if (orderBy != null) {
798828
result = 31 * result + orderBy.hashCode();
799829
}
830+
if (lastStatement != null) {
831+
result = 31 * result + lastStatement.hashCode();
832+
}
800833
if (lockHint != null) {
801834
result = 31 * result + lockHint.hashCode();
802835
}
@@ -965,4 +998,24 @@ public boolean equals(Object o) {
965998
return Objects.equals(filter, ((FilterOption) o).filter);
966999
}
9671000
}
1001+
1002+
static final class LastStatementUpdateOption extends InternalOption implements QueryUpdateOption {
1003+
1004+
LastStatementUpdateOption() {}
1005+
1006+
@Override
1007+
void appendToOptions(Options options) {
1008+
options.lastStatement = true;
1009+
}
1010+
1011+
@Override
1012+
public int hashCode() {
1013+
return LastStatementUpdateOption.class.hashCode();
1014+
}
1015+
1016+
@Override
1017+
public boolean equals(Object o) {
1018+
return o instanceof LastStatementUpdateOption;
1019+
}
1020+
}
9681021
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerExceptionFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ static ErrorDetails extractErrorDetails(Throwable cause) {
265265
if (cause instanceof ApiException) {
266266
return ((ApiException) cause).getErrorDetails();
267267
}
268+
if (cause instanceof SpannerException) {
269+
return ((SpannerException) cause).getErrorDetails();
270+
}
268271
prevCause = cause;
269272
cause = cause.getCause();
270273
}

0 commit comments

Comments
 (0)