Skip to content

Commit c1a9536

Browse files
committed
tests(Spanner): Explicit integration tests for MUX
1 parent cfa2392 commit c1a9536

File tree

1 file changed

+242
-0
lines changed

1 file changed

+242
-0
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"):
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.Spanner.V1;
16+
using Google.Cloud.Spanner.Data;
17+
using Google.Cloud.Spanner.Data.IntegrationTests;
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Linq;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Xunit;
24+
using Google.Cloud.ClientTesting;
25+
26+
namespace Google.Cloud.Spanner.V1.IntegrationTests;
27+
28+
[Collection(nameof(DmlTableFixture))]
29+
[CommonTestDiagnostics]
30+
public class ManagedSessionIntegrationTests
31+
{
32+
private readonly DmlTableFixture _fixture;
33+
private readonly string _tableName;
34+
35+
public ManagedSessionIntegrationTests(DmlTableFixture fixture)
36+
{
37+
_fixture = fixture;
38+
_tableName = fixture.TableName;
39+
}
40+
41+
private async Task<ManagedSession> CreateManagedSessionAsync()
42+
{
43+
var client = await _fixture.SpannerClientCreationOptions.CreateSpannerClientAsync(new SpannerSettings());
44+
var databaseName = _fixture.DatabaseName;
45+
var options = ManagedSessionOptions.Create(databaseName, client);
46+
return new ManagedSession(options);
47+
}
48+
49+
[Fact]
50+
public async Task BasicReadWriteCycle()
51+
{
52+
var managedSession = await CreateManagedSessionAsync();
53+
string key = Guid.NewGuid().ToString();
54+
55+
// 1. Begin RW Transaction
56+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
57+
58+
// 2. Insert row
59+
var insertRequest = new ExecuteSqlRequest
60+
{
61+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 1)",
62+
Params = new Protobuf.WellKnownTypes.Struct
63+
{
64+
Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } }
65+
}
66+
};
67+
await transaction.ExecuteSqlAsync(insertRequest, null);
68+
69+
// 3. Commit
70+
await transaction.CommitAsync(new CommitRequest(), null);
71+
72+
// 4. Verify data via standard SpannerConnection
73+
using var connection = _fixture.GetConnection();
74+
using var cmd = connection.CreateSelectCommand($"SELECT Value FROM {_tableName} WHERE Key=@key");
75+
cmd.Parameters.Add("key", SpannerDbType.String, key);
76+
var value = await cmd.ExecuteScalarAsync();
77+
Assert.Equal(1L, value);
78+
}
79+
80+
[Fact]
81+
[Trait(Constants.SupportedOnEmulator, Constants.No)]
82+
public async Task ConcurrentTransactions_Multiplexing()
83+
{
84+
var managedSession = await CreateManagedSessionAsync();
85+
int concurrency = 5;
86+
var tasks = Enumerable.Range(0, concurrency).Select(i => RunTransactionAsync(i)).ToList();
87+
88+
await Task.WhenAll(tasks);
89+
90+
async Task RunTransactionAsync(int index)
91+
{
92+
string key = $"mux-{Guid.NewGuid()}-{index}";
93+
94+
// Start a new transaction on the SAME managed session
95+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
96+
97+
// Execute some SQL
98+
var insertRequest = new ExecuteSqlRequest
99+
{
100+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, @val)",
101+
Params = new Protobuf.WellKnownTypes.Struct
102+
{
103+
Fields =
104+
{
105+
{ "key", Protobuf.WellKnownTypes.Value.ForString(key) },
106+
{ "val", Protobuf.WellKnownTypes.Value.ForString(index.ToString()) }
107+
}
108+
}
109+
};
110+
await transaction.ExecuteSqlAsync(insertRequest, null);
111+
112+
// Commit
113+
await transaction.CommitAsync(new CommitRequest(), null);
114+
115+
// Verify
116+
using var connection = _fixture.GetConnection();
117+
using var cmd = connection.CreateSelectCommand($"SELECT Value FROM {_tableName} WHERE Key=@key");
118+
cmd.Parameters.Add("key", SpannerDbType.String, key);
119+
var value = await cmd.ExecuteScalarAsync();
120+
Assert.Equal((long)index, value);
121+
}
122+
}
123+
124+
[Fact]
125+
public async Task BatchDml()
126+
{
127+
var managedSession = await CreateManagedSessionAsync();
128+
string key1 = Guid.NewGuid().ToString();
129+
string key2 = Guid.NewGuid().ToString();
130+
131+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
132+
133+
var batchRequest = new ExecuteBatchDmlRequest
134+
{
135+
Statements =
136+
{
137+
new ExecuteBatchDmlRequest.Types.Statement
138+
{
139+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 10)",
140+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key1) } } }
141+
},
142+
new ExecuteBatchDmlRequest.Types.Statement
143+
{
144+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 20)",
145+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key2) } } }
146+
}
147+
}
148+
};
149+
150+
var response = await transaction.ExecuteBatchDmlAsync(batchRequest, null);
151+
Assert.Equal(2, response.ResultSets.Count);
152+
153+
await transaction.CommitAsync(new CommitRequest(), null);
154+
155+
// Verify
156+
using var connection = _fixture.GetConnection();
157+
using var cmd = connection.CreateSelectCommand($"SELECT SUM(Value) FROM {_tableName} WHERE Key IN (@k1, @k2)");
158+
cmd.Parameters.Add("k1", SpannerDbType.String, key1);
159+
cmd.Parameters.Add("k2", SpannerDbType.String, key2);
160+
var sum = await cmd.ExecuteScalarAsync();
161+
Assert.Equal(30L, sum);
162+
}
163+
164+
[Fact]
165+
public async Task StreamingRead()
166+
{
167+
var managedSession = await CreateManagedSessionAsync();
168+
string key = Guid.NewGuid().ToString();
169+
170+
// 1. Setup data
171+
await using (var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None))
172+
{
173+
await transaction.ExecuteSqlAsync(new ExecuteSqlRequest { Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 100)" , Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } } }}, null);
174+
await transaction.CommitAsync(new CommitRequest(), null);
175+
}
176+
177+
// 2. Stream read back in a ReadOnly transaction
178+
await using (var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadOnly = new() }, false, false, CancellationToken.None))
179+
{
180+
var reader = transaction.ExecuteSqlStreamReader(new ExecuteSqlRequest
181+
{
182+
Sql = $"SELECT Value FROM {_tableName} WHERE Key = @key",
183+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } } }
184+
}, null);
185+
186+
var values = new List<long>();
187+
Protobuf.WellKnownTypes.Value value;
188+
while ((value = await reader.NextAsync(CancellationToken.None)) != null)
189+
{
190+
values.Add(long.Parse(value.StringValue));
191+
}
192+
193+
Assert.Single(values);
194+
Assert.Equal(100L, values[0]);
195+
}
196+
}
197+
198+
[Fact]
199+
public async Task BatchWrite()
200+
{
201+
var managedSession = await CreateManagedSessionAsync();
202+
string key1 = Guid.NewGuid().ToString();
203+
string key2 = Guid.NewGuid().ToString();
204+
205+
var request = new BatchWriteRequest
206+
{
207+
MutationGroups =
208+
{
209+
new BatchWriteRequest.Types.MutationGroup
210+
{
211+
Mutations = { new Mutation { Insert = new Mutation.Types.Write { Table = _tableName, Columns = { "Key", "OriginalValue", "Value" }, Values = { new Protobuf.WellKnownTypes.ListValue { Values = { Protobuf.WellKnownTypes.Value.ForString(key1), Protobuf.WellKnownTypes.Value.ForString("1"), Protobuf.WellKnownTypes.Value.ForString("100") } } } } } }
212+
},
213+
new BatchWriteRequest.Types.MutationGroup
214+
{
215+
Mutations = { new Mutation { Insert = new Mutation.Types.Write { Table = _tableName, Columns = { "Key", "OriginalValue", "Value" }, Values = { new Protobuf.WellKnownTypes.ListValue { Values = { Protobuf.WellKnownTypes.Value.ForString(key2), Protobuf.WellKnownTypes.Value.ForString("1"), Protobuf.WellKnownTypes.Value.ForString("200") } } } } } }
216+
}
217+
}
218+
};
219+
220+
var responseStream = await managedSession.BatchWriteAsync(request, null);
221+
var responses = new List<BatchWriteResponse>();
222+
while (await responseStream.MoveNextAsync(CancellationToken.None))
223+
{
224+
responses.Add(responseStream.Current);
225+
}
226+
227+
// Note: BatchWrite status check depends on backend, but we expect successes for these inserts.
228+
Assert.NotEmpty(responses);
229+
foreach (var resp in responses)
230+
{
231+
Assert.Equal(0, resp.Status.Code); // OK
232+
}
233+
234+
// Verify data
235+
using var connection = _fixture.GetConnection();
236+
using var cmd = connection.CreateSelectCommand($"SELECT SUM(Value) FROM {_tableName} WHERE Key IN (@k1, @k2)");
237+
cmd.Parameters.Add("k1", SpannerDbType.String, key1);
238+
cmd.Parameters.Add("k2", SpannerDbType.String, key2);
239+
var sum = await cmd.ExecuteScalarAsync();
240+
Assert.Equal(300L, sum);
241+
}
242+
}

0 commit comments

Comments
 (0)