Skip to content

Commit 384ffcc

Browse files
committed
feat: Replace PooledSession with Transaction wrapper to support multiplex sessions
1 parent be8e2c7 commit 384ffcc

File tree

6 files changed

+1077
-4
lines changed

6 files changed

+1077
-4
lines changed

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/CloudSpannerFixtureBase.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
using Google.Cloud.ClientTesting;
1616
using Google.Cloud.Spanner.Common.V1;
17+
using Google.Cloud.Spanner.V1;
1718
using Google.Cloud.Spanner.V1.Internal.Logging;
1819
using System;
20+
using System.Threading.Tasks;
1921

2022
namespace Google.Cloud.Spanner.Data.CommonTesting;
2123

@@ -39,4 +41,6 @@ public abstract class CloudSpannerFixtureBase<TDatabase> : CloudProjectFixtureBa
3941
protected CloudSpannerFixtureBase(Func<string, TDatabase> databaseFactory) => Database = databaseFactory(ProjectId);
4042

4143
public SpannerConnection GetConnection(Logger logger = null, bool logCommitStats = false) => Database.GetConnection(logger, logCommitStats);
44+
45+
public async Task<MultiplexSession> GetMultiplexSession() => await Database.GetMultiplexSession().ConfigureAwait(false);
4246
}

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/SpannerTestDatabaseBase.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
using Google.Api.Gax.ResourceNames;
1717
using Google.Cloud.Spanner.Admin.Instance.V1;
1818
using Google.Cloud.Spanner.Common.V1;
19+
using Google.Cloud.Spanner.V1;
1920
using Google.Cloud.Spanner.V1.Internal.Logging;
2021
using Grpc.Core;
2122
using System;
23+
using System.Threading.Tasks;
2224

2325
namespace Google.Cloud.Spanner.Data.CommonTesting;
2426

@@ -27,6 +29,8 @@ namespace Google.Cloud.Spanner.Data.CommonTesting;
2729
/// </summary>
2830
public abstract class SpannerTestDatabaseBase
2931
{
32+
private MultiplexSession _multiplexSession;
33+
3034
/// <summary>
3135
/// The Spanner Host name to connect to. It is read from the environment variable "TEST_SPANNER_HOST".
3236
/// </summary>
@@ -178,4 +182,26 @@ public SpannerConnection GetConnection(Logger logger, bool logCommitStats = fals
178182
SessionPoolManager = SessionPoolManager.Create(new V1.SessionPoolOptions(), logger),
179183
LogCommitStats = logCommitStats
180184
});
185+
186+
public async Task<MultiplexSession> GetMultiplexSession()
187+
{
188+
if (_multiplexSession != null)
189+
{
190+
return _multiplexSession;
191+
}
192+
193+
var options = new MultiplexSessionOptions();
194+
195+
_multiplexSession = await CreateMultiplexSession(options).ConfigureAwait(false);
196+
197+
return _multiplexSession;
198+
}
199+
200+
private async Task<MultiplexSession> CreateMultiplexSession(MultiplexSessionOptions options)
201+
{
202+
var poolManager = SessionPoolManager.Create(options);
203+
var muxSession = await poolManager.AcquireMultiplexSessionAsync(SpannerClientCreationOptions, DatabaseName, null);
204+
205+
return muxSession;
206+
}
181207
}
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"):
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Api.Gax.Grpc;
16+
using Google.Cloud.ClientTesting;
17+
using Google.Cloud.Spanner.Data.CommonTesting;
18+
using Google.Cloud.Spanner.V1;
19+
using Google.Protobuf.WellKnownTypes;
20+
using Grpc.Core;
21+
using System;
22+
using System.Collections.Generic;
23+
using System.Linq;
24+
using System.Threading.Tasks;
25+
using Xunit;
26+
27+
namespace Google.Cloud.Spanner.Data.IntegrationTests;
28+
[Collection(nameof(AllTypesTableFixture))]
29+
[CommonTestDiagnostics]
30+
public class MultiplexSessionTests
31+
{
32+
private readonly AllTypesTableFixture _fixture;
33+
34+
public MultiplexSessionTests(AllTypesTableFixture fixture) =>
35+
_fixture = fixture;
36+
37+
[Fact]
38+
[Trait(Constants.SupportedOnEmulator, Constants.No)]
39+
public async Task SessionCreationSucceeds()
40+
{
41+
MultiplexSession muxSession = await _fixture.GetMultiplexSession();
42+
43+
Assert.NotNull(muxSession.Session);
44+
Assert.NotNull(muxSession.SessionName);
45+
46+
// Use the underlying client to get the mux session from the server.
47+
SpannerClient client = muxSession.Client;
48+
var getSessionRequest = new GetSessionRequest
49+
{
50+
SessionName = muxSession.SessionName,
51+
};
52+
var matchingSession = client.GetSession(getSessionRequest);
53+
54+
Assert.Equal(muxSession.SessionName, matchingSession.SessionName);
55+
Assert.True(matchingSession.Multiplexed);
56+
}
57+
58+
[Fact]
59+
[Trait(Constants.SupportedOnEmulator, Constants.No)]
60+
public async Task RunReadWriteTransactionWithMultipleQueries()
61+
{
62+
MultiplexSession multiplexSession = await _fixture.GetMultiplexSession();
63+
Transaction transaction = new Transaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null);
64+
String uniqueRowId = IdGenerator.FromGuid();
65+
// Query 1: Read some data before modification.
66+
var result = await ExecuteSelectQuery(transaction, uniqueRowId);
67+
Assert.NotNull(result);
68+
Assert.NotNull(transaction.PrecommitToken);
69+
Assert.NotNull(transaction.TransactionId);
70+
71+
int preCommitTokenSeqNumber = transaction.PrecommitToken.SeqNum;
72+
73+
// Query 2: Insert a new record.
74+
result = await ExecuteInsertInt64Value(transaction, uniqueRowId, 10);
75+
Assert.NotNull(result);
76+
Assert.NotNull(transaction.PrecommitToken);
77+
Assert.NotNull(transaction.Id);
78+
Assert.True(transaction.PrecommitToken.SeqNum >= preCommitTokenSeqNumber);
79+
80+
preCommitTokenSeqNumber = transaction.PrecommitToken.SeqNum;
81+
82+
// Commit the transaction
83+
var commitResponse = await transaction.CommitAsync(new CommitRequest(), null);
84+
Assert.NotNull(commitResponse);
85+
Assert.NotNull(transaction.Id);
86+
}
87+
88+
[Fact]
89+
[Trait(Constants.SupportedOnEmulator, Constants.No)]
90+
public async Task TestMultipleTransactionWritesOnSameSession()
91+
{
92+
MultiplexSession multiplexSession = await _fixture.GetMultiplexSession();
93+
const int concurrentThreads = 5;
94+
String uniqueRowId = IdGenerator.FromGuid();
95+
96+
try
97+
{
98+
var transactions = new Transaction[concurrentThreads];
99+
for (var i = 0; i < concurrentThreads; i++)
100+
{
101+
transactions[i] = new Transaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null);
102+
}
103+
104+
for (var i = 0; i < concurrentThreads; i++)
105+
{
106+
await IncrementByOneAsync(transactions[i], uniqueRowId);
107+
}
108+
109+
Transaction fetchResultsTransaction = new Transaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null);
110+
var fetched = await ExecuteSelectQuery(fetchResultsTransaction, uniqueRowId);
111+
112+
var row = fetched.Rows.First();
113+
var actual = long.Parse(row.Values[1].StringValue);
114+
Assert.Equal(5, actual);
115+
}
116+
catch (Exception ex)
117+
{
118+
Console.WriteLine(ex.ToString());
119+
Console.WriteLine(ex.InnerException?.ToString());
120+
throw;
121+
}
122+
}
123+
124+
private async Task IncrementByOneAsync(Transaction transaction, string uniqueRowId, bool orphanTransaction = false)
125+
{
126+
var retrySettings = RetrySettings.FromExponentialBackoff(
127+
maxAttempts: int.MaxValue,
128+
initialBackoff: TimeSpan.FromMilliseconds(250),
129+
maxBackoff: TimeSpan.FromSeconds(5),
130+
backoffMultiplier: 1.5,
131+
retryFilter: ignored => false,
132+
RetrySettings.RandomJitter);
133+
TimeSpan nextDelay = TimeSpan.Zero;
134+
SpannerException spannerException;
135+
DateTime deadline = DateTime.UtcNow.AddSeconds(30);
136+
137+
while (true)
138+
{
139+
spannerException = null;
140+
try
141+
{
142+
// We use manually created transactions here so the tests run on .NET Core.
143+
long current;
144+
145+
var fetched = await ExecuteSelectQuery(transaction, uniqueRowId);
146+
if (fetched?.Rows.Any() == true)
147+
{
148+
var row = fetched.Rows.First();
149+
current = long.Parse(row.Values[1].StringValue);
150+
}
151+
else
152+
{
153+
current = 0L;
154+
}
155+
156+
157+
ResultSet result;
158+
if (current == 0)
159+
{
160+
result = await ExecuteInsertInt64Value(transaction, uniqueRowId, current + 1);
161+
}
162+
else
163+
{
164+
result = await ExecuteUpdateInt64Value(transaction, uniqueRowId, current + 1);
165+
}
166+
167+
await transaction.CommitAsync(new CommitRequest(), null);
168+
return;
169+
}
170+
// Keep trying for up to 30 seconds
171+
catch (SpannerException ex) when (ex.IsRetryable && DateTime.UtcNow < deadline)
172+
{
173+
nextDelay = retrySettings.NextBackoff(nextDelay);
174+
await Task.Delay(retrySettings.BackoffJitter.GetDelay(nextDelay));
175+
spannerException = ex;
176+
}
177+
}
178+
}
179+
180+
private async Task<ResultSet> ExecuteSelectQuery(Transaction transaction, String uniqueRowId)
181+
{
182+
var selectParams = new Dictionary<string, SpannerParameter>
183+
{
184+
{ "id", new SpannerParameter { Value = Value.ForString(uniqueRowId) } }
185+
};
186+
var selectSql = $"SELECT K, Int64Value FROM {_fixture.TableName} WHERE K = @id";
187+
var request = new ExecuteSqlRequest
188+
{
189+
Sql = selectSql,
190+
Params = CreateStructFromParameters(selectParams),
191+
};
192+
193+
return await transaction.ExecuteSqlAsync(request, null);
194+
}
195+
196+
private async Task<ResultSet> ExecuteInsertInt64Value(Transaction transaction, String uniqueRowId, long insertValue)
197+
{
198+
var insertSql = $"INSERT {_fixture.TableName} (K, Int64Value) VALUES (@k, @int64Value)";
199+
var insertParams = new Dictionary<string, SpannerParameter>
200+
{
201+
{ "k", new SpannerParameter { Value = Value.ForString(uniqueRowId) } },
202+
{ "int64Value", new SpannerParameter("int64Value", SpannerDbType.Int64, insertValue) }
203+
};
204+
205+
var request = new ExecuteSqlRequest
206+
{
207+
Sql = insertSql,
208+
Params = CreateStructFromParameters(insertParams),
209+
};
210+
return await transaction.ExecuteSqlAsync(request, null);
211+
}
212+
213+
private async Task<ResultSet> ExecuteUpdateInt64Value(Transaction transaction, String uniqueRowId, long updateValue)
214+
{
215+
var updateSql = $"UPDATE {_fixture.TableName} SET Int64Value = @newIntValue WHERE K = @id";
216+
var updateParams = new Dictionary<string, SpannerParameter>
217+
{
218+
{ "newIntValue", new SpannerParameter("newIntValue", SpannerDbType.Int64, updateValue) },
219+
{ "id", new SpannerParameter { Value = Value.ForString(uniqueRowId) } }
220+
};
221+
222+
var request = new ExecuteSqlRequest
223+
{
224+
Sql = updateSql,
225+
Params = CreateStructFromParameters(updateParams),
226+
};
227+
return await transaction.ExecuteSqlAsync(request, null);
228+
}
229+
230+
/// <summary>
231+
/// Converts a dictionary of Spanner parameters to a Google.Protobuf.WellKnownTypes.Struct.
232+
/// </summary>
233+
private Struct CreateStructFromParameters(Dictionary<string, SpannerParameter> parameters)
234+
{
235+
var pbStruct = new Struct();
236+
var options = SpannerConversionOptions.Default;
237+
if (parameters != null)
238+
{
239+
foreach (var param in parameters)
240+
{
241+
var parameter = param.Value;
242+
var protobufValue = parameter.GetConfiguredSpannerDbType(options).ToProtobufValue(parameter.GetValidatedValue());
243+
pbStruct.Fields.Add(param.Key, protobufValue);
244+
}
245+
}
246+
return pbStruct;
247+
}
248+
}

0 commit comments

Comments
 (0)