Skip to content

Commit 626fec3

Browse files
feat(Spanner.V1): Add Spanner request ID header
Adds the class to generate a unique ID for each RPC. This also modifies the to inject this ID into the header for all Spanner RPCs. Includes comprehensive tests for ID generation and header injection.
1 parent d82a010 commit 626fec3

File tree

3 files changed

+433
-4
lines changed

3 files changed

+433
-4
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Grpc.Core;
16+
using Google.Api.Gax.Grpc;
17+
using System;
18+
using System.Linq;
19+
using System.Threading.Tasks;
20+
using Xunit;
21+
using System.Collections.Generic;
22+
23+
namespace Google.Cloud.Spanner.V1.Tests;
24+
25+
public class SpannerRequestIdTests
26+
{
27+
private const string SampleDatabaseName = "projects/proj/instances/inst/databases/db";
28+
private const string SampleSessionName = "projects/proj/instances/inst/databases/db/sessions/sess";
29+
30+
// Tests for SpannerRequestId class itself
31+
[Fact]
32+
public void SpannerRequestId_Format()
33+
{
34+
var expectedClientId = 123;
35+
var expectedRequestSequence = 456;
36+
var requestId = new SpannerRequestId(expectedClientId, expectedRequestSequence);
37+
38+
// The expected format is 6 parts that break down as such:
39+
// {VersionId}.{s_processId}.{ClientId}.{ChannelId}.{RequestSequence}.{AttemptNum}
40+
var idString = requestId.IncrementAttempt();
41+
var parts = idString.Split('.');
42+
43+
Assert.Equal(6, parts.Length);
44+
var versionId = parts[0];
45+
var processIdString = parts[1];
46+
var clientId = parts[2];
47+
var channelId = parts[3];
48+
var requestSequence = parts[4];
49+
var attemptNum = parts[5];
50+
51+
Assert.Equal("1", versionId);
52+
Assert.Equal(expectedClientId.ToString(), clientId);
53+
Assert.Equal("0", channelId);
54+
Assert.Equal(expectedRequestSequence.ToString(), requestSequence);
55+
Assert.Equal("1", attemptNum);
56+
Assert.True(ulong.TryParse(processIdString, out _));
57+
}
58+
59+
[Fact]
60+
public void SpannerRequestId_IncrementAttemptNum()
61+
{
62+
var requestId = new SpannerRequestId(1, 1);
63+
for (int expectedAttemptNum = 1; expectedAttemptNum < 5; expectedAttemptNum++)
64+
{
65+
var parts = requestId.IncrementAttempt().Split('.');
66+
Assert.Equal(6, parts.Length);
67+
var actualAttemptNum = parts[5];
68+
Assert.Equal($"{expectedAttemptNum}", actualAttemptNum);
69+
}
70+
}
71+
72+
[Fact]
73+
public void SpannerRequestId_ProcessIdIsStatic()
74+
{
75+
var requestId1 = new SpannerRequestId(1, 1);
76+
var requestId2 = new SpannerRequestId(2, 2);
77+
78+
// The process ID is part of the ToString() output, so we can compare the generated strings
79+
// and extract the process ID.
80+
string requestId1String = requestId1.IncrementAttempt();
81+
string requestId2String = requestId2.IncrementAttempt();
82+
83+
string processId1 = requestId1String.Split('.')[1];
84+
string processId2 = requestId2String.Split('.')[1];
85+
86+
Assert.Equal(processId1, processId2);
87+
}
88+
89+
// Tests for header injection
90+
public static TheoryData<Action<SpannerClient>> SpannerClientActions { get; } = new TheoryData<Action<SpannerClient>>
91+
{
92+
client => client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName }),
93+
client => client.BatchCreateSessions(new BatchCreateSessionsRequest { Database = SampleDatabaseName }),
94+
client => client.GetSession(new GetSessionRequest { Name = SampleSessionName }),
95+
client => client.ListSessions(new ListSessionsRequest { Database = SampleDatabaseName }).AsRawResponses().First(),
96+
client => client.DeleteSession(new DeleteSessionRequest { Name = SampleSessionName }),
97+
client => client.ExecuteSql(new ExecuteSqlRequest { Session = SampleSessionName }),
98+
client => client.ExecuteBatchDml(new ExecuteBatchDmlRequest { Session = SampleSessionName }),
99+
client => client.Read(new ReadRequest { Session = SampleSessionName }),
100+
client => client.BeginTransaction(new BeginTransactionRequest { Session = SampleSessionName }),
101+
client => client.Commit(new CommitRequest { Session = SampleSessionName }),
102+
client => client.Rollback(new RollbackRequest { Session = SampleSessionName }),
103+
client => client.PartitionQuery(new PartitionQueryRequest { Session = SampleSessionName }),
104+
client => client.PartitionRead(new PartitionReadRequest { Session = SampleSessionName })
105+
};
106+
107+
[Theory]
108+
[MemberData(nameof(SpannerClientActions))]
109+
public void SetsHeaderOnRpcCalls(Action<SpannerClient> action)
110+
{
111+
var invoker = new RetryFakeCallInvoker();
112+
var client = new SpannerClientBuilder { CallInvoker = invoker }.Build();
113+
action(client);
114+
Metadata.Entry entry = Assert.Single(invoker.CapturedMetadata[0], e => e.Key == SpannerRequestId.HeaderKey);
115+
Assert.NotNull(entry.Value);
116+
}
117+
118+
[Fact]
119+
public void IncrementsRequestIdOnRetry()
120+
{
121+
var invoker = new RetryFakeCallInvoker(failCount: 1);
122+
var settings = new SpannerSettings
123+
{
124+
// Configure the CreateSession call to retry on Unavailable errors.
125+
CallSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
126+
maxAttempts: 3,
127+
initialBackoff: TimeSpan.FromMilliseconds(1),
128+
maxBackoff: TimeSpan.FromMilliseconds(1),
129+
backoffMultiplier: 1.0,
130+
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
131+
};
132+
var client = new SpannerClientBuilder { CallInvoker = invoker, Settings = settings }.Build();
133+
134+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
135+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
136+
client.CreateSession(new CreateSessionRequest { Database = SampleDatabaseName });
137+
138+
// Assert that the invoker was called four times for the three client calls.
139+
// The first call should have failed the first time and succeeded on retry.
140+
Assert.Equal(4, invoker.CapturedMetadata.Count);
141+
142+
var requestIds = invoker.CapturedMetadata
143+
.Select(m => m.Single(e => e.Key == SpannerRequestId.HeaderKey).Value)
144+
.Select(id => new SpannerRequestIdParts(id))
145+
.ToList();
146+
147+
Assert.Equal((1, 1), (requestIds[0].RequestSequence, requestIds[0].AttemptNum));
148+
Assert.Equal((1, 2), (requestIds[1].RequestSequence, requestIds[1].AttemptNum));
149+
Assert.Equal((2, 1), (requestIds[2].RequestSequence, requestIds[2].AttemptNum));
150+
Assert.Equal((3, 1), (requestIds[3].RequestSequence, requestIds[3].AttemptNum));
151+
}
152+
153+
/// <summary>
154+
/// A fake call invoker that fails the first time it's called with an "Unavailable" status,
155+
/// and succeeds on subsequent calls. It captures the metadata for every call.
156+
/// </summary>
157+
private class RetryFakeCallInvoker : CallInvoker
158+
{
159+
private int _callCount = 0;
160+
private readonly int _failCount;
161+
public List<Metadata> CapturedMetadata { get; } = new List<Metadata>();
162+
163+
public RetryFakeCallInvoker(int failCount = 0) => _failCount = failCount;
164+
165+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
166+
{
167+
CapturedMetadata.Add(options.Headers);
168+
_callCount++;
169+
170+
if (_callCount <= _failCount)
171+
{
172+
// Fail the first time with a retryable error.
173+
throw new RpcException(new Status(StatusCode.Unavailable, "Transient error"));
174+
}
175+
176+
// Succeed on the second attempt.
177+
return (TResponse)Activator.CreateInstance(typeof(TResponse));
178+
}
179+
180+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
181+
throw new NotImplementedException();
182+
183+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) =>
184+
throw new NotImplementedException();
185+
186+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) =>
187+
throw new NotImplementedException();
188+
189+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
190+
{
191+
CapturedMetadata.Add(options.Headers);
192+
_callCount++;
193+
194+
if (_callCount <= _failCount)
195+
{
196+
// Fail the first time with a retryable error.
197+
throw new RpcException(new Status(StatusCode.Unavailable, "Transient error"));
198+
}
199+
200+
// Succeed on the second attempt.
201+
var response = (TResponse)Activator.CreateInstance(typeof(TResponse));
202+
return new AsyncUnaryCall<TResponse>(Task.FromResult(response), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
203+
}
204+
}
205+
206+
private struct SpannerRequestIdParts
207+
{
208+
public int VersionId { get; }
209+
public ulong ProcessId { get; }
210+
public int ClientId { get; }
211+
public int ChannelId { get; }
212+
public int RequestSequence { get; }
213+
public int AttemptNum { get; }
214+
215+
public SpannerRequestIdParts(string requestId)
216+
{
217+
var parts = requestId.Split('.');
218+
Assert.Equal(6, parts.Length);
219+
220+
VersionId = int.Parse(parts[0]);
221+
ProcessId = ulong.Parse(parts[1]);
222+
ClientId = int.Parse(parts[2]);
223+
ChannelId = int.Parse(parts[3]);
224+
RequestSequence = int.Parse(parts[4]);
225+
AttemptNum = int.Parse(parts[5]);
226+
}
227+
}
228+
}

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SpannerClientPartial.cs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
using Google.Api.Gax.Grpc;
1616
using Google.Cloud.Spanner.Common.V1;
17+
using System.Threading;
1718

1819
namespace Google.Cloud.Spanner.V1
1920
{
@@ -48,6 +49,25 @@ internal void MaybeApplyRouteToLeaderHeader(ref CallSettings settings)
4849

4950
public partial class SpannerClientImpl
5051
{
52+
/// <summary>
53+
/// A counter incremented on the instation of each new Spanner client
54+
/// </summary>
55+
private static int s_clientIdCounter;
56+
57+
/// <summary>
58+
/// Creates the request ID associated with each RPC.
59+
/// </summary>
60+
private SpannerRequestId.Generator _requestIdGenerator;
61+
62+
/// <summary>
63+
/// The process ID, assigned to each outgoing request in the x-goog-spanner-request-id header.
64+
/// </summary>
65+
public string ProcessId
66+
{
67+
get => SpannerRequestId.s_processId;
68+
set => SpannerRequestId.s_processId = value;
69+
}
70+
5171
/// <summary>
5272
/// The name of the header used for efficiently routing requests.
5373
/// </summary>
@@ -64,78 +84,100 @@ public partial class SpannerClientImpl
6484
partial void OnConstruction(Spanner.SpannerClient grpcClient, SpannerSettings effectiveSettings, ClientHelper clientHelper)
6585
{
6686
Settings = effectiveSettings;
87+
_requestIdGenerator = new SpannerRequestId.Generator(Interlocked.Increment(ref s_clientIdCounter));
6788
}
6889

6990
partial void Modify_CreateSessionRequest(ref CreateSessionRequest request, ref CallSettings settings)
7091
{
7192
ApplyResourcePrefixHeaderFromDatabase(ref settings, request.Database);
7293
MaybeApplyRouteToLeaderHeader(ref settings);
94+
ApplyRequestIdToHeader(ref settings);
7395
}
7496

7597
partial void Modify_BatchCreateSessionsRequest(ref BatchCreateSessionsRequest request, ref CallSettings settings)
7698
{
7799
ApplyResourcePrefixHeaderFromDatabase(ref settings, request.Database);
78100
MaybeApplyRouteToLeaderHeader(ref settings);
101+
ApplyRequestIdToHeader(ref settings);
79102
}
80103

81104
partial void Modify_GetSessionRequest(ref GetSessionRequest request, ref CallSettings settings)
82105
{
83106
ApplyResourcePrefixHeaderFromSession(ref settings, request.Name);
84107
MaybeApplyRouteToLeaderHeader(ref settings);
108+
ApplyRequestIdToHeader(ref settings);
85109
}
86110

87-
partial void Modify_ListSessionsRequest(ref ListSessionsRequest request, ref CallSettings settings) =>
111+
partial void Modify_ListSessionsRequest(ref ListSessionsRequest request, ref CallSettings settings)
112+
{
88113
// This operation is never routed to leader so we don't call MaybeApplyRouteToLeaderHeader.
89114
ApplyResourcePrefixHeaderFromDatabase(ref settings, request.Database);
115+
ApplyRequestIdToHeader(ref settings);
116+
}
90117

91-
partial void Modify_DeleteSessionRequest(ref DeleteSessionRequest request, ref CallSettings settings) =>
118+
partial void Modify_DeleteSessionRequest(ref DeleteSessionRequest request, ref CallSettings settings)
119+
{
92120
// This operation is never routed to leader so we don't call MaybeApplyRouteToLeaderHeader.
93121
ApplyResourcePrefixHeaderFromSession(ref settings, request.Name);
122+
ApplyRequestIdToHeader(ref settings);
123+
}
94124

95-
partial void Modify_ExecuteSqlRequest(ref ExecuteSqlRequest request, ref CallSettings settings) =>
125+
partial void Modify_ExecuteSqlRequest(ref ExecuteSqlRequest request, ref CallSettings settings)
126+
{
96127
// This operations is routed to leader only if the transaction it uses is of a certain type.
97128
// We don't have that information here so the leader routing header needs to be applied elsewhere.
98129
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
130+
ApplyRequestIdToHeader(ref settings);
131+
}
99132

100133
partial void Modify_ExecuteBatchDmlRequest(ref ExecuteBatchDmlRequest request, ref CallSettings settings)
101134
{
102135
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
103136
MaybeApplyRouteToLeaderHeader(ref settings);
137+
ApplyRequestIdToHeader(ref settings);
104138
}
105139

106-
partial void Modify_ReadRequest(ref ReadRequest request, ref CallSettings settings) =>
140+
partial void Modify_ReadRequest(ref ReadRequest request, ref CallSettings settings)
141+
{
107142
// This operations is routed to leader only if the transaction it uses is of a certain type.
108143
// We don't have that information here so the leader routing header needs to be applied elsewhere.
109144
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
145+
ApplyRequestIdToHeader(ref settings);
146+
}
110147

111148
partial void Modify_BeginTransactionRequest(ref BeginTransactionRequest request, ref CallSettings settings)
112149
{
113150
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
114151
MaybeApplyRouteToLeaderHeader(ref settings, request.Options?.ModeCase ?? TransactionOptions.ModeOneofCase.None);
152+
ApplyRequestIdToHeader(ref settings);
115153
}
116154

117155
partial void Modify_CommitRequest(ref CommitRequest request, ref CallSettings settings)
118156
{
119157
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
120158
MaybeApplyRouteToLeaderHeader(ref settings);
159+
ApplyRequestIdToHeader(ref settings);
121160
}
122161

123162
partial void Modify_RollbackRequest(ref RollbackRequest request, ref CallSettings settings)
124163
{
125164
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
126165
MaybeApplyRouteToLeaderHeader(ref settings);
166+
ApplyRequestIdToHeader(ref settings);
127167
}
128168

129169
partial void Modify_PartitionQueryRequest(ref PartitionQueryRequest request, ref CallSettings settings)
130170
{
131171
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
132172
MaybeApplyRouteToLeaderHeader(ref settings);
173+
ApplyRequestIdToHeader(ref settings);
133174
}
134175

135176
partial void Modify_PartitionReadRequest(ref PartitionReadRequest request, ref CallSettings settings)
136177
{
137178
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
138179
MaybeApplyRouteToLeaderHeader(ref settings);
180+
ApplyRequestIdToHeader(ref settings);
139181
}
140182

141183
internal static void ApplyResourcePrefixHeaderFromDatabase(ref CallSettings settings, string resource)
@@ -167,5 +209,15 @@ internal static void ApplyResourcePrefixHeaderFromSession(ref CallSettings setti
167209
settings = settings.WithHeader(ResourcePrefixHeader, database.ToString());
168210
}
169211
}
212+
213+
internal void ApplyRequestIdToHeader(ref CallSettings settings)
214+
{
215+
var requestId = _requestIdGenerator.Generate();
216+
217+
// The header mutation increments the attempt each time the header is populated which happens right before a
218+
// call. This allows us to increment the attempt num and assign a unique request id in the case of retries
219+
var newSettings = CallSettings.FromHeaderMutation(metadata => metadata.Add(SpannerRequestId.HeaderKey, requestId.IncrementAttempt()));
220+
settings = settings.MergedWith(newSettings);
221+
}
170222
}
171223
}

0 commit comments

Comments
 (0)