Skip to content

Commit 425bfc8

Browse files
authored
Merge branch 'main' into generate-libraries-main
2 parents 5b14e67 + d9813a0 commit 425bfc8

21 files changed

+499
-51
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,7 @@ ResultSet readInternalWithOptions(
960960
SpannerImpl.READ,
961961
span,
962962
tracer,
963+
tracer.createTableAttributes(table, readOptions),
963964
session.getErrorHandler(),
964965
rpc.getReadRetrySettings(),
965966
rpc.getReadRetryableCodes()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626
import com.google.common.base.Function;
2727
import com.google.common.util.concurrent.ListenableFuture;
2828
import com.google.spanner.v1.BatchWriteResponse;
29+
import io.opentelemetry.api.common.Attributes;
2930
import javax.annotation.Nullable;
3031

3132
class DatabaseClientImpl implements DatabaseClient {
3233
private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
3334
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";
3435
private static final String PARTITION_DML_TRANSACTION = "CloudSpanner.PartitionDMLTransaction";
3536
private final TraceWrapper tracer;
37+
private Attributes commonAttributes;
3638
@VisibleForTesting final String clientId;
3739
@VisibleForTesting final SessionPool pool;
3840
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@@ -50,7 +52,8 @@ class DatabaseClientImpl implements DatabaseClient {
5052
/* multiplexedSessionDatabaseClient = */ null,
5153
/* useMultiplexedSessionPartitionedOps= */ false,
5254
tracer,
53-
/* useMultiplexedSessionForRW = */ false);
55+
/* useMultiplexedSessionForRW = */ false,
56+
Attributes.empty());
5457
}
5558

5659
@VisibleForTesting
@@ -62,7 +65,8 @@ class DatabaseClientImpl implements DatabaseClient {
6265
/* multiplexedSessionDatabaseClient = */ null,
6366
/* useMultiplexedSessionPartitionedOps= */ false,
6467
tracer,
65-
/* useMultiplexedSessionForRW = */ false);
68+
/* useMultiplexedSessionForRW = */ false,
69+
Attributes.empty());
6670
}
6771

6872
DatabaseClientImpl(
@@ -72,14 +76,16 @@ class DatabaseClientImpl implements DatabaseClient {
7276
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
7377
boolean useMultiplexedSessionPartitionedOps,
7478
TraceWrapper tracer,
75-
boolean useMultiplexedSessionForRW) {
79+
boolean useMultiplexedSessionForRW,
80+
Attributes commonAttributes) {
7681
this.clientId = clientId;
7782
this.pool = pool;
7883
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
7984
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
8085
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
8186
this.tracer = tracer;
8287
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
88+
this.commonAttributes = commonAttributes;
8389
}
8490

8591
@VisibleForTesting
@@ -138,7 +144,7 @@ public Timestamp write(final Iterable<Mutation> mutations) throws SpannerExcepti
138144
public CommitResponse writeWithOptions(
139145
final Iterable<Mutation> mutations, final TransactionOption... options)
140146
throws SpannerException {
141-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
147+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
142148
try (IScope s = tracer.withSpan(span)) {
143149
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
144150
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
@@ -161,7 +167,7 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
161167
public CommitResponse writeAtLeastOnceWithOptions(
162168
final Iterable<Mutation> mutations, final TransactionOption... options)
163169
throws SpannerException {
164-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
170+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
165171
try (IScope s = tracer.withSpan(span)) {
166172
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
167173
return getMultiplexedSessionDatabaseClient()
@@ -181,7 +187,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
181187
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
182188
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
183189
throws SpannerException {
184-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
190+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
185191
try (IScope s = tracer.withSpan(span)) {
186192
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
187193
} catch (RuntimeException e) {
@@ -194,7 +200,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
194200

195201
@Override
196202
public ReadContext singleUse() {
197-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
203+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
198204
try (IScope s = tracer.withSpan(span)) {
199205
return getMultiplexedSession().singleUse();
200206
} catch (RuntimeException e) {
@@ -206,7 +212,7 @@ public ReadContext singleUse() {
206212

207213
@Override
208214
public ReadContext singleUse(TimestampBound bound) {
209-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
215+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
210216
try (IScope s = tracer.withSpan(span)) {
211217
return getMultiplexedSession().singleUse(bound);
212218
} catch (RuntimeException e) {
@@ -218,7 +224,7 @@ public ReadContext singleUse(TimestampBound bound) {
218224

219225
@Override
220226
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
221-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
227+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
222228
try (IScope s = tracer.withSpan(span)) {
223229
return getMultiplexedSession().singleUseReadOnlyTransaction();
224230
} catch (RuntimeException e) {
@@ -230,7 +236,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
230236

231237
@Override
232238
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
233-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
239+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
234240
try (IScope s = tracer.withSpan(span)) {
235241
return getMultiplexedSession().singleUseReadOnlyTransaction(bound);
236242
} catch (RuntimeException e) {
@@ -242,7 +248,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
242248

243249
@Override
244250
public ReadOnlyTransaction readOnlyTransaction() {
245-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
251+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
246252
try (IScope s = tracer.withSpan(span)) {
247253
return getMultiplexedSession().readOnlyTransaction();
248254
} catch (RuntimeException e) {
@@ -254,7 +260,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
254260

255261
@Override
256262
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
257-
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION);
263+
ISpan span = tracer.spanBuilder(READ_ONLY_TRANSACTION, commonAttributes);
258264
try (IScope s = tracer.withSpan(span)) {
259265
return getMultiplexedSession().readOnlyTransaction(bound);
260266
} catch (RuntimeException e) {
@@ -266,7 +272,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
266272

267273
@Override
268274
public TransactionRunner readWriteTransaction(TransactionOption... options) {
269-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
275+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
270276
try (IScope s = tracer.withSpan(span)) {
271277
return getMultiplexedSessionForRW().readWriteTransaction(options);
272278
} catch (RuntimeException e) {
@@ -278,7 +284,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
278284

279285
@Override
280286
public TransactionManager transactionManager(TransactionOption... options) {
281-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
287+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
282288
try (IScope s = tracer.withSpan(span)) {
283289
return getMultiplexedSessionForRW().transactionManager(options);
284290
} catch (RuntimeException e) {
@@ -290,7 +296,7 @@ public TransactionManager transactionManager(TransactionOption... options) {
290296

291297
@Override
292298
public AsyncRunner runAsync(TransactionOption... options) {
293-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
299+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
294300
try (IScope s = tracer.withSpan(span)) {
295301
return getMultiplexedSessionForRW().runAsync(options);
296302
} catch (RuntimeException e) {
@@ -302,7 +308,7 @@ public AsyncRunner runAsync(TransactionOption... options) {
302308

303309
@Override
304310
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
305-
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
311+
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
306312
try (IScope s = tracer.withSpan(span)) {
307313
return getMultiplexedSessionForRW().transactionManagerAsync(options);
308314
} catch (RuntimeException e) {
@@ -322,7 +328,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
322328

323329
private long executePartitionedUpdateWithPooledSession(
324330
final Statement stmt, final UpdateOption... options) {
325-
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
331+
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
326332
try (IScope s = tracer.withSpan(span)) {
327333
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
328334
} catch (RuntimeException e) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,15 @@ public void onSessionReady(SessionImpl session) {
253253
// initiate a begin transaction request to verify if read-write transactions are
254254
// supported using multiplexed sessions.
255255
if (sessionClient
256-
.getSpanner()
257-
.getOptions()
258-
.getSessionPoolOptions()
259-
.getUseMultiplexedSessionForRW()) {
256+
.getSpanner()
257+
.getOptions()
258+
.getSessionPoolOptions()
259+
.getUseMultiplexedSessionForRW()
260+
&& !sessionClient
261+
.getSpanner()
262+
.getOptions()
263+
.getSessionPoolOptions()
264+
.getSkipVerifyBeginTransactionForMuxRW()) {
260265
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
261266
}
262267
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.common.base.Preconditions;
2626
import com.google.common.collect.ImmutableMap;
2727
import com.google.common.collect.Maps;
28+
import io.opentelemetry.api.common.Attributes;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.List;
@@ -125,7 +126,8 @@ private BatchCreateSessionsRunnable(
125126
public void run() {
126127
List<SessionImpl> sessions;
127128
int remainingSessionsToCreate = sessionCount;
128-
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS);
129+
ISpan span =
130+
spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
129131
try (IScope s = spanner.getTracer().withSpan(span)) {
130132
spanner
131133
.getTracer()
@@ -170,6 +172,7 @@ interface SessionConsumer {
170172
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
171173
private final ScheduledExecutorService executor;
172174
private final DatabaseId db;
175+
private final Attributes commonAttributes;
173176

174177
@GuardedBy("this")
175178
private volatile long sessionChannelCounter;
@@ -182,6 +185,7 @@ interface SessionConsumer {
182185
this.db = db;
183186
this.executorFactory = executorFactory;
184187
this.executor = executorFactory.get();
188+
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
185189
}
186190

187191
@Override
@@ -205,7 +209,7 @@ SessionImpl createSession() {
205209
synchronized (this) {
206210
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
207211
}
208-
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION);
212+
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
209213
try (IScope s = spanner.getTracer().withSpan(span)) {
210214
com.google.spanner.v1.Session session =
211215
spanner
@@ -250,7 +254,10 @@ void createMultiplexedSession(SessionConsumer consumer) {
250254
* GRPC channel. In case of an error during the gRPC calls, an exception will be thrown.
251255
*/
252256
SessionImpl createMultiplexedSession() {
253-
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
257+
ISpan span =
258+
spanner
259+
.getTracer()
260+
.spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
254261
try (IScope s = spanner.getTracer().withSpan(span)) {
255262
com.google.spanner.v1.Session session =
256263
spanner

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void onSuccess(AsyncTransactionManagerImpl result) {
107107
new ApiFutureCallback<Void>() {
108108
@Override
109109
public void onFailure(Throwable t) {
110+
session.close();
110111
res.setException(t);
111112
}
112113

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class SessionPoolOptions {
8383

8484
// TODO: Change to use java.time.Duration.
8585
private final Duration multiplexedSessionMaintenanceDuration;
86+
private final boolean skipVerifyingBeginTransactionForMuxRW;
8687

8788
private SessionPoolOptions(Builder builder) {
8889
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
@@ -132,6 +133,7 @@ private SessionPoolOptions(Builder builder) {
132133
? useMultiplexedSessionFromEnvVariablePartitionedOps
133134
: builder.useMultiplexedSessionPartitionedOps;
134135
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
136+
this.skipVerifyingBeginTransactionForMuxRW = builder.skipVerifyingBeginTransactionForMuxRW;
135137
}
136138

137139
@Override
@@ -169,8 +171,10 @@ public boolean equals(Object o) {
169171
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
170172
&& Objects.equals(this.useMultiplexedSessionForRW, other.useMultiplexedSessionForRW)
171173
&& Objects.equals(
172-
this.multiplexedSessionMaintenanceDuration,
173-
other.multiplexedSessionMaintenanceDuration);
174+
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
175+
&& Objects.equals(
176+
this.skipVerifyingBeginTransactionForMuxRW,
177+
other.skipVerifyingBeginTransactionForMuxRW);
174178
}
175179

176180
@Override
@@ -199,7 +203,8 @@ public int hashCode() {
199203
this.poolMaintainerClock,
200204
this.useMultiplexedSession,
201205
this.useMultiplexedSessionForRW,
202-
this.multiplexedSessionMaintenanceDuration);
206+
this.multiplexedSessionMaintenanceDuration,
207+
this.skipVerifyingBeginTransactionForMuxRW);
203208
}
204209

205210
public Builder toBuilder() {
@@ -392,6 +397,12 @@ Duration getMultiplexedSessionMaintenanceDuration() {
392397
return multiplexedSessionMaintenanceDuration;
393398
}
394399

400+
@VisibleForTesting
401+
@InternalApi
402+
boolean getSkipVerifyBeginTransactionForMuxRW() {
403+
return skipVerifyingBeginTransactionForMuxRW;
404+
}
405+
395406
public static Builder newBuilder() {
396407
return new Builder();
397408
}
@@ -607,6 +618,7 @@ public static class Builder {
607618

608619
private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
609620
private Clock poolMaintainerClock = Clock.INSTANCE;
621+
private boolean skipVerifyingBeginTransactionForMuxRW = false;
610622

611623
private static Position getReleaseToPositionFromSystemProperty() {
612624
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
@@ -650,6 +662,7 @@ private Builder(SessionPoolOptions options) {
650662
this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps;
651663
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
652664
this.poolMaintainerClock = options.poolMaintainerClock;
665+
this.skipVerifyingBeginTransactionForMuxRW = options.skipVerifyingBeginTransactionForMuxRW;
653666
}
654667

655668
/**
@@ -872,6 +885,18 @@ Builder setMultiplexedSessionMaintenanceDuration(
872885
return this;
873886
}
874887

888+
// The additional BeginTransaction RPC for multiplexed session read-write is causing
889+
// unexpected behavior in mock Spanner tests that rely on mocking the BeginTransaction RPC.
890+
// Invoking this method with `true` skips sending the BeginTransaction RPC when the multiplexed
891+
// session is created for the first time during client initialization.
892+
// This is only used for tests.
893+
@VisibleForTesting
894+
Builder setSkipVerifyingBeginTransactionForMuxRW(
895+
boolean skipVerifyingBeginTransactionForMuxRW) {
896+
this.skipVerifyingBeginTransactionForMuxRW = skipVerifyingBeginTransactionForMuxRW;
897+
return this;
898+
}
899+
875900
/**
876901
* Sets whether the client should automatically execute a background query to detect the dialect
877902
* that is used by the database or not. Set this option to true if you do not know what the

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,8 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
315315
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
316316
multiplexedSessionDatabaseClient,
317317
getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(),
318-
useMultiplexedSessionForRW);
318+
useMultiplexedSessionForRW,
319+
this.tracer.createCommonAttributes(db));
319320
dbClients.put(db, dbClient);
320321
return dbClient;
321322
}
@@ -329,15 +330,17 @@ DatabaseClientImpl createDatabaseClient(
329330
boolean useMultiplexedSessionBlindWrite,
330331
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient,
331332
boolean useMultiplexedSessionPartitionedOps,
332-
boolean useMultiplexedSessionForRW) {
333+
boolean useMultiplexedSessionForRW,
334+
Attributes commonAttributes) {
333335
return new DatabaseClientImpl(
334336
clientId,
335337
pool,
336338
useMultiplexedSessionBlindWrite,
337339
multiplexedSessionClient,
338340
useMultiplexedSessionPartitionedOps,
339341
tracer,
340-
useMultiplexedSessionForRW);
342+
useMultiplexedSessionForRW,
343+
commonAttributes);
341344
}
342345

343346
@Override

0 commit comments

Comments
 (0)