Skip to content

Commit cf21a25

Browse files
feat: Add Spanner request ID in exceptions
1 parent 22ce108 commit cf21a25

File tree

4 files changed

+443
-4
lines changed

4 files changed

+443
-4
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using System;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
20+
namespace Google.Cloud.Spanner.V1.Tests.Common;
21+
22+
/// <summary>
23+
/// CallInvoker that returns a valid call object which fails with an RpcException when awaited or iterated.
24+
/// Simulates asynchronous failures like server-side errors or network issues.
25+
/// Used by SpannerClient tests to verify error handling behavior.
26+
/// </summary>
27+
public class AsyncFailureCallInvoker : FailureCallInvokerBase
28+
{
29+
/// <summary>
30+
/// Creates a new instance of <see cref="AsyncFailureCallInvoker"/>.
31+
/// </summary>
32+
/// <param name="numberOfFailuresToSimulate">The number of times to simulate a failure before succeeding.</param>
33+
/// <param name="statusCodeToThrow">The gRPC status code to use in the thrown exception.</param>
34+
/// <param name="exceptionMessage">The message to use in the thrown exception.</param>
35+
public AsyncFailureCallInvoker(int numberOfFailuresToSimulate = int.MaxValue, StatusCode statusCodeToThrow = StatusCode.Internal, string exceptionMessage = "Test exception")
36+
: base(numberOfFailuresToSimulate, statusCodeToThrow, exceptionMessage)
37+
{
38+
}
39+
40+
/// <inheritdoc/>
41+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
42+
{
43+
RecordMetadata(options.Headers);
44+
if (ShouldFail())
45+
{
46+
var exceptionToThrow = CreateRpcException();
47+
var failedResponse = Task.FromException<TResponse>(exceptionToThrow);
48+
var failedResponseHeadersAsync = Task.FromException<Metadata>(exceptionToThrow);
49+
return new AsyncUnaryCall<TResponse>(failedResponse, failedResponseHeadersAsync, () => Status.DefaultSuccess, () => new Metadata(), () => { });
50+
}
51+
52+
var response = (TResponse)Activator.CreateInstance(typeof(TResponse));
53+
return new AsyncUnaryCall<TResponse>(Task.FromResult(response), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
54+
}
55+
56+
/// <inheritdoc/>
57+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
58+
{
59+
RecordMetadata(options.Headers);
60+
if (ShouldFail())
61+
{
62+
var exceptionToThrow = CreateRpcException();
63+
var failedResponseStream = new ThrowingStreamReader<TResponse>(exceptionToThrow);
64+
var failedResponseHeadersAsync = Task.FromException<Metadata>(exceptionToThrow);
65+
return new AsyncServerStreamingCall<TResponse>(failedResponseStream, failedResponseHeadersAsync, () => Status.DefaultSuccess, () => new Metadata(), () => { });
66+
}
67+
68+
return new AsyncServerStreamingCall<TResponse>(new EmptyAsyncStreamReader<TResponse>(), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
69+
}
70+
71+
/// <inheritdoc/>
72+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
73+
throw new NotImplementedException(); // Not an async method.
74+
75+
76+
/// <summary>
77+
/// An async stream reader that throws an exception when moved to the next element.
78+
/// </summary>
79+
/// <typeparam name="T">The type of element in the stream.</typeparam>
80+
private class ThrowingStreamReader<T> : IAsyncStreamReader<T>
81+
{
82+
private readonly Exception _exception;
83+
84+
/// <summary>
85+
/// Creates a new instance of <see cref="ThrowingStreamReader{T}"/>.
86+
/// </summary>
87+
/// <param name="exception">The exception to throw.</param>
88+
public ThrowingStreamReader(Exception exception) => _exception = exception;
89+
90+
/// <inheritdoc/>
91+
public T Current => default;
92+
93+
/// <inheritdoc/>
94+
public Task<bool> MoveNext(CancellationToken cancellationToken) => Task.FromException<bool>(_exception);
95+
}
96+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.Spanner.V1.Tests.Common;
16+
using Grpc.Core;
17+
using System;
18+
using System.Threading.Tasks;
19+
using Xunit;
20+
21+
namespace Google.Cloud.Spanner.V1.Tests;
22+
23+
public class RequestIdCallInterceptorTests
24+
{
25+
private const string SampleDatabaseName = "projects/proj/instances/inst/databases/db";
26+
private const string SampleSessionName = "projects/proj/instances/inst/databases/db/sessions/sess";
27+
28+
[Theory]
29+
[MemberData(nameof(SyncActions))]
30+
public void SyncCall_SyncFailure_ExceptionContainsRequestId(Action<SpannerClient> action)
31+
{
32+
var invoker = new SyncFailureCallInvoker();
33+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
34+
35+
var exception = Assert.Throws<RpcException>(() => action(client));
36+
37+
Assert.True(exception.Data.Contains("x-goog-spanner-request-id"));
38+
Assert.NotNull(exception.Data["x-goog-spanner-request-id"]);
39+
}
40+
41+
[Theory]
42+
[MemberData(nameof(AsyncActions))]
43+
public async Task AsyncCall_SyncFailure_ExceptionContainsRequestId(Func<SpannerClient, Task> action)
44+
{
45+
var invoker = new SyncFailureCallInvoker();
46+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
47+
48+
var exception = await Assert.ThrowsAsync<RpcException>(() => action(client));
49+
50+
Assert.True(exception.Data.Contains("x-goog-spanner-request-id"));
51+
Assert.NotNull(exception.Data["x-goog-spanner-request-id"]);
52+
}
53+
54+
55+
[Theory]
56+
[MemberData(nameof(AsyncActions))]
57+
public async Task AsyncCall_AsyncFailure_ExceptionContainsRequestId(Func<SpannerClient, Task> action)
58+
{
59+
var invoker = new AsyncFailureCallInvoker();
60+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
61+
62+
var exception = await Assert.ThrowsAsync<RpcException>(() => action(client));
63+
64+
Assert.True(exception.Data.Contains("x-goog-spanner-request-id"));
65+
Assert.NotNull(exception.Data["x-goog-spanner-request-id"]);
66+
}
67+
68+
public static TheoryData<Action<SpannerClient>> SyncActions { get; } = new TheoryData<Action<SpannerClient>>
69+
{
70+
client => client.ExecuteSql(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" }),
71+
client => client.GetSession(new GetSessionRequest { Name = SampleSessionName }),
72+
client => client.DeleteSession(new DeleteSessionRequest { Name = SampleSessionName }),
73+
client => client.ExecuteBatchDml(new ExecuteBatchDmlRequest { Session = SampleSessionName }),
74+
client => client.Read(new ReadRequest { Session = SampleSessionName }),
75+
client => client.BeginTransaction(new BeginTransactionRequest { Session = SampleSessionName }),
76+
client => client.Commit(new CommitRequest { Session = SampleSessionName }),
77+
client => client.Rollback(new RollbackRequest { Session = SampleSessionName }),
78+
client => client.PartitionQuery(new PartitionQueryRequest { Session = SampleSessionName }),
79+
client => client.PartitionRead(new PartitionReadRequest { Session = SampleSessionName })
80+
};
81+
82+
public static TheoryData<Func<SpannerClient, Task>> AsyncActions { get; } = new TheoryData<Func<SpannerClient, Task>>
83+
{
84+
async client =>
85+
{
86+
var stream = client.ExecuteStreamingSql(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" });
87+
await stream.GrpcCall.ResponseStream.MoveNext(default);
88+
},
89+
async client =>
90+
{
91+
var stream = client.ExecuteStreamingSql(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" });
92+
await stream.GrpcCall.ResponseHeadersAsync;
93+
},
94+
async client =>
95+
{
96+
var stream = client.ListSessionsAsync(new ListSessionsRequest { Database = SampleDatabaseName });
97+
await stream.ReadPageAsync(10);
98+
},
99+
async client =>
100+
{
101+
var stream = client.StreamingRead(new ReadRequest { Session = SampleSessionName });
102+
await stream.GrpcCall.ResponseStream.MoveNext(default);
103+
},
104+
async client =>
105+
{
106+
var stream = client.StreamingRead(new ReadRequest { Session = SampleSessionName });
107+
await stream.GrpcCall.ResponseHeadersAsync;
108+
},
109+
async client => await client.ExecuteSqlAsync(new ExecuteSqlRequest { Session = SampleSessionName, Sql = "SELECT 1" })
110+
};
111+
}

0 commit comments

Comments
 (0)