Skip to content

Commit 22ce108

Browse files
feat: Add Spanner request ID header
1 parent d82a010 commit 22ce108

File tree

9 files changed

+659
-4
lines changed

9 files changed

+659
-4
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using System.Threading.Tasks;
17+
18+
namespace Google.Cloud.Spanner.V1.Tests.Common;
19+
20+
/// <summary>
21+
/// IAsyncStreamReader{T} implementation that immediately signals the end of the stream.
22+
/// Used as a placeholder in test CallInvokers where no data is expected.
23+
/// </summary>
24+
/// <typeparam name="T">The message type.</typeparam>
25+
public class EmptyAsyncStreamReader<T> : IAsyncStreamReader<T>
26+
{
27+
/// <inheritdoc/>
28+
public T Current => default;
29+
30+
/// <inheritdoc/>
31+
public Task<bool> MoveNext(System.Threading.CancellationToken cancellationToken)
32+
{
33+
return Task.FromResult(false);
34+
}
35+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using System;
17+
using System.Collections.Generic;
18+
19+
namespace Google.Cloud.Spanner.V1.Tests.Common;
20+
21+
/// <summary>
22+
/// Base class for test CallInvokers that simulate failures, providing metadata capture capabilities.
23+
/// Shared by AsyncFailureCallInvoker and SyncFailureCallInvoker for verifying request headers in tests.
24+
/// </summary>
25+
public abstract class FailureCallInvokerBase : CallInvoker
26+
{
27+
private int _invocationCount = 0;
28+
private readonly int _numberOfFailuresToSimulate;
29+
private readonly StatusCode _statusCodeToThrow;
30+
private readonly string _exceptionMessage;
31+
32+
/// <summary>
33+
/// Creates a new instance of <see cref="FailureCallInvokerBase"/>.
34+
/// </summary>
35+
/// <param name="numberOfFailuresToSimulate">The number of times to simulate a failure before succeeding.</param>
36+
/// <param name="statusCodeToThrow">The gRPC status code to use in the thrown exception.</param>
37+
/// <param name="exceptionMessage">The message to use in the thrown exception.</param>
38+
protected FailureCallInvokerBase(int numberOfFailuresToSimulate, StatusCode statusCodeToThrow, string exceptionMessage)
39+
{
40+
_numberOfFailuresToSimulate = numberOfFailuresToSimulate;
41+
_statusCodeToThrow = statusCodeToThrow;
42+
_exceptionMessage = exceptionMessage;
43+
}
44+
45+
/// <summary>
46+
/// The list of metadata headers captured from each method invocation.
47+
/// </summary>
48+
public List<Metadata> CapturedMetadata { get; } = new List<Metadata>();
49+
50+
/// <summary>
51+
/// Records the metadata from a call.
52+
/// </summary>
53+
/// <param name="headers">The metadata headers to record.</param>
54+
protected void RecordMetadata(Metadata headers)
55+
{
56+
CapturedMetadata.Add(headers);
57+
}
58+
59+
/// <summary>
60+
/// Determines whether the current invocation should fail based on the configured failure count.
61+
/// </summary>
62+
/// <returns>True if the call should fail; otherwise, false.</returns>
63+
protected bool ShouldFail()
64+
{
65+
_invocationCount++;
66+
return _invocationCount <= _numberOfFailuresToSimulate;
67+
}
68+
69+
/// <summary>
70+
/// Creates the RpcException to be thrown or returned.
71+
/// </summary>
72+
/// <returns>The configured RpcException.</returns>
73+
protected RpcException CreateRpcException()
74+
{
75+
return new RpcException(new Status(_statusCodeToThrow, _exceptionMessage));
76+
}
77+
78+
/// <summary>
79+
/// Checks whether the current invocation should fail, and throws an exception if so.
80+
/// </summary>
81+
protected void MaybeThrowException()
82+
{
83+
if (ShouldFail())
84+
{
85+
throw CreateRpcException();
86+
}
87+
}
88+
89+
90+
/// <inheritdoc/>
91+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
92+
throw new NotImplementedException(); // SpannerClient does not support client streaming calls
93+
94+
/// <inheritdoc/>
95+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
96+
throw new NotImplementedException(); // SpannerClient does not support duplex streaming calls
97+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using System;
17+
using System.Threading.Tasks;
18+
19+
namespace Google.Cloud.Spanner.V1.Tests.Common;
20+
21+
/// <summary>
22+
/// CallInvoker that throws an RpcException synchronously upon method invocation.
23+
/// Simulates immediate failures like client-side validation or connection errors.
24+
/// Used by SpannerClient tests to verify synchronous error handling.
25+
/// </summary>
26+
internal class SyncFailureCallInvoker : FailureCallInvokerBase
27+
{
28+
/// <summary>
29+
/// Creates a new instance of <see cref="SyncFailureCallInvoker"/>.
30+
/// </summary>
31+
/// <param name="numberOfFailuresToSimulate">The number of times to simulate a failure before succeeding.</param>
32+
/// <param name="statusCodeToThrow">The gRPC status code to use in the thrown exception.</param>
33+
/// <param name="exceptionMessage">The message to use in the thrown exception.</param>
34+
public SyncFailureCallInvoker(int numberOfFailuresToSimulate = int.MaxValue, StatusCode statusCodeToThrow = StatusCode.Internal, string exceptionMessage = "Test exception")
35+
: base(numberOfFailuresToSimulate, statusCodeToThrow, exceptionMessage)
36+
{
37+
}
38+
39+
/// <inheritdoc/>
40+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
41+
{
42+
RecordMetadata(options.Headers);
43+
MaybeThrowException();
44+
return (TResponse)Activator.CreateInstance(typeof(TResponse));
45+
}
46+
47+
/// <inheritdoc/>
48+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
49+
{
50+
RecordMetadata(options.Headers);
51+
MaybeThrowException();
52+
return new AsyncUnaryCall<TResponse>(
53+
Task.FromResult((TResponse)Activator.CreateInstance(typeof(TResponse))),
54+
Task.FromResult(new Metadata()),
55+
() => Status.DefaultSuccess,
56+
() => new Metadata(),
57+
() => { });
58+
}
59+
60+
/// <inheritdoc/>
61+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
62+
{
63+
RecordMetadata(options.Headers);
64+
MaybeThrowException();
65+
return new AsyncServerStreamingCall<TResponse>(
66+
new EmptyAsyncStreamReader<TResponse>(),
67+
Task.FromResult(new Metadata()),
68+
() => Status.DefaultSuccess,
69+
() => new Metadata(),
70+
() => { });
71+
}
72+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Google.Api.Gax.Grpc;
17+
using System;
18+
using System.Linq;
19+
using Xunit;
20+
using Google.Cloud.Spanner.V1.Tests.Common;
21+
22+
namespace Google.Cloud.Spanner.V1.Tests;
23+
24+
public class RequestIdTests
25+
{
26+
private const string SampleDatabaseName = "projects/proj/instances/inst/databases/db";
27+
private const string SampleSessionName = "projects/proj/instances/inst/databases/db/sessions/sess";
28+
29+
[Theory]
30+
[MemberData(nameof(SpannerClientActions))]
31+
public void RequestId_Format(Action<SpannerClient> action)
32+
{
33+
var invoker = new SyncFailureCallInvoker(0);
34+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
35+
36+
action(client);
37+
38+
// The expected format is 6 parts that break down as such:
39+
// {VersionId}.{ProcessId}.{ClientId}.{ChannelId}.{RequestId}.{AttemptNum}
40+
Metadata headerMetadata = invoker.CapturedMetadata[0];
41+
var idString = headerMetadata.Get("x-goog-spanner-request-id").Value;
42+
43+
var parts = idString.Split('.');
44+
45+
Assert.Equal(6, parts.Length);
46+
var versionId = parts[0];
47+
var processId = parts[1];
48+
var clientId = parts[2];
49+
var channelId = parts[3];
50+
var requestId = parts[4];
51+
var attemptNum = parts[5];
52+
53+
Assert.Equal("1", versionId);
54+
Assert.True(ulong.TryParse(processId, out _));
55+
Assert.True(int.TryParse(clientId, out _));
56+
Assert.Equal("1", channelId);
57+
Assert.True(int.TryParse(requestId, out _));
58+
Assert.Equal("1", attemptNum);
59+
}
60+
61+
[Theory]
62+
[MemberData(nameof(SpannerClientActions))]
63+
public void SetsHeaderOnRpcCalls(Action<SpannerClient> action)
64+
{
65+
var invoker = new SyncFailureCallInvoker(0);
66+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
67+
action(client);
68+
Metadata.Entry entry = Assert.Single(invoker.CapturedMetadata[0], e => e.Key == "x-goog-spanner-request-id");
69+
Assert.NotNull(entry.Value);
70+
}
71+
72+
[Fact]
73+
public void IncrementsRequestIdOnRetry()
74+
{
75+
var invoker = new SyncFailureCallInvoker(numberOfFailuresToSimulate: 1, statusCodeToThrow: StatusCode.ResourceExhausted);
76+
var settings = new SpannerSettings
77+
{
78+
// Configure the CreateSession call to retry on Unavailable errors.
79+
CallSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
80+
maxAttempts: 3,
81+
initialBackoff: TimeSpan.FromMilliseconds(1),
82+
maxBackoff: TimeSpan.FromMilliseconds(1),
83+
backoffMultiplier: 1.0,
84+
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
85+
};
86+
var client = new SpannerClientBuilder { CallInvoker = invoker, Settings = settings }.Build();
87+
88+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
89+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
90+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
91+
92+
// Assert that the invoker was called four times for the three client calls.
93+
// The first call should have failed the first time and succeeded on retry.
94+
Assert.Equal(4, invoker.CapturedMetadata.Count);
95+
96+
var requestIds = invoker.CapturedMetadata
97+
.Select(m => m.Single(e => e.Key == "x-goog-spanner-request-id").Value)
98+
.Select(id => new SpannerRequestIdParts(id))
99+
.ToList();
100+
101+
Assert.Equal((1, 1), (requestIds[0].RequestId, requestIds[0].AttemptNum));
102+
Assert.Equal((1, 2), (requestIds[1].RequestId, requestIds[1].AttemptNum));
103+
Assert.Equal((2, 1), (requestIds[2].RequestId, requestIds[2].AttemptNum));
104+
Assert.Equal((3, 1), (requestIds[3].RequestId, requestIds[3].AttemptNum));
105+
}
106+
107+
[Fact]
108+
public void RequestIdSource_ProcessId_OverwritingBehavior() =>
109+
// The process ID should only ever be set once per process.
110+
Assert.Throws<InvalidOperationException>(() =>
111+
{
112+
// Note in the case of test re-runs within the same process the
113+
// "first_override" may cause the exception to be thrown as it was
114+
// set prior.
115+
SpannerClientImpl.ProcessId = 1UL;
116+
SpannerClientImpl.ProcessId = 2UL;
117+
});
118+
119+
public static TheoryData<Action<SpannerClient>> SpannerClientActions { get; } = new TheoryData<Action<SpannerClient>>
120+
{
121+
client => client.ExecuteSql(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" }),
122+
client => client.ExecuteStreamingSql(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" }),
123+
client => client.GetSession(new GetSessionRequest { Name = SampleSessionName }),
124+
client => client.ListSessions(new ListSessionsRequest { Database = SampleDatabaseName }).AsRawResponses().First(),
125+
client => client.DeleteSession(new DeleteSessionRequest { Name = SampleSessionName }),
126+
client => client.ExecuteSql(new ExecuteSqlRequest { Session = SampleSessionName }),
127+
client => client.ExecuteBatchDml(new ExecuteBatchDmlRequest { Session = SampleSessionName }),
128+
client => client.Read(new ReadRequest { Session = SampleSessionName }),
129+
client => client.StreamingRead(new ReadRequest { Session = SampleSessionName }),
130+
client => client.BeginTransaction(new BeginTransactionRequest { Session = SampleSessionName }),
131+
client => client.Commit(new CommitRequest { Session = SampleSessionName }),
132+
client => client.Rollback(new RollbackRequest { Session = SampleSessionName }),
133+
client => client.PartitionQuery(new PartitionQueryRequest { Session = SampleSessionName }),
134+
client => client.PartitionRead(new PartitionReadRequest { Session = SampleSessionName }),
135+
};
136+
137+
private struct SpannerRequestIdParts
138+
{
139+
public int VersionId { get; }
140+
public ulong ProcessId { get; }
141+
public int ClientId { get; }
142+
public int ChannelId { get; }
143+
public int RequestId { get; }
144+
public int AttemptNum { get; }
145+
146+
public SpannerRequestIdParts(string requestId)
147+
{
148+
var parts = requestId.Split('.');
149+
Assert.Equal(6, parts.Length);
150+
151+
VersionId = int.Parse(parts[0]);
152+
ProcessId = ulong.Parse(parts[1]);
153+
ClientId = int.Parse(parts[2]);
154+
ChannelId = int.Parse(parts[3]);
155+
RequestId = int.Parse(parts[4]);
156+
AttemptNum = int.Parse(parts[5]);
157+
}
158+
}
159+
}

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SpannerClientBuilderPartial.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,16 @@ namespace Google.Cloud.Spanner.V1
2525
{
2626
public partial class SpannerClientBuilder
2727
{
28+
/// <summary>
29+
/// The process ID, assigned to each outgoing RPC to identify the request source. This can be set once,
30+
/// but only before the process has made its first Spanner request.
31+
/// </summary>
32+
/// <exception cref="InvalidOperationException">The process ID has already been set.</exception>
33+
public static ulong ProcessId
34+
{
35+
set => SpannerClientImpl.ProcessId = value;
36+
}
37+
2838
/// <summary>
2939
/// The Grpc.Gcp method configurations for pool options.
3040
/// </summary>

0 commit comments

Comments
 (0)