Skip to content

Commit be8e2c7

Browse files
committed
feat: Multiplex Sessions in V1
1 parent 76a53cb commit be8e2c7

File tree

4 files changed

+432
-0
lines changed

4 files changed

+432
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"):
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Api.Gax.Testing;
16+
using Google.Cloud.Spanner.Common.V1;
17+
using Google.Cloud.Spanner.V1.Internal.Logging;
18+
using System;
19+
using System.Threading.Tasks;
20+
using Xunit;
21+
using static Google.Cloud.Spanner.V1.MultiplexSession;
22+
23+
namespace Google.Cloud.Spanner.V1.Tests;
24+
public class MultiplexSessionTests
25+
{
26+
private const string testDatabase = "projects/testproject/instances/testinstance/databases/testdb";
27+
28+
[Fact]
29+
public async Task TestBuilderCreation()
30+
{
31+
MultiplexSession multiplexSession = await FetchTestMultiplexSession();
32+
33+
Assert.NotNull(multiplexSession);
34+
Assert.NotNull(multiplexSession.Session);
35+
Assert.NotNull(multiplexSession.Client);
36+
Assert.NotNull(multiplexSession.DatabaseName);
37+
Assert.NotNull(multiplexSession.DatabaseRole);
38+
}
39+
40+
[Fact]
41+
public async Task TestSessionHasExpired()
42+
{
43+
SpannerClient fakeClient = CreateFakeClient();
44+
MultiplexSession multiplexSession = await FetchTestMultiplexSession(fakeClient);
45+
46+
DateTime sessionCreateTime = multiplexSession.Session.CreateTime.ToDateTime();
47+
FakeClock clock = (FakeClock) fakeClient.Settings.Clock;
48+
49+
clock.AdvanceTo(sessionCreateTime + TimeSpan.FromDays(3));
50+
Assert.True(multiplexSession.SessionHasExpired(2.0));
51+
52+
clock.AdvanceTo(sessionCreateTime + TimeSpan.FromDays(7));
53+
Assert.True(multiplexSession.SessionHasExpired());
54+
}
55+
56+
private SpannerClient CreateFakeClient()
57+
{
58+
SpannerClient fakeClient = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
59+
fakeClient.SetupMultiplexSessionCreationAsync();
60+
61+
return fakeClient;
62+
}
63+
64+
private async Task<MultiplexSession> FetchTestMultiplexSession(SpannerClient client = null)
65+
{
66+
if (!DatabaseName.TryParse(testDatabase, out var databaseName))
67+
{
68+
throw new Exception($"Unable to parse string to DatabaseName {testDatabase}");
69+
}
70+
71+
if (client == null)
72+
{
73+
client = CreateFakeClient();
74+
}
75+
76+
MultiplexSessionBuilder builder = new MultiplexSessionBuilder(databaseName, client)
77+
{
78+
DatabaseRole = "testRole",
79+
};
80+
81+
MultiplexSession multiplexSession = await builder.BuildAsync();
82+
83+
return multiplexSession;
84+
}
85+
}

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SpannerClientHelpers.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@ internal static SpannerClient CreateMockClient(Logger logger)
6565
return mock;
6666
}
6767

68+
internal static SpannerClient SetupMultiplexSessionCreationAsync(this SpannerClient spannerClientMock)
69+
{
70+
spannerClientMock.Configure().CreateSessionAsync(Arg.Is<CreateSessionRequest>(x => x != null), Arg.Any<CallSettings>())
71+
.Returns(args =>
72+
{
73+
var request = (CreateSessionRequest) args[0];
74+
Session response = new Session();
75+
response.CreateTime = spannerClientMock.GetNowTimestamp();
76+
response.CreatorRole = request.Session.CreatorRole;
77+
response.Multiplexed = request.Session.Multiplexed;
78+
response.Name = Guid.NewGuid().ToString();
79+
response.SessionName = new SessionName(ProjectId, Instance, Database, response.Name);
80+
81+
return Task.FromResult(response);
82+
});
83+
return spannerClientMock;
84+
}
85+
6886
internal static SpannerClient SetupBatchCreateSessionsAsync(this SpannerClient spannerClientMock)
6987
{
7088
spannerClientMock.Configure()
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"):
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Api.Gax;
16+
using Google.Api.Gax.Grpc;
17+
using Google.Cloud.Spanner.Common.V1;
18+
using Google.Cloud.Spanner.V1.Internal;
19+
using Google.Cloud.Spanner.V1.Internal.Logging;
20+
using System;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
24+
namespace Google.Cloud.Spanner.V1;
25+
26+
/// <summary>
27+
/// TODO: Add summary for mux sessions
28+
/// </summary>
29+
public class MultiplexSession
30+
{
31+
private readonly SemaphoreSlim _sessionCreateSemaphore;
32+
private readonly Logger _logger;
33+
private readonly CreateSessionRequest _createSessionRequestTemplate;
34+
35+
internal Session _session;
36+
internal int _markedForRefresh;
37+
38+
private const double ForceRefreshIntervalInDays = 28.0;
39+
private const double SoftRefreshIntervalInDays = 7.0;
40+
41+
private readonly IClock _clock;
42+
43+
/// <summary>
44+
/// The client used for all operations in this multiplex session.
45+
/// </summary>
46+
internal SpannerClient Client { get; }
47+
48+
internal Task<Session> CreateSessionTask { get; }
49+
50+
/// <summary>
51+
/// The name of the session. This is never null.
52+
/// </summary>
53+
public SessionName SessionName => Session.SessionName;
54+
55+
/// <summary>
56+
/// The Spanner session resource associated to this pooled session.
57+
/// Won't be null.
58+
/// </summary>
59+
internal Session Session
60+
{
61+
get { return _session; }
62+
private set { _session = value; }
63+
}
64+
65+
private bool MarkedForRefresh => Interlocked.CompareExchange(ref _markedForRefresh, 0, 0) == 1;
66+
67+
/// <summary>
68+
/// The options governing this multiplex session.
69+
/// </summary>
70+
public MultiplexSessionOptions Options { get; }
71+
72+
/// <summary>
73+
/// The database for this multiplex session
74+
/// </summary>
75+
public DatabaseName DatabaseName { get; }
76+
77+
/// <summary>
78+
/// The database role of the multiplex session
79+
/// </summary>
80+
public string DatabaseRole { get; }
81+
82+
/// <summary>
83+
///
84+
/// </summary>
85+
/// <param name="client"></param>
86+
/// <param name="dbName"></param>
87+
/// <param name="dbRole"></param>
88+
/// <param name="options"></param>
89+
public MultiplexSession(SpannerClient client, DatabaseName dbName, string dbRole, MultiplexSessionOptions options)
90+
{
91+
Client = GaxPreconditions.CheckNotNull(client, nameof(client));
92+
Options = options ?? new MultiplexSessionOptions();
93+
_logger = client.Settings.Logger; // Just to avoid fetching it all the time
94+
_sessionCreateSemaphore = new SemaphoreSlim(1);
95+
96+
DatabaseName = dbName;
97+
DatabaseRole = dbRole;
98+
99+
_clock = client.Settings.Clock ?? SystemClock.Instance;
100+
101+
_createSessionRequestTemplate = new CreateSessionRequest
102+
{
103+
DatabaseAsDatabaseName = DatabaseName,
104+
Session = new Session
105+
{
106+
CreatorRole = DatabaseRole ?? "",
107+
Multiplexed = true
108+
}
109+
};
110+
}
111+
112+
private async Task<Boolean> UpdateMuxSession(bool needsRefresh, double intervalInDays)
113+
{
114+
Session oldSession = _session;
115+
await CreateOrRefreshSessionsAsync(default).ConfigureAwait(false);
116+
117+
return _session != oldSession;
118+
}
119+
120+
internal void MaybeRefreshWithTimePeriodCheck()
121+
{
122+
if (SessionHasExpired(ForceRefreshIntervalInDays))
123+
{
124+
// If the session has expired on a client RPC request call, or has exceeded the 28 day Mux session refresh guidance
125+
// No request can proceed without us having a new Session to work with
126+
// Block on refreshing and getting a new session
127+
bool sessionIsRefreshed = UpdateMuxSession(true, ForceRefreshIntervalInDays).Result;
128+
129+
if (!sessionIsRefreshed)
130+
{
131+
throw new Exception("Unable to refresh multiplex session, and the old session has expired or is 28 days past refresh");
132+
}
133+
134+
_logger.Info($"Refreshed session since it was expired or past 28 days refresh period. New session {SessionName}");
135+
}
136+
137+
if (SessionHasExpired(SoftRefreshIntervalInDays))
138+
{
139+
// The Mux sessions have a lifespan of 28 days. We check if we need a session refresh in every request needing the session
140+
// If the timespan of a request needing a session and the session creation time is greater than 7 days, we proactively refresh the mux session
141+
// The request can safely use the older session since it is still valid while we do this refresh to fetch the new session.
142+
// Hence fire and forget the session refresh.
143+
_ = Task.Run(() => UpdateMuxSession(true, SoftRefreshIntervalInDays));
144+
}
145+
}
146+
147+
// internal for testing
148+
internal bool SessionHasExpired(double intervalInDays = SoftRefreshIntervalInDays)
149+
{
150+
DateTime currentTime = _clock.GetCurrentDateTimeUtc();
151+
DateTime? sessionCreateTime = _session?.CreateTime.ToDateTime(); // Inherent conversion into UTC DateTime
152+
153+
if (_session == null || _session.Expired || currentTime - sessionCreateTime >= TimeSpan.FromDays(intervalInDays))
154+
{
155+
return true;
156+
}
157+
158+
return false;
159+
}
160+
161+
private async Task CreateOrRefreshSessionsAsync(CancellationToken cancellationToken, bool needsRefresh = false)
162+
{
163+
try
164+
{
165+
var callSettings = Client.Settings.CreateSessionSettings
166+
.WithExpiration(Expiration.FromTimeout(Options.Timeout))
167+
.WithCancellationToken(cancellationToken);
168+
169+
Session multiplexSession;
170+
171+
bool acquiredSemaphore = false;
172+
try
173+
{
174+
if (needsRefresh && MarkedForRefresh && !SessionHasExpired(ForceRefreshIntervalInDays))
175+
{
176+
// If the refresh was triggered for the soft refresh timeline (7 days)
177+
// Some other thread has already marked this session to be refreshed
178+
// Any subsequent request threads can continue using the 'stale' session so let's not block
179+
// On the other hand if the refresh is for the forced refresh timeline (28 days)
180+
// Any subsequent request threads need to be blocked on the Session refresh
181+
return;
182+
}
183+
184+
await _sessionCreateSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
185+
acquiredSemaphore = true;
186+
187+
if (_session == null || (needsRefresh && SessionHasExpired()))
188+
{
189+
Interlocked.Exchange(ref _markedForRefresh, 1);
190+
multiplexSession = await Client.CreateSessionAsync(_createSessionRequestTemplate, callSettings).ConfigureAwait(false);
191+
192+
Interlocked.Exchange(ref _session, multiplexSession);
193+
Interlocked.Exchange(ref _markedForRefresh, 0);
194+
}
195+
}
196+
catch (OperationCanceledException)
197+
{
198+
_logger.Warn(() => $"Creation request cancelled before we could procure a Multiplex Session for DatabaseName: {DatabaseName}, DatabaseRole: {DatabaseRole}");
199+
throw;
200+
}
201+
finally
202+
{
203+
if (acquiredSemaphore)
204+
{
205+
_sessionCreateSemaphore.Release();
206+
}
207+
}
208+
}
209+
catch (Exception e)
210+
{
211+
_logger.Warn(() => $"Failed to create multiplex session for DatabaseName: {DatabaseName}, DatabaseRole: {DatabaseRole}", e);
212+
throw;
213+
}
214+
finally
215+
{
216+
// Nothing to do here since for legacy SessionPool we had to have some logging for when the pool went from healthy to unhealthy.
217+
// This could mean n number of things went wrong in the pool
218+
// But with the MUX session, we essentially only have 1 session we need to manage per client.
219+
// So there is no case of the mux session going back and forth in terms of its healthiness.
220+
}
221+
222+
}
223+
224+
/// <summary>
225+
///
226+
/// </summary>
227+
public sealed partial class MultiplexSessionBuilder
228+
{
229+
/// <summary>
230+
///
231+
/// </summary>
232+
public MultiplexSessionBuilder(DatabaseName databaseName, SpannerClient client)
233+
{
234+
DatabaseName = GaxPreconditions.CheckNotNull(databaseName, nameof(databaseName));
235+
Client = GaxPreconditions.CheckNotNull(client, nameof(client));
236+
}
237+
238+
/// <summary>
239+
/// The options governing this multiplex session.
240+
/// </summary>
241+
public MultiplexSessionOptions Options { get; set; }
242+
243+
/// <summary>
244+
/// The database for this multiplex session
245+
/// </summary>
246+
public DatabaseName DatabaseName { get; set; }
247+
248+
/// <summary>
249+
/// The database role of the multiplex session
250+
/// </summary>
251+
public string DatabaseRole { get; set; }
252+
253+
/// <summary>
254+
/// The client used for all operations in this multiplex session.
255+
/// </summary>
256+
public SpannerClient Client { get; set; }
257+
258+
/// <summary>
259+
///
260+
/// </summary>
261+
/// <param name="cancellationToken"></param>
262+
/// <returns></returns>
263+
public async Task<MultiplexSession> BuildAsync(CancellationToken cancellationToken = default)
264+
{
265+
MultiplexSession multiplexSession = new MultiplexSession(Client, DatabaseName, DatabaseRole, Options);
266+
267+
await multiplexSession.CreateOrRefreshSessionsAsync(cancellationToken).ConfigureAwait(false);
268+
269+
return multiplexSession;
270+
}
271+
}
272+
}

0 commit comments

Comments
 (0)