diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index fdc51ea3f3..40dbd710bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -48,7 +48,7 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; - private final int dbId; + @VisibleForTesting final int dbId; private final AtomicInteger nthRequest; private final Map clientIdToOrdinalMap; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index c5952a38a5..c95b8d4bc2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -1098,9 +1098,6 @@ public boolean equals(Object o) { return false; } RequestIdOption other = (RequestIdOption) o; - if (this.reqId == null || other.reqId == null) { - return this.reqId == null && other.reqId == null; - } return Objects.equals(this.reqId, other.reqId); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 605f1639c5..20c86bdf25 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -223,7 +223,7 @@ DatabaseId getDatabaseId() { @Override public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) { return XGoogSpannerRequestId.of( - this.nthId, this.nthRequest.incrementAndGet(), channelId, attempt); + this.nthId, channelId, this.nthRequest.incrementAndGet(), attempt); } /** Create a single session. */ @@ -423,7 +423,7 @@ private List internalBatchCreateSessions( span.addAnnotation(String.format("Requesting %d sessions", sessionCount)); try (IScope s = spanner.getTracer().withSpan(span)) { XGoogSpannerRequestId reqId = - XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1); + XGoogSpannerRequestId.of(this.nthId, channelHint, this.nthRequest.incrementAndGet(), 1); List sessions = spanner .getRpc() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index d3017afd61..60a99a8bfa 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -2042,6 +2042,7 @@ GrpcCallContext newCallContext( } } if (options != null) { + // TODO(@odeke-em): Infer the affinity if it doesn't match up with in the request-id. context = withRequestId(context, options); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index fa90dd0fe1..fa8b5c982f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -2907,6 +2907,37 @@ public void testPartitionedDmlDoesNotTimeout() { return null; })); assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode()); + + DatabaseClientImpl dbImpl = ((DatabaseClientImpl) client); + int channelId = 0; + try (Session session = dbImpl.getSession()) { + channelId = ((PooledSessionFuture) session).getChannel(); + } + int dbId = dbImpl.dbId; + long NON_DETERMINISTIC = XGoogSpannerRequestIdTest.NON_DETERMINISTIC; + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = { + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/ExecuteStreamingSql", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 6, 1)), + }; + if (false) { // TODO(@odeke-em): enable in next PRs. + xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); + } + + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = { + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/BeginTransaction", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 7, 1)), + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/CreateSession", + new XGoogSpannerRequestId(NON_DETERMINISTIC, 0, 1, 1)), + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/ExecuteSql", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 8, 1)), + }; + if (false) { // TODO(@odeke-em): enable in next PRs. + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); + } } } @@ -2989,6 +3020,38 @@ public void testPartitionedDmlWithHigherTimeout() { .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT))); assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED); assertThat(updateCount).isEqualTo(UPDATE_COUNT); + + DatabaseClientImpl dbImpl = ((DatabaseClientImpl) client); + int channelId = 0; + try (Session session = dbImpl.getSession()) { + channelId = ((PooledSessionFuture) session).getChannel(); + } + int dbId = dbImpl.dbId; + long NON_DETERMINISTIC = XGoogSpannerRequestIdTest.NON_DETERMINISTIC; + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = { + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/ExecuteStreamingSql", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 6, 1)), + }; + + if (false) { // TODO(@odeke-em): enable in next PRs. + xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); + } + + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = { + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/BeginTransaction", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 7, 1)), + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/CreateSession", + new XGoogSpannerRequestId(NON_DETERMINISTIC, 0, 1, 1)), + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/ExecuteSql", + new XGoogSpannerRequestId(NON_DETERMINISTIC, channelId, 8, 1)), + }; + if (false) { // TODO(@odeke-em): enable in next PRs. + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); + } } } @@ -5314,6 +5377,26 @@ public void testSessionPoolExhaustedError_containsStackTraces() { } // Closing the transactions should return the sessions to the pool. assertEquals(4, pool.getNumberOfSessionsInPool()); + + DatabaseClientImpl dbClient = (DatabaseClientImpl) client; + int channelId = 0; + try (Session session = dbClient.getSession()) { + channelId = ((PooledSessionFuture) session).getChannel(); + } + int dbId = dbClient.dbId; + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = {}; + + xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues); + long NON_DETERMINISTIC = XGoogSpannerRequestIdTest.NON_DETERMINISTIC; + + XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = { + XGoogSpannerRequestIdTest.ofMethodAndRequestId( + "google.spanner.v1.Spanner/CreateSession", + new XGoogSpannerRequestId(NON_DETERMINISTIC, 0, 1, 1)), + }; + if (false) { // TODO(@odeke-em): enable in next PRs. + xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIdsAsSuffixes(wantUnaryValues); + } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java index fd1ddbbf24..719b94593b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java @@ -28,6 +28,8 @@ import io.grpc.ServerInterceptor; import io.grpc.Status; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -41,6 +43,7 @@ @RunWith(JUnit4.class) public class XGoogSpannerRequestIdTest { + public static long NON_DETERMINISTIC = -1; @Test public void testEquals() { @@ -157,40 +160,83 @@ private void assertMonotonicityOfIds(String prefix, List + String.join("\n\t", violations.toArray(new String[0]))); } - public static class methodAndRequestId { - String method; - String requestId; - - public methodAndRequestId(String method, String requestId) { - this.method = method; - this.requestId = requestId; - } - - public String toString() { - return "{" + this.method + ":" + this.requestId + "}"; - } - } - - public methodAndRequestId[] accumulatedUnaryValues() { - List accumulated = new ArrayList(); + public MethodAndRequestId[] accumulatedUnaryValues() { + List accumulated = new ArrayList(); this.unaryResults.forEach( (String method, CopyOnWriteArrayList values) -> { for (int i = 0; i < values.size(); i++) { - accumulated.add(new methodAndRequestId(method, values.get(i).toString())); + accumulated.add(new MethodAndRequestId(method, values.get(i))); } }); - return accumulated.toArray(new methodAndRequestId[0]); + return accumulated.toArray(new MethodAndRequestId[0]); } - public methodAndRequestId[] accumulatedStreamingValues() { - List accumulated = new ArrayList(); + public MethodAndRequestId[] accumulatedStreamingValues() { + List accumulated = new ArrayList(); this.streamingResults.forEach( (String method, CopyOnWriteArrayList values) -> { for (int i = 0; i < values.size(); i++) { - accumulated.add(new methodAndRequestId(method, values.get(i).toString())); + accumulated.add(new MethodAndRequestId(method, values.get(i))); } }); - return accumulated.toArray(new methodAndRequestId[0]); + return accumulated.toArray(new MethodAndRequestId[0]); + } + + public void checkExpectedUnaryXGoogRequestIds(MethodAndRequestId... wantUnaryValues) { + MethodAndRequestId[] gotUnaryValues = this.accumulatedUnaryValues(); + sortValues(gotUnaryValues); + for (int i = 0; i < gotUnaryValues.length && false; i++) { + System.out.println("\033[33misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m"); + } + assertEquals(wantUnaryValues, gotUnaryValues); + } + + public void checkAtLeastHasExpectedUnaryXGoogRequestIds(MethodAndRequestId... wantUnaryValues) { + MethodAndRequestId[] gotUnaryValues = this.accumulatedUnaryValues(); + sortValues(gotUnaryValues); + for (int i = 0; i < gotUnaryValues.length && false; i++) { + System.out.println("\033[33misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m"); + } + if (wantUnaryValues.length < gotUnaryValues.length) { + MethodAndRequestId[] gotSliced = + Arrays.copyOfRange(gotUnaryValues, 0, wantUnaryValues.length); + assertEquals(wantUnaryValues, gotSliced); + } else { + assertEquals(wantUnaryValues, gotUnaryValues); + } + } + + public void checkExpectedUnaryXGoogRequestIdsAsSuffixes(MethodAndRequestId... wantUnaryValues) { + MethodAndRequestId[] gotUnaryValues = this.accumulatedUnaryValues(); + sortValues(gotUnaryValues); + for (int i = 0; i < gotUnaryValues.length && false; i++) { + System.out.println("\033[33misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m"); + } + if (wantUnaryValues.length < gotUnaryValues.length) { + MethodAndRequestId[] gotSliced = + Arrays.copyOfRange( + gotUnaryValues, + gotUnaryValues.length - wantUnaryValues.length, + gotUnaryValues.length); + assertEquals(wantUnaryValues, gotSliced); + } else { + assertEquals(wantUnaryValues, gotUnaryValues); + } + } + + private void sortValues(MethodAndRequestId[] values) { + massageValues(values); + Arrays.sort(values, new MethodAndRequestIdComparator()); + } + + public void checkExpectedStreamingXGoogRequestIds(MethodAndRequestId... wantStreamingValues) { + MethodAndRequestId[] gotStreamingValues = this.accumulatedStreamingValues(); + for (int i = 0; i < gotStreamingValues.length && false; i++) { + System.out.println( + "\033[32misStreaming: #" + i + ":: " + gotStreamingValues[i] + "\033[00m"); + } + sortValues(gotStreamingValues); + assertEquals(wantStreamingValues, gotStreamingValues); } public void reset() { @@ -199,4 +245,80 @@ public void reset() { this.streamingResults.clear(); } } + + public static class MethodAndRequestId { + String method; + XGoogSpannerRequestId requestId; + + public MethodAndRequestId(String method, XGoogSpannerRequestId requestId) { + this.method = method; + this.requestId = requestId; + } + + public String toString() { + return "{" + this.method + ":" + this.requestId.debugToString() + "}"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MethodAndRequestId)) { + return false; + } + MethodAndRequestId other = (MethodAndRequestId) o; + return Objects.equals(this.method, other.method) + && Objects.equals(this.requestId, other.requestId); + } + } + + static class MethodAndRequestIdComparator implements Comparator { + @Override + public int compare(MethodAndRequestId mr1, MethodAndRequestId mr2) { + int cmpMethod = mr1.method.compareTo(mr2.method); + if (cmpMethod != 0) { + return cmpMethod; + } + + if (Objects.equals(mr1.requestId, mr2.requestId)) { + return 0; + } + if (mr1.requestId.isGreaterThan(mr2.requestId)) { + return +1; + } + return -1; + } + } + + static void massageValues(MethodAndRequestId[] mreqs) { + for (int i = 0; i < mreqs.length; i++) { + MethodAndRequestId mreq = mreqs[i]; + // BatchCreateSessions is so hard to control as the round-robin doling out + // hence we might need to be able to scrub the nth_request that won't match + // nth_req in consecutive order of nth_client. + if (mreq.method.compareTo("google.spanner.v1.Spanner/BatchCreateSessions") == 0) { + mreqs[i] = + new MethodAndRequestId( + mreq.method, + mreq.requestId + .withNthRequest(NON_DETERMINISTIC) + .withChannelId(NON_DETERMINISTIC) + .withNthClientId(NON_DETERMINISTIC)); + } else if (mreq.method.compareTo("google.spanner.v1.Spanner/BeginTransaction") == 0 + || mreq.method.compareTo("google.spanner.v1.Spanner/ExecuteStreamingSql") == 0 + || mreq.method.compareTo("google.spanner.v1.Spanner/ExecuteSql") == 0 + || mreq.method.compareTo("google.spanner.v1.Spanner/CreateSession") == 0 + || mreq.method.compareTo("google.spanner.v1.Spanner/Commit") == 0) { + mreqs[i] = + new MethodAndRequestId(mreq.method, mreq.requestId.withNthClientId(NON_DETERMINISTIC)); + } + } + } + + public static MethodAndRequestId ofMethodAndRequestId(String method, String reqId) { + return new MethodAndRequestId(method, XGoogSpannerRequestId.of(reqId)); + } + + public static MethodAndRequestId ofMethodAndRequestId( + String method, XGoogSpannerRequestId reqId) { + return new MethodAndRequestId(method, reqId); + } }