diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/CloudSpannerFixtureBase.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/CloudSpannerFixtureBase.cs index fd5b85859684..1367f7ff9cfb 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/CloudSpannerFixtureBase.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/CloudSpannerFixtureBase.cs @@ -14,8 +14,10 @@ using Google.Cloud.ClientTesting; using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1; using Google.Cloud.Spanner.V1.Internal.Logging; using System; +using System.Threading.Tasks; namespace Google.Cloud.Spanner.Data.CommonTesting; @@ -39,4 +41,6 @@ public abstract class CloudSpannerFixtureBase : CloudProjectFixtureBa protected CloudSpannerFixtureBase(Func databaseFactory) => Database = databaseFactory(ProjectId); public SpannerConnection GetConnection(Logger logger = null, bool logCommitStats = false) => Database.GetConnection(logger, logCommitStats); + + public async Task GetManagedSession() => await Database.GetManagedSession().ConfigureAwait(false); } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/SpannerTestDatabaseBase.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/SpannerTestDatabaseBase.cs index dbe3ce8299d4..9fe6d28a6eb5 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/SpannerTestDatabaseBase.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.CommonTesting/SpannerTestDatabaseBase.cs @@ -16,9 +16,11 @@ using Google.Api.Gax.ResourceNames; using Google.Cloud.Spanner.Admin.Instance.V1; using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1; using Google.Cloud.Spanner.V1.Internal.Logging; using Grpc.Core; using System; +using System.Threading.Tasks; namespace Google.Cloud.Spanner.Data.CommonTesting; @@ -27,6 +29,8 @@ namespace Google.Cloud.Spanner.Data.CommonTesting; /// public abstract class SpannerTestDatabaseBase { + private ManagedSession _managedSession; + /// /// The Spanner Host name to connect to. It is read from the environment variable "TEST_SPANNER_HOST". /// @@ -178,4 +182,26 @@ public SpannerConnection GetConnection(Logger logger, bool logCommitStats = fals SessionPoolManager = SessionPoolManager.Create(new V1.SessionPoolOptions(), logger), LogCommitStats = logCommitStats }); + + public async Task GetManagedSession() + { + if (_managedSession != null && GetEnvironmentVariableOrDefault("SPANNER_EMULATOR_HOST", null) == null) + { + // Only return the same multiplex session if we are NOT testing on the emulator + // The emulator does not handle concurrent transactions on a single multiplex session well + return _managedSession; + } + + var options = new ManagedSessionOptions(); + + _managedSession = await CreateManagedSession(options).ConfigureAwait(false); + + return _managedSession; + } + + private async Task CreateManagedSession(ManagedSessionOptions options) + { + var poolManager = SessionPoolManager.Create(options); + return await poolManager.AcquireManagedSessionAsync(SpannerClientCreationOptions, DatabaseName, null); + } } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/ManagedSessionTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/ManagedSessionTests.cs new file mode 100644 index 000000000000..319358eb058d --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/V1/ManagedSessionTests.cs @@ -0,0 +1,242 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"): +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax.Grpc; +using Google.Cloud.ClientTesting; +using Google.Cloud.Spanner.Data.CommonTesting; +using Google.Cloud.Spanner.V1; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Google.Cloud.Spanner.Data.IntegrationTests; +[Collection(nameof(AllTypesTableFixture))] +[CommonTestDiagnostics] +public class ManagedSessionTests +{ + private readonly AllTypesTableFixture _fixture; + + public ManagedSessionTests(AllTypesTableFixture fixture) => + _fixture = fixture; + + [Fact] + [Trait(Constants.SupportedOnEmulator, Constants.No)] + public async Task SessionCreationSucceeds() + { + ManagedSession muxSession = await _fixture.GetManagedSession(); + + Assert.NotNull(muxSession.Session); + Assert.NotNull(muxSession.SessionName); + + // Use the underlying client to get the mux session from the server. + SpannerClient client = muxSession.Client; + var getSessionRequest = new GetSessionRequest + { + SessionName = muxSession.SessionName, + }; + var matchingSession = client.GetSession(getSessionRequest); + + Assert.Equal(muxSession.SessionName, matchingSession.SessionName); + Assert.True(matchingSession.Multiplexed); + } + + [Fact] + [Trait(Constants.SupportedOnEmulator, Constants.No)] + public async Task RunReadWriteTransactionWithMultipleQueries() + { + ManagedSession multiplexSession = await _fixture.GetManagedSession(); + ManagedTransaction transaction = new ManagedTransaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null); + String uniqueRowId = IdGenerator.FromGuid(); + // Query 1: Read some data before modification. + var result = await ExecuteSelectQuery(transaction, uniqueRowId); + Assert.NotNull(result); + Assert.NotNull(transaction.PrecommitToken); + Assert.NotNull(transaction.TransactionId); + + int preCommitTokenSeqNumber = transaction.PrecommitToken.SeqNum; + + // Query 2: Insert a new record. + result = await ExecuteInsertInt64Value(transaction, uniqueRowId, 10); + Assert.NotNull(result); + Assert.NotNull(transaction.PrecommitToken); + Assert.NotNull(transaction.TransactionId); + Assert.True(transaction.PrecommitToken.SeqNum >= preCommitTokenSeqNumber); + + // Commit the transaction + var commitResponse = await transaction.CommitAsync(new CommitRequest(), null); + Assert.NotNull(commitResponse); + Assert.NotNull(transaction.TransactionId); + } + + [Fact] + [Trait(Constants.SupportedOnEmulator, Constants.No)] + public async Task TestMultipleTransactionWritesOnSameSession() + { + ManagedSession multiplexSession = await _fixture.GetManagedSession(); + const int concurrentThreads = 5; + String uniqueRowId = IdGenerator.FromGuid(); + + try + { + var transactions = new ManagedTransaction[concurrentThreads]; + for (var i = 0; i < concurrentThreads; i++) + { + transactions[i] = new ManagedTransaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null); + } + + for (var i = 0; i < concurrentThreads; i++) + { + await IncrementByOneAsync(transactions[i], uniqueRowId); + } + + ManagedTransaction fetchResultsTransaction = new ManagedTransaction(multiplexSession, null, new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }, false, null); + var fetched = await ExecuteSelectQuery(fetchResultsTransaction, uniqueRowId); + + var row = fetched.Rows.First(); + var actual = long.Parse(row.Values[1].StringValue); + Assert.Equal(5, actual); + } + catch (Exception ex) + { + Console.WriteLine(ex.ToString()); + Console.WriteLine(ex.InnerException?.ToString()); + throw; + } + } + + private async Task IncrementByOneAsync(ManagedTransaction transaction, string uniqueRowId, bool orphanTransaction = false) + { + var retrySettings = RetrySettings.FromExponentialBackoff( + maxAttempts: int.MaxValue, + initialBackoff: TimeSpan.FromMilliseconds(250), + maxBackoff: TimeSpan.FromSeconds(5), + backoffMultiplier: 1.5, + retryFilter: ignored => false, + RetrySettings.RandomJitter); + TimeSpan nextDelay = TimeSpan.Zero; + DateTime deadline = DateTime.UtcNow.AddSeconds(30); + + while (true) + { + try + { + // We use manually created transactions here so the tests run on .NET Core. + long current; + + var fetched = await ExecuteSelectQuery(transaction, uniqueRowId); + if (fetched?.Rows.Any() == true) + { + var row = fetched.Rows.First(); + current = long.Parse(row.Values[1].StringValue); + } + else + { + current = 0L; + } + + + if (current == 0) + { + await ExecuteInsertInt64Value(transaction, uniqueRowId, current + 1); + } + else + { + await ExecuteUpdateInt64Value(transaction, uniqueRowId, current + 1); + } + + await transaction.CommitAsync(new CommitRequest(), null); + return; + } + // Keep trying for up to 30 seconds + catch (SpannerException ex) when (ex.IsRetryable && DateTime.UtcNow < deadline) + { + nextDelay = retrySettings.NextBackoff(nextDelay); + await Task.Delay(retrySettings.BackoffJitter.GetDelay(nextDelay)); + } + } + } + + private async Task ExecuteSelectQuery(ManagedTransaction transaction, String uniqueRowId) + { + var selectParams = new Dictionary + { + { "id", new SpannerParameter { Value = Value.ForString(uniqueRowId) } } + }; + var selectSql = $"SELECT K, Int64Value FROM {_fixture.TableName} WHERE K = @id"; + var request = new ExecuteSqlRequest + { + Sql = selectSql, + Params = CreateStructFromParameters(selectParams), + }; + + return await transaction.ExecuteSqlAsync(request, null); + } + + private async Task ExecuteInsertInt64Value(ManagedTransaction transaction, String uniqueRowId, long insertValue) + { + var insertSql = $"INSERT {_fixture.TableName} (K, Int64Value) VALUES (@k, @int64Value)"; + var insertParams = new Dictionary + { + { "k", new SpannerParameter { Value = Value.ForString(uniqueRowId) } }, + { "int64Value", new SpannerParameter("int64Value", SpannerDbType.Int64, insertValue) } + }; + + var request = new ExecuteSqlRequest + { + Sql = insertSql, + Params = CreateStructFromParameters(insertParams), + }; + return await transaction.ExecuteSqlAsync(request, null); + } + + private async Task ExecuteUpdateInt64Value(ManagedTransaction transaction, String uniqueRowId, long updateValue) + { + var updateSql = $"UPDATE {_fixture.TableName} SET Int64Value = @newIntValue WHERE K = @id"; + var updateParams = new Dictionary + { + { "newIntValue", new SpannerParameter("newIntValue", SpannerDbType.Int64, updateValue) }, + { "id", new SpannerParameter { Value = Value.ForString(uniqueRowId) } } + }; + + var request = new ExecuteSqlRequest + { + Sql = updateSql, + Params = CreateStructFromParameters(updateParams), + }; + return await transaction.ExecuteSqlAsync(request, null); + } + + /// + /// Converts a dictionary of Spanner parameters to a Google.Protobuf.WellKnownTypes.Struct. + /// + private Struct CreateStructFromParameters(Dictionary parameters) + { + var pbStruct = new Struct(); + var options = SpannerConversionOptions.Default; + if (parameters != null) + { + foreach (var param in parameters) + { + var parameter = param.Value; + var protobufValue = parameter.GetConfiguredSpannerDbType(options).ToProtobufValue(parameter.GetValidatedValue()); + pbStruct.Fields.Add(param.Key, protobufValue); + } + } + return pbStruct; + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/DirectedReadTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/DirectedReadTests.cs index fbfbc8d0fe47..dafc614f5855 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/DirectedReadTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/DirectedReadTests.cs @@ -61,6 +61,7 @@ public class DirectedReadTests private static readonly SessionName s_sessionName = SessionName.FromProjectInstanceDatabaseSession("project", "instance", "database", "session"); private static readonly ByteString s_transactionId = ByteString.CopyFromUtf8("transaction"); + private static readonly DatabaseName s_databaseName = DatabaseName.FromProjectInstanceDatabase("project", "instance", "database"); private static readonly TransactionOptions s_partitionedDml = new TransactionOptions { PartitionedDml = new TransactionOptions.Types.PartitionedDml() }; private static readonly TransactionOptions s_readWrite = new TransactionOptions { ReadWrite = new TransactionOptions.Types.ReadWrite() }; private static readonly TransactionOptions s_readOnly = new TransactionOptions { ReadOnly = new TransactionOptions.Types.ReadOnly() }; @@ -113,12 +114,9 @@ public async Task PooledSession_SetsOptionsFromClient_ExecuteSqlAsync(bool singl DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null); + await managedTransaction.ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null); Assert.Equal(IncludeDirectedReadOptions, grpcClient.LastExecuteSqlRequest.DirectedReadOptions); } @@ -128,12 +126,9 @@ public async Task PooledSession_SetsOptionsFromRequest_ExecuteSqlAsync(bool sing { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, settings: null, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlAsync(new ExecuteSqlRequest + await managedTransaction.ExecuteSqlAsync(new ExecuteSqlRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null); @@ -152,12 +147,9 @@ public async Task PooledSession_RequestOptionsTakePrecedenceOverClientOptions_Ex DirectedReadOptions = ExcludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlAsync(new ExecuteSqlRequest + await managedTransaction.ExecuteSqlAsync(new ExecuteSqlRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null); @@ -180,13 +172,9 @@ public async Task PooledSession_NonReadOnlyTransaction_IgnoresOptionsFromClient_ DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - // Only read-only transaction can be single use. - .WithTransaction(s_transactionId, options, singleUseTransaction: false); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, options, false); - await session.ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null); + await managedTransaction.ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null); Assert.Null(grpcClient.LastExecuteSqlRequest.DirectedReadOptions); } @@ -201,12 +189,9 @@ public async Task PooledSession_SetsOptionsFromClient_ExecuteSqlStreamReader(boo DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlStreamReader(new ExecuteSqlRequest(), callSettings: null).HasDataAsync(default); + await managedTransaction.ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).HasDataAsync(default); Assert.Equal(IncludeDirectedReadOptions, grpcClient.LastExecuteSqlRequest.DirectedReadOptions); } @@ -216,12 +201,10 @@ public async Task PooledSession_SetsOptionsFromRequest_ExecuteSqlStreamReader(bo { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, settings: null, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlStreamReader(new ExecuteSqlRequest + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); + + await managedTransaction.ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null).HasDataAsync(default); @@ -240,12 +223,9 @@ public async Task PooledSession_RequestOptionsTakePrecedenceOverClientOptions_Ex DirectedReadOptions = ExcludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ExecuteSqlStreamReader(new ExecuteSqlRequest + await managedTransaction.ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null).HasDataAsync(default); @@ -268,13 +248,9 @@ public async Task PooledSession_NonReadOnlyTransaction_IgnoresOptionsFromClient_ DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - // Only read-only transaction can be single use. - .WithTransaction(s_transactionId, options, singleUseTransaction: false); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, options, false); - await session.ExecuteSqlStreamReader(new ExecuteSqlRequest(), callSettings: null).HasDataAsync(default); + await managedTransaction.ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).HasDataAsync(default); Assert.Null(grpcClient.LastExecuteSqlRequest.DirectedReadOptions); } @@ -289,12 +265,9 @@ public async Task PooledSession_SetsOptionsFromClient_ReadStreamReader(bool sing DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ReadStreamReader(new ReadRequest(), callSettings: null).HasDataAsync(default); + await managedTransaction.ReadStreamReaderAsync(new ReadRequest(), callSettings: null).HasDataAsync(default); Assert.Equal(IncludeDirectedReadOptions, grpcClient.LastReadRequest.DirectedReadOptions); } @@ -304,12 +277,10 @@ public async Task PooledSession_SetsOptionsFromRequest_ReadStreamReader(bool sin { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, settings: null, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); - await session.ReadStreamReader(new ReadRequest + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); + + await managedTransaction.ReadStreamReaderAsync(new ReadRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null).HasDataAsync(default); @@ -328,12 +299,9 @@ public async Task PooledSession_RequestOptionsTakePrecedenceOverClientOptions_Re DirectedReadOptions = ExcludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - .WithTransaction(s_transactionId, s_readOnly, singleUseTransaction); + var managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, s_readOnly, singleUseTransaction); - await session.ReadStreamReader(new ReadRequest + await managedTransaction.ReadStreamReaderAsync(new ReadRequest { DirectedReadOptions = IncludeDirectedReadOptions }, callSettings: null).HasDataAsync(default); @@ -356,16 +324,25 @@ public async Task PooledSession_NonReadOnlyTransaction_IgnoresOptionsFromClient_ DirectedReadOptions = IncludeDirectedReadOptions }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession - .FromSessionName(sessionPool, s_sessionName) - // Only read-only transaction can be single use. - .WithTransaction(s_transactionId, options, singleUseTransaction: false); + ManagedTransaction managedTransaction = await CreateManagedTransaction(spannerClient, s_transactionId, options, false); - await session.ReadStreamReader(new ReadRequest(), callSettings: null).HasDataAsync(default); + await managedTransaction.ReadStreamReaderAsync(new ReadRequest(), callSettings: null).HasDataAsync(default); Assert.Null(grpcClient.LastReadRequest.DirectedReadOptions); } + private Task CreateManagedTransaction(SpannerClient client, ByteString transactionId, TransactionOptions options, bool singleUse) + { + var managedSession = new ManagedSession(client, s_databaseName, null, null); + managedSession.Session = new Session + { + CreateTime = Timestamp.FromDateTime(DateTime.UtcNow), + SessionName = SessionName.FromProjectInstanceDatabaseSession("projectId", "instanceId", "databaseId", "testSessionId"), + Multiplexed = true + }; + + return managedSession.CreateManagedTransaction(transactionId, options, singleUseTransaction: singleUse); + } + public class FakeGrpcSpannerClient : V1.Spanner.SpannerClient { public ExecuteSqlRequest LastExecuteSqlRequest { get; private set; } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/ManagedSessionTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/ManagedSessionTests.cs new file mode 100644 index 000000000000..39394de0e480 --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/ManagedSessionTests.cs @@ -0,0 +1,85 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"): +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax.Testing; +using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1.Internal.Logging; +using System; +using System.Threading.Tasks; +using Xunit; +using static Google.Cloud.Spanner.V1.ManagedSession; + +namespace Google.Cloud.Spanner.V1.Tests; +public class ManagedSessionTests +{ + private const string TestDatabase = "projects/testproject/instances/testinstance/databases/testdb"; + + [Fact] + public async Task TestBuilderCreation() + { + ManagedSession multiplexSession = await FetchTestMultiplexSessionAsync(); + + Assert.NotNull(multiplexSession); + Assert.NotNull(multiplexSession.Session); + Assert.NotNull(multiplexSession.Client); + Assert.NotNull(multiplexSession.DatabaseName); + Assert.NotNull(multiplexSession.DatabaseRole); + } + + [Fact] + public async Task TestSessionHasExpired() + { + SpannerClient fakeClient = CreateFakeClient(); + ManagedSession multiplexSession = await FetchTestMultiplexSessionAsync(fakeClient); + + DateTime sessionCreateTime = multiplexSession.Session.CreateTime.ToDateTime(); + FakeClock clock = (FakeClock) fakeClient.Settings.Clock; + + clock.AdvanceTo(sessionCreateTime + TimeSpan.FromDays(3)); + Assert.True(multiplexSession.SessionHasExpired(2.0)); + + clock.AdvanceTo(sessionCreateTime + TimeSpan.FromDays(7)); + Assert.True(multiplexSession.SessionHasExpired()); + } + + private SpannerClient CreateFakeClient() + { + SpannerClient fakeClient = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger); + fakeClient.SetupMultiplexSessionCreationAsync(); + + return fakeClient; + } + + internal async Task FetchTestMultiplexSessionAsync(SpannerClient client = null) + { + if (!DatabaseName.TryParse(TestDatabase, out var databaseName)) + { + throw new Exception($"Unable to parse string to DatabaseName {TestDatabase}"); + } + + if (client == null) + { + client = CreateFakeClient(); + } + + SessionBuilder builder = new SessionBuilder(databaseName, client) + { + DatabaseRole = "testRole", + }; + + ManagedSession multiplexSession = await builder.BuildAsync(); + + return multiplexSession; + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/RouteToLeaderTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/RouteToLeaderTests.cs index 1e76fd06ed63..805451cf5f87 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/RouteToLeaderTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/RouteToLeaderTests.cs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Google.Api.Gax; -using Google.Api.Gax.Testing; using Google.Cloud.ClientTesting; using Google.Cloud.Spanner.Common.V1; using Google.Cloud.Spanner.V1; @@ -122,66 +120,79 @@ public async Task SpannerClient_DoesNotRouteToLeaderWhenNotEnabled(Func header.Key == LeaderRoutingHeader && header.Value == true.ToString()); } - public static TheoryData> PooledSessionRoutesToLeader => new TheoryData> + public static TheoryData> ManagedTransactionRoutesToLeader => new TheoryData> { - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_partitionedDml, false).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readWrite, false).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_partitionedDml, false).ReadStreamReader(new ReadRequest(), callSettings: null).NextAsync(default) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readWrite, false).ReadStreamReader(new ReadRequest(), callSettings: null).NextAsync(default) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_partitionedDml, false).ExecuteSqlStreamReader(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readWrite, false).ExecuteSqlStreamReader(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_partitionedDml, false)).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readWrite, false)).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_partitionedDml, false)).ReadStreamReaderAsync(new ReadRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readWrite, false)).ReadStreamReaderAsync(new ReadRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_partitionedDml, false)).ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readWrite, false)).ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readWrite, false)).ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, }; - public static TheoryData> PooledSessionDoesNotRouteToLeader => new TheoryData> + public static TheoryData> ManagedTransactionDoesNotRouteToLeader => new TheoryData> { - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readOnly, false).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readOnly, false).ReadStreamReader(new ReadRequest(), callSettings: null).NextAsync(default) }, - { pooledSession => pooledSession.WithTransaction(s_transactionId, s_readOnly, false).ExecuteSqlStreamReader(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readOnly, false)).ExecuteSqlAsync(new ExecuteSqlRequest(), callSettings: null) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readOnly, false)).ReadStreamReaderAsync(new ReadRequest(), callSettings: null).NextAsync(default) }, + { async managedSession => await (await managedSession.CreateManagedTransaction(s_transactionId, s_readOnly, false)).ExecuteSqlStreamReaderAsync(new ExecuteSqlRequest(), callSettings: null).NextAsync(default) }, }; [Theory] - [MemberData(nameof(PooledSessionRoutesToLeader))] - public async Task PooledSession_RoutesToLeaderWhenEnabled(Func operation) + [MemberData(nameof(ManagedTransactionRoutesToLeader))] + public async Task ManagedTransaction_RoutesToLeaderWhenEnabled(Func operation) { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, settings: null, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession.FromSessionName(sessionPool, s_sessionName); + var managedTransaction = CreateManagedSession(spannerClient); - await operation(session); + await operation(managedTransaction); Assert.Contains(grpcClient.LastCallOptions.Headers, header => header.Key == LeaderRoutingHeader && header.Value == true.ToString()); } + + [Theory] - [MemberData(nameof(PooledSessionDoesNotRouteToLeader))] - public async Task PooledSession_DoesNotRouteToLeaderWhenEnabled(Func operation) + [MemberData(nameof(ManagedTransactionDoesNotRouteToLeader))] + public async Task ManagedTransaction_DoesNotRouteToLeaderWhenEnabled(Func operation) { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, settings: null, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession.FromSessionName(sessionPool, s_sessionName); + var managedTransaction = CreateManagedSession(spannerClient); - await operation(session); + await operation(managedTransaction); Assert.DoesNotContain(grpcClient.LastCallOptions.Headers, header => header.Key == LeaderRoutingHeader && header.Value == true.ToString()); } [Theory] - [MemberData(nameof(PooledSessionRoutesToLeader))] - [MemberData(nameof(PooledSessionDoesNotRouteToLeader))] - public async Task PooledSession_DoesNotRouteToLeaderWhenNotEnabled(Func operation) + [MemberData(nameof(ManagedTransactionRoutesToLeader))] + [MemberData(nameof(ManagedTransactionDoesNotRouteToLeader))] + public async Task ManagedTransaction_DoesNotRouteToLeaderWhenNotEnabled(Func operation) { var grpcClient = new FakeGrpcSpannerClient(); var spannerClient = new SpannerClientImpl(grpcClient, new SpannerSettings { LeaderRoutingEnabled = false }, logger: null); - var sessionPool = new FakeSessionPool(spannerClient); - var session = PooledSession.FromSessionName(sessionPool, s_sessionName); + var managedTransaction = CreateManagedSession(spannerClient); - await operation(session); + await operation(managedTransaction); Assert.DoesNotContain(grpcClient.LastCallOptions.Headers, header => header.Key == LeaderRoutingHeader && header.Value == true.ToString()); } + private ManagedSession CreateManagedSession(SpannerClient client) + { + var managedSession = new ManagedSession(client, s_databaseName, null, null); + managedSession.Session = new Session + { + CreateTime = Timestamp.FromDateTime(DateTime.UtcNow), + SessionName = SessionName.FromProjectInstanceDatabaseSession("projectId", "instanceId", "databaseId", "testSessionId"), + Multiplexed = true + }; + + return managedSession; + } + private class FakeGrpcSpannerClient : V1.Spanner.SpannerClient { public CallOptions LastCallOptions { get; private set; } @@ -252,22 +263,6 @@ private AsyncServerStreamingCall FakeAsyncServerStreamingCall( } } - private class FakeSessionPool : SessionPool.ISessionPool - { - public FakeSessionPool(SpannerClient spannerClient) => Client = spannerClient; - public SpannerClient Client { get; } - - public IClock Clock => new FakeClock(); - - public SessionPoolOptions Options => new SessionPoolOptions(); - - public bool TracksSessions => throw new NotImplementedException(); - - public void Detach(PooledSession session) => throw new NotImplementedException(); - public Task RefreshedOrNewAsync(PooledSession session, TransactionOptions transactionOptions, bool singleUseTransaction, CancellationToken cancellationToken) => throw new NotImplementedException(); - public void Release(PooledSession session, ByteString transactionToRollback, bool deleteSession) => throw new NotImplementedException(); - } - private class FakeAsyncStreamReader : IAsyncStreamReader { private bool _hasNext = true; diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SpannerClientHelpers.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SpannerClientHelpers.cs index 613b17d6c1e7..69c1f0d8671f 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SpannerClientHelpers.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SpannerClientHelpers.cs @@ -90,6 +90,24 @@ internal static SpannerClient SetupBatchCreateSessionsAsync(this SpannerClient s return spannerClientMock; } + internal static SpannerClient SetupMultiplexSessionCreationAsync(this SpannerClient spannerClientMock) + { + spannerClientMock.Configure().CreateSessionAsync(Arg.Is(x => x != null), Arg.Any()) + .Returns(args => + { + var request = (CreateSessionRequest) args[0]; + Session response = new Session(); + response.CreateTime = spannerClientMock.GetNowTimestamp(); + response.CreatorRole = request.Session.CreatorRole; + response.Multiplexed = request.Session.Multiplexed; + response.Name = Guid.NewGuid().ToString(); + response.SessionName = new SessionName(ProjectId, Instance, Database, response.Name); + + return Task.FromResult(response); + }); + return spannerClientMock; + } + internal static SpannerClient SetupBeginTransactionAsync(this SpannerClient spannerClientMock) { spannerClientMock.Configure() diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SqlResultStreamTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SqlResultStreamTests.cs index 7ab21f800d5f..d842651437c3 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SqlResultStreamTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/V1/SqlResultStreamTests.cs @@ -16,7 +16,7 @@ using Google.Api.Gax.Grpc; using Google.Api.Gax.Grpc.Testing; using Google.Api.Gax.Testing; -using Google.Cloud.ClientTesting; +using Google.Cloud.Spanner.Common.V1; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Grpc.Core; @@ -314,13 +314,17 @@ private ResultStream CreateResultStream( int maxBufferSize = 10, CallSettings callSettings = null, RetrySettings retrySettings = null) - => new ResultStream( + { + ManagedSession managedSession = new ManagedSession(client, DatabaseName.FromProjectInstanceDatabase("projectId", "instanceId", "databaseId"), "testDatabaseRole", null); + ManagedTransaction transaction = new ManagedTransaction(managedSession, null, null, false, null); + return new ResultStream( client, ReadOrQueryRequest.FromRequest(type == typeof(ExecuteSqlRequest) ? new ExecuteSqlRequest() : new ReadRequest() as IReadOrQueryRequest), - PooledSession.FromSessionName(new PooledSessionTests.FakeSessionPool(), SessionName.FromProjectInstanceDatabaseSession("projectId", "instanceId", "databaseId", "sessionId")), + transaction, callSettings ?? s_simpleCallSettings, maxBufferSize, retrySettings ?? s_retrySettings); + } private static List CreateResultSets(params string[] resumeTokens) => resumeTokens diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs index 93cb03c91b9b..a53f3c5d1b83 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs @@ -13,6 +13,7 @@ // limitations under the License. using Google.Api.Gax; +using Google.Cloud.Spanner.Common.V1; using Google.Cloud.Spanner.V1; using Google.Cloud.Spanner.V1.Internal.Logging; using System; @@ -22,6 +23,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using static Google.Cloud.Spanner.V1.ManagedSession; using static Google.Cloud.Spanner.V1.SessionPool; namespace Google.Cloud.Spanner.Data @@ -56,11 +58,21 @@ static SessionPoolManager() private readonly ConcurrentDictionary _poolReverseLookup = new ConcurrentDictionary(); + // Dictionary to store the sessions managed by this manager + // Each session is keyed by a complex key, as each (multiplex) session is unique to a combination of client options and unique segment key + private readonly ConcurrentDictionary<(SpannerClientCreationOptions, SessionPoolSegmentKey), Task> _targetedSessions = + new ConcurrentDictionary<(SpannerClientCreationOptions options, SessionPoolSegmentKey segmentKey), Task>(); + /// /// The session pool options used for every created by this session pool manager. /// public SessionPoolOptions SessionPoolOptions { get; } + /// + /// The options for every managed session created by this session pool manager. + /// + public ManagedSessionOptions ManagedSessionOptions { get; } + /// /// The logger used by this SessionPoolManager and the session pools it creates. /// @@ -100,6 +112,18 @@ internal SessionPoolManager( _clientFactory = GaxPreconditions.CheckNotNull(clientFactory, nameof(clientFactory)); } + internal SessionPoolManager( + ManagedSessionOptions options, + SpannerSettings spannerSettings, + Logger logger, + Func> clientFactory) + { + ManagedSessionOptions = GaxPreconditions.CheckNotNull(options, nameof(options)); + SpannerSettings = AppendAssemblyVersionHeader(GaxPreconditions.CheckNotNull(spannerSettings, nameof(spannerSettings))); + Logger = GaxPreconditions.CheckNotNull(logger, nameof(logger)); + _clientFactory = GaxPreconditions.CheckNotNull(clientFactory, nameof(clientFactory)); + } + /// /// Creates a with the specified options. /// @@ -109,6 +133,15 @@ internal SessionPoolManager( public static SessionPoolManager Create(SessionPoolOptions options, Logger logger = null) => new SessionPoolManager(options, CreateDefaultSpannerSettings(), logger ?? Logger.DefaultLogger, CreateClientAsync); + /// + /// Creates a with the specified options for managed sessions. + /// + /// Managed session options applied to all managed sessions in this session pool + /// The logger to use. May be null, in which case the default logger is used. + /// + public static SessionPoolManager Create(ManagedSessionOptions options, Logger logger = null) => + new SessionPoolManager(options, CreateDefaultSpannerSettings(), logger ?? Logger.DefaultLogger, CreateClientAsync); + /// /// Creates a with the specified SpannerSettings and options. /// @@ -126,6 +159,27 @@ internal Task AcquireSessionPoolAsync(SpannerClientCreationOptions return targetedPool.SessionPoolTask; } + internal Task AcquireManagedSessionAsync(SpannerClientCreationOptions options, DatabaseName dbName, string dbRole) + { + SessionPoolSegmentKey segmentKey = SessionPoolSegmentKey.Create(dbName).WithDatabaseRole(dbRole); + GaxPreconditions.CheckNotNull(options, nameof(options)); + + return _targetedSessions.GetOrAdd((options, segmentKey), _ => CreateMultiplexSessionAsync()); + + async Task CreateMultiplexSessionAsync() + { + var client = await _clientFactory.Invoke(options, SpannerSettings).ConfigureAwait(false); + var managedSessionBuilder = new SessionBuilder(dbName, client) + { + Options = ManagedSessionOptions, + DatabaseRole = dbRole, + }; + + var managedSession = await managedSessionBuilder.BuildAsync().ConfigureAwait(false); + return managedSession; + } + } + /// /// Decrements the connection count associated with a client session pool. /// diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs new file mode 100644 index 000000000000..69adc8cf8e52 --- /dev/null +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs @@ -0,0 +1,317 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"): +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax; +using Google.Api.Gax.Grpc; +using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1.Internal; +using Google.Cloud.Spanner.V1.Internal.Logging; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using System; +using System.Threading; +using System.Threading.Tasks; +using static Google.Cloud.Spanner.V1.TransactionOptions; + +namespace Google.Cloud.Spanner.V1; + +/// +/// This class helps manage Spanner multiplex session creation and lifecycle. +/// A should only be created through a . +/// It provides factory methods to create which will be created using the current underlying session in this ManagedSession. +/// A check to see if the underlying session needs to be refreshed or created will be done everytime a method to create a ManagedTransaction is called. +/// The class will do a soft, non-blocking refresh if the underlying session is greater than 7 days but less than 28 days old from its creation. +/// A hard, blocking refresh is done if the session is greater than 28 days old from its creation. +/// +public class ManagedSession +{ + private const double HardRefreshIntervalInDays = 28.0; + private const double SoftRefreshIntervalInDays = 7.0; + + private readonly Logger _logger; + private readonly CreateSessionRequest _createSessionRequestTemplate; + private Task _sessionCreationTask; + private readonly object _sessionCreationTaskLock = new object(); + + private Session _session; + + private readonly IClock _clock; + + /// + /// The client used for all operations in this managed session. + /// + internal SpannerClient Client { get; } + + /// + /// The name of the session. This is never null. + /// + internal SessionName SessionName => Session.SessionName; + + /// + /// The Spanner session resource associated to this ManagedSession. + /// Won't be null. + /// + internal Session Session + { + get { return _session; } + set { _session = value; } + } + + /// + /// The options governing this managed session. + /// + public ManagedSessionOptions Options { get; } + + /// + /// The database for this managed session + /// + public DatabaseName DatabaseName { get; } + + /// + /// The database role of the managed session + /// + public string DatabaseRole { get; } + + /// + /// + /// + /// + /// + /// + /// + internal ManagedSession(SpannerClient client, DatabaseName dbName, string dbRole, ManagedSessionOptions options) + { + GaxPreconditions.CheckNotNull(dbName, nameof(dbName)); + Client = GaxPreconditions.CheckNotNull(client, nameof(client)); + Options = options ?? new ManagedSessionOptions(); + _logger = client.Settings.Logger; // Just to avoid fetching it all the time + + DatabaseName = dbName; + DatabaseRole = dbRole; + + _clock = client.Settings.Clock ?? SystemClock.Instance; + + _createSessionRequestTemplate = new CreateSessionRequest + { + DatabaseAsDatabaseName = DatabaseName, + Session = new Session + { + CreatorRole = DatabaseRole ?? "", + Multiplexed = true + } + }; + } + + /// + /// Returns a ManagedTransaction for the same ManagedSession as this with the given . + /// + public async Task CreateManagedTransaction(TransactionOptions transactionOptions, bool singleUseTransaction, CancellationToken cancellationToken = default) + { + await CreateOrRefreshSessionsAsync(cancellationToken).ConfigureAwait(false); + return new ManagedTransaction(this, transactionId: null, transactionOptions, singleUseTransaction, readTimestamp: null); + } + + /// + /// Returns a ManagedTransaction for the same ManagedSession as this one with the given transaction related values. + /// + public async Task CreateManagedTransaction(ByteString transactionId, TransactionOptions transactionOptions, bool singleUseTransaction, Timestamp readTimestamp = null, CancellationToken cancellationToken = default) + { + GaxPreconditions.CheckNotNull(transactionId, nameof(transactionId)); + await CreateOrRefreshSessionsAsync(cancellationToken).ConfigureAwait(false); + return new ManagedTransaction(this, transactionId, transactionOptions, singleUseTransaction, readTimestamp); + } + + /// + /// Returns a ManagedTransaction for the same ManagedSession as this one with the given transaction mode. + /// + public async Task CreateManagedTransaction(ByteString transactionId, ModeOneofCase transactionMode, Timestamp readTimestamp = null) + { + TransactionOptions BuildTransactionOptions() => transactionMode switch + { + ModeOneofCase.None => new TransactionOptions(), + ModeOneofCase.PartitionedDml => new TransactionOptions { PartitionedDml = new() }, + ModeOneofCase.ReadWrite => new TransactionOptions { ReadWrite = new() }, + ModeOneofCase.ReadOnly => new TransactionOptions() { ReadOnly = new() }, + _ => throw new ArgumentException(nameof(transactionMode), $"Unknown {typeof(ModeOneofCase).FullName}: {transactionMode}") + }; + + return await CreateManagedTransaction(transactionId, BuildTransactionOptions(), false, readTimestamp).ConfigureAwait(false); + } + + // internal for testing + internal bool SessionHasExpired(double intervalInDays = SoftRefreshIntervalInDays) + { + DateTime currentTime = _clock.GetCurrentDateTimeUtc(); + DateTime? sessionCreateTime = _session?.CreateTime.ToDateTime(); // Inherent conversion into UTC DateTime + if (_session == null || _session.Expired || currentTime - sessionCreateTime >= TimeSpan.FromDays(intervalInDays)) + { + return true; + } + + return false; + } + + private async Task CreateOrRefreshSessionsAsync(CancellationToken cancellationToken, bool needsRefresh = false) + { + try + { + var callSettings = Client.Settings.CreateSessionSettings + .WithExpiration(Expiration.FromTimeout(Options.Timeout)) + .WithCancellationToken(cancellationToken); + + Session multiplexSession; + + try + { + // Non-blocking, fast check if the session is still "fresh" enough + if (_session != null && !SessionHasExpired(HardRefreshIntervalInDays)) + { + // Check for soft refresh need *without* blocking or creating a new task yet. + if (SessionHasExpired(SoftRefreshIntervalInDays)) + { + _ = TriggerRefresh(SoftRefreshIntervalInDays); + } + + // Either way, the current session is safe to use. Return immediately. + return; + } + + // Hard refresh or initial session creation, + Task currentCreationTask = TriggerRefresh(HardRefreshIntervalInDays); + + // 2b. Block the current caller on the task (Hard Refresh requirement) + // If we initiated the task, we are waiting for our own task. + // If another thread initiated it, we are waiting for their task. + await currentCreationTask.ConfigureAwait(false); + + Task TriggerRefresh(double refreshInterval) + { + // We only need to check if a task is running to avoid running two at once. + if (_sessionCreationTask != null) + { + // A refresh is already running (either soft or hard). Do nothing and use the existing session creation task. + return _sessionCreationTask; + } + + // Acquire the lock to safely initialize the task + lock (_sessionCreationTaskLock) + { + if (_sessionCreationTask is null) + { + _sessionCreationTask = Task.Run(async () => + { + try + { + if (SessionHasExpired(refreshInterval)) + { + // Create/Refresh the session only if it is null or it is older than the refreshInterval + multiplexSession = await Client.CreateSessionAsync(_createSessionRequestTemplate, callSettings) + .WithCancellationToken(cancellationToken).ConfigureAwait(false); + + Interlocked.Exchange(ref _session, multiplexSession); + } + } + catch (Exception ex) + { + // Log the exception. Soft refresh failures should generally not crash the main thread. + _logger.Warn($"Exception while trying to refresh session for interval {refreshInterval}. {ex}"); + if (refreshInterval == HardRefreshIntervalInDays) + { + // Very likely we cannot continue without getting a fresh session in this case. + throw; + } + } + finally + { + // Clear the task after completion (success or failure) + // to allow the next refresh to run. + _ = Interlocked.Exchange(ref _sessionCreationTask, null); + } + }); + } + } + + return _sessionCreationTask; + } + } + catch (OperationCanceledException) + { + _logger.Warn(() => $"Creation request cancelled before we could procure a Multiplex Session for DatabaseName: {DatabaseName}, DatabaseRole: {DatabaseRole}"); + throw; + } + } + catch (Exception e) + { + _logger.Warn(() => $"Failed to create multiplex session for DatabaseName: {DatabaseName}, DatabaseRole: {DatabaseRole}", e); + throw; + } + finally + { + // Nothing to do here since for legacy SessionPool we had to have some logging for when the pool went from healthy to unhealthy. + // This could mean n number of things went wrong in the pool + // But with the MUX session, we essentially only have 1 session we need to manage per client. + // So there is no case of the mux session going back and forth in terms of its healthiness. + } + + } + + /// + /// Builder to build a + /// + public sealed partial class SessionBuilder + { + /// + /// Constructor with validations on essential parameters needed to build the ManagedSession + /// + public SessionBuilder(DatabaseName databaseName, SpannerClient client) + { + DatabaseName = GaxPreconditions.CheckNotNull(databaseName, nameof(databaseName)); + Client = GaxPreconditions.CheckNotNull(client, nameof(client)); + } + + /// + /// The options governing this managed session. + /// + public ManagedSessionOptions Options { get; set; } + + /// + /// The for this managed session. + /// This is a required field and will be used when creating the underlying Spanner session. + /// + public DatabaseName DatabaseName { get; set; } + + /// + /// The database role of the managed session. This will be used when creating the underlying Spanner session. + /// + public string DatabaseRole { get; set; } + + /// + /// The client used for all operations in this managed session. + /// + public SpannerClient Client { get; set; } + + /// + /// Async method to build a managed session. This will fetch a valid session from backend Spanner and wrap it inside the managed session object. + /// + /// Cancellation token to cancel this call + /// + public async Task BuildAsync(CancellationToken cancellationToken = default) + { + ManagedSession managedSession = new ManagedSession(Client, DatabaseName, DatabaseRole, Options); + await managedSession.CreateOrRefreshSessionsAsync(cancellationToken).ConfigureAwait(false); + + return managedSession; + } + } +} diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSessionOptions.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSessionOptions.cs new file mode 100644 index 000000000000..131aa4bad228 --- /dev/null +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSessionOptions.cs @@ -0,0 +1,59 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"): +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; + +namespace Google.Cloud.Spanner.V1; + +/// +/// Options which can be set on the managed session +/// +public class ManagedSessionOptions +{ + private const double DefaultTimeoutSeconds = 60.0; + private TimeSpan _timeout; + + /// + /// Constructs a new with default values. + /// + public ManagedSessionOptions() + { + _timeout = TimeSpan.FromSeconds(DefaultTimeoutSeconds); + } + + /// + /// The total time allowed for a network call to the Cloud Spanner server, including retries. This setting + /// is applied to calls to create session as well as beginning transactions. + /// + /// + /// + /// This value must be positive. The default value is one minute. + /// + /// + public TimeSpan Timeout + { + get => _timeout; + set => _timeout = CheckPositiveTimeSpan(value); + } + + // TODO: Move to GAX if we find we need it in other libraries. (We have CheckNonNegative already.) + private static TimeSpan CheckPositiveTimeSpan(TimeSpan value) + { + if (value.Ticks <= 0) + { + throw new ArgumentOutOfRangeException("value", "Value must be a positive TimeSpan"); + } + return value; + } +} diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs new file mode 100644 index 000000000000..56d8686f3a05 --- /dev/null +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs @@ -0,0 +1,736 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"): +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax; +using Google.Api.Gax.Grpc; +using Google.Cloud.Spanner.V1.Internal; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using static Google.Cloud.Spanner.V1.TransactionOptions; + +namespace Google.Cloud.Spanner.V1 +{ + /// + /// Class which manages an underlying Spanner Transaction and operations around it + /// + public partial class ManagedTransaction + { + // All these fields are derived from the ManagedSession + private readonly Session _multiplexSession; + private readonly SpannerClient _client; + private readonly TimeSpan _timeout; + + private Transaction _transaction; + private readonly object _transactionCreationTaskLock = new object(); + private Task _transactionCreationTask; + private readonly object _precommitTokenUpdateLock = new object(); + + /// + /// The name of the session. This is never null. + /// + public SessionName SessionName => _multiplexSession.SessionName; + + /// + /// The Spanner session resource associated to managed transaction. + /// Won't be null. + /// + internal Session Session => _multiplexSession; + + /// + /// The options for the transaction that is or will be associated with this session. Won't be null. + /// + /// + /// Will be if + /// is null. + /// + internal TransactionOptions TransactionOptions { get; } + + /// + /// The transaction mode for the transaction that is or may be associated with this session. + /// Available for testing. + /// + internal TransactionOptions.ModeOneofCase TransactionMode => TransactionOptions.ModeCase; + + private SpannerClient Client => _client; + + /// + /// Whether the transaction is single use or not. + /// If this is true then will be + /// and will be null. + /// + internal bool SingleUseTransaction { get; } + + /// + /// The ID of the transaction. May be null. + /// + /// + /// Transactions are acquired when they are needed so this will be null in the + /// following cases: + /// + /// + /// is , + /// including when the session is idle in the pool. + /// + /// + /// is true. No transactions will exist client side. + /// + /// + /// The managed transaction has been created with some transaction options but no command execution has + /// been attempted since. + /// + /// + /// The first command execution is underway and the transaction is being created either + /// by inlining or explicitly. + /// + /// + /// + public ByteString TransactionId => Interlocked.CompareExchange(ref _transaction, null, null)?.Id; + + /// + /// The read timestamp of the transaction. May be null. + /// + /// + /// Will be set iif a transaction has been started and represents read-only options + /// with set to true + /// and a value was provided for the parameter. + /// + public Timestamp ReadTimestamp => Interlocked.CompareExchange(ref _transaction, null, null)?.ReadTimestamp; + + // internal for testing + internal MultiplexedSessionPrecommitToken PrecommitToken { get; set; } + + internal ManagedTransaction(ManagedSession managedSession, ByteString transactionId, TransactionOptions transactionOptions, bool singleUseTransaction, Timestamp readTimestamp) + { + _multiplexSession = managedSession.Session; + _client = managedSession.Client; + _timeout = managedSession.Options.Timeout; + + TransactionOptions = transactionOptions?.Clone() ?? new TransactionOptions(); + + GaxPreconditions.CheckArgument( + TransactionOptions.ModeCase == ModeOneofCase.ReadOnly || !singleUseTransaction, + nameof(singleUseTransaction), + "Single use transactions are only supported for read-only transactions."); + GaxPreconditions.CheckArgument( + transactionId is null || TransactionOptions.ModeCase != ModeOneofCase.None, + nameof(transactionOptions), + $"No transaction options were specified for the given transasaction ID {transactionId is null}, {transactionId?.ToBase64()}, {TransactionOptions.ModeCase}."); + GaxPreconditions.CheckArgument( + readTimestamp is null || transactionId is not null, + nameof(readTimestamp), + "A read timestamp can only be specified if a transaction ID is also being specified."); + + SingleUseTransaction = singleUseTransaction; + if (transactionId is not null) + { + // We don't need to lock here, this is the constructor. + _transaction = new Transaction + { + Id = transactionId, + ReadTimestamp = readTimestamp, + }; + } + } + + /// + /// Decides whether we need a transaction ID or not and whether that can be obtained by transaction inlining. + /// Sets the correct transaction selector before executing the given command. + /// + /// All RPCs executed within the managed transaction should call this method for guaranteeing they are + /// using the correct transaction selector. + /// The type of the command response. + /// Called by this method to set the correct transaction selector. + /// The command (RPC) that will be executed after the transaction selector has maybe been set. + /// Callers may fail if command is executed but no transaction selector has been set. + /// Will extract transaction information from the command's response if + /// transaction inlining was succesful. May be null, which indicates that command does not support transaction inlining. + /// If true, transaction creation may be skipped. This is used by commit and rollback + /// so that a transaction is not created just for inmediate commit or rollback. Note that if there are pending mutations, commit + /// should set this parameter to false. + /// + /// The cancellation token for the operation. + /// A task whose result will be the result from having executed . + internal async Task ExecuteMaybeWithTransactionSelectorAsync( + Action transactionSelectorSetter, + Func> commandAsync, + Func inlinedTransactionExtractor, + bool skipTransactionCreation, + CancellationToken cancellationToken, + Mutation mutationKey = null) + { + // If this session is configured to use no transaction we just execute the command. + if (TransactionOptions.ModeCase == ModeOneofCase.None) + { + return await commandAsync().ConfigureAwait(false); + } + + // If this is to be a single use transaction, we set the selector to single use and execute the command. + if (SingleUseTransaction) + { + transactionSelectorSetter(new TransactionSelector { SingleUse = TransactionOptions }); + return await commandAsync().ConfigureAwait(false); + } + + // If we already have a transaction ID, we set the selector to that and execute the command. + // TransactionId is accessed and modified via Interlock.CompareExchange so these two are atomic operations. + // But also, if TransactionId is about to be modified right after this check, that's not a problem, because next + // we'll be awaiting on the task that does the modifying. + if (TransactionId is ByteString transactionId) + { + transactionSelectorSetter(new TransactionSelector { Id = transactionId }); + return await commandAsync().ConfigureAwait(false); + } + + // We now know that we don't have a transaction ID but we need one to execute + // the command we have been given. + // We now need to check if we are already creating a transaction or not. + // If we are, we just get ready to wait for it. + // If we are not, but we need to, we start and save the task that does so, + // and get ready to wait for it. + + // This is the function that will wait for the transaction task to be done. + // We need to initialize a function within the lock, so we can execute + // async code, outside the lock. + // We initialize it with just command, in case no transaction is being created + // and the caller does not require one. This is the case for commits and rollbacks + // executed when no transaction has been created before. + // Commits and rollbacks will know how to handle transaction absence. + Func> commandMaybeWithTransactionAsync = commandAsync; + + lock (_transactionCreationTaskLock) + { + // We are not creating a transaction. We might need to do so. + if (_transactionCreationTask is null) + { + // We need to create a transaction. + if (!skipTransactionCreation) + { + // The calling command does not support inlining + // or the transaction mode Partitioned DML, which cannot be inline. + // Either way we need to create a transaction explicitly. + if (inlinedTransactionExtractor is null || TransactionOptions.ModeCase == ModeOneofCase.PartitionedDml) + { + _transactionCreationTask = Task.Run(() => SetExplicitTransactionAsync(cancellationToken), cancellationToken); + commandMaybeWithTransactionAsync = () => CommandWithTransactionAsync(cancellationToken); + } + // The calling command supports inlining. + // We attempt inlining but if that fails, we create a transaction explicitly. + else + { + // Create a task for executing the command which inlines transaction creation. + // If this task succeeds we'll have both the transaction ID and the response from the command. + // If this task fails we'll have to attempt to begin a transaction explicitly, which will give us a transaction ID, + // and then we'll have to execute the command with that transaction. + Task commandWithInliningTask = Task.Run(CommandWithInliningAsync, cancellationToken); + + // Now, create two tasks, one that is done when we have a transaction ID (via inlining or explicit), + // and one that is done when the command is done (with inlining or explicit). + + // The transaction creation task is the combination of attempting inlining, + // and if that fails, explicitly creating a transaction. + _transactionCreationTask = Task.Run(async () => + { + try + { + await SetInlinedTransactionAsync(commandWithInliningTask).ConfigureAwait(false); + } + catch (Exception ex) + { + Client.Settings.Logger.Warn("Transaction creation via inlining failed. " + + "Attempting to begin an explicit transaction.", + ex); + await SetExplicitTransactionAsync(cancellationToken).ConfigureAwait(false); + } + }, cancellationToken); + + // The command execution task is the combination of attempting inlining, + // and if that fails, executing the command with the explicitly created transaction. + commandMaybeWithTransactionAsync = async () => + { + try + { + var response = await commandWithInliningTask.ConfigureAwait(false); + // If we are here, inlining was successful so we have a transaction. + // We only wait for the _transactionCreationTaks to be done to guarantee + // that the transaction ID obtained via inlining has been stored and can + // be access via TransactionId. In turn this guarantees command executors + // that there's an ID available inmediately after a successful command execution. + await _transactionCreationTask.ConfigureAwait(false); + return response; + } + catch (Exception ex) + { + Client.Settings.Logger.Warn("Command execution with transaction inlining failed. " + + "Waiting for an explicit transaction to be created to attempt command execution.", + ex); + // If we got here, transaction inlining (plus command execution) failed. + // That means that _transactionCreationTask is attempting to created an explicit + // transaction. + // So now we wait for that transaction to be created and the execute the command normally. + return await CommandWithTransactionAsync(cancellationToken).ConfigureAwait(false); + } + }; + } + } + + // No transaction is being created, but we don't need to do so. + // commandWithTransactionAsync is already initialized to just commandAsync. + } + // We are creating a transaction, let's get ready to wait for that to be done and use it. + else + { + commandMaybeWithTransactionAsync = () => CommandWithTransactionAsync(cancellationToken); + } + } + + return await commandMaybeWithTransactionAsync().ConfigureAwait(false); + + async Task CommandWithTransactionAsync(CancellationToken cancellationToken) + { + // This is only called when we know the _transactionCreationTask has been initialized + await _transactionCreationTask.WithCancellationToken(cancellationToken).ConfigureAwait(false); + // Now we know there's a transaction id. + transactionSelectorSetter(new TransactionSelector { Id = TransactionId }); + return await commandAsync().ConfigureAwait(false); + } + + Task CommandWithInliningAsync() + { + transactionSelectorSetter(new TransactionSelector { Begin = TransactionOptions }); + return commandAsync(); + } + + async Task SetExplicitTransactionAsync(CancellationToken cancellationToken) + { + Transaction transaction = await BeginTransactionAsync(cancellationToken, mutationKey).ConfigureAwait(false); + SetTransaction(transaction); + } + + async Task SetInlinedTransactionAsync(Task commandWithInliningTask) + { + TResponse response = await commandWithInliningTask.ConfigureAwait(false); + Transaction transaction = inlinedTransactionExtractor(response) + ?? throw new InvalidOperationException("The inlined transaction extractor returned a null transaction. " + + "This is possibly because of a bug in library code or because an operation that supported transaction inlining has stopped doing so."); + SetTransaction(transaction); + } + + void SetTransaction(Transaction transaction) + { + if (Interlocked.CompareExchange(ref _transaction, transaction, null) is not null) + { + throw new InvalidOperationException("A transaction has already been set on this instance. This is a bug in library code."); + } + } + + async Task BeginTransactionAsync(CancellationToken cancellationToken, Mutation mutationKey = null) + { + var request = new BeginTransactionRequest + { + Options = TransactionOptions, + SessionAsSessionName = SessionName, + MutationKey = mutationKey + }; + var callSettings = Client.Settings.BeginTransactionSettings + .WithExpiration(Expiration.FromTimeout(_timeout)) + .WithCancellationToken(cancellationToken); + + Transaction response = await RecordSuccessAndExpiredSessions(Client.BeginTransactionAsync(request, callSettings)).ConfigureAwait(false); + UpdatePrecommitToken(response.PrecommitToken); + + return response; + } + } + + /// + /// Executes a Commit RPC asynchronously. + /// + /// The commit request. Must not be null. The request will be modified with session and transaction details + /// from this object. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public Task CommitAsync(CommitRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + request.PrecommitToken = FetchPrecommitToken(); + + return ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: CommitAsync, + inlinedTransactionExtractor: null, // Commit does not support inline transactions. + skipTransactionCreation: request.Mutations.Count == 0, // If there are only mutations we won't have a transaction but we need one. + callSettings?.CancellationToken ?? default, + // Multiplex sessions needs a mutation key in transaction create for a purely mutation based transaction + mutationKey: request.Mutations.Count > 0 ? MaybeFetchMutationKey(request.Mutations) : null); + + void SetCommandTransaction(TransactionSelector transactionSelector) + { + switch (transactionSelector.SelectorCase) + { + case TransactionSelector.SelectorOneofCase.Id: + request.TransactionId = transactionSelector.Id; + break; + case TransactionSelector.SelectorOneofCase.Begin: + throw new InvalidOperationException("Commit does not support inline transactions. This is a bug in library code."); + case TransactionSelector.SelectorOneofCase.SingleUse: + throw new InvalidOperationException("A single use transaction cannot be committed."); + default: + throw new InvalidOperationException("Cannot commit with no associated transaction"); + } + } + + async Task CommitAsync() + { + // If a transaction had been started, by now SetTransaction should have been called with a transaction ID. + // If not, there's an attempt to commit a non-existent transaction. + if (request.TransactionId is null || request.TransactionId.IsEmpty) + { + throw new InvalidOperationException("Cannot commit without an associated transaction. " + + "A transaction has not been acquired because no command execution has been attempted."); + } + + CommitResponse finalResponse; + + do + { + // This loop will keep executing as long as we are signaled by Spanner to retry the Commit + // It only exits if we no longer receive a PrecommitToken based retry enum from backend + finalResponse = await ExecuteCommitOnceAsync(request, callSettings).ConfigureAwait(false); + } while (finalResponse.MultiplexedSessionRetryCase == CommitResponse.MultiplexedSessionRetryOneofCase.PrecommitToken); + + return finalResponse; + + async Task ExecuteCommitOnceAsync(CommitRequest request, CallSettings callSettings) + { + // The original logic of the Commit call and session updates + request.PrecommitToken = FetchPrecommitToken(); + CommitResponse response = await RecordSuccessAndExpiredSessions(Client.CommitAsync(request, callSettings)).ConfigureAwait(false); + UpdatePrecommitToken(response.PrecommitToken); + return response; + } + } + + Mutation MaybeFetchMutationKey(Protobuf.Collections.RepeatedField mutations) + { + if (mutations.Count < 1) + { + return null; + } + + // The filtering conditions are: + // 1. It is not a Delete mutation (MutationCase != MutationType.Delete) + // OR + // 2. It is a Delete mutation AND Keyset.Keys.Count >= 1 + + Mutation filteredMutationKey = mutations.FirstOrDefault(mutation => + mutation.Delete is null || (mutation.Delete.KeySet?.Keys.Count ?? 0) >= 1); + + + return filteredMutationKey; + } + } + + /// + /// Executes a Rollback RPC asynchronously. + /// + /// The rollback request. Must not be null. The request will be modified with session and transaction details + /// from this object. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. + public async Task RollbackAsync(RollbackRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + + await ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: RollbackAsync, + inlinedTransactionExtractor: null, // Rollback does not support inline transactions. + skipTransactionCreation: true, // If there's no transaction by the time roll back is called, we fail, we don't need to create one. + callSettings?.CancellationToken ?? default).ConfigureAwait(false); + + void SetCommandTransaction(TransactionSelector transactionSelector) + { + switch (transactionSelector.SelectorCase) + { + case TransactionSelector.SelectorOneofCase.Id: + request.TransactionId = transactionSelector.Id; + break; + case TransactionSelector.SelectorOneofCase.Begin: + throw new InvalidOperationException("Rollback does not support inline transactions. This is a bug in library code."); + case TransactionSelector.SelectorOneofCase.SingleUse: + throw new InvalidOperationException("A single use transaction cannot be rolled back."); + default: + throw new InvalidOperationException("Cannot roll back with no associated transaction"); + } + } + + async Task RollbackAsync() + { + // If a transaction had been started, by now SetTransaction should have been called with a transaction ID. + // If not, there's an attempt to roll back a transaction that was never started. + // Possibly starting the transaction is what failed, but if we fail here as well, we are passing the burden to calling code + // to know whether a transaction was actually acquired before calling rollback, and we don't want to do that. + // Attemting to roll back an empty transaction is no-op. + if (request.TransactionId is null || request.TransactionId.IsEmpty) + { + return false; + } + + await RecordSuccessAndExpiredSessions(Client.RollbackAsync(request, callSettings)).ConfigureAwait(false); + // Just so we can use the same ExecuteMaybeWithTransactionAsync method that expects a result. + return true; + } + } + + /// + /// Executes a PartitionRead RPC asynchronously. + /// + /// The partitioning request. Must not be null. The request will be modified with session details + /// from this object. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public Task PartitionReadAsync(PartitionReadRequest request, CallSettings callSettings) => + PartitionReadOrQueryAsync(PartitionReadOrQueryRequest.FromRequest(request), callSettings); + + /// + /// Executes a PartitionQuery RPC asynchronously. + /// + /// The partitioning request. Must not be null. The request will be modified with session details + /// from this object. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public Task PartitionQueryAsync(PartitionQueryRequest request, CallSettings callSettings) => + PartitionReadOrQueryAsync(PartitionReadOrQueryRequest.FromRequest(request), callSettings); + + /// + /// Executes a PartitionRead RPC asynchronously. + /// + /// The partitioning request. Must not be null. The request will be modified with session details + /// from this object. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + internal async Task PartitionReadOrQueryAsync(PartitionReadOrQueryRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + + return await ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: PartitionReadOrQueryAsync, + inlinedTransactionExtractor: GetInlinedTransaction, + skipTransactionCreation: false, + callSettings?.CancellationToken ?? default).ConfigureAwait(false); + + void SetCommandTransaction(TransactionSelector transactionSelector) + { + switch (transactionSelector.SelectorCase) + { + case TransactionSelector.SelectorOneofCase.Id: + case TransactionSelector.SelectorOneofCase.Begin: + request.Transaction = transactionSelector; + break; + case TransactionSelector.SelectorOneofCase.SingleUse: + throw new InvalidOperationException("A single use transaction cannot be used for creating partitioned reads or queries."); + default: + throw new InvalidOperationException("Cannot call PartitionReadOrQueryAsync with no associated transaction."); + } + } + + Task PartitionReadOrQueryAsync() + { + // By now SetTransaction should have been called with a valid transaction selector. + // If not, there's a bug in code because we said not to skip transaction creation. + if (request.Transaction is null) + { + throw new InvalidOperationException("Cannot call PartitionReadOrQueryAsync with no associated transaction."); + } + + return RecordSuccessAndExpiredSessions(request.PartitionAsync(Client, callSettings)); + } + + Transaction GetInlinedTransaction(PartitionResponse response) => response?.Transaction; + } + + /// + /// Creates a for the given request. + /// + /// + /// The read request. Must not be null. + /// Will be modified to include session information from this managed transaction. + /// May be modified to include transaction and directed read options information + /// from this managed transaction and its underlying . + /// + /// If not null, applies overrides to this RPC call. + /// A for the streaming SQL request. + public ReliableStreamReader ReadStreamReaderAsync(ReadRequest request, CallSettings callSettings) => + ExecuteReadOrQueryStreamReader(ReadOrQueryRequest.FromRequest(request), callSettings); + + /// + /// Creates a for the given request. + /// + /// + /// The query request. Must not be null. + /// Will be modified to include session information from this managed transaction. + /// May be modified to include transaction and directed read options information + /// from this managed transaction and its underlying . + /// + /// If not null, applies overrides to this RPC call. + /// A for the streaming SQL request. + public ReliableStreamReader ExecuteSqlStreamReaderAsync(ExecuteSqlRequest request, CallSettings callSettings) => + ExecuteReadOrQueryStreamReader(ReadOrQueryRequest.FromRequest(request), callSettings); + + /// + /// Creates a for the given request + /// + /// The read request. Must not be null. The request will be modified with session and transaction details + /// from this object. If this object's is null, the request's transaction is not modified. + /// If not null, applies overrides to this RPC call. + /// A for the streaming read request. + internal ReliableStreamReader ExecuteReadOrQueryStreamReader(ReadOrQueryRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + SpannerClientImpl.ApplyResourcePrefixHeaderFromSession(ref callSettings, request.Session); + Client.MaybeApplyRouteToLeaderHeader(ref callSettings, TransactionMode); + MaybeApplyDirectedReadOptions(request.UnderlyingRequest); + + ResultStream stream = new ResultStream(Client, request, this, callSettings); + return new ReliableStreamReader(stream, Client.Settings.Logger); + } + + /// + /// Executes an ExecuteSql RPC asynchronously. + /// + /// The query request. Must not be null. + /// Will be modified to include session information from this managed transaction. + /// May be modified to include transaction and directed read options information + /// from this managed transaction and its underlying . + /// + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public Task ExecuteSqlAsync(ExecuteSqlRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + + return ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: ExecuteSqlAsync, + inlinedTransactionExtractor: GetInlinedTransaction, + skipTransactionCreation: false, + callSettings?.CancellationToken ?? default); + + void SetCommandTransaction(TransactionSelector transactionSelector) => request.Transaction = transactionSelector; + + // This needs to be made async now with multiplex sessions as we need to check/update the Precommit token + async Task ExecuteSqlAsync() + { + Client.MaybeApplyRouteToLeaderHeader(ref callSettings, TransactionMode); + MaybeApplyDirectedReadOptions(request); + ResultSet response = await RecordSuccessAndExpiredSessions(Client.ExecuteSqlAsync(request, callSettings)).ConfigureAwait(false); + UpdatePrecommitToken(response.PrecommitToken); + return response; + } + + Transaction GetInlinedTransaction(ResultSet response) => response?.Metadata?.Transaction; + } + + /// + /// Executes an ExecuteBatchDml RPC asynchronously. + /// + /// The query request. Must not be null. The request will be modified with session and transaction details + /// from this object. If this object's is null, the request's transaction is not modified. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public async Task ExecuteBatchDmlAsync(ExecuteBatchDmlRequest request, CallSettings callSettings) + { + GaxPreconditions.CheckNotNull(request, nameof(request)); + + request.SessionAsSessionName = SessionName; + + return await ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: ExecuteBatchDmlAsync, + inlinedTransactionExtractor: GetInlinedTransaction, + skipTransactionCreation: false, + callSettings?.CancellationToken ?? default).ConfigureAwait(false); + + void SetCommandTransaction(TransactionSelector transactionSelector) => request.Transaction = transactionSelector; + + // This needs to be made async now with multiplex sessions as we need to check/update the Precommit token + async Task ExecuteBatchDmlAsync() + { + ExecuteBatchDmlResponse response = await RecordSuccessAndExpiredSessions(Client.ExecuteBatchDmlAsync(request, callSettings)).ConfigureAwait(false); + UpdatePrecommitToken(response.PrecommitToken); + return response; + } + + Transaction GetInlinedTransaction(ExecuteBatchDmlResponse response) => response?.ResultSets?.FirstOrDefault()?.Metadata?.Transaction; + } + + private void MaybeApplyDirectedReadOptions(IReadOrQueryRequest request) + { + if (TransactionMode == ModeOneofCase.ReadOnly // Directed reads apply only to single use or read only transactions. Single use are read only. + && request.DirectedReadOptions is null) // Request specific options have priority over client options. + { + request.DirectedReadOptions = Client.Settings.DirectedReadOptions; + } + + // We don't validate that DirectedReadOptions is null when this is a non-read-only transaction. + // We just pass the request along as we received it. The service should fail if there are options set. + // This was agreed as part of the client library desing. + } + + private async Task RecordSuccessAndExpiredSessions(Task task) + { + var result = await task.WithSessionExpiryChecking(Session).ConfigureAwait(false); + return result; + } + + private async Task RecordSuccessAndExpiredSessions(Task task) + { + await task.WithSessionExpiryChecking(Session).ConfigureAwait(false); + } + + internal void UpdatePrecommitToken(MultiplexedSessionPrecommitToken token) + { + lock (_precommitTokenUpdateLock) + { + if (PrecommitToken == null || PrecommitToken.SeqNum < token?.SeqNum) + { + PrecommitToken = token; + } + } + } + + internal MultiplexedSessionPrecommitToken FetchPrecommitToken() + { + lock (_precommitTokenUpdateLock) + { + return PrecommitToken; + } + } + } +} diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs index d5e3cb7a74f7..5f4c5753b1ae 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs @@ -51,6 +51,7 @@ internal sealed class ResultStream : IAsyncStreamReader, IDisp private readonly SpannerClient _client; private readonly ReadOrQueryRequest _request; private readonly PooledSession _pooledSession; + private readonly ManagedTransaction _transaction; private readonly CallSettings _callSettings; private readonly RetrySettings _retrySettings; private readonly int _maxBufferSize; @@ -71,6 +72,34 @@ internal ResultStream(SpannerClient client, ReadOrQueryRequest request, PooledSe { } + /// + /// Constructor for normal usage, taking in a managed transaction, with default buffer size, backoff settings and jitter. + /// + internal ResultStream(SpannerClient client, ReadOrQueryRequest request, ManagedTransaction transaction, CallSettings callSettings) + : this(client, request, transaction, callSettings, DefaultMaxBufferSize, s_defaultRetrySettings) + { + } + + /// + /// Constructor with complete control that does not perform any validation. + /// + internal ResultStream( + SpannerClient client, + ReadOrQueryRequest request, + ManagedTransaction transaction, + CallSettings callSettings, + int maxBufferSize, + RetrySettings retrySettings) + { + _buffer = new LinkedList(); + _client = GaxPreconditions.CheckNotNull(client, nameof(client)); + _request = GaxPreconditions.CheckNotNull(request, nameof(request)); + _transaction = GaxPreconditions.CheckNotNull(transaction, nameof(transaction)); + _callSettings = callSettings; + _maxBufferSize = GaxPreconditions.CheckArgumentRange(maxBufferSize, nameof(maxBufferSize), 1, 10_000); + _retrySettings = GaxPreconditions.CheckNotNull(retrySettings, nameof(retrySettings)); + } + /// /// Constructor with complete control that does not perform any validation. /// @@ -105,6 +134,7 @@ public async Task MoveNext(CancellationToken cancellationToken) { var value = await ComputeNextAsync(cancellationToken).ConfigureAwait(false); Current = value; + _transaction?.UpdatePrecommitToken(value?.PrecommitToken); return value != null; } @@ -134,20 +164,32 @@ private async Task ComputeNextAsync(CancellationToken cancella bool hasNext = false; if (_grpcCall is null) { - // Whenever we have to execute the gRPC streaming call we ask the pooled session - // for the transaction. Only the first time the gRPC streaming call is executed - // the transaction will be inlined, if there's not a transaction already. Subsequent - // times will just get the same transacton ID. So in principle, we only need to ask - // for the transaction the first and second times the gRPC streaming call is executed, - // but doing it every time simplifies implementation and adds little overhead, because - // once there's a transaction ID, ExecuteMaybeWithTransactionSelectorAsync returns - // inmediately. - await _pooledSession.ExecuteMaybeWithTransactionSelectorAsync( - transactionSelectorSetter: SetCommandTransaction, - commandAsync: ExecuteStreamingAsync, - inlinedTransactionExtractor: GetInlinedTransaction, - skipTransactionCreation: false, - cancellationToken).ConfigureAwait(false); + if (_transaction != null) + { + await _transaction.ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: ExecuteStreamingAsync, + inlinedTransactionExtractor: GetInlinedTransaction, + skipTransactionCreation: false, + cancellationToken).ConfigureAwait(false); + } + else + { + // Whenever we have to execute the gRPC streaming call we ask the pooled session + // for the transaction. Only the first time the gRPC streaming call is executed + // the transaction will be inlined, if there's not a transaction already. Subsequent + // times will just get the same transacton ID. So in principle, we only need to ask + // for the transaction the first and second times the gRPC streaming call is executed, + // but doing it every time simplifies implementation and adds little overhead, because + // once there's a transaction ID, ExecuteMaybeWithTransactionSelectorAsync returns + // inmediately. + await _pooledSession.ExecuteMaybeWithTransactionSelectorAsync( + transactionSelectorSetter: SetCommandTransaction, + commandAsync: ExecuteStreamingAsync, + inlinedTransactionExtractor: GetInlinedTransaction, + skipTransactionCreation: false, + cancellationToken).ConfigureAwait(false); + } void SetCommandTransaction(TransactionSelector transactionSelector) => _request.Transaction = transactionSelector; @@ -173,7 +215,7 @@ async Task ExecuteStreamingAsync() hasNext = await MoveNextAsync().ConfigureAwait(false); } - Task MoveNextAsync() => _grpcCall.ResponseStream.MoveNext(cancellationToken).WithSessionExpiryChecking(_pooledSession.Session); + Task MoveNextAsync() => _grpcCall.ResponseStream.MoveNext(cancellationToken).WithSessionExpiryChecking(_transaction != null ? _transaction.Session : _pooledSession.Session); retryState.Reset();