From 72fa8b9f9ead9ca692a53541d4114fdcd70617a3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 10 Apr 2025 22:36:10 +0300 Subject: [PATCH 01/22] chore(x-goog-spanner-request-id): plumb for BatchCreateSessions This change plumbs x-goog-spanner-request-id into BatchCreateSessions and asserts that the header is present for that method. Updates #3537 --- .../cloud/spanner/DatabaseClientImpl.java | 73 ++++++++- .../com/google/cloud/spanner/Options.java | 48 +++++- .../google/cloud/spanner/SessionClient.java | 43 +++-- .../com/google/cloud/spanner/SessionImpl.java | 62 +++++++- .../com/google/cloud/spanner/SessionPool.java | 4 + .../cloud/spanner/XGoogSpannerRequestId.java | 88 +++++++++++ .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 25 ++- .../cloud/spanner/spi/v1/SpannerRpc.java | 3 +- .../cloud/spanner/DatabaseClientImplTest.java | 45 +++++- .../com/google/cloud/spanner/OptionsTest.java | 38 +++++ .../cloud/spanner/SessionClientTests.java | 11 +- .../google/cloud/spanner/SessionImplTest.java | 1 + .../spanner/XGoogSpannerRequestIdTest.java | 147 +++++++++++++++++- 13 files changed, 555 insertions(+), 33 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 624aba7547c..1ed48499009 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -24,14 +24,18 @@ import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.cloud.spanner.Statement.StatementFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.BatchWriteResponse; import io.opentelemetry.api.common.Attributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import javax.annotation.Nullable; class DatabaseClientImpl implements DatabaseClient { @@ -45,6 +49,8 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; + private final int dbId; + private final AtomicInteger nthRequest; final boolean useMultiplexedSessionBlindWrite; @@ -91,6 +97,18 @@ class DatabaseClientImpl implements DatabaseClient { this.tracer = tracer; this.useMultiplexedSessionForRW = useMultiplexedSessionForRW; this.commonAttributes = commonAttributes; + + this.dbId = this.dbIdFromClientId(this.clientId); + this.nthRequest = new AtomicInteger(0); + } + + private int dbIdFromClientId(String clientId) { + int i = clientId.indexOf("-"); + String strWithValue = clientId.substring(i + 1); + if (Objects.equals(strWithValue, "")) { + strWithValue = "0"; + } + return Integer.parseInt(strWithValue); } @VisibleForTesting @@ -188,7 +206,11 @@ public CommitResponse writeWithOptions( if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options); } - return runWithSessionRetry(session -> session.writeWithOptions(mutations, options)); + + return runWithSessionRetry( + (session, reqId) -> { + return session.writeWithOptions(mutations, withReqId(reqId, options)); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -213,7 +235,8 @@ public CommitResponse writeAtLeastOnceWithOptions( .writeAtLeastOnceWithOptions(mutations, options); } return runWithSessionRetry( - session -> session.writeAtLeastOnceWithOptions(mutations, options)); + (session, reqId) -> + session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options))); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -222,6 +245,10 @@ public CommitResponse writeAtLeastOnceWithOptions( } } + private int nextNthRequest() { + return this.nthRequest.incrementAndGet(); + } + @Override public ServerStream batchWriteAtLeastOnce( final Iterable mutationGroups, final TransactionOption... options) @@ -231,7 +258,9 @@ public ServerStream batchWriteAtLeastOnce( if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) { return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options); } - return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options)); + return runWithSessionRetry( + (session, reqId) -> + session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options))); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -383,11 +412,34 @@ private Future getDialectAsync() { return pool.getDialectAsync(); } + private UpdateOption[] withReqId( + final XGoogSpannerRequestId reqId, final UpdateOption... options) { + if (reqId == null) { + return options; + } + ArrayList allOptions = new ArrayList(Arrays.asList(options)); + allOptions.add(new Options.RequestIdOption(reqId)); + return allOptions.toArray(new UpdateOption[0]); + } + + private TransactionOption[] withReqId( + final XGoogSpannerRequestId reqId, final TransactionOption... options) { + if (reqId == null) { + return options; + } + ArrayList allOptions = new ArrayList(Arrays.asList(options)); + allOptions.add(new Options.RequestIdOption(reqId)); + return allOptions.toArray(new TransactionOption[0]); + } + private long executePartitionedUpdateWithPooledSession( final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes); try (IScope s = tracer.withSpan(span)) { - return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); + return runWithSessionRetry( + (session, reqId) -> { + return session.executePartitionedUpdate(stmt, withReqId(reqId, options)); + }); } catch (RuntimeException e) { span.setStatus(e); span.end(); @@ -395,15 +447,22 @@ private long executePartitionedUpdateWithPooledSession( } } - private T runWithSessionRetry(Function callable) { + private T runWithSessionRetry(BiFunction callable) { PooledSessionFuture session = getSession(); + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of( + this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0); while (true) { try { - return callable.apply(session); + reqId.incrementAttempt(); + return callable.apply(session, reqId); } catch (SessionNotFoundException e) { session = (PooledSessionFuture) pool.getPooledSessionReplacementHandler().replaceSession(e, session); + reqId = + XGoogSpannerRequestId.of( + this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0); } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index c36f1902648..0b9556084ef 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -177,6 +177,10 @@ public static UpdateTransactionOption excludeTxnFromChangeStreams() { return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION; } + public static RequestIdOption requestId(XGoogSpannerRequestId reqId) { + return new RequestIdOption(reqId); + } + /** * Specifying this will cause the read to yield at most this many rows. This should be greater * than 0. @@ -535,6 +539,7 @@ void appendToOptions(Options options) { private RpcLockHint lockHint; private Boolean lastStatement; private IsolationLevel isolationLevel; + private XGoogSpannerRequestId reqId; // Construction is via factory methods below. private Options() {} @@ -599,6 +604,14 @@ String filter() { return filter; } + boolean hasReqId() { + return reqId != null; + } + + XGoogSpannerRequestId reqId() { + return reqId; + } + boolean hasPriority() { return priority != null; } @@ -756,6 +769,9 @@ public String toString() { if (isolationLevel != null) { b.append("isolationLevel: ").append(isolationLevel).append(' '); } + if (reqId != null) { + b.append("requestId: ").append(reqId.toString()); + } return b.toString(); } @@ -798,7 +814,8 @@ public boolean equals(Object o) { && Objects.equals(orderBy(), that.orderBy()) && Objects.equals(isLastStatement(), that.isLastStatement()) && Objects.equals(lockHint(), that.lockHint()) - && Objects.equals(isolationLevel(), that.isolationLevel()); + && Objects.equals(isolationLevel(), that.isolationLevel()) + && Objects.equals(reqId(), that.reqId()); } @Override @@ -867,6 +884,9 @@ public int hashCode() { if (isolationLevel != null) { result = 31 * result + isolationLevel.hashCode(); } + if (reqId != null) { + result = 31 * result + reqId.hashCode(); + } return result; } @@ -1052,4 +1072,30 @@ public boolean equals(Object o) { return o instanceof LastStatementUpdateOption; } } + + static final class RequestIdOption extends InternalOption + implements ReadOption, TransactionOption, UpdateOption { + private final XGoogSpannerRequestId reqId; + + RequestIdOption(XGoogSpannerRequestId reqId) { + this.reqId = reqId; + } + + @Override + void appendToOptions(Options options) { + options.reqId = this.reqId; + } + + @Override + public int hashCode() { + return RequestIdOption.class.hashCode(); + } + + @Override + public boolean equals(Object o) { + // TODO: Examine why the precedent for LastStatementUpdateOption + // does not check against the actual value. + return o instanceof RequestIdOption; + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 2edfb66d896..405c5f86812 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -31,10 +31,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.concurrent.GuardedBy; /** Client for creating single sessions and batches of sessions. */ -class SessionClient implements AutoCloseable { +class SessionClient implements AutoCloseable, XGoogSpannerRequestId.RequestIdCreator { static class SessionId { private static final PathTemplate NAME_TEMPLATE = PathTemplate.create( @@ -174,6 +175,12 @@ interface SessionConsumer { private final DatabaseId db; private final Attributes commonAttributes; + // SessionClient is created long before a DatabaseClientImpl is created, + // as batch sessions are firstly created then later attached to each Client. + private static AtomicInteger NTH_ID = new AtomicInteger(0); + private final int nthId; + private final AtomicInteger nthRequest; + @GuardedBy("this") private volatile long sessionChannelCounter; @@ -186,6 +193,8 @@ interface SessionConsumer { this.executorFactory = executorFactory; this.executor = executorFactory.get(); this.commonAttributes = spanner.getTracer().createCommonAttributes(db); + this.nthId = SessionClient.NTH_ID.incrementAndGet(); + this.nthRequest = new AtomicInteger(0); } @Override @@ -201,16 +210,24 @@ DatabaseId getDatabaseId() { return db; } + @Override + public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { + return XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1); + } + /** Create a single session. */ SessionImpl createSession() { // The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE, // which is also a valid channel hint. final Map options; + final long channelId; synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); + channelId = sessionChannelCounter; } ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { + XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1); com.google.spanner.v1.Session session = spanner .getRpc() @@ -218,11 +235,13 @@ SessionImpl createSession() { db.getName(), spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - options); + reqId.withOptions(options)); SessionReference sessionReference = new SessionReference( session.getName(), session.getCreateTime(), session.getMultiplexed(), options); - return new SessionImpl(spanner, sessionReference); + SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference); + sessionImpl.setRequestIdCreator(this); + return sessionImpl; } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -273,6 +292,7 @@ SessionImpl createMultiplexedSession() { spanner, new SessionReference( session.getName(), session.getCreateTime(), session.getMultiplexed(), null)); + sessionImpl.setRequestIdCreator(this); span.addAnnotation( String.format("Request for %d multiplexed session returned %d session", 1, 1)); return sessionImpl; @@ -387,6 +407,8 @@ private List internalBatchCreateSessions( .spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent); span.addAnnotation(String.format("Requesting %d sessions", sessionCount)); try (IScope s = spanner.getTracer().withSpan(span)) { + XGoogSpannerRequestId reqId = + XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1); List sessions = spanner .getRpc() @@ -395,21 +417,20 @@ private List internalBatchCreateSessions( sessionCount, spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - options); + reqId.withOptions(options)); span.addAnnotation( String.format( "Request for %d sessions returned %d sessions", sessionCount, sessions.size())); span.end(); List res = new ArrayList<>(sessionCount); for (com.google.spanner.v1.Session session : sessions) { - res.add( + SessionImpl sessionImpl = new SessionImpl( spanner, new SessionReference( - session.getName(), - session.getCreateTime(), - session.getMultiplexed(), - options))); + session.getName(), session.getCreateTime(), session.getMultiplexed(), options)); + sessionImpl.setRequestIdCreator(this); + res.add(sessionImpl); } return res; } catch (RuntimeException e) { @@ -425,6 +446,8 @@ SessionImpl sessionWithId(String name) { synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); } - return new SessionImpl(spanner, new SessionReference(name, options)); + SessionImpl sessionImpl = new SessionImpl(spanner, new SessionReference(name, options)); + sessionImpl.setRequestIdCreator(this); + return sessionImpl; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 6b6113d41da..0f95f362604 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -126,18 +126,31 @@ interface SessionTransaction { private final Clock clock; private final Map options; private final ErrorHandler errorHandler; + private XGoogSpannerRequestId.RequestIdCreator requestIdCreator; SessionImpl(SpannerImpl spanner, SessionReference sessionReference) { this(spanner, sessionReference, NO_CHANNEL_HINT); } SessionImpl(SpannerImpl spanner, SessionReference sessionReference, int channelHint) { + this(spanner, sessionReference, channelHint, new XGoogSpannerRequestId.NoopRequestIdCreator()); + } + + SessionImpl( + SpannerImpl spanner, + SessionReference sessionReference, + int channelHint, + XGoogSpannerRequestId.RequestIdCreator requestIdCreator) { this.spanner = spanner; this.tracer = spanner.getTracer(); this.sessionReference = sessionReference; this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); this.options = createOptions(sessionReference, channelHint); this.errorHandler = createErrorHandler(spanner.getOptions()); + this.requestIdCreator = requestIdCreator; + if (this.requestIdCreator == null) { + this.requestIdCreator = new XGoogSpannerRequestId.NoopRequestIdCreator(); + } } static Map createOptions( @@ -287,9 +300,16 @@ public CommitResponse writeAtLeastOnceWithOptions( } CommitRequest request = requestBuilder.build(); ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT); + final XGoogSpannerRequestId reqId = reqIdOrFresh(options); + try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( - () -> new CommitResponse(spanner.getRpc().commit(request, getOptions()))); + () -> { + // TODO: Detect an abort and then refresh the reqId. + reqId.incrementAttempt(); + return new CommitResponse( + spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); + }); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -298,6 +318,14 @@ public CommitResponse writeAtLeastOnceWithOptions( } } + private XGoogSpannerRequestId reqIdOrFresh(Options options) { + XGoogSpannerRequestId reqId = options.reqId(); + if (reqId == null) { + reqId = this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0); + } + return reqId; + } + private RequestOptions getRequestOptions(TransactionOption... transactionOptions) { Options requestOptions = Options.fromTransactionOptions(transactionOptions); if (requestOptions.hasPriority() || requestOptions.hasTag()) { @@ -325,16 +353,19 @@ public ServerStream batchWriteAtLeastOnce( .setSession(getName()) .addAllMutationGroups(mutationGroupsProto); RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions); + Options allOptions = Options.fromTransactionOptions(transactionOptions); + final XGoogSpannerRequestId reqId = reqIdOrFresh(allOptions); if (batchWriteRequestOptions != null) { requestBuilder.setRequestOptions(batchWriteRequestOptions); } - if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams() - == Boolean.TRUE) { + if (allOptions.withExcludeTxnFromChangeStreams() == Boolean.TRUE) { requestBuilder.setExcludeTxnFromChangeStreams(true); } ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE); try (IScope s = tracer.withSpan(span)) { - return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), getOptions()); + return spanner + .getRpc() + .batchWriteAtLeastOnce(requestBuilder.build(), reqId.withOptions(getOptions())); } catch (Throwable e) { span.setStatus(e); throw SpannerExceptionFactory.newSpannerException(e); @@ -435,14 +466,18 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... @Override public ApiFuture asyncClose() { - return spanner.getRpc().asyncDeleteSession(getName(), getOptions()); + XGoogSpannerRequestId reqId = + this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0); + return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions())); } @Override public void close() { ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION); try (IScope s = tracer.withSpan(span)) { - spanner.getRpc().deleteSession(getName(), getOptions()); + XGoogSpannerRequestId reqId = + this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0); + spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions())); } catch (RuntimeException e) { span.setStatus(e); throw e; @@ -472,8 +507,13 @@ ApiFuture beginTransactionAsync( } final BeginTransactionRequest request = requestBuilder.build(); final ApiFuture requestFuture; + XGoogSpannerRequestId reqId = + this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 1); try (IScope ignore = tracer.withSpan(span)) { - requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader); + requestFuture = + spanner + .getRpc() + .beginTransactionAsync(request, reqId.withOptions(channelHint), routeToLeader); } requestFuture.addListener( () -> { @@ -551,4 +591,12 @@ void onTransactionDone() {} TraceWrapper getTracer() { return tracer; } + + public void setRequestIdCreator(XGoogSpannerRequestId.RequestIdCreator creator) { + this.requestIdCreator = creator; + } + + public XGoogSpannerRequestId.RequestIdCreator getRequestIdCreator() { + return this.requestIdCreator; + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index fa4e1d03d0c..42a67a66296 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1585,6 +1585,10 @@ PooledSession get(final boolean eligibleForLongRunning) { throw SpannerExceptionFactory.propagateInterrupt(e); } } + + public int getChannel() { + return get().getChannel(); + } } interface CachedSession extends Session { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 4f6c0114750..325aace2d2c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -17,10 +17,19 @@ package com.google.cloud.spanner; import com.google.api.core.InternalApi; +import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import io.grpc.Metadata; import java.math.BigInteger; import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.regex.MatchResult; +import java.util.regex.Matcher; +import java.util.regex.Pattern; @InternalApi public class XGoogSpannerRequestId { @@ -28,6 +37,9 @@ public class XGoogSpannerRequestId { @VisibleForTesting static final String RAND_PROCESS_ID = XGoogSpannerRequestId.generateRandProcessId(); + public static final Metadata.Key REQUEST_HEADER_KEY = + Metadata.Key.of("x-goog-spanner-request-id", Metadata.ASCII_STRING_MARSHALLER); + @VisibleForTesting static final long VERSION = 1; // The version of the specification being implemented. @@ -48,6 +60,26 @@ public static XGoogSpannerRequestId of( return new XGoogSpannerRequestId(nthClientId, nthChannelId, nthRequest, attempt); } + @VisibleForTesting + static final Pattern REGEX = + Pattern.compile("^(\\d)\\.([0-9a-z]{16})\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"); + + public static XGoogSpannerRequestId of(String s) { + Matcher m = XGoogSpannerRequestId.REGEX.matcher(s); + if (!m.matches()) { + throw new IllegalStateException( + s + " does not match " + XGoogSpannerRequestId.REGEX.pattern()); + } + + MatchResult mr = m.toMatchResult(); + + return new XGoogSpannerRequestId( + Long.parseLong(mr.group(3)), + Long.parseLong(mr.group(4)), + Long.parseLong(mr.group(5)), + Long.parseLong(mr.group(6))); + } + private static String generateRandProcessId() { // Expecting to use 64-bits of randomness to avoid clashes. BigInteger bigInt = new BigInteger(64, new SecureRandom()); @@ -66,6 +98,13 @@ public String toString() { this.attempt); } + private boolean isGreaterThan(XGoogSpannerRequestId other) { + return this.nthClientId > other.nthClientId + && this.nthChannelId > other.nthChannelId + && this.nthRequest > other.nthRequest + && this.attempt > other.attempt; + } + @Override public boolean equals(Object other) { // instanceof for a null object returns false. @@ -81,8 +120,57 @@ public boolean equals(Object other) { && Objects.equals(this.attempt, otherReqId.attempt); } + public void incrementAttempt() { + this.attempt++; + } + + @SuppressWarnings("unchecked") + public Map withOptions(Map options) { + Map copyOptions = new HashMap<>(); + if (options != null) { + copyOptions.putAll(options); + } + copyOptions.put(SpannerRpc.Option.REQUEST_ID, this); + return copyOptions; + } + @Override public int hashCode() { return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt); } + + public interface RequestIdCreator { + XGoogSpannerRequestId nextRequestId(long channelId, int attempt); + } + + public static class NoopRequestIdCreator implements RequestIdCreator { + NoopRequestIdCreator() {} + + @Override + public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { + return XGoogSpannerRequestId.of(1, 1, 1, 0); + } + } + + public static void assertMonotonicityOfIds(String prefix, List reqIds) { + int size = reqIds.size(); + + List violations = new ArrayList<>(); + for (int i = 1; i < size; i++) { + XGoogSpannerRequestId prev = reqIds.get(i - 1); + XGoogSpannerRequestId curr = reqIds.get(i); + if (prev.isGreaterThan(curr)) { + violations.add(String.format("#%d(%s) > #%d(%s)", i - 1, prev, i, curr)); + } + } + + if (violations.size() == 0) { + return; + } + + throw new IllegalStateException( + prefix + + " monotonicity violation:" + + String.join("\n\t", violations.toArray(new String[0]))); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index c43bbe1f11b..5de3ed54dfd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -71,6 +71,7 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; +import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminCallableFactory; @@ -88,6 +89,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import com.google.common.util.concurrent.RateLimiter; @@ -193,6 +195,7 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -407,6 +410,8 @@ public GapicSpannerRpc(final SpannerOptions options) { final String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); try { + // TODO: make our retry settings to inject and increment + // XGoogSpannerRequestId whenever a retry occurs. SpannerStubSettings spannerStubSettings = options.getSpannerStubSettings().toBuilder() .setTransportChannelProvider(channelProvider) @@ -2042,8 +2047,13 @@ GrpcCallContext newCallContext( GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); } else { // Set channel affinity in GAX. - context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + Long affinity = Option.CHANNEL_HINT.getLong(options); + if (affinity != null) { + context = context.withChannelAffinity(affinity.intValue()); + } } + String methodName = method.getFullMethodName(); + context = withRequestId(context, options, methodName); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); if (routeToLeader && leaderAwareRoutingEnabled) { @@ -2064,6 +2074,19 @@ GrpcCallContext newCallContext( return (GrpcCallContext) context.merge(apiCallContextFromContext); } + GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) { + XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID); + if (reqId == null) { + return context; + } + + Map> withReqId = + ImmutableMap.of( + XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(), + Collections.singletonList(reqId.toString())); + return context.withExtraHeaders(withReqId); + } + void registerResponseObserver(SpannerResponseObserver responseObserver) { responseObservers.add(responseObserver); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 73671b91ff7..4b5682bb2b0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -78,7 +78,8 @@ public interface SpannerRpc extends ServiceRpc { /** Options passed in {@link SpannerRpc} methods to control how an RPC is issued. */ enum Option { - CHANNEL_HINT("Channel Hint"); + CHANNEL_HINT("Channel Hint"), + REQUEST_ID("Request Id"); private final String value; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 6ceb3e979bf..ead2c0ba611 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -105,6 +105,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; +import io.grpc.ServerInterceptors; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessServerBuilder; @@ -119,6 +120,7 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; @@ -152,6 +154,7 @@ public class DatabaseClientImplTest { private static final String DATABASE_NAME = String.format( "projects/%s/instances/%s/databases/%s", TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE); + private static XGoogSpannerRequestIdTest.ServerHeaderEnforcer xGoogReqIdInterceptor; private static MockSpannerServiceImpl mockSpanner; private static Server server; private static LocalChannelProvider channelProvider; @@ -220,13 +223,31 @@ public static void startStaticServer() throws IOException { StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET)); mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES); + Set checkMethods = + new HashSet( + Arrays.asList( + "google.spanner.v1.Spanner/BatchCreateSessions" + // As functionality is added, uncomment each method. + // "google.spanner.v1.Spanner/BatchWrite", + // "google.spanner.v1.Spanner/BeginTransaction", + // "google.spanner.v1.Spanner/CreateSession", + // "google.spanner.v1.Spanner/DeleteSession", + // "google.spanner.v1.Spanner/ExecuteBatchDml", + // "google.spanner.v1.Spanner/ExecuteSql", + // "google.spanner.v1.Spanner/ExecuteStreamingSql", + // "google.spanner.v1.Spanner/StreamingRead", + // "google.spanner.v1.Spanner/PartitionQuery", + // "google.spanner.v1.Spanner/PartitionRead", + // "google.spanner.v1.Spanner/Commit", + )); + xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(checkMethods); executor = Executors.newSingleThreadExecutor(); String uniqueName = InProcessServerBuilder.generateName(); server = InProcessServerBuilder.forName(uniqueName) // We need to use a real executor for timeouts to occur. .scheduledExecutorService(new ScheduledThreadPoolExecutor(1)) - .addService(mockSpanner) + .addService(ServerInterceptors.intercept(mockSpanner, xGoogReqIdInterceptor)) .build() .start(); channelProvider = LocalChannelProvider.create(uniqueName); @@ -264,6 +285,7 @@ public void tearDown() { spanner.close(); spannerWithEmptySessionPool.close(); mockSpanner.reset(); + xGoogReqIdInterceptor.reset(); mockSpanner.removeAllExecutionTimes(); } @@ -1391,6 +1413,7 @@ public void testWriteAtLeastOnceAborted() { List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); assertEquals(2, commitRequests.size()); + xGoogReqIdInterceptor.assertIntegrity(); } @Test @@ -5197,6 +5220,26 @@ public void testRetryOnResourceExhausted() { } } + @Test + public void testSelectHasXGoogRequestIdHeader() { + Statement statement = + Statement.newBuilder("select id from test where b=@p1") + .bind("p1") + .toBytesArray( + Arrays.asList(ByteArray.copyFrom("test1"), null, ByteArray.copyFrom("test2"))) + .build(); + mockSpanner.putStatementResult(StatementResult.query(statement, SELECT1_RESULTSET)); + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = client.singleUse().executeQuery(statement)) { + assertTrue(resultSet.next()); + assertEquals(1L, resultSet.getLong(0)); + assertFalse(resultSet.next()); + } finally { + xGoogReqIdInterceptor.assertIntegrity(); + } + } + @Test public void testSessionPoolExhaustedError_containsStackTraces() { assumeFalse( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index e2bcc92fedc..f8b5304a706 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -83,6 +83,7 @@ public void zeroPrefetchChunksNotAllowed() { @Test public void allOptionsPresent() { + XGoogSpannerRequestId reqId1 = XGoogSpannerRequestId.of(2, 3, 4, 5); Options options = Options.fromReadOptions( Options.limit(10), @@ -90,6 +91,7 @@ public void allOptionsPresent() { Options.dataBoostEnabled(true), Options.directedRead(DIRECTED_READ_OPTIONS), Options.orderBy(RpcOrderBy.NO_ORDER), + Options.requestId(reqId1), Options.lockHint(Options.RpcLockHint.SHARED)); assertThat(options.hasLimit()).isTrue(); assertThat(options.limit()).isEqualTo(10); @@ -101,6 +103,7 @@ public void allOptionsPresent() { assertTrue(options.hasOrderBy()); assertTrue(options.hasLockHint()); assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions()); + assertEquals(options.reqId(), reqId1); } @Test @@ -873,4 +876,39 @@ public void testOptions_WithMultipleDifferentIsolationLevels() { Options options = Options.fromTransactionOptions(transactionOptions); assertEquals(options.isolationLevel(), IsolationLevel.SERIALIZABLE); } + + @Test + public void testRequestId() { + XGoogSpannerRequestId reqId1 = XGoogSpannerRequestId.of(1, 2, 3, 4); + XGoogSpannerRequestId reqId2 = XGoogSpannerRequestId.of(2, 3, 4, 5); + Options option1 = Options.fromUpdateOptions(Options.requestId(reqId1)); + Options option1Prime = Options.fromUpdateOptions(Options.requestId(reqId1)); + Options option2 = Options.fromUpdateOptions(Options.requestId(reqId2)); + Options option3 = Options.fromUpdateOptions(); + + assertEquals(option1, option1Prime); + assertNotEquals(option1, option2); + assertEquals(option1.hashCode(), option1Prime.hashCode()); + assertNotEquals(option1, option2); + assertNotEquals(option1, option3); + assertNotEquals(option1.hashCode(), option3.hashCode()); + + assertTrue(option1.hasReqId()); + assertThat(option1.toString()).contains("requestId: " + reqId1.toString()); + + assertFalse(option3.hasReqId()); + assertThat(option3.toString()).doesNotContain("requestId"); + } + + @Test + public void testOptions_WithMultipleDifferentRequestIds() { + XGoogSpannerRequestId reqId1 = XGoogSpannerRequestId.of(1, 1, 1, 1); + XGoogSpannerRequestId reqId2 = XGoogSpannerRequestId.of(1, 1, 1, 2); + TransactionOption[] transactionOptions = { + Options.requestId(reqId1), Options.requestId(reqId2), + }; + Options options = Options.fromTransactionOptions(transactionOptions); + assertNotEquals(options.reqId(), reqId1); + assertEquals(options.reqId(), reqId2); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java index bcba430c521..f04d9678d11 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java @@ -153,8 +153,17 @@ public void createAndCloseSession() { assertThat(session.getName()).isEqualTo(sessionName); session.close(); + + final ArgumentCaptor> deleteOptionsCaptor = + ArgumentCaptor.forClass(Map.class); + final ArgumentCaptor sessionNameCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(rpc).deleteSession(sessionNameCaptor.capture(), deleteOptionsCaptor.capture()); + assertEquals(sessionName, sessionNameCaptor.getValue()); // The same channelHint is passed for deleteSession (contained in "options"). - Mockito.verify(rpc).deleteSession(sessionName, options.getValue()); + assertEquals( + deleteOptionsCaptor.getValue().get(SpannerRpc.Option.CHANNEL_HINT), + options.getValue().get(SpannerRpc.Option.CHANNEL_HINT)); + assertTrue(deleteOptionsCaptor.getValue().containsKey(SpannerRpc.Option.REQUEST_ID)); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index eed75bb67e1..e9096048d05 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -144,6 +144,7 @@ public void setUp() { when(rpc.getCommitRetrySettings()) .thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings()); session = spanner.getSessionClient(db).createSession(); + ((SessionImpl) session).setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); Span oTspan = mock(Span.class); ISpan span = new OpenTelemetrySpan(oTspan); when(oTspan.makeCurrent()).thenReturn(mock(Scope.class)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index 12c9213c7dc..847a4adf7ba 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -18,18 +18,29 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class XGoogSpannerRequestIdTest { - private static final Pattern REGEX_RAND_PROCESS_ID = - Pattern.compile("1.([0-9a-z]{16})(\\.\\d+){3}\\.(\\d+)$"); @Test public void testEquals() { @@ -48,7 +59,135 @@ public void testEquals() { @Test public void testEnsureHexadecimalFormatForRandProcessID() { String str = XGoogSpannerRequestId.of(1, 2, 3, 4).toString(); - Matcher m = XGoogSpannerRequestIdTest.REGEX_RAND_PROCESS_ID.matcher(str); + Matcher m = XGoogSpannerRequestId.REGEX.matcher(str); assertTrue(m.matches()); } + + public static class ServerHeaderEnforcer implements ServerInterceptor { + private Map> unaryResults; + private Map> streamingResults; + private List gotValues; + private Set checkMethods; + + ServerHeaderEnforcer(Set checkMethods) { + this.gotValues = new CopyOnWriteArrayList(); + this.unaryResults = + new ConcurrentHashMap>(); + this.streamingResults = + new ConcurrentHashMap>(); + this.checkMethods = checkMethods; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + final Metadata requestHeaders, + ServerCallHandler next) { + boolean isUnary = call.getMethodDescriptor().getType() == MethodType.UNARY; + String methodName = call.getMethodDescriptor().getFullMethodName(); + String gotReqIdStr = requestHeaders.get(XGoogSpannerRequestId.REQUEST_HEADER_KEY); + if (!this.checkMethods.contains(methodName)) { + return next.startCall(call, requestHeaders); + } + + Map> saver = this.streamingResults; + if (isUnary) { + saver = this.unaryResults; + } + + if (Objects.equals(gotReqIdStr, null) || Objects.equals(gotReqIdStr, "")) { + Status status = + Status.fromCode(Status.Code.INVALID_ARGUMENT) + .augmentDescription( + methodName + " lacks " + XGoogSpannerRequestId.REQUEST_HEADER_KEY); + call.close(status, requestHeaders); + return next.startCall(call, requestHeaders); + } + + assertNotNull(gotReqIdStr); + // Firstly assert and validate that at least we've got a requestId. + Matcher m = XGoogSpannerRequestId.REGEX.matcher(gotReqIdStr); + assertTrue(m.matches()); + + XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(gotReqIdStr); + if (!saver.containsKey(methodName)) { + saver.put(methodName, new CopyOnWriteArrayList()); + } + + saver.get(methodName).add(reqId); + + // Finally proceed with the call. + return next.startCall(call, requestHeaders); + } + + public String[] accumulatedValues() { + return this.gotValues.toArray(new String[0]); + } + + public void assertIntegrity() { + this.unaryResults.forEach( + (String method, CopyOnWriteArrayList values) -> { + XGoogSpannerRequestId.assertMonotonicityOfIds(method, values); + }); + this.streamingResults.forEach( + (String method, CopyOnWriteArrayList values) -> { + XGoogSpannerRequestId.assertMonotonicityOfIds(method, values); + }); + } + + public static class methodAndRequestId { + String method; + String requestId; + + public methodAndRequestId(String method, String requestId) { + this.method = method; + this.requestId = requestId; + } + + public String toString() { + return "{" + this.method + ":" + this.requestId + "}"; + } + } + + public methodAndRequestId[] accumulatedUnaryValues() { + List accumulated = new ArrayList(); + this.unaryResults.forEach( + (String method, CopyOnWriteArrayList values) -> { + for (int i = 0; i < values.size(); i++) { + accumulated.add(new methodAndRequestId(method, values.get(i).toString())); + } + }); + return accumulated.toArray(new methodAndRequestId[0]); + } + + public methodAndRequestId[] accumulatedStreamingValues() { + List accumulated = new ArrayList(); + this.streamingResults.forEach( + (String method, CopyOnWriteArrayList values) -> { + for (int i = 0; i < values.size(); i++) { + accumulated.add(new methodAndRequestId(method, values.get(i).toString())); + } + }); + return accumulated.toArray(new methodAndRequestId[0]); + } + + public void printAccumulatedValues() { + methodAndRequestId[] unary = this.accumulatedUnaryValues(); + System.out.println("accumulatedUnaryvalues"); + for (int i = 0; i < unary.length; i++) { + System.out.println("\t" + unary[i].toString()); + } + methodAndRequestId[] streaming = this.accumulatedStreamingValues(); + System.out.println("accumulatedStreaminvalues"); + for (int i = 0; i < streaming.length; i++) { + System.out.println("\t" + streaming[i].toString()); + } + } + + public void reset() { + this.gotValues.clear(); + this.unaryResults.clear(); + this.streamingResults.clear(); + } + } } From 882964c8b37402b75ea6bf34e26e3ddd12f8182e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 5 May 2025 18:21:22 +0200 Subject: [PATCH 02/22] test: fix failing test cases --- .../google/cloud/spanner/SessionClient.java | 28 +++++++++++++------ .../cloud/spanner/XGoogSpannerRequestId.java | 11 ++++---- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 11 +++++--- .../google/cloud/spanner/SessionImplTest.java | 18 ++++++------ .../google/cloud/spanner/SessionPoolTest.java | 13 +++++---- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 405c5f86812..1dba4b92ffe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -22,6 +22,7 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -109,6 +110,13 @@ Object value() { return ImmutableMap.copyOf(tmp); } + static Map createRequestOptions( + long channelId, XGoogSpannerRequestId requestId) { + return ImmutableMap.of( + Option.CHANNEL_HINT, channelId, + Option.REQUEST_ID, requestId); + } + private final class BatchCreateSessionsRunnable implements Runnable { private final long channelHint; private final int sessionCount; @@ -219,15 +227,14 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { SessionImpl createSession() { // The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE, // which is also a valid channel hint. - final Map options; final long channelId; synchronized (this) { - options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); channelId = sessionChannelCounter; + sessionChannelCounter++; } + XGoogSpannerRequestId reqId = nextRequestId(channelId, 1); ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { - XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1); com.google.spanner.v1.Session session = spanner .getRpc() @@ -235,10 +242,13 @@ SessionImpl createSession() { db.getName(), spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - reqId.withOptions(options)); + createRequestOptions(channelId, reqId)); SessionReference sessionReference = new SessionReference( - session.getName(), session.getCreateTime(), session.getMultiplexed(), options); + session.getName(), + session.getCreateTime(), + session.getMultiplexed(), + optionMap(SessionOption.channelHint(channelId))); SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference); sessionImpl.setRequestIdCreator(this); return sessionImpl; @@ -399,7 +409,6 @@ void asyncBatchCreateSessions( */ private List internalBatchCreateSessions( final int sessionCount, final long channelHint) throws SpannerException { - final Map options = optionMap(SessionOption.channelHint(channelHint)); ISpan parent = spanner.getTracer().getCurrentSpan(); ISpan span = spanner @@ -417,7 +426,7 @@ private List internalBatchCreateSessions( sessionCount, spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - reqId.withOptions(options)); + createRequestOptions(channelHint, reqId)); span.addAnnotation( String.format( "Request for %d sessions returned %d sessions", sessionCount, sessions.size())); @@ -428,7 +437,10 @@ private List internalBatchCreateSessions( new SessionImpl( spanner, new SessionReference( - session.getName(), session.getCreateTime(), session.getMultiplexed(), options)); + session.getName(), + session.getCreateTime(), + session.getMultiplexed(), + optionMap(SessionOption.channelHint(channelHint)))); sessionImpl.setRequestIdCreator(this); res.add(sessionImpl); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 325aace2d2c..49a0c7eb3d2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -124,8 +124,7 @@ public void incrementAttempt() { this.attempt++; } - @SuppressWarnings("unchecked") - public Map withOptions(Map options) { + Map withOptions(Map options) { Map copyOptions = new HashMap<>(); if (options != null) { copyOptions.putAll(options); @@ -139,11 +138,11 @@ public int hashCode() { return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt); } - public interface RequestIdCreator { + interface RequestIdCreator { XGoogSpannerRequestId nextRequestId(long channelId, int attempt); } - public static class NoopRequestIdCreator implements RequestIdCreator { + static class NoopRequestIdCreator implements RequestIdCreator { NoopRequestIdCreator() {} @Override @@ -152,7 +151,7 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { } } - public static void assertMonotonicityOfIds(String prefix, List reqIds) { + static void assertMonotonicityOfIds(String prefix, List reqIds) { int size = reqIds.size(); List violations = new ArrayList<>(); @@ -164,7 +163,7 @@ public static void assertMonotonicityOfIds(String prefix, List labels, @Nullable Map options) throws SpannerException { - // By default sessions are not multiplexed + // By default, sessions are not multiplexed return createSession(databaseName, databaseRole, labels, options, false); } @@ -2052,8 +2052,10 @@ GrpcCallContext newCallContext( context = context.withChannelAffinity(affinity.intValue()); } } - String methodName = method.getFullMethodName(); - context = withRequestId(context, options, methodName); + if (method != null) { + String methodName = method.getFullMethodName(); + context = withRequestId(context, options, methodName); + } } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); if (routeToLeader && leaderAwareRoutingEnabled) { @@ -2074,7 +2076,8 @@ GrpcCallContext newCallContext( return (GrpcCallContext) context.merge(apiCallContextFromContext); } - GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) { + GrpcCallContext withRequestId( + GrpcCallContext context, Map options, String methodName) { XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID); if (reqId == null) { return context; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index e9096048d05..20d0effd76d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -223,7 +224,7 @@ public void writeAtLeastOnce() throws ParseException { ArgumentCaptor commit = ArgumentCaptor.forClass(CommitRequest.class); CommitResponse response = CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build(); - Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response); + Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response); Timestamp timestamp = session.writeAtLeastOnce( @@ -255,7 +256,7 @@ public void writeAtLeastOnceWithOptions() throws ParseException { ArgumentCaptor commit = ArgumentCaptor.forClass(CommitRequest.class); CommitResponse response = CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build(); - Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response); + Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response); session.writeAtLeastOnceWithOptions( Collections.singletonList(Mutation.newInsertBuilder("T").set("C").to("x").build()), Options.tag(tag)); @@ -340,7 +341,7 @@ public void newMultiUseReadOnlyTransactionContextClosesOldSingleUseContext() { public void writeClosesOldSingleUseContext() throws ParseException { ReadContext ctx = session.singleUse(TimestampBound.strong()); - Mockito.when(rpc.commit(Mockito.any(), Mockito.eq(options))) + Mockito.when(rpc.commit(Mockito.any(), anyMap())) .thenReturn( CommitResponse.newBuilder() .setCommitTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z")) @@ -441,7 +442,7 @@ public void request(int numMessages) {} private void mockRead(final PartialResultSet myResultSet) { final ArgumentCaptor consumer = ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class); - Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), eq(false))) + Mockito.when(rpc.read(Mockito.any(), consumer.capture(), anyMap(), eq(false))) .then( invocation -> { consumer.getValue().onPartialResultSet(myResultSet); @@ -457,8 +458,7 @@ public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) - .thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -476,8 +476,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTimestamp() { PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) - .thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); @@ -496,8 +495,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars PartialResultSet.newBuilder() .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) .build(); - Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) - .thenReturn(txnMetadata); + Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata); mockRead(resultSet); ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index b027ebbc07f..ef7fe66a3be 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1644,13 +1644,13 @@ public void testSessionNotFoundWrite() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); - when(closedSession.writeWithOptions(mutations)).thenThrow(sessionNotFound); + when(closedSession.writeWithOptions(eq(mutations), any())).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); com.google.cloud.spanner.CommitResponse response = mock(com.google.cloud.spanner.CommitResponse.class); when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); - when(openSession.writeWithOptions(mutations)).thenReturn(response); + when(openSession.writeWithOptions(eq(mutations), any())).thenReturn(response); doAnswer( invocation -> { executor.submit( @@ -1687,13 +1687,14 @@ public void testSessionNotFoundWriteAtLeastOnce() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); - when(closedSession.writeAtLeastOnceWithOptions(mutations)).thenThrow(sessionNotFound); + when(closedSession.writeAtLeastOnceWithOptions(eq(mutations), any())) + .thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); com.google.cloud.spanner.CommitResponse response = mock(com.google.cloud.spanner.CommitResponse.class); when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); - when(openSession.writeAtLeastOnceWithOptions(mutations)).thenReturn(response); + when(openSession.writeAtLeastOnceWithOptions(eq(mutations), any())).thenReturn(response); doAnswer( invocation -> { executor.submit( @@ -1729,10 +1730,10 @@ public void testSessionNotFoundPartitionedUpdate() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1"); final SessionImpl closedSession = mockSession(); - when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound); + when(closedSession.executePartitionedUpdate(eq(statement), any())).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.executePartitionedUpdate(statement)).thenReturn(1L); + when(openSession.executePartitionedUpdate(eq(statement), any())).thenReturn(1L); doAnswer( invocation -> { executor.submit( From 339f500c4c23ea0661cd121f4c09bd9c89144f53 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 12 May 2025 21:56:16 -0400 Subject: [PATCH 03/22] Add setRequestIdCreator inside session tests --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 8 ++++++++ .../java/com/google/cloud/spanner/SessionImplTest.java | 4 ++++ .../java/com/google/cloud/spanner/SessionPoolTest.java | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 1ed48499009..b889b6c9d78 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -212,8 +212,16 @@ public CommitResponse writeWithOptions( return session.writeWithOptions(mutations, withReqId(reqId, options)); }); } catch (RuntimeException e) { + System.out.println("runtime.crashing out\033[31m"); + e.printStackTrace(); + System.out.println("\033[00m"); span.setStatus(e); throw e; + } catch (Exception e) { + System.out.println("crashing out\033[31m"); + e.printStackTrace(); + System.out.println("\033[00m"); + throw e; } finally { span.end(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 20d0effd76d..df957f75ebf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -221,6 +221,10 @@ public void nestedTxnSucceedsWhenAllowed() { @Test public void writeAtLeastOnce() throws ParseException { String timestampString = "2015-10-01T10:54:20.021Z"; + com.google.protobuf.Timestamp t = Timestamps.parse(timestampString); + Transaction txnMetadata = Transaction.newBuilder().setReadTimestamp(t).build(); + Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false))) + .thenReturn(txnMetadata); ArgumentCaptor commit = ArgumentCaptor.forClass(CommitRequest.class); CommitResponse response = CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index ef7fe66a3be..df2c89fd769 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1644,12 +1644,14 @@ public void testSessionNotFoundWrite() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); + closedSession.setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(closedSession.writeWithOptions(eq(mutations), any())).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); com.google.cloud.spanner.CommitResponse response = mock(com.google.cloud.spanner.CommitResponse.class); when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + openSession.setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(openSession.writeWithOptions(eq(mutations), any())).thenReturn(response); doAnswer( invocation -> { @@ -1687,6 +1689,7 @@ public void testSessionNotFoundWriteAtLeastOnce() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); + closedSession.setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(closedSession.writeAtLeastOnceWithOptions(eq(mutations), any())) .thenThrow(sessionNotFound); @@ -1694,6 +1697,7 @@ public void testSessionNotFoundWriteAtLeastOnce() { com.google.cloud.spanner.CommitResponse response = mock(com.google.cloud.spanner.CommitResponse.class); when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + openSession.setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(openSession.writeAtLeastOnceWithOptions(eq(mutations), any())).thenReturn(response); doAnswer( invocation -> { From 06574d23831487e3772af369daafceb5f56882e8 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 13 May 2025 16:29:16 -0700 Subject: [PATCH 04/22] Plumb in test for DatabaseClientImpl.runWithSessionRetry behavior --- .../cloud/spanner/DatabaseClientImpl.java | 11 +++- .../cloud/spanner/XGoogSpannerRequestId.java | 10 ++++ .../cloud/spanner/DatabaseClientImplTest.java | 60 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b889b6c9d78..c6c5e9ace35 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -102,7 +102,8 @@ class DatabaseClientImpl implements DatabaseClient { this.nthRequest = new AtomicInteger(0); } - private int dbIdFromClientId(String clientId) { + @VisibleForTesting + int dbIdFromClientId(String clientId) { int i = clientId.indexOf("-"); String strWithValue = clientId.substring(i + 1); if (Objects.equals(strWithValue, "")) { @@ -257,6 +258,11 @@ private int nextNthRequest() { return this.nthRequest.incrementAndGet(); } + @VisibleForTesting + int getNthRequest() { + return this.nthRequest.get(); + } + @Override public ServerStream batchWriteAtLeastOnce( final Iterable mutationGroups, final TransactionOption... options) @@ -455,7 +461,8 @@ private long executePartitionedUpdateWithPooledSession( } } - private T runWithSessionRetry(BiFunction callable) { + @VisibleForTesting + T runWithSessionRetry(BiFunction callable) { PooledSessionFuture session = getSession(); XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 49a0c7eb3d2..db2e6bc3c24 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -60,6 +60,16 @@ public static XGoogSpannerRequestId of( return new XGoogSpannerRequestId(nthClientId, nthChannelId, nthRequest, attempt); } + @VisibleForTesting + long getAttempt() { + return this.attempt; + } + + @VisibleForTesting + long getNthRequest() { + return this.nthRequest; + } + @VisibleForTesting static final Pattern REGEX = Pattern.compile("^(\\d)\\.([0-9a-z]{16})\\.(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index ead2c0ba611..de05251c821 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -130,6 +130,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; @@ -5633,4 +5634,63 @@ private boolean isMultiplexedSessionsEnabledForRW() { } return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW(); } + + @Test + public void testdbIdFromClientId() { + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture session = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(session); + TransactionOption option = mock(TransactionOption.class); + DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class)); + + assertEquals(client.dbIdFromClientId(""), 0); + assertEquals(client.dbIdFromClientId("client-10"), 10); + assertEquals(client.dbIdFromClientId("client--10"), -10); + assertThrows(NumberFormatException.class, () -> client.dbIdFromClientId("client10")); + } + + @Test + public void testrunWithSessionRetry_withRequestId() { + // Tests that DatabaseClientImpl.runWithSessionRetry correctly returns a XGoogSpannerRequestId + // and correctly increases its nthRequest ordinal number and that attempts stay at 1. + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture sessionFut = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(sessionFut); + // TODO:(@olavloite) to kindly help with resolving this mocking that's failing. + // when(pool.getPooledSessionReplacementHandler()).thenReturn(pool.new + // PooledSessionReplacementHandler()); + TransactionOption option = mock(TransactionOption.class); + DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class)); + + // 1. Run with no fail has a single attempt. + client.runWithSessionRetry( + (session, reqId) -> { + assertEquals(reqId.getAttempt(), 1); + return 1; + }); + + // 2. Run with SessionNotFoundException. + final AtomicInteger i = new AtomicInteger(0); + SessionNotFoundException excSessionNotFound = + SpannerExceptionFactoryTest.newSessionNotFoundException( + "projects/p/instances/i/databases/d/sessions/s"); + + final AtomicLong priorNthRequest = new AtomicLong(client.getNthRequest()); + client.runWithSessionRetry( + (session, reqId) -> { + // Monotonically increasing priorNthRequest. + assertEquals(reqId.getNthRequest() - priorNthRequest.get(), 1); + priorNthRequest.set(reqId.getNthRequest()); + + // Attempts stay at 1 since with a SessionNotFound exception, + // a fresh requestId is generated. + assertEquals(reqId.getAttempt(), 1); + + if (i.addAndGet(1) < 4) { + throw excSessionNotFound; + } + + return 1; + }); + } } From 20f305d7c6e008ad3be815dc3f0755af3f1f1c20 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 13 May 2025 18:44:45 -0700 Subject: [PATCH 05/22] Remove unnecessary debugs --- .../google/cloud/spanner/DatabaseClientImpl.java | 8 -------- .../cloud/spanner/XGoogSpannerRequestIdTest.java | 13 ------------- 2 files changed, 21 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index c6c5e9ace35..7b9fd9d75b7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -213,16 +213,8 @@ public CommitResponse writeWithOptions( return session.writeWithOptions(mutations, withReqId(reqId, options)); }); } catch (RuntimeException e) { - System.out.println("runtime.crashing out\033[31m"); - e.printStackTrace(); - System.out.println("\033[00m"); span.setStatus(e); throw e; - } catch (Exception e) { - System.out.println("crashing out\033[31m"); - e.printStackTrace(); - System.out.println("\033[00m"); - throw e; } finally { span.end(); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index 847a4adf7ba..4cb33be6705 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -171,19 +171,6 @@ public methodAndRequestId[] accumulatedStreamingValues() { return accumulated.toArray(new methodAndRequestId[0]); } - public void printAccumulatedValues() { - methodAndRequestId[] unary = this.accumulatedUnaryValues(); - System.out.println("accumulatedUnaryvalues"); - for (int i = 0; i < unary.length; i++) { - System.out.println("\t" + unary[i].toString()); - } - methodAndRequestId[] streaming = this.accumulatedStreamingValues(); - System.out.println("accumulatedStreaminvalues"); - for (int i = 0; i < streaming.length; i++) { - System.out.println("\t" + streaming[i].toString()); - } - } - public void reset() { this.gotValues.clear(); this.unaryResults.clear(); From c1aa6061e21aa40b8f6c5e240e2f980a3d6d97a0 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 13:37:26 -0700 Subject: [PATCH 06/22] Address review feedback --- .../cloud/spanner/DatabaseClientImpl.java | 27 ++++++++++++------- .../com/google/cloud/spanner/SessionImpl.java | 3 ++- .../cloud/spanner/DatabaseClientImplTest.java | 9 ++++--- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 7b9fd9d75b7..c9526dd9131 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -29,7 +29,8 @@ import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.Arrays; -import java.util.Objects; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -51,6 +52,7 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final boolean useMultiplexedSessionForRW; private final int dbId; private final AtomicInteger nthRequest; + private final Map clientIdToOrdinalMap; final boolean useMultiplexedSessionBlindWrite; @@ -98,18 +100,19 @@ class DatabaseClientImpl implements DatabaseClient { this.useMultiplexedSessionForRW = useMultiplexedSessionForRW; this.commonAttributes = commonAttributes; + this.clientIdToOrdinalMap = new HashMap(); this.dbId = this.dbIdFromClientId(this.clientId); this.nthRequest = new AtomicInteger(0); } @VisibleForTesting - int dbIdFromClientId(String clientId) { - int i = clientId.indexOf("-"); - String strWithValue = clientId.substring(i + 1); - if (Objects.equals(strWithValue, "")) { - strWithValue = "0"; + synchronized int dbIdFromClientId(String clientId) { + Integer id = this.clientIdToOrdinalMap.get(clientId); + if (id == null) { + id = this.clientIdToOrdinalMap.size() + 1; + this.clientIdToOrdinalMap.put(clientId, id); } - return Integer.parseInt(strWithValue); + return id; } @VisibleForTesting @@ -423,9 +426,13 @@ private UpdateOption[] withReqId( if (reqId == null) { return options; } - ArrayList allOptions = new ArrayList(Arrays.asList(options)); - allOptions.add(new Options.RequestIdOption(reqId)); - return allOptions.toArray(new UpdateOption[0]); + if (options == null || options.length == 0) { + return new UpdateOption[] {new Options.RequestIdOption(reqId)}; + } + UpdateOption[] allOptions = new UpdateOption[options.length + 1]; + System.arraycopy(options, 0, allOptions, 0, options.length); + allOptions[options.length] = new Options.RequestIdOption(reqId); + return allOptions; } private TransactionOption[] withReqId( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 0f95f362604..45f247072a4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -305,7 +305,8 @@ public CommitResponse writeAtLeastOnceWithOptions( try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { - // TODO: Detect an abort and then refresh the reqId. + // TODO(@odeke-em): Only increment on UNAVAILABLE and INCREMENT, + // instead increment the nthRequest on ABORTED and others. reqId.incrementAttempt(); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index de05251c821..d41273cc81d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -5643,10 +5643,11 @@ public void testdbIdFromClientId() { TransactionOption option = mock(TransactionOption.class); DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class)); - assertEquals(client.dbIdFromClientId(""), 0); - assertEquals(client.dbIdFromClientId("client-10"), 10); - assertEquals(client.dbIdFromClientId("client--10"), -10); - assertThrows(NumberFormatException.class, () -> client.dbIdFromClientId("client10")); + for (int i = 0; i < 10; i++) { + String dbId = String.format("%d", i); + int id = client.dbIdFromClientId(dbId); + assertEquals(id, i + 2); // There was already 1 dbId after new DatabaseClientImpl. + } } @Test From 34f5579c07c1cdff9d9f71cff850948ee63b7fc6 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 14:06:10 -0700 Subject: [PATCH 07/22] More code review updates --- .../cloud/spanner/XGoogSpannerRequestId.java | 27 ++----------------- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 5 ++-- .../spanner/XGoogSpannerRequestIdTest.java | 26 ++++++++++++++++-- 3 files changed, 28 insertions(+), 30 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index db2e6bc3c24..9c69838367f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -22,9 +22,7 @@ import io.grpc.Metadata; import java.math.BigInteger; import java.security.SecureRandom; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.regex.MatchResult; @@ -108,7 +106,8 @@ public String toString() { this.attempt); } - private boolean isGreaterThan(XGoogSpannerRequestId other) { + @VisibleForTesting + boolean isGreaterThan(XGoogSpannerRequestId other) { return this.nthClientId > other.nthClientId && this.nthChannelId > other.nthChannelId && this.nthRequest > other.nthRequest @@ -160,26 +159,4 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { return XGoogSpannerRequestId.of(1, 1, 1, 0); } } - - static void assertMonotonicityOfIds(String prefix, List reqIds) { - int size = reqIds.size(); - - List violations = new ArrayList<>(); - for (int i = 1; i < size; i++) { - XGoogSpannerRequestId prev = reqIds.get(i - 1); - XGoogSpannerRequestId curr = reqIds.get(i); - if (prev.isGreaterThan(curr)) { - violations.add(String.format("#%d(%s) > #%d(%s)", i - 1, prev, i, curr)); - } - } - - if (violations.isEmpty()) { - return; - } - - throw new IllegalStateException( - prefix - + " monotonicity violation:" - + String.join("\n\t", violations.toArray(new String[0]))); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index a4b4b10b24f..90601007b98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2054,7 +2054,7 @@ GrpcCallContext newCallContext( } if (method != null) { String methodName = method.getFullMethodName(); - context = withRequestId(context, options, methodName); + context = withRequestId(context, options); } } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); @@ -2076,8 +2076,7 @@ GrpcCallContext newCallContext( return (GrpcCallContext) context.merge(apiCallContextFromContext); } - GrpcCallContext withRequestId( - GrpcCallContext context, Map options, String methodName) { + GrpcCallContext withRequestId(GrpcCallContext context, Map options) { XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID); if (reqId == null) { return context; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index 4cb33be6705..fd1ddbbf249 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -127,14 +127,36 @@ public String[] accumulatedValues() { public void assertIntegrity() { this.unaryResults.forEach( (String method, CopyOnWriteArrayList values) -> { - XGoogSpannerRequestId.assertMonotonicityOfIds(method, values); + assertMonotonicityOfIds(method, values); }); this.streamingResults.forEach( (String method, CopyOnWriteArrayList values) -> { - XGoogSpannerRequestId.assertMonotonicityOfIds(method, values); + assertMonotonicityOfIds(method, values); }); } + private void assertMonotonicityOfIds(String prefix, List reqIds) { + int size = reqIds.size(); + + List violations = new ArrayList<>(); + for (int i = 1; i < size; i++) { + XGoogSpannerRequestId prev = reqIds.get(i - 1); + XGoogSpannerRequestId curr = reqIds.get(i); + if (prev.isGreaterThan(curr)) { + violations.add(String.format("#%d(%s) > #%d(%s)", i - 1, prev, i, curr)); + } + } + + if (violations.isEmpty()) { + return; + } + + throw new IllegalStateException( + prefix + + " monotonicity violation:" + + String.join("\n\t", violations.toArray(new String[0]))); + } + public static class methodAndRequestId { String method; String requestId; From 2b6633f7219bce2b5f1a8065da92e71c8e8d1189 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 14:26:01 -0700 Subject: [PATCH 08/22] Ensure that requestIdCreator is non-null --- .../src/main/java/com/google/cloud/spanner/SessionImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 45f247072a4..dab179bdc02 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -149,7 +149,7 @@ interface SessionTransaction { this.errorHandler = createErrorHandler(spanner.getOptions()); this.requestIdCreator = requestIdCreator; if (this.requestIdCreator == null) { - this.requestIdCreator = new XGoogSpannerRequestId.NoopRequestIdCreator(); + throw new IllegalStateException("requestIdCreator must be non-null"); } } From cfe8076638cfc1cd8d7832e6488e408c1ec3b789 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 14:45:36 -0700 Subject: [PATCH 09/22] More code review updates --- .../com/google/cloud/spanner/SessionClient.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 1dba4b92ffe..8b0bfb8faba 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -185,9 +185,9 @@ interface SessionConsumer { // SessionClient is created long before a DatabaseClientImpl is created, // as batch sessions are firstly created then later attached to each Client. - private static AtomicInteger NTH_ID = new AtomicInteger(0); - private final int nthId; - private final AtomicInteger nthRequest; + private static final AtomicInteger NTH_ID = new AtomicInteger(0); + private final int nthId = NTH_ID.incrementAndGet(); + private final AtomicInteger nthRequest = new AtomicInteger(0); @GuardedBy("this") private volatile long sessionChannelCounter; @@ -201,8 +201,6 @@ interface SessionConsumer { this.executorFactory = executorFactory; this.executor = executorFactory.get(); this.commonAttributes = spanner.getTracer().createCommonAttributes(db); - this.nthId = SessionClient.NTH_ID.incrementAndGet(); - this.nthRequest = new AtomicInteger(0); } @Override @@ -220,7 +218,8 @@ DatabaseId getDatabaseId() { @Override public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { - return XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1); + return XGoogSpannerRequestId.of( + this.nthId, this.nthRequest.incrementAndGet(), channelId, attempt); } /** Create a single session. */ @@ -287,6 +286,9 @@ SessionImpl createMultiplexedSession() { spanner .getTracer() .spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes); + // MultiplexedSession doesn't use a channelId hence this hard-coded value. + int channelId = 0; + XGoogSpannerRequestId reqId = nextRequestId(channelId, 1); try (IScope s = spanner.getTracer().withSpan(span)) { com.google.spanner.v1.Session session = spanner @@ -295,7 +297,7 @@ SessionImpl createMultiplexedSession() { db.getName(), spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - null, + createRequestOptions(channelId, reqId), true); SessionImpl sessionImpl = new SessionImpl( From c045751ab15d07f089ee5c26c116febbcdcdec92 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 15:12:51 -0700 Subject: [PATCH 10/22] More updates --- .../src/main/java/com/google/cloud/spanner/Options.java | 2 -- .../src/main/java/com/google/cloud/spanner/SessionImpl.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 0b9556084ef..9771e2173f1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -1093,8 +1093,6 @@ public int hashCode() { @Override public boolean equals(Object o) { - // TODO: Examine why the precedent for LastStatementUpdateOption - // does not check against the actual value. return o instanceof RequestIdOption; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index dab179bdc02..a9ee85c700a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -126,7 +126,7 @@ interface SessionTransaction { private final Clock clock; private final Map options; private final ErrorHandler errorHandler; - private XGoogSpannerRequestId.RequestIdCreator requestIdCreator; + private final XGoogSpannerRequestId.RequestIdCreator requestIdCreator; SessionImpl(SpannerImpl spanner, SessionReference sessionReference) { this(spanner, sessionReference, NO_CHANNEL_HINT); From f426d63a248e2aaf1f704083a72f1dbfdaf5288a Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 15:15:26 -0700 Subject: [PATCH 11/22] DatabaseClientImpl.runWithSessionRetry update on ID --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index c9526dd9131..711b4ede0cc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -465,10 +465,9 @@ T runWithSessionRetry(BiFunction callable PooledSessionFuture session = getSession(); XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of( - this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0); + this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1); while (true) { try { - reqId.incrementAttempt(); return callable.apply(session, reqId); } catch (SessionNotFoundException e) { session = @@ -476,7 +475,7 @@ T runWithSessionRetry(BiFunction callable pool.getPooledSessionReplacementHandler().replaceSession(e, session); reqId = XGoogSpannerRequestId.of( - this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0); + this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 1); } } } From ee21a0c0082bff576394a3e15664964318cfb235 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 15:35:24 -0700 Subject: [PATCH 12/22] Move reqId attempt increase out of runWithRetries --- .../src/main/java/com/google/cloud/spanner/SessionImpl.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index a9ee85c700a..e7821bdda57 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -305,9 +305,6 @@ public CommitResponse writeAtLeastOnceWithOptions( try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { - // TODO(@odeke-em): Only increment on UNAVAILABLE and INCREMENT, - // instead increment the nthRequest on ABORTED and others. - reqId.incrementAttempt(); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); }); @@ -322,7 +319,7 @@ public CommitResponse writeAtLeastOnceWithOptions( private XGoogSpannerRequestId reqIdOrFresh(Options options) { XGoogSpannerRequestId reqId = options.reqId(); if (reqId == null) { - reqId = this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0); + reqId = this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 1); } return reqId; } From 914b073a0e510b595f0f81c74e91b833b0bd33d0 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 17:58:48 -0700 Subject: [PATCH 13/22] Apply allOptions copy everywhere --- .../com/google/cloud/spanner/DatabaseClientImpl.java | 10 +++++++--- .../java/com/google/cloud/spanner/SessionImpl.java | 2 +- .../google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 5 +---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 711b4ede0cc..cb3304d7449 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -440,9 +440,13 @@ private TransactionOption[] withReqId( if (reqId == null) { return options; } - ArrayList allOptions = new ArrayList(Arrays.asList(options)); - allOptions.add(new Options.RequestIdOption(reqId)); - return allOptions.toArray(new TransactionOption[0]); + if (options == null || options.length == 0) { + return new TransactionOption[] {new Options.RequestIdOption(reqId)}; + } + TransactionOption[] allOptions = new TransactionOption[options.length + 1]; + System.arraycopy(options, 0, allOptions, 0, options.length); + allOptions[options.length] = new Options.RequestIdOption(reqId); + return allOptions; } private long executePartitionedUpdateWithPooledSession( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e7821bdda57..c78cfaeea3f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -126,7 +126,7 @@ interface SessionTransaction { private final Clock clock; private final Map options; private final ErrorHandler errorHandler; - private final XGoogSpannerRequestId.RequestIdCreator requestIdCreator; + private XGoogSpannerRequestId.RequestIdCreator requestIdCreator; SessionImpl(SpannerImpl spanner, SessionReference sessionReference) { this(spanner, sessionReference, NO_CHANNEL_HINT); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 90601007b98..449c8c2e2da 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2052,10 +2052,7 @@ GrpcCallContext newCallContext( context = context.withChannelAffinity(affinity.intValue()); } } - if (method != null) { - String methodName = method.getFullMethodName(); - context = withRequestId(context, options); - } + context = withRequestId(context, options); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); if (routeToLeader && leaderAwareRoutingEnabled) { From 7c7218fd82646a3705237fe5a2dca17eacf76aad Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 18 May 2025 21:35:56 -0700 Subject: [PATCH 14/22] Update tests for createMultiplexedSession --- .../java/com/google/cloud/spanner/SessionClient.java | 7 ++++++- .../com/google/cloud/spanner/SessionClientTests.java | 9 +++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 8b0bfb8faba..efc420f38fd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -117,6 +117,11 @@ Object value() { Option.REQUEST_ID, requestId); } + static Map createRequestOptions( + XGoogSpannerRequestId requestId) { + return ImmutableMap.of(Option.REQUEST_ID, requestId); + } + private final class BatchCreateSessionsRunnable implements Runnable { private final long channelHint; private final int sessionCount; @@ -297,7 +302,7 @@ SessionImpl createMultiplexedSession() { db.getName(), spanner.getOptions().getDatabaseRole(), spanner.getOptions().getSessionLabels(), - createRequestOptions(channelId, reqId), + createRequestOptions(reqId), true); SessionImpl sessionImpl = new SessionImpl( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java index f04d9678d11..974313ce847 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; @@ -207,7 +208,9 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount client.createMultiplexedSession(consumer); } // for multiplexed session there is no channel hint pass in the RPC options - assertNull(options.getValue()); + assertNotNull(options.getValue()); + assertEquals(options.getValue().get(Option.CHANNEL_HINT), null); + assertNotNull(options.getValue().get(Option.REQUEST_ID)); assertEquals(1, returnedSessionCount.get()); } @@ -239,7 +242,9 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount client.createMultiplexedSession(consumer); } // for multiplexed session there is no channel hint pass in the RPC options - assertNull(options.getValue()); + assertNotNull(options.getValue()); + assertEquals(options.getValue().get(Option.CHANNEL_HINT), null); + assertNotNull(options.getValue().get(Option.REQUEST_ID)); } @SuppressWarnings("unchecked") From 96bb8f41bc5b046e3eaa962634a0adf60e6e1997 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 10 Apr 2025 22:36:10 +0300 Subject: [PATCH 15/22] chore(x-goog-spanner-request-id): plumb for BatchCreateSessions This change plumbs x-goog-spanner-request-id into BatchCreateSessions and asserts that the header is present for that method. Updates #3537 --- .../google/cloud/spanner/SessionClient.java | 3 +++ .../com/google/cloud/spanner/SessionImpl.java | 2 ++ .../cloud/spanner/XGoogSpannerRequestId.java | 22 +++++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index efc420f38fd..34c92d99c7a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -206,6 +206,8 @@ interface SessionConsumer { this.executorFactory = executorFactory; this.executor = executorFactory.get(); this.commonAttributes = spanner.getTracer().createCommonAttributes(db); + this.nthId = SessionClient.NTH_ID.incrementAndGet(); + this.nthRequest = new AtomicInteger(0); } @Override @@ -239,6 +241,7 @@ SessionImpl createSession() { XGoogSpannerRequestId reqId = nextRequestId(channelId, 1); ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { + XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1); com.google.spanner.v1.Session session = spanner .getRpc() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index c78cfaeea3f..e5aa0840640 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -305,6 +305,8 @@ public CommitResponse writeAtLeastOnceWithOptions( try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { + // TODO: Detect an abort and then refresh the reqId. + reqId.incrementAttempt(); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); }); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 9c69838367f..358fd36397f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -159,4 +159,26 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { return XGoogSpannerRequestId.of(1, 1, 1, 0); } } + + public static void assertMonotonicityOfIds(String prefix, List reqIds) { + int size = reqIds.size(); + + List violations = new ArrayList<>(); + for (int i = 1; i < size; i++) { + XGoogSpannerRequestId prev = reqIds.get(i - 1); + XGoogSpannerRequestId curr = reqIds.get(i); + if (prev.isGreaterThan(curr)) { + violations.add(String.format("#%d(%s) > #%d(%s)", i - 1, prev, i, curr)); + } + } + + if (violations.size() == 0) { + return; + } + + throw new IllegalStateException( + prefix + + " monotonicity violation:" + + String.join("\n\t", violations.toArray(new String[0]))); + } } From f33a0d979f865d60a2e9d34d8cb972496c034b5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 5 May 2025 18:21:22 +0200 Subject: [PATCH 16/22] test: fix failing test cases --- .../src/main/java/com/google/cloud/spanner/SessionClient.java | 1 - .../java/com/google/cloud/spanner/XGoogSpannerRequestId.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 34c92d99c7a..f02058d15d5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -241,7 +241,6 @@ SessionImpl createSession() { XGoogSpannerRequestId reqId = nextRequestId(channelId, 1); ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes); try (IScope s = spanner.getTracer().withSpan(span)) { - XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1); com.google.spanner.v1.Session session = spanner .getRpc() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index 358fd36397f..d8a135e8067 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -160,7 +160,7 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { } } - public static void assertMonotonicityOfIds(String prefix, List reqIds) { + static void assertMonotonicityOfIds(String prefix, List reqIds) { int size = reqIds.size(); List violations = new ArrayList<>(); @@ -172,7 +172,7 @@ public static void assertMonotonicityOfIds(String prefix, List Date: Mon, 12 May 2025 21:56:16 -0400 Subject: [PATCH 17/22] Add setRequestIdCreator inside session tests --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index cb3304d7449..d853a7b8fa5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -216,8 +216,16 @@ public CommitResponse writeWithOptions( return session.writeWithOptions(mutations, withReqId(reqId, options)); }); } catch (RuntimeException e) { + System.out.println("runtime.crashing out\033[31m"); + e.printStackTrace(); + System.out.println("\033[00m"); span.setStatus(e); throw e; + } catch (Exception e) { + System.out.println("crashing out\033[31m"); + e.printStackTrace(); + System.out.println("\033[00m"); + throw e; } finally { span.end(); } From 3c10ee6d63a02ded5c1d8d0a116bdb3593a67ee2 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 13 May 2025 18:44:45 -0700 Subject: [PATCH 18/22] Remove unnecessary debugs --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d853a7b8fa5..cb3304d7449 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -216,16 +216,8 @@ public CommitResponse writeWithOptions( return session.writeWithOptions(mutations, withReqId(reqId, options)); }); } catch (RuntimeException e) { - System.out.println("runtime.crashing out\033[31m"); - e.printStackTrace(); - System.out.println("\033[00m"); span.setStatus(e); throw e; - } catch (Exception e) { - System.out.println("crashing out\033[31m"); - e.printStackTrace(); - System.out.println("\033[00m"); - throw e; } finally { span.end(); } From f76cbc01fa30b5e5b79058f97cfae4dc5ff22312 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 13:37:26 -0700 Subject: [PATCH 19/22] Address review feedback --- .../src/main/java/com/google/cloud/spanner/SessionImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index e5aa0840640..a6031325ad0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -305,7 +305,8 @@ public CommitResponse writeAtLeastOnceWithOptions( try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { - // TODO: Detect an abort and then refresh the reqId. + // TODO(@odeke-em): Only increment on UNAVAILABLE and INCREMENT, + // instead increment the nthRequest on ABORTED and others. reqId.incrementAttempt(); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); From 8c4f2450c7d9bbb0e153d770d029492c9ec78d74 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 14:06:10 -0700 Subject: [PATCH 20/22] More code review updates --- .../cloud/spanner/XGoogSpannerRequestId.java | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java index d8a135e8067..9c69838367f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java @@ -159,26 +159,4 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { return XGoogSpannerRequestId.of(1, 1, 1, 0); } } - - static void assertMonotonicityOfIds(String prefix, List reqIds) { - int size = reqIds.size(); - - List violations = new ArrayList<>(); - for (int i = 1; i < size; i++) { - XGoogSpannerRequestId prev = reqIds.get(i - 1); - XGoogSpannerRequestId curr = reqIds.get(i); - if (prev.isGreaterThan(curr)) { - violations.add(String.format("#%d(%s) > #%d(%s)", i - 1, prev, i, curr)); - } - } - - if (violations.isEmpty()) { - return; - } - - throw new IllegalStateException( - prefix - + " monotonicity violation:" - + String.join("\n\t", violations.toArray(new String[0]))); - } } From 127ea9a2dbdb1b733449aee7110c8ff71feb7776 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 16 May 2025 14:45:36 -0700 Subject: [PATCH 21/22] More code review updates More updates DatabaseClientImpl.runWithSessionRetry update on ID Move reqId attempt increase out of runWithRetries Apply allOptions copy everywhere Complete testrunWithSessionRetry_withRequestId --- .../cloud/spanner/DatabaseClientImpl.java | 2 -- .../google/cloud/spanner/SessionClient.java | 5 +--- .../com/google/cloud/spanner/SessionImpl.java | 3 -- .../cloud/spanner/DatabaseClientImplTest.java | 29 +++++++++++++------ .../cloud/spanner/SessionClientTests.java | 1 - 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index cb3304d7449..fdc51ea3f37 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -27,8 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.spanner.v1.BatchWriteResponse; import io.opentelemetry.api.common.Attributes; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index f02058d15d5..605f1639c57 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -117,8 +117,7 @@ Object value() { Option.REQUEST_ID, requestId); } - static Map createRequestOptions( - XGoogSpannerRequestId requestId) { + static Map createRequestOptions(XGoogSpannerRequestId requestId) { return ImmutableMap.of(Option.REQUEST_ID, requestId); } @@ -206,8 +205,6 @@ interface SessionConsumer { this.executorFactory = executorFactory; this.executor = executorFactory.get(); this.commonAttributes = spanner.getTracer().createCommonAttributes(db); - this.nthId = SessionClient.NTH_ID.incrementAndGet(); - this.nthRequest = new AtomicInteger(0); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index a6031325ad0..c78cfaeea3f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -305,9 +305,6 @@ public CommitResponse writeAtLeastOnceWithOptions( try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { - // TODO(@odeke-em): Only increment on UNAVAILABLE and INCREMENT, - // instead increment the nthRequest on ABORTED and others. - reqId.incrementAttempt(); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); }); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index d41273cc81d..1db7335ef94 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -34,6 +34,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -5653,25 +5654,33 @@ public void testdbIdFromClientId() { @Test public void testrunWithSessionRetry_withRequestId() { // Tests that DatabaseClientImpl.runWithSessionRetry correctly returns a XGoogSpannerRequestId - // and correctly increases its nthRequest ordinal number and that attempts stay at 1. + // and correctly increases its nthRequest ordinal number and that attempts stay at 1, given + // a fresh session returned on SessionNotFoundException. SessionPool pool = mock(SessionPool.class); PooledSessionFuture sessionFut = mock(PooledSessionFuture.class); when(pool.getSession()).thenReturn(sessionFut); - // TODO:(@olavloite) to kindly help with resolving this mocking that's failing. - // when(pool.getPooledSessionReplacementHandler()).thenReturn(pool.new - // PooledSessionReplacementHandler()); - TransactionOption option = mock(TransactionOption.class); + SessionPool.PooledSession pooledSession = mock(SessionPool.PooledSession.class); + when(sessionFut.get()).thenReturn(pooledSession); + SessionPool.PooledSessionReplacementHandler sessionReplacementHandler = + mock(SessionPool.PooledSessionReplacementHandler.class); + when(pool.getPooledSessionReplacementHandler()).thenReturn(sessionReplacementHandler); + when(sessionReplacementHandler.replaceSession(any(), any())).thenReturn(sessionFut); DatabaseClientImpl client = new DatabaseClientImpl(pool, mock(TraceWrapper.class)); - // 1. Run with no fail has a single attempt. + // 1. Run with no fail runs a single attempt. + final AtomicInteger nCalls = new AtomicInteger(0); client.runWithSessionRetry( (session, reqId) -> { assertEquals(reqId.getAttempt(), 1); + nCalls.incrementAndGet(); return 1; }); + assertEquals(nCalls.get(), 1); + + // Reset the call counter. + nCalls.set(0); - // 2. Run with SessionNotFoundException. - final AtomicInteger i = new AtomicInteger(0); + // 2. Run with SessionNotFoundException and ensure that a fresh requestId is returned each time. SessionNotFoundException excSessionNotFound = SpannerExceptionFactoryTest.newSessionNotFoundException( "projects/p/instances/i/databases/d/sessions/s"); @@ -5687,11 +5696,13 @@ public void testrunWithSessionRetry_withRequestId() { // a fresh requestId is generated. assertEquals(reqId.getAttempt(), 1); - if (i.addAndGet(1) < 4) { + if (nCalls.addAndGet(1) < 4) { throw excSessionNotFound; } return 1; }); + + assertEquals(nCalls.get(), 4); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java index 974313ce847..5d3ed2bca53 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionClientTests.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; From 591ab72ffaac2982004a08e39a5aa3b122dade42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 26 May 2025 00:22:47 -0700 Subject: [PATCH 22/22] chore: fix failing test cases --- .../google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 12 ++++++------ .../cloud/spanner/TransactionManagerImplTest.java | 4 ++-- .../cloud/spanner/TransactionRunnerImplTest.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 449c8c2e2da..0b3c6c649be 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2034,11 +2034,12 @@ GrpcCallContext newCallContext( MethodDescriptor method, boolean routeToLeader) { GrpcCallContext context = this.baseGrpcCallContext; - if (options != null) { + Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options); + if (affinity != null) { if (this.isGrpcGcpExtensionEnabled) { // Set channel affinity in gRPC-GCP. // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. - int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; + int boundedChannelHint = affinity.intValue() % this.numChannels; context = context.withCallOptions( context @@ -2047,11 +2048,10 @@ GrpcCallContext newCallContext( GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); } else { // Set channel affinity in GAX. - Long affinity = Option.CHANNEL_HINT.getLong(options); - if (affinity != null) { - context = context.withChannelAffinity(affinity.intValue()); - } + context = context.withChannelAffinity(affinity.intValue()); } + } + if (options != null) { context = withRequestId(context, options); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 547f6b70a22..19c03859e66 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -241,7 +241,7 @@ public void usesPreparedTransaction() { Mockito.anyString(), Mockito.anyString(), Mockito.anyMap(), - Mockito.eq(null), + Mockito.anyMap(), Mockito.eq(true))) .thenAnswer( invocation -> @@ -324,7 +324,7 @@ public void inlineBegin() { Mockito.anyString(), Mockito.anyString(), Mockito.anyMap(), - Mockito.eq(null), + Mockito.anyMap(), Mockito.eq(true))) .thenAnswer( invocation -> diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 3068b38f3ef..c2f1009b43e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -193,7 +193,7 @@ public void usesPreparedTransaction() { Mockito.anyString(), Mockito.anyString(), Mockito.anyMap(), - Mockito.eq(null), + Mockito.anyMap(), Mockito.eq(true))) .thenAnswer( invocation ->