Skip to content

Commit 90d29b1

Browse files
Merge branch 'main' into main
2 parents e9410cc + 1b4b4d3 commit 90d29b1

32 files changed

+662
-126
lines changed

.github/workflows/hermetic_library_generation.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
with:
3838
fetch-depth: 0
3939
token: ${{ secrets.CLOUD_JAVA_BOT_TOKEN }}
40-
- uses: googleapis/sdk-platform-java/.github/scripts@v2.52.0
40+
- uses: googleapis/sdk-platform-java/.github/scripts@v2.53.0
4141
if: env.SHOULD_RUN == 'true'
4242
with:
4343
base_ref: ${{ github.base_ref }}

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
1919
<dependency>
2020
<groupId>com.google.cloud</groupId>
2121
<artifactId>libraries-bom</artifactId>
22-
<version>26.53.0</version>
22+
<version>26.54.0</version>
2323
<type>pom</type>
2424
<scope>import</scope>
2525
</dependency>
@@ -49,7 +49,7 @@ If you are using Maven without the BOM, add this to your dependencies:
4949
If you are using Gradle 5.x or later, add this to your dependencies:
5050

5151
```Groovy
52-
implementation platform('com.google.cloud:libraries-bom:26.53.0')
52+
implementation platform('com.google.cloud:libraries-bom:26.54.0')
5353
5454
implementation 'com.google.cloud:google-cloud-spanner'
5555
```

generation_config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
gapic_generator_version: 2.52.0
2-
googleapis_commitish: 3cf61b2df20eace09e6336c23f9e08859c0d87ae
3-
libraries_bom_version: 26.53.0
1+
gapic_generator_version: 2.53.0
2+
googleapis_commitish: 9605bff3d36fbdb1227b26bce68258c5f00815e4
3+
libraries_bom_version: 26.54.0
44
libraries:
55
- api_shortname: spanner
66
name_pretty: Cloud Spanner

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,9 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
698698
if (!isReadOnly()) {
699699
builder.setSeqno(getSeqNo());
700700
}
701+
if (options.hasLastStatement()) {
702+
builder.setLastStatement(options.isLastStatement());
703+
}
701704
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
702705
builder.setRequestOptions(buildRequestOptions(options));
703706
return builder;
@@ -743,6 +746,9 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
743746
if (selector != null) {
744747
builder.setTransaction(selector);
745748
}
749+
if (options.hasLastStatement()) {
750+
builder.setLastStatements(options.isLastStatement());
751+
}
746752
builder.setSeqno(getSeqNo());
747753
builder.setRequestOptions(buildRequestOptions(options));
748754
return builder;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void setSpan(ISpan span) {
5555

5656
@Override
5757
public void close() {
58-
closeAsync();
58+
SpannerApiFutures.get(closeAsync());
5959
}
6060

6161
@Override
@@ -183,6 +183,10 @@ public ApiFuture<Void> rollbackAsync() {
183183

184184
@Override
185185
public TransactionContextFuture resetForRetryAsync() {
186+
if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) {
187+
throw new IllegalStateException(
188+
"resetForRetry can only be called if the previous attempt aborted");
189+
}
186190
return new TransactionContextFutureImpl(this, internalBeginAsync(false));
187191
}
188192

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.ImmutableList;
2728
import com.google.protobuf.Struct;
@@ -35,6 +36,7 @@
3536
import java.time.Instant;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicReference;
3941
import java.util.concurrent.locks.ReentrantLock;
4042
import javax.annotation.Nullable;
@@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
5961
@GuardedBy("multiplexedSessionLock")
6062
private final AtomicReference<SessionImpl> multiplexedSessionReference;
6163

64+
/**
65+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
66+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
67+
*/
68+
@VisibleForTesting
69+
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
70+
6271
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
6372
this.sessionClient = checkNotNull(sessionClient);
6473
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
@@ -85,7 +94,7 @@ public String getDatabaseRole() {
8594
@Override
8695
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
8796
SessionImpl session;
88-
if (isMultiplexedSessionEnabled) {
97+
if (canUseMultiplexedSession()) {
8998
session = getMultiplexedSession();
9099
} else {
91100
session = sessionClient.createSession();
@@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
131140
batchTransactionId);
132141
}
133142

143+
private boolean canUseMultiplexedSession() {
144+
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
145+
}
146+
134147
private SessionImpl getMultiplexedSession() {
135148
this.multiplexedSessionLock.lock();
136149
try {
@@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
216229
builder.setPartitionOptions(pbuilder.build());
217230

218231
final PartitionReadRequest request = builder.build();
219-
PartitionResponse response = rpc.partitionRead(request, options);
220-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
221-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
222-
Partition partition =
223-
Partition.createReadPartition(
224-
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
225-
partitions.add(partition);
232+
try {
233+
PartitionResponse response = rpc.partitionRead(request, options);
234+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
235+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
236+
Partition partition =
237+
Partition.createReadPartition(
238+
p.getPartitionToken(),
239+
partitionOptions,
240+
table,
241+
index,
242+
keys,
243+
columns,
244+
readOptions);
245+
partitions.add(partition);
246+
}
247+
return partitions.build();
248+
} catch (SpannerException e) {
249+
maybeMarkUnimplementedForPartitionedOps(e);
250+
throw e;
226251
}
227-
return partitions.build();
228252
}
229253

230254
@Override
@@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
256280
builder.setPartitionOptions(pbuilder.build());
257281

258282
final PartitionQueryRequest request = builder.build();
259-
PartitionResponse response = rpc.partitionQuery(request, options);
260-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
261-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
262-
Partition partition =
263-
Partition.createQueryPartition(
264-
p.getPartitionToken(), partitionOptions, statement, queryOptions);
265-
partitions.add(partition);
283+
try {
284+
PartitionResponse response = rpc.partitionQuery(request, options);
285+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
286+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
287+
Partition partition =
288+
Partition.createQueryPartition(
289+
p.getPartitionToken(), partitionOptions, statement, queryOptions);
290+
partitions.add(partition);
291+
}
292+
return partitions.build();
293+
} catch (SpannerException e) {
294+
maybeMarkUnimplementedForPartitionedOps(e);
295+
throw e;
296+
}
297+
}
298+
299+
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300+
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301+
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302+
unimplementedForPartitionedOps.set(true);
266303
}
267-
return partitions.build();
268304
}
269305

270306
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
124124
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
125125
}
126126

127+
private boolean canUseMultiplexedSessionsForPartitionedOps() {
128+
return this.useMultiplexedSessionPartitionedOps
129+
&& this.multiplexedSessionDatabaseClient != null
130+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
131+
}
132+
127133
@Override
128134
public Dialect getDialect() {
129135
return pool.getDialect();
@@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
323329

324330
@Override
325331
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
326-
if (useMultiplexedSessionPartitionedOps) {
327-
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
332+
333+
if (canUseMultiplexedSessionsForPartitionedOps()) {
334+
try {
335+
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
336+
} catch (SpannerException e) {
337+
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
338+
throw e;
339+
}
340+
}
328341
}
329342
return executePartitionedUpdateWithPooledSession(stmt, options);
330343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
104104
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
105105
// multiplexed sessions" is returned.
106106
this.client.maybeMarkUnimplementedForRW(spannerException);
107+
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
108+
// sessions if
109+
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
110+
// sessions".
111+
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
107112
}
108113

109114
@Override
@@ -214,6 +219,12 @@ public void close() {
214219
*/
215220
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
216221

222+
/**
223+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
224+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
225+
*/
226+
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
227+
217228
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
218229
this(sessionClient, Clock.systemUTC());
219230
}
@@ -316,7 +327,18 @@ && verifyErrorMessage(
316327
}
317328
}
318329

319-
private boolean verifyErrorMessage(SpannerException spannerException, String message) {
330+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
331+
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
332+
&& verifyErrorMessage(
333+
spannerException,
334+
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
335+
unimplementedForPartitionedOps.set(true);
336+
return true;
337+
}
338+
return false;
339+
}
340+
341+
static boolean verifyErrorMessage(SpannerException spannerException, String message) {
320342
if (spannerException.getCause() == null) {
321343
return false;
322344
}
@@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
391413
return !this.unimplementedForRW.get();
392414
}
393415

416+
boolean isMultiplexedSessionsForPartitionedOpsSupported() {
417+
return !this.unimplementedForPartitionedOps.get();
418+
}
419+
394420
void close() {
395421
synchronized (this) {
396422
if (!this.isClosed) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public interface ReadQueryUpdateTransactionOption
108108
/** Marker interface to mark options applicable to Update and Write operations */
109109
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}
110110

111+
/** Marker interface for options that can be used with both executeQuery and executeUpdate. */
112+
public interface QueryUpdateOption extends QueryOption, UpdateOption {}
113+
111114
/**
112115
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
113116
* API.
@@ -236,6 +239,20 @@ public static DataBoostQueryOption dataBoostEnabled(Boolean dataBoostEnabled) {
236239
return new DataBoostQueryOption(dataBoostEnabled);
237240
}
238241

242+
/**
243+
* If set to true, this option marks the end of the transaction. The transaction should be
244+
* committed or aborted after this statement executes, and attempts to execute any other requests
245+
* against this transaction (including reads and queries) will be rejected. Mixing mutations with
246+
* statements that are marked as the last statement is not allowed.
247+
*
248+
* <p>For DML statements, setting this option may cause some error reporting to be deferred until
249+
* commit time (e.g. validation of unique constraints). Given this, successful execution of a DML
250+
* statement should not be assumed until the transaction commits.
251+
*/
252+
public static QueryUpdateOption lastStatement() {
253+
return new LastStatementUpdateOption();
254+
}
255+
239256
/**
240257
* Specifying this will cause the list operation to start fetching the record from this onwards.
241258
*/
@@ -494,6 +511,7 @@ void appendToOptions(Options options) {
494511
private DecodeMode decodeMode;
495512
private RpcOrderBy orderBy;
496513
private RpcLockHint lockHint;
514+
private Boolean lastStatement;
497515

498516
// Construction is via factory methods below.
499517
private Options() {}
@@ -630,6 +648,14 @@ OrderBy orderBy() {
630648
return orderBy == null ? null : orderBy.proto;
631649
}
632650

651+
boolean hasLastStatement() {
652+
return lastStatement != null;
653+
}
654+
655+
Boolean isLastStatement() {
656+
return lastStatement;
657+
}
658+
633659
boolean hasLockHint() {
634660
return lockHint != null;
635661
}
@@ -694,6 +720,9 @@ public String toString() {
694720
if (orderBy != null) {
695721
b.append("orderBy: ").append(orderBy).append(' ');
696722
}
723+
if (lastStatement != null) {
724+
b.append("lastStatement: ").append(lastStatement).append(' ');
725+
}
697726
if (lockHint != null) {
698727
b.append("lockHint: ").append(lockHint).append(' ');
699728
}
@@ -737,6 +766,7 @@ public boolean equals(Object o) {
737766
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
738767
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
739768
&& Objects.equals(orderBy(), that.orderBy())
769+
&& Objects.equals(isLastStatement(), that.isLastStatement())
740770
&& Objects.equals(lockHint(), that.lockHint());
741771
}
742772

@@ -797,6 +827,9 @@ public int hashCode() {
797827
if (orderBy != null) {
798828
result = 31 * result + orderBy.hashCode();
799829
}
830+
if (lastStatement != null) {
831+
result = 31 * result + lastStatement.hashCode();
832+
}
800833
if (lockHint != null) {
801834
result = 31 * result + lockHint.hashCode();
802835
}
@@ -965,4 +998,24 @@ public boolean equals(Object o) {
965998
return Objects.equals(filter, ((FilterOption) o).filter);
966999
}
9671000
}
1001+
1002+
static final class LastStatementUpdateOption extends InternalOption implements QueryUpdateOption {
1003+
1004+
LastStatementUpdateOption() {}
1005+
1006+
@Override
1007+
void appendToOptions(Options options) {
1008+
options.lastStatement = true;
1009+
}
1010+
1011+
@Override
1012+
public int hashCode() {
1013+
return LastStatementUpdateOption.class.hashCode();
1014+
}
1015+
1016+
@Override
1017+
public boolean equals(Object o) {
1018+
return o instanceof LastStatementUpdateOption;
1019+
}
1020+
}
9681021
}

0 commit comments

Comments
 (0)