Skip to content

Commit f5a6fc9

Browse files
feat: Add Spanner request ID header
1 parent d82a010 commit f5a6fc9

File tree

3 files changed

+476
-4
lines changed

3 files changed

+476
-4
lines changed
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Google.Api.Gax.Grpc;
17+
using System;
18+
using System.Linq;
19+
using System.Threading.Tasks;
20+
using Xunit;
21+
using System.Collections.Generic;
22+
23+
namespace Google.Cloud.Spanner.V1.Tests;
24+
25+
public class SpannerRequestIdTests
26+
{
27+
private const string SampleDatabaseName = "projects/proj/instances/inst/databases/db";
28+
private const string SampleSessionName = "projects/proj/instances/inst/databases/db/sessions/sess";
29+
30+
// Tests for SpannerRequestId class itself
31+
[Fact]
32+
public void SpannerRequestId_Format()
33+
{
34+
var expectedClientId = 123;
35+
var expectedRequestSequence = 456;
36+
var requestId = new SpannerRequestId(expectedClientId, expectedRequestSequence);
37+
38+
// The expected format is 6 parts that break down as such:
39+
// {VersionId}.{s_processId}.{ClientId}.{ChannelId}.{RequestSequence}.{AttemptNum}
40+
var idString = requestId.IncrementAttempt();
41+
var parts = idString.Split('.');
42+
43+
Assert.Equal(6, parts.Length);
44+
var versionId = parts[0];
45+
var processIdString = parts[1];
46+
var clientId = parts[2];
47+
var channelId = parts[3];
48+
var requestSequence = parts[4];
49+
var attemptNum = parts[5];
50+
51+
Assert.Equal("1", versionId);
52+
Assert.Equal(expectedClientId.ToString(), clientId);
53+
Assert.Equal("1", channelId);
54+
Assert.Equal(expectedRequestSequence.ToString(), requestSequence);
55+
Assert.Equal("1", attemptNum);
56+
Assert.True(ulong.TryParse(processIdString, out _));
57+
}
58+
59+
[Fact]
60+
public void SpannerRequestId_IncrementAttemptNum()
61+
{
62+
var requestId = new SpannerRequestId(1, 1);
63+
for (int expectedAttemptNum = 1; expectedAttemptNum < 5; expectedAttemptNum++)
64+
{
65+
var parts = requestId.IncrementAttempt().Split('.');
66+
Assert.Equal(6, parts.Length);
67+
var actualAttemptNum = parts[5];
68+
Assert.Equal($"{expectedAttemptNum}", actualAttemptNum);
69+
}
70+
}
71+
72+
[Fact]
73+
public void SpannerRequestId_ProcessId_IsStaticByDefault()
74+
{
75+
var requestId1 = new SpannerRequestId(1, 1);
76+
var requestId2 = new SpannerRequestId(2, 2);
77+
78+
// The process ID is part of the ToString() output, so we can compare the generated strings
79+
// and extract the process ID.
80+
string requestId1String = requestId1.IncrementAttempt();
81+
string requestId2String = requestId2.IncrementAttempt();
82+
83+
string processId1 = requestId1String.Split('.')[1];
84+
string processId2 = requestId2String.Split('.')[1];
85+
86+
Assert.Equal(processId1, processId2);
87+
}
88+
89+
[Fact]
90+
public void SpannerRequestId_ProcessId_OverwritingBehavior()
91+
{
92+
// Capture the initial ProcessId. This could be the default generated ID,
93+
// or a value set by another test if XUnit's test execution order leads to state leakage.
94+
var initialProcessId = SpannerRequestId.ProcessId;
95+
96+
// Try to set the ProcessId to "first_override".
97+
// This will only succeed if ProcessId has not been accessed/initialized yet.
98+
SpannerRequestId.ProcessId = "first_override";
99+
100+
// After attempting to set, retrieve the current ProcessId.
101+
var currentProcessId = SpannerRequestId.ProcessId;
102+
103+
// If ProcessId was initially the default and we successfully set it to "first_override",
104+
// then currentProcessId should be "first_override".
105+
// Otherwise, it means ProcessId was already created (either by default or another test),
106+
// and our attempt to set it was ignored. In this case, currentProcessId should equal initialProcessId.
107+
string expectedProcessIdAfterFirstAttempt = initialProcessId == currentProcessId ? initialProcessId : "first_override";
108+
Assert.Equal(expectedProcessIdAfterFirstAttempt, currentProcessId);
109+
110+
// Now, attempt to set it a second time. This should always be ignored if already initialized.
111+
SpannerRequestId.ProcessId = "second_override";
112+
113+
// Verify that the ProcessId has not changed from its state after the first attempt.
114+
Assert.Equal(expectedProcessIdAfterFirstAttempt, SpannerRequestId.ProcessId);
115+
}
116+
117+
// Tests for header injection
118+
public static TheoryData<Action<SpannerClient>> SpannerClientActions { get; } = new TheoryData<Action<SpannerClient>>
119+
{
120+
client => client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName }),
121+
client => client.BatchCreateSessions(new BatchCreateSessionsRequest { Database = SampleDatabaseName }),
122+
client => client.GetSession(new GetSessionRequest { Name = SampleSessionName }),
123+
client => client.ListSessions(new ListSessionsRequest { Database = SampleDatabaseName }).AsRawResponses().First(),
124+
client => client.DeleteSession(new DeleteSessionRequest { Name = SampleSessionName }),
125+
client => client.ExecuteSql(new ExecuteSqlRequest { Session = SampleSessionName }),
126+
client => client.ExecuteBatchDml(new ExecuteBatchDmlRequest { Session = SampleSessionName }),
127+
client => client.Read(new ReadRequest { Session = SampleSessionName }),
128+
client => client.BeginTransaction(new BeginTransactionRequest { Session = SampleSessionName }),
129+
client => client.Commit(new CommitRequest { Session = SampleSessionName }),
130+
client => client.Rollback(new RollbackRequest { Session = SampleSessionName }),
131+
client => client.PartitionQuery(new PartitionQueryRequest { Session = SampleSessionName }),
132+
client => client.PartitionRead(new PartitionReadRequest { Session = SampleSessionName })
133+
};
134+
135+
[Theory]
136+
[MemberData(nameof(SpannerClientActions))]
137+
public void SetsHeaderOnRpcCalls(Action<SpannerClient> action)
138+
{
139+
var invoker = new RetryFakeCallInvoker();
140+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
141+
action(client);
142+
Metadata.Entry entry = Assert.Single(invoker.CapturedMetadata[0], e => e.Key == SpannerRequestId.HeaderKey);
143+
Assert.NotNull(entry.Value);
144+
}
145+
146+
[Fact]
147+
public void IncrementsRequestIdOnRetry()
148+
{
149+
var invoker = new RetryFakeCallInvoker(failCount: 1);
150+
var settings = new SpannerSettings
151+
{
152+
// Configure the CreateSession call to retry on Unavailable errors.
153+
CallSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
154+
maxAttempts: 3,
155+
initialBackoff: TimeSpan.FromMilliseconds(1),
156+
maxBackoff: TimeSpan.FromMilliseconds(1),
157+
backoffMultiplier: 1.0,
158+
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
159+
};
160+
var client = new SpannerClientBuilder { CallInvoker = invoker, Settings = settings }.Build();
161+
162+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
163+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
164+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
165+
166+
// Assert that the invoker was called four times for the three client calls.
167+
// The first call should have failed the first time and succeeded on retry.
168+
Assert.Equal(4, invoker.CapturedMetadata.Count);
169+
170+
var requestIds = invoker.CapturedMetadata
171+
.Select(m => m.Single(e => e.Key == SpannerRequestId.HeaderKey).Value)
172+
.Select(id => new SpannerRequestIdParts(id))
173+
.ToList();
174+
175+
Assert.Equal((1, 1), (requestIds[0].RequestSequence, requestIds[0].AttemptNum));
176+
Assert.Equal((1, 2), (requestIds[1].RequestSequence, requestIds[1].AttemptNum));
177+
Assert.Equal((2, 1), (requestIds[2].RequestSequence, requestIds[2].AttemptNum));
178+
Assert.Equal((3, 1), (requestIds[3].RequestSequence, requestIds[3].AttemptNum));
179+
}
180+
181+
/// <summary>
182+
/// A fake call invoker that fails the first time it's called with an "Unavailable" status,
183+
/// and succeeds on subsequent calls. It captures the metadata for every call.
184+
/// </summary>
185+
private class RetryFakeCallInvoker : CallInvoker
186+
{
187+
private int _callCount = 0;
188+
private readonly int _failCount;
189+
public List<Metadata> CapturedMetadata { get; } = new List<Metadata>();
190+
191+
public RetryFakeCallInvoker(int failCount = 0) => _failCount = failCount;
192+
193+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
194+
{
195+
CapturedMetadata.Add(options.Headers);
196+
_callCount++;
197+
198+
if (_callCount <= _failCount)
199+
{
200+
// Fail the first time with a retryable error.
201+
throw new RpcException(new Status(StatusCode.Unavailable, "Transient error"));
202+
}
203+
204+
// Succeed on the second attempt.
205+
return (TResponse)Activator.CreateInstance(typeof(TResponse));
206+
}
207+
208+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
209+
throw new NotImplementedException();
210+
211+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
212+
throw new NotImplementedException();
213+
214+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
215+
throw new NotImplementedException();
216+
217+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
218+
{
219+
CapturedMetadata.Add(options.Headers);
220+
_callCount++;
221+
222+
if (_callCount <= _failCount)
223+
{
224+
// Fail the first time with a retryable error.
225+
throw new RpcException(new Status(StatusCode.Unavailable, "Transient error"));
226+
}
227+
228+
// Succeed on the second attempt.
229+
var response = (TResponse)Activator.CreateInstance(typeof(TResponse));
230+
return new AsyncUnaryCall<TResponse>(Task.FromResult(response), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
231+
}
232+
}
233+
234+
private struct SpannerRequestIdParts
235+
{
236+
public int VersionId { get; }
237+
public ulong ProcessId { get; }
238+
public int ClientId { get; }
239+
public int ChannelId { get; }
240+
public int RequestSequence { get; }
241+
public int AttemptNum { get; }
242+
243+
public SpannerRequestIdParts(string requestId)
244+
{
245+
var parts = requestId.Split('.');
246+
Assert.Equal(6, parts.Length);
247+
248+
VersionId = int.Parse(parts[0]);
249+
ProcessId = ulong.Parse(parts[1]);
250+
ClientId = int.Parse(parts[2]);
251+
ChannelId = int.Parse(parts[3]);
252+
RequestSequence = int.Parse(parts[4]);
253+
AttemptNum = int.Parse(parts[5]);
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)