Skip to content

Commit d7c8eae

Browse files
feat: Add Spanner request ID in exceptions
1 parent f4a7f59 commit d7c8eae

File tree

3 files changed

+325
-4
lines changed

3 files changed

+325
-4
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
using Xunit;
20+
21+
namespace Google.Cloud.Spanner.V1.Tests;
22+
23+
public class RequestIdCallInterceptorTests
24+
{
25+
[Theory]
26+
[MemberData(nameof(RequestIdTests.SpannerClientActions), MemberType = typeof(RequestIdTests))]
27+
public void RpcCall_ExceptionContainsRequestId(Action<SpannerClient> action)
28+
{
29+
var invoker = new ThrowingCallInvoker();
30+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
31+
32+
var exception = Assert.Throws<RpcException>(() => action(client));
33+
34+
Assert.True(exception.Data.Contains("x-goog-spanner-request-id"));
35+
Assert.NotNull(exception.Data["x-goog-spanner-request-id"]);
36+
}
37+
38+
private class ThrowingCallInvoker : CallInvoker
39+
{
40+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
41+
throw new RpcException(new Status(StatusCode.Internal, "Test exception"));
42+
43+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
44+
new AsyncUnaryCall<TResponse>(
45+
Task.FromException<TResponse>(new RpcException(new Status(StatusCode.Internal, "Test exception"))),
46+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
47+
48+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
49+
new AsyncClientStreamingCall<TRequest, TResponse>(
50+
new MockClientStreamWriter<TRequest>(),
51+
Task.FromException<TResponse>(new RpcException(new Status(StatusCode.Internal, "Test exception"))),
52+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
53+
54+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
55+
new AsyncServerStreamingCall<TResponse>(
56+
new ThrowingStreamReader<TResponse>(),
57+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
58+
59+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
60+
new AsyncDuplexStreamingCall<TRequest, TResponse>(
61+
new MockClientStreamWriter<TRequest>(),
62+
new ThrowingStreamReader<TResponse>(),
63+
Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
64+
}
65+
66+
private class MockClientStreamWriter<T> : IClientStreamWriter<T>
67+
{
68+
public WriteOptions WriteOptions { get; set; }
69+
public Task CompleteAsync() => Task.CompletedTask;
70+
public Task WriteAsync(T message) => Task.CompletedTask;
71+
}
72+
73+
private class ThrowingStreamReader<T> : IAsyncStreamReader<T>
74+
{
75+
public T Current => default;
76+
77+
public Task<bool> MoveNext(CancellationToken cancellationToken) =>
78+
Task.FromException<bool>(new RpcException(new Status(StatusCode.Internal, "Test exception")));
79+
}
80+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Grpc.Core.Interceptors;
17+
using System;
18+
using System.Threading;
19+
using System.Threading.Tasks;
20+
21+
namespace Google.Cloud.Spanner.V1;
22+
23+
public partial class SpannerClientBuilder
24+
{
25+
/// <summary>
26+
/// A <see cref="Interceptor"/> that wraps all calls, adding the Spanner request ID to any exceptions thrown while
27+
/// using the <see cref="SpannerClient"/>.
28+
/// </summary>
29+
private sealed class RequestIdOnExceptionInterceptor : Interceptor
30+
{
31+
/// <summary>
32+
/// Provides access to the singleton instance of <see cref="RequestIdOnExceptionInterceptor"/>
33+
/// </summary>
34+
internal static RequestIdOnExceptionInterceptor s_instance = new();
35+
36+
private RequestIdOnExceptionInterceptor()
37+
{
38+
}
39+
40+
/// <inheritdoc/>
41+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(
42+
TRequest request,
43+
ClientInterceptorContext<TRequest, TResponse> context,
44+
BlockingUnaryCallContinuation<TRequest, TResponse> continuation) => WrapException(() => continuation(request, context), context.Options);
45+
46+
/// <inheritdoc/>
47+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
48+
TRequest request,
49+
ClientInterceptorContext<TRequest, TResponse> context,
50+
AsyncUnaryCallContinuation<TRequest, TResponse> continuation) =>
51+
WrapException(() =>
52+
{
53+
var call = continuation(request, context);
54+
return new AsyncUnaryCall<TResponse>(
55+
WrapExceptionAsync(call.ResponseAsync, context.Options),
56+
WrapExceptionAsync(call.ResponseHeadersAsync, context.Options),
57+
call.GetStatus,
58+
call.GetTrailers,
59+
call.Dispose);
60+
},
61+
context.Options);
62+
63+
/// <inheritdoc/>
64+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(
65+
ClientInterceptorContext<TRequest, TResponse> context,
66+
AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) =>
67+
WrapException(() =>
68+
{
69+
var call = continuation(context);
70+
return new AsyncClientStreamingCall<TRequest, TResponse>(
71+
new SpannerRequestIdStreamWriter<TRequest>(call.RequestStream, context.Options),
72+
WrapExceptionAsync(call.ResponseAsync, context.Options),
73+
WrapExceptionAsync(call.ResponseHeadersAsync, context.Options),
74+
call.GetStatus,
75+
call.GetTrailers,
76+
call.Dispose);
77+
},
78+
context.Options);
79+
80+
/// <inheritdoc/>
81+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(
82+
TRequest request,
83+
ClientInterceptorContext<TRequest, TResponse> context,
84+
AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) =>
85+
WrapException(() =>
86+
{
87+
var call = continuation(request, context);
88+
var wrappedResponseStream = new SpannerRequestIdStreamReader<TResponse>(call.ResponseStream, context.Options);
89+
90+
return new AsyncServerStreamingCall<TResponse>(
91+
wrappedResponseStream,
92+
WrapExceptionAsync(call.ResponseHeadersAsync, context.Options),
93+
call.GetStatus,
94+
call.GetTrailers,
95+
call.Dispose);
96+
},
97+
context.Options);
98+
99+
/// <inheritdoc/>
100+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(
101+
ClientInterceptorContext<TRequest, TResponse> context,
102+
AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) =>
103+
WrapException(() =>
104+
{
105+
var call = continuation(context);
106+
var wrappedResponseStream = new SpannerRequestIdStreamReader<TResponse>(call.ResponseStream, context.Options);
107+
var wrappedRequestStream = new SpannerRequestIdStreamWriter<TRequest>(call.RequestStream, context.Options);
108+
109+
return new AsyncDuplexStreamingCall<TRequest, TResponse>(
110+
wrappedRequestStream,
111+
wrappedResponseStream,
112+
WrapExceptionAsync(call.ResponseHeadersAsync, context.Options),
113+
call.GetStatus,
114+
call.GetTrailers,
115+
call.Dispose);
116+
},
117+
context.Options);
118+
119+
/// <summary>
120+
/// Gets the request ID from the call options.
121+
/// </summary>
122+
private static string GetRequestIdFromOptions(CallOptions options)
123+
{
124+
if (options.Headers is Metadata headers)
125+
{
126+
return headers.GetValue(SpannerClientImpl.RequestIdHeaderKey);
127+
}
128+
return null;
129+
}
130+
131+
/// <summary>
132+
/// Handles the response asynchronously, adding the request ID to any exceptions thrown.
133+
/// </summary>
134+
private static async Task<T> WrapExceptionAsync<T>(Task<T> task, CallOptions options)
135+
{
136+
try
137+
{
138+
return await task.ConfigureAwait(false);
139+
}
140+
catch (Exception e)
141+
{
142+
throw EnrichException(e, options);
143+
}
144+
}
145+
146+
/// <summary>
147+
/// Handles the response asynchronously, adding the request ID to any exceptions thrown.
148+
/// </summary>
149+
private static async Task WrapExceptionAsync(Task task, CallOptions options)
150+
{
151+
try
152+
{
153+
await task.ConfigureAwait(false);
154+
}
155+
catch (Exception e)
156+
{
157+
throw EnrichException(e, options);
158+
}
159+
}
160+
161+
/// <summary>
162+
/// Handles the response, adding the request ID to any exceptions thrown.
163+
/// </summary>
164+
private static T WrapException<T>(Func<T> action, CallOptions options)
165+
{
166+
try
167+
{
168+
return action();
169+
}
170+
catch (Exception e)
171+
{
172+
throw EnrichException(e, options);
173+
}
174+
}
175+
176+
/// <summary>
177+
/// Enriches an exception with the Spanner Request ID from the provided <see cref="CallOptions"/>.
178+
/// </summary>
179+
/// <param name="e">The exception to enrich.</param>
180+
/// <param name="options">The <see cref="CallOptions"/> containing the request ID.</param>
181+
/// <returns>The enriched exception (the same instance passed in).</returns>
182+
private static Exception EnrichException(Exception e, CallOptions options)
183+
{
184+
var requestId = GetRequestIdFromOptions(options);
185+
if (requestId != null)
186+
{
187+
e.Data[SpannerClientImpl.RequestIdHeaderKey] = requestId;
188+
}
189+
return e;
190+
}
191+
192+
193+
/// <summary>
194+
/// A stream reader that wraps the original stream reader and adds the request ID to any exceptions thrown.
195+
/// </summary>
196+
private class SpannerRequestIdStreamReader<T> : IAsyncStreamReader<T>
197+
{
198+
private readonly IAsyncStreamReader<T> _originalReader;
199+
private readonly CallOptions _options;
200+
201+
public SpannerRequestIdStreamReader(IAsyncStreamReader<T> originalReader, CallOptions options)
202+
{
203+
_originalReader = originalReader;
204+
_options = options;
205+
}
206+
207+
public T Current => _originalReader.Current;
208+
209+
public Task<bool> MoveNext(CancellationToken cancellationToken) =>
210+
WrapExceptionAsync(_originalReader.MoveNext(cancellationToken), _options);
211+
}
212+
213+
/// <summary>
214+
/// A stream writer that wraps the original stream writer and adds the request ID to any exceptions thrown.
215+
/// </summary>
216+
private class SpannerRequestIdStreamWriter<T> : IClientStreamWriter<T>
217+
{
218+
private readonly IClientStreamWriter<T> _originalWriter;
219+
private readonly CallOptions _options;
220+
221+
public SpannerRequestIdStreamWriter(IClientStreamWriter<T> originalWriter, CallOptions options)
222+
{
223+
_originalWriter = originalWriter;
224+
_options = options;
225+
}
226+
227+
public WriteOptions WriteOptions { get => _originalWriter.WriteOptions; set => _originalWriter.WriteOptions = value; }
228+
229+
public Task CompleteAsync() => WrapExceptionAsync(_originalWriter.CompleteAsync(), _options);
230+
231+
public Task WriteAsync(T message) => WrapExceptionAsync(_originalWriter.WriteAsync(message), _options);
232+
}
233+
}
234+
}

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SpannerClientBuilderPartial.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using Google.Api.Gax.Grpc;
1717
using Google.Api.Gax.Grpc.Gcp;
1818
using Grpc.Core;
19+
using Grpc.Core.Interceptors;
1920
using System;
2021
using System.Threading;
2122
using System.Threading.Tasks;
@@ -159,8 +160,9 @@ partial void InterceptBuildAsync(CancellationToken cancellationToken, ref Task<S
159160
task = MaybeCreateEmulatorClientBuilder()?.BuildAsync(cancellationToken);
160161

161162
/// <inheritdoc/>
162-
protected override CallInvoker CreateCallInvoker() =>
163-
AffinityChannelPoolConfiguration is null
163+
protected override CallInvoker CreateCallInvoker()
164+
{
165+
var invoker = AffinityChannelPoolConfiguration is null
164166
? base.CreateCallInvoker()
165167
: new GcpCallInvoker(
166168
ServiceMetadata,
@@ -169,10 +171,13 @@ AffinityChannelPoolConfiguration is null
169171
GetChannelOptions(),
170172
GetApiConfig(),
171173
EffectiveGrpcAdapter);
174+
return invoker.Intercept(RequestIdOnExceptionInterceptor.s_instance);
175+
}
172176

173177
/// <inheritdoc/>
174-
protected override async Task<CallInvoker> CreateCallInvokerAsync(CancellationToken cancellationToken) =>
175-
AffinityChannelPoolConfiguration is null
178+
protected override async Task<CallInvoker> CreateCallInvokerAsync(CancellationToken cancellationToken)
179+
{
180+
var invoker = AffinityChannelPoolConfiguration is null
176181
? await base.CreateCallInvokerAsync(cancellationToken).ConfigureAwait(false)
177182
: new GcpCallInvoker(
178183
ServiceMetadata,
@@ -181,6 +186,8 @@ await GetChannelCredentialsAsync(cancellationToken).ConfigureAwait(false),
181186
GetChannelOptions(),
182187
GetApiConfig(),
183188
EffectiveGrpcAdapter);
189+
return invoker.Intercept(RequestIdOnExceptionInterceptor.s_instance);
190+
}
184191

185192
private ApiConfig GetApiConfig() => new ApiConfig
186193
{

0 commit comments

Comments
 (0)