Skip to content

Commit e5ffbde

Browse files
feat(Spanner.V1): Add Spanner request ID in exceptions
Adds a gRPC interceptor to attach the Spanner request ID to exceptions and integrates it into the Spanner client builder.
1 parent ed48f5c commit e5ffbde

File tree

4 files changed

+302
-5
lines changed

4 files changed

+302
-5
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Grpc.Core.Interceptors;
17+
using System;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
using Xunit;
21+
22+
namespace Google.Cloud.Spanner.V1.Tests;
23+
24+
public class SpannerRequestIdCallInterceptorTest
25+
{
26+
private static readonly Method<string, string> s_unaryMethod = new Method<string, string>(MethodType.Unary, "service", "method", Marshallers.StringMarshaller, Marshallers.StringMarshaller);
27+
private static readonly Method<string, string> s_clientStreamingMethod = new Method<string, string>(MethodType.ClientStreaming, "service", "method", Marshallers.StringMarshaller, Marshallers.StringMarshaller);
28+
private static readonly Method<string, string> s_serverStreamingMethod = new Method<string, string>(MethodType.ServerStreaming, "service", "method", Marshallers.StringMarshaller, Marshallers.StringMarshaller);
29+
private static readonly Method<string, string> s_duplexStreamingMethod = new Method<string, string>(MethodType.DuplexStreaming, "service", "method", Marshallers.StringMarshaller, Marshallers.StringMarshaller);
30+
31+
public static TheoryData<Func<CallInvoker, CallOptions, Task>> CallInvokerActions { get; } = new TheoryData<Func<CallInvoker, CallOptions, Task>>
32+
{
33+
(invoker, options) => Task.Run(() => invoker.BlockingUnaryCall(s_unaryMethod, null, options, "")),
34+
(invoker, options) => invoker.AsyncUnaryCall(s_unaryMethod, null, options, "").ResponseAsync,
35+
(invoker, options) => invoker.AsyncClientStreamingCall(s_clientStreamingMethod, null, options).ResponseAsync,
36+
(invoker, options) => invoker.AsyncServerStreamingCall(s_serverStreamingMethod, null, options, "").ResponseStream.MoveNext(),
37+
(invoker, options) => invoker.AsyncDuplexStreamingCall(s_duplexStreamingMethod, null, options).ResponseStream.MoveNext()
38+
};
39+
40+
[Theory]
41+
[MemberData(nameof(CallInvokerActions))]
42+
public async Task RpcCall_ExceptionContainsRequestId(Func<CallInvoker, CallOptions, Task> action)
43+
{
44+
var invoker = new ThrowingCallInvoker();
45+
var interceptedInvoker = invoker.Intercept(new SpannerRequestIdCallInterceptor());
46+
var headers = new Metadata { { SpannerRequestId.HeaderKey, "test-request-id" } };
47+
var options = new CallOptions(headers);
48+
49+
var exception = await Assert.ThrowsAsync<RpcException>(() => action(interceptedInvoker, options));
50+
51+
Assert.True(exception.Data.Contains(SpannerRequestId.HeaderKey));
52+
Assert.Equal("test-request-id", exception.Data[SpannerRequestId.HeaderKey]);
53+
}
54+
55+
private class ThrowingCallInvoker : CallInvoker
56+
{
57+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
58+
throw new RpcException(new Status(StatusCode.Internal, "Test exception"));
59+
60+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
61+
new AsyncUnaryCall<TResponse>(
62+
Task.FromException<TResponse>(new RpcException(new Status(StatusCode.Internal, "Test exception"))),
63+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
64+
65+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
66+
new AsyncClientStreamingCall<TRequest, TResponse>(
67+
new MockClientStreamWriter<TRequest>(),
68+
Task.FromException<TResponse>(new RpcException(new Status(StatusCode.Internal, "Test exception"))),
69+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
70+
71+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
72+
new AsyncServerStreamingCall<TResponse>(
73+
new ThrowingStreamReader<TResponse>(),
74+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
75+
76+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
77+
new AsyncDuplexStreamingCall<TRequest, TResponse>(
78+
new MockClientStreamWriter<TRequest>(),
79+
new ThrowingStreamReader<TResponse>(),
80+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
81+
}
82+
83+
private class MockClientStreamWriter<T> : IClientStreamWriter<T>
84+
{
85+
public WriteOptions WriteOptions { get; set; }
86+
public Task CompleteAsync() => Task.CompletedTask;
87+
public Task WriteAsync(T message) => Task.CompletedTask;
88+
}
89+
90+
private class ThrowingStreamReader<T> : IAsyncStreamReader<T>
91+
{
92+
public T Current => default;
93+
94+
public Task<bool> MoveNext(CancellationToken cancellationToken) =>
95+
Task.FromException<bool>(new RpcException(new Status(StatusCode.Internal, "Test exception")));
96+
}
97+
}

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1.Tests/SpannerRequestIdTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void SpannerRequestId_Format()
5050

5151
Assert.Equal("1", versionId);
5252
Assert.Equal(expectedClientId.ToString(), clientId);
53-
Assert.Equal("0", channelId);
53+
Assert.Equal("1", channelId);
5454
Assert.Equal(expectedRequestSequence.ToString(), requestSequence);
5555
Assert.Equal("1", attemptNum);
5656
Assert.True(ulong.TryParse(processIdString, out _));

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SpannerClientBuilderPartial.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using Google.Api.Gax.Grpc;
1717
using Google.Api.Gax.Grpc.Gcp;
1818
using Grpc.Core;
19+
using Grpc.Core.Interceptors;
1920
using System;
2021
using System.Threading;
2122
using System.Threading.Tasks;
@@ -150,8 +151,9 @@ partial void InterceptBuildAsync(CancellationToken cancellationToken, ref Task<S
150151
task = MaybeCreateEmulatorClientBuilder()?.BuildAsync(cancellationToken);
151152

152153
/// <inheritdoc/>
153-
protected override CallInvoker CreateCallInvoker() =>
154-
AffinityChannelPoolConfiguration is null
154+
protected override CallInvoker CreateCallInvoker()
155+
{
156+
var invoker = AffinityChannelPoolConfiguration is null
155157
? base.CreateCallInvoker()
156158
: new GcpCallInvoker(
157159
ServiceMetadata,
@@ -160,10 +162,13 @@ AffinityChannelPoolConfiguration is null
160162
GetChannelOptions(),
161163
GetApiConfig(),
162164
EffectiveGrpcAdapter);
165+
return invoker.Intercept(new SpannerRequestIdCallInterceptor());
166+
}
163167

164168
/// <inheritdoc/>
165-
protected override async Task<CallInvoker> CreateCallInvokerAsync(CancellationToken cancellationToken) =>
166-
AffinityChannelPoolConfiguration is null
169+
protected override async Task<CallInvoker> CreateCallInvokerAsync(CancellationToken cancellationToken)
170+
{
171+
var invoker = AffinityChannelPoolConfiguration is null
167172
? await base.CreateCallInvokerAsync(cancellationToken).ConfigureAwait(false)
168173
: new GcpCallInvoker(
169174
ServiceMetadata,
@@ -172,6 +177,8 @@ await GetChannelCredentialsAsync(cancellationToken).ConfigureAwait(false),
172177
GetChannelOptions(),
173178
GetApiConfig(),
174179
EffectiveGrpcAdapter);
180+
return invoker.Intercept(new SpannerRequestIdCallInterceptor());
181+
}
175182

176183
private ApiConfig GetApiConfig() => new ApiConfig
177184
{
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Grpc.Core.Interceptors;
17+
using System;
18+
using System.Linq;
19+
using System.Threading;
20+
using System.Threading.Tasks;
21+
22+
namespace Google.Cloud.Spanner.V1;
23+
24+
/// <summary>
25+
/// A <see cref="Interceptor"/> that wraps all calls, adding the Spanner request ID to any exceptions thrown.
26+
/// The SpannerClient acquires one of these interceptors on build and pipes all outgoing RPCs through it.
27+
/// In the event of an error being returned, the interceptor will catch the resulting exception and automatically
28+
/// populate the request ID into the base Exception.Data collection using the passed CallOption value for the x-goog-spanner-request-id.
29+
/// </summary>
30+
internal sealed class SpannerRequestIdCallInterceptor : Interceptor
31+
{
32+
/// <summary>
33+
/// Intercepts a blocking unary call, adding the Spanner request ID to any exceptions thrown.
34+
/// </summary>
35+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(
36+
TRequest request,
37+
ClientInterceptorContext<TRequest, TResponse> context,
38+
BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
39+
{
40+
var requestId = GetRequestIdFromOptions(context.Options);
41+
return HandleResponse(() => continuation(request, context), requestId);
42+
}
43+
44+
/// <summary>
45+
/// Intercepts an asynchronous unary call, adding the Spanner request ID to any exceptions thrown.
46+
/// </summary>
47+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
48+
TRequest request,
49+
ClientInterceptorContext<TRequest, TResponse> context,
50+
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
51+
{
52+
var call = continuation(request, context);
53+
var requestId = GetRequestIdFromOptions(context.Options);
54+
55+
return new AsyncUnaryCall<TResponse>(
56+
HandleResponseAsync(call.ResponseAsync, requestId),
57+
call.ResponseHeadersAsync,
58+
call.GetStatus,
59+
call.GetTrailers,
60+
call.Dispose);
61+
}
62+
63+
/// <summary>
64+
/// Intercepts an asynchronous client streaming call, adding the Spanner request ID to any exceptions thrown.
65+
/// </summary>
66+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
67+
ClientInterceptorContext<TRequest, TResponse> context,
68+
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
69+
{
70+
var call = continuation(context);
71+
var requestId = GetRequestIdFromOptions(context.Options);
72+
73+
return new AsyncClientStreamingCall<TRequest, TResponse>(
74+
call.RequestStream,
75+
HandleResponseAsync(call.ResponseAsync, requestId),
76+
call.ResponseHeadersAsync,
77+
call.GetStatus,
78+
call.GetTrailers,
79+
call.Dispose);
80+
}
81+
82+
/// <summary>
83+
/// Intercepts an asynchronous server streaming call, adding the Spanner request ID to any exceptions thrown.
84+
/// </summary>
85+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
86+
TRequest request,
87+
ClientInterceptorContext<TRequest, TResponse> context,
88+
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
89+
{
90+
var call = continuation(request, context);
91+
var requestId = GetRequestIdFromOptions(context.Options);
92+
var responseStream = new SpannerRequestIdStreamReader<TResponse>(call.ResponseStream, requestId);
93+
94+
return new AsyncServerStreamingCall<TResponse>(
95+
responseStream,
96+
call.ResponseHeadersAsync,
97+
call.GetStatus,
98+
call.GetTrailers,
99+
call.Dispose);
100+
}
101+
102+
/// <summary>
103+
/// Intercepts an asynchronous duplex streaming call, adding the Spanner request ID to any exceptions thrown.
104+
/// </summary>
105+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
106+
ClientInterceptorContext<TRequest, TResponse> context,
107+
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
108+
{
109+
var call = continuation(context);
110+
var requestId = GetRequestIdFromOptions(context.Options);
111+
var responseStream = new SpannerRequestIdStreamReader<TResponse>(call.ResponseStream, requestId);
112+
113+
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
114+
call.RequestStream,
115+
responseStream,
116+
call.ResponseHeadersAsync,
117+
call.GetStatus,
118+
call.GetTrailers,
119+
call.Dispose);
120+
}
121+
122+
/// <summary>
123+
/// Gets the request ID from the call options.
124+
/// </summary>
125+
private static string GetRequestIdFromOptions(CallOptions options)
126+
{
127+
if (options.Headers is Metadata headers)
128+
{
129+
// Headers can contain multiple entries with the same key.
130+
// We are only interested in the last one.
131+
return headers.LastOrDefault(x => x.Key == SpannerRequestId.HeaderKey)?.Value;
132+
}
133+
return null;
134+
}
135+
136+
/// <summary>
137+
/// Handles the response, adding the request ID to any exceptions thrown.
138+
/// </summary>
139+
private static T HandleResponse<T>(Func<T> action, string requestId)
140+
{
141+
try
142+
{
143+
return action();
144+
}
145+
catch (Exception e)
146+
{
147+
if (requestId != null)
148+
{
149+
e.Data[SpannerRequestId.HeaderKey] = requestId;
150+
}
151+
throw;
152+
}
153+
}
154+
155+
/// <summary>
156+
/// Handles the response asynchronously, adding the request ID to any exceptions thrown.
157+
/// </summary>
158+
private static async Task<T> HandleResponseAsync<T>(Task<T> task, string requestId)
159+
{
160+
try
161+
{
162+
return await task.ConfigureAwait(false);
163+
}
164+
catch (Exception e)
165+
{
166+
if (requestId != null)
167+
{
168+
e.Data[SpannerRequestId.HeaderKey] = requestId;
169+
}
170+
throw;
171+
}
172+
}
173+
174+
/// <summary>
175+
/// A stream reader that wraps the original stream reader and adds the request ID to any exceptions thrown.
176+
/// </summary>
177+
private class SpannerRequestIdStreamReader<T> : IAsyncStreamReader<T>
178+
{
179+
private readonly IAsyncStreamReader<T> _originalReader;
180+
private readonly string _requestId;
181+
182+
public SpannerRequestIdStreamReader(IAsyncStreamReader<T> originalReader, string requestId)
183+
{
184+
_originalReader = originalReader;
185+
_requestId = requestId;
186+
}
187+
188+
public T Current => _originalReader.Current;
189+
190+
public Task<bool> MoveNext(CancellationToken cancellationToken) =>
191+
HandleResponseAsync(_originalReader.MoveNext(cancellationToken), _requestId);
192+
}
193+
}

0 commit comments

Comments
 (0)