Skip to content

Commit 2ba1b7b

Browse files
authored
Merge branch 'main' into mux-rw-it
2 parents 2c658bb + c12968a commit 2ba1b7b

File tree

14 files changed

+343
-37
lines changed

14 files changed

+343
-37
lines changed

.github/workflows/unmanaged_dependency_check.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ jobs:
1717
# repository
1818
.kokoro/build.sh
1919
- name: Unmanaged dependency check
20-
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.42.0
20+
uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.43.0
2121
with:
2222
bom-path: google-cloud-spanner-bom/pom.xml

.kokoro/presubmit/graalvm-native-17.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Configure the docker image for kokoro-trampoline.
44
env_vars: {
55
key: "TRAMPOLINE_IMAGE"
6-
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.42.0"
6+
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.43.0"
77
}
88

99
env_vars: {

.kokoro/presubmit/graalvm-native.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# Configure the docker image for kokoro-trampoline.
44
env_vars: {
55
key: "TRAMPOLINE_IMAGE"
6-
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.42.0"
6+
value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.43.0"
77
}
88

99
env_vars: {

google-cloud-spanner-bom/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<parent>
99
<groupId>com.google.cloud</groupId>
1010
<artifactId>sdk-platform-java-config</artifactId>
11-
<version>3.42.0</version>
11+
<version>3.43.0</version>
1212
</parent>
1313

1414
<name>Google Cloud Spanner BOM</name>

google-cloud-spanner-executor/pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,16 @@
4646
<groupId>com.google.cloud.opentelemetry</groupId>
4747
<artifactId>exporter-trace</artifactId>
4848
<version>0.33.0</version>
49+
<exclusions>
50+
<exclusion>
51+
<groupId>io.opentelemetry.semconv</groupId>
52+
<artifactId>opentelemetry-semconv</artifactId>
53+
</exclusion>
54+
</exclusions>
55+
</dependency>
56+
<dependency>
57+
<groupId>io.opentelemetry.semconv</groupId>
58+
<artifactId>opentelemetry-semconv</artifactId>
4959
</dependency>
5060
<dependency>
5161
<groupId>com.google.cloud</groupId>
@@ -266,7 +276,7 @@
266276
<groupId>org.apache.maven.plugins</groupId>
267277
<artifactId>maven-dependency-plugin</artifactId>
268278
<configuration>
269-
<ignoredDependencies> com.google.api:gax,org.apache.maven.surefire:surefire-junit4 </ignoredDependencies>
279+
<ignoredDependencies> com.google.api:gax,org.apache.maven.surefire:surefire-junit4,io.opentelemetry.semconv:opentelemetry-semconv </ignoredDependencies>
270280
</configuration>
271281
</plugin>
272282
</plugins>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.Options.QueryOption;
2323
import com.google.cloud.spanner.Options.ReadOption;
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
25+
import com.google.common.annotations.VisibleForTesting;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.ImmutableList;
2728
import com.google.protobuf.Struct;
@@ -35,6 +36,7 @@
3536
import java.time.Instant;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicReference;
3941
import java.util.concurrent.locks.ReentrantLock;
4042
import javax.annotation.Nullable;
@@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
5961
@GuardedBy("multiplexedSessionLock")
6062
private final AtomicReference<SessionImpl> multiplexedSessionReference;
6163

64+
/**
65+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
66+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
67+
*/
68+
@VisibleForTesting
69+
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
70+
6271
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
6372
this.sessionClient = checkNotNull(sessionClient);
6473
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
@@ -85,7 +94,7 @@ public String getDatabaseRole() {
8594
@Override
8695
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
8796
SessionImpl session;
88-
if (isMultiplexedSessionEnabled) {
97+
if (canUseMultiplexedSession()) {
8998
session = getMultiplexedSession();
9099
} else {
91100
session = sessionClient.createSession();
@@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
131140
batchTransactionId);
132141
}
133142

143+
private boolean canUseMultiplexedSession() {
144+
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
145+
}
146+
134147
private SessionImpl getMultiplexedSession() {
135148
this.multiplexedSessionLock.lock();
136149
try {
@@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
216229
builder.setPartitionOptions(pbuilder.build());
217230

218231
final PartitionReadRequest request = builder.build();
219-
PartitionResponse response = rpc.partitionRead(request, options);
220-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
221-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
222-
Partition partition =
223-
Partition.createReadPartition(
224-
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
225-
partitions.add(partition);
232+
try {
233+
PartitionResponse response = rpc.partitionRead(request, options);
234+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
235+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
236+
Partition partition =
237+
Partition.createReadPartition(
238+
p.getPartitionToken(),
239+
partitionOptions,
240+
table,
241+
index,
242+
keys,
243+
columns,
244+
readOptions);
245+
partitions.add(partition);
246+
}
247+
return partitions.build();
248+
} catch (SpannerException e) {
249+
maybeMarkUnimplementedForPartitionedOps(e);
250+
throw e;
226251
}
227-
return partitions.build();
228252
}
229253

230254
@Override
@@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
256280
builder.setPartitionOptions(pbuilder.build());
257281

258282
final PartitionQueryRequest request = builder.build();
259-
PartitionResponse response = rpc.partitionQuery(request, options);
260-
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
261-
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
262-
Partition partition =
263-
Partition.createQueryPartition(
264-
p.getPartitionToken(), partitionOptions, statement, queryOptions);
265-
partitions.add(partition);
283+
try {
284+
PartitionResponse response = rpc.partitionQuery(request, options);
285+
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
286+
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
287+
Partition partition =
288+
Partition.createQueryPartition(
289+
p.getPartitionToken(), partitionOptions, statement, queryOptions);
290+
partitions.add(partition);
291+
}
292+
return partitions.build();
293+
} catch (SpannerException e) {
294+
maybeMarkUnimplementedForPartitionedOps(e);
295+
throw e;
296+
}
297+
}
298+
299+
void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
300+
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
301+
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
302+
unimplementedForPartitionedOps.set(true);
266303
}
267-
return partitions.build();
268304
}
269305

270306
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
124124
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
125125
}
126126

127+
private boolean canUseMultiplexedSessionsForPartitionedOps() {
128+
return this.useMultiplexedSessionPartitionedOps
129+
&& this.multiplexedSessionDatabaseClient != null
130+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
131+
}
132+
127133
@Override
128134
public Dialect getDialect() {
129135
return pool.getDialect();
@@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
323329

324330
@Override
325331
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
326-
if (useMultiplexedSessionPartitionedOps) {
327-
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
332+
333+
if (canUseMultiplexedSessionsForPartitionedOps()) {
334+
try {
335+
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
336+
} catch (SpannerException e) {
337+
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
338+
throw e;
339+
}
340+
}
328341
}
329342
return executePartitionedUpdateWithPooledSession(stmt, options);
330343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
104104
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
105105
// multiplexed sessions" is returned.
106106
this.client.maybeMarkUnimplementedForRW(spannerException);
107+
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
108+
// sessions if
109+
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
110+
// sessions".
111+
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
107112
}
108113

109114
@Override
@@ -214,6 +219,12 @@ public void close() {
214219
*/
215220
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);
216221

222+
/**
223+
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
224+
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
225+
*/
226+
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);
227+
217228
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
218229
this(sessionClient, Clock.systemUTC());
219230
}
@@ -316,7 +327,18 @@ && verifyErrorMessage(
316327
}
317328
}
318329

319-
private boolean verifyErrorMessage(SpannerException spannerException, String message) {
330+
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
331+
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
332+
&& verifyErrorMessage(
333+
spannerException,
334+
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
335+
unimplementedForPartitionedOps.set(true);
336+
return true;
337+
}
338+
return false;
339+
}
340+
341+
static boolean verifyErrorMessage(SpannerException spannerException, String message) {
320342
if (spannerException.getCause() == null) {
321343
return false;
322344
}
@@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
391413
return !this.unimplementedForRW.get();
392414
}
393415

416+
boolean isMultiplexedSessionsForPartitionedOpsSupported() {
417+
return !this.unimplementedForPartitionedOps.get();
418+
}
419+
394420
void close() {
395421
synchronized (this) {
396422
if (!this.isClosed) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.InternalApi;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import java.math.BigInteger;
22+
import java.security.SecureRandom;
23+
import java.util.Objects;
24+
25+
@InternalApi
26+
public class XGoogSpannerRequestId {
27+
// 1. Generate the random process Id singleton.
28+
@VisibleForTesting
29+
static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId();
30+
31+
@VisibleForTesting
32+
static final long VERSION = 1; // The version of the specification being implemented.
33+
34+
private final long nthClientId;
35+
private final long nthChannelId;
36+
private final long nthRequest;
37+
private long attempt;
38+
39+
XGoogSpannerRequestId(long nthClientId, long nthChannelId, long nthRequest, long attempt) {
40+
this.nthClientId = nthClientId;
41+
this.nthChannelId = nthChannelId;
42+
this.nthRequest = nthRequest;
43+
this.attempt = attempt;
44+
}
45+
46+
public static XGoogSpannerRequestId of(
47+
long nthClientId, long nthChannelId, long nthRequest, long attempt) {
48+
return new XGoogSpannerRequestId(nthClientId, nthChannelId, nthRequest, attempt);
49+
}
50+
51+
private static String generateRandProcessId() {
52+
// Expecting to use 64-bits of randomness to avoid clashes.
53+
BigInteger bigInt = new BigInteger(64, new SecureRandom());
54+
return String.format("%016x", bigInt);
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return String.format(
60+
"%d.%s.%d.%d.%d.%d",
61+
XGoogSpannerRequestId.VERSION,
62+
XGoogSpannerRequestId.RAND_PROCESS_ID,
63+
this.nthClientId,
64+
this.nthChannelId,
65+
this.nthRequest,
66+
this.attempt);
67+
}
68+
69+
@Override
70+
public boolean equals(Object other) {
71+
// instanceof for a null object returns false.
72+
if (!(other instanceof XGoogSpannerRequestId)) {
73+
return false;
74+
}
75+
76+
XGoogSpannerRequestId otherReqId = (XGoogSpannerRequestId) (other);
77+
78+
return Objects.equals(this.nthClientId, otherReqId.nthClientId)
79+
&& Objects.equals(this.nthChannelId, otherReqId.nthChannelId)
80+
&& Objects.equals(this.nthRequest, otherReqId.nthRequest)
81+
&& Objects.equals(this.attempt, otherReqId.attempt);
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);
87+
}
88+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
Args = --initialize-at-build-time=com.google.cloud.spanner.IntegrationTestEnv,\
22
org.junit.experimental.categories.CategoryValidator,\
3-
org.junit.validator.AnnotationValidator \
3+
org.junit.validator.AnnotationValidator,\
4+
java.lang.annotation.Annotation \
45
--features=com.google.cloud.spanner.nativeimage.SpannerFeature

0 commit comments

Comments
 (0)