Skip to content

Commit 75c29fa

Browse files
committed
tests(Spanner): Explicit integration tests for MUX
1 parent cfa2392 commit 75c29fa

File tree

1 file changed

+243
-0
lines changed

1 file changed

+243
-0
lines changed
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"):
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.Spanner.V1;
16+
using Google.Cloud.Spanner.Data;
17+
using Google.Cloud.Spanner.Data.IntegrationTests;
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Linq;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Xunit;
24+
using Google.Cloud.ClientTesting;
25+
using Google.Cloud.Spanner.Data.CommonTesting;
26+
27+
namespace Google.Cloud.Spanner.V1.IntegrationTests;
28+
29+
[Collection(nameof(DmlTableFixture))]
30+
[CommonTestDiagnostics]
31+
public class ManagedSessionIntegrationTests
32+
{
33+
private readonly DmlTableFixture _fixture;
34+
private readonly string _tableName;
35+
36+
public ManagedSessionIntegrationTests(DmlTableFixture fixture)
37+
{
38+
_fixture = fixture;
39+
_tableName = fixture.TableName;
40+
}
41+
42+
private async Task<ManagedSession> CreateManagedSessionAsync()
43+
{
44+
var client = await _fixture.SpannerClientCreationOptions.CreateSpannerClientAsync(new SpannerSettings());
45+
var databaseName = _fixture.DatabaseName;
46+
var options = ManagedSessionOptions.Create(databaseName, client);
47+
return new ManagedSession(options);
48+
}
49+
50+
[Fact]
51+
public async Task BasicReadWriteCycle()
52+
{
53+
var managedSession = await CreateManagedSessionAsync();
54+
string key = Guid.NewGuid().ToString();
55+
56+
// 1. Begin RW Transaction
57+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
58+
59+
// 2. Insert row
60+
var insertRequest = new ExecuteSqlRequest
61+
{
62+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 1)",
63+
Params = new Protobuf.WellKnownTypes.Struct
64+
{
65+
Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } }
66+
}
67+
};
68+
await transaction.ExecuteSqlAsync(insertRequest, null);
69+
70+
// 3. Commit
71+
await transaction.CommitAsync(new CommitRequest(), null);
72+
73+
// 4. Verify data via standard SpannerConnection
74+
using var connection = _fixture.GetConnection();
75+
using var cmd = connection.CreateSelectCommand($"SELECT Value FROM {_tableName} WHERE Key=@key");
76+
cmd.Parameters.Add("key", SpannerDbType.String, key);
77+
var value = await cmd.ExecuteScalarAsync();
78+
Assert.Equal(1L, value);
79+
}
80+
81+
[Fact]
82+
[Trait(Constants.SupportedOnEmulator, Constants.No)]
83+
public async Task ConcurrentTransactions_Multiplexing()
84+
{
85+
var managedSession = await CreateManagedSessionAsync();
86+
int concurrency = 5;
87+
var tasks = Enumerable.Range(0, concurrency).Select(i => RunTransactionAsync(i)).ToList();
88+
89+
await Task.WhenAll(tasks);
90+
91+
async Task RunTransactionAsync(int index)
92+
{
93+
string key = $"mux-{Guid.NewGuid()}-{index}";
94+
95+
// Start a new transaction on the SAME managed session
96+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
97+
98+
// Execute some SQL
99+
var insertRequest = new ExecuteSqlRequest
100+
{
101+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, @val)",
102+
Params = new Protobuf.WellKnownTypes.Struct
103+
{
104+
Fields =
105+
{
106+
{ "key", Protobuf.WellKnownTypes.Value.ForString(key) },
107+
{ "val", Protobuf.WellKnownTypes.Value.ForString(index.ToString()) }
108+
}
109+
}
110+
};
111+
await transaction.ExecuteSqlAsync(insertRequest, null);
112+
113+
// Commit
114+
await transaction.CommitAsync(new CommitRequest(), null);
115+
116+
// Verify
117+
using var connection = _fixture.GetConnection();
118+
using var cmd = connection.CreateSelectCommand($"SELECT Value FROM {_tableName} WHERE Key=@key");
119+
cmd.Parameters.Add("key", SpannerDbType.String, key);
120+
var value = await cmd.ExecuteScalarAsync();
121+
Assert.Equal((long)index, value);
122+
}
123+
}
124+
125+
[Fact]
126+
public async Task BatchDml()
127+
{
128+
var managedSession = await CreateManagedSessionAsync();
129+
string key1 = Guid.NewGuid().ToString();
130+
string key2 = Guid.NewGuid().ToString();
131+
132+
await using var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None);
133+
134+
var batchRequest = new ExecuteBatchDmlRequest
135+
{
136+
Statements =
137+
{
138+
new ExecuteBatchDmlRequest.Types.Statement
139+
{
140+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 10)",
141+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key1) } } }
142+
},
143+
new ExecuteBatchDmlRequest.Types.Statement
144+
{
145+
Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 20)",
146+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key2) } } }
147+
}
148+
}
149+
};
150+
151+
var response = await transaction.ExecuteBatchDmlAsync(batchRequest, null);
152+
Assert.Equal(2, response.ResultSets.Count);
153+
154+
await transaction.CommitAsync(new CommitRequest(), null);
155+
156+
// Verify
157+
using var connection = _fixture.GetConnection();
158+
using var cmd = connection.CreateSelectCommand($"SELECT SUM(Value) FROM {_tableName} WHERE Key IN (@k1, @k2)");
159+
cmd.Parameters.Add("k1", SpannerDbType.String, key1);
160+
cmd.Parameters.Add("k2", SpannerDbType.String, key2);
161+
var sum = await cmd.ExecuteScalarAsync();
162+
Assert.Equal(30L, sum);
163+
}
164+
165+
[Fact]
166+
public async Task StreamingRead()
167+
{
168+
var managedSession = await CreateManagedSessionAsync();
169+
string key = Guid.NewGuid().ToString();
170+
171+
// 1. Setup data
172+
await using (var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadWrite = new() }, false, false, CancellationToken.None))
173+
{
174+
await transaction.ExecuteSqlAsync(new ExecuteSqlRequest { Sql = $"INSERT INTO {_tableName} (Key, OriginalValue, Value) VALUES (@key, 1, 100)" , Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } } }}, null);
175+
await transaction.CommitAsync(new CommitRequest(), null);
176+
}
177+
178+
// 2. Stream read back in a ReadOnly transaction
179+
await using (var transaction = await managedSession.BeginTransactionAsync(new TransactionOptions { ReadOnly = new() }, false, false, CancellationToken.None))
180+
{
181+
var reader = transaction.ExecuteSqlStreamReader(new ExecuteSqlRequest
182+
{
183+
Sql = $"SELECT Value FROM {_tableName} WHERE Key = @key",
184+
Params = new Protobuf.WellKnownTypes.Struct { Fields = { { "key", Protobuf.WellKnownTypes.Value.ForString(key) } } }
185+
}, null);
186+
187+
var values = new List<long>();
188+
Protobuf.WellKnownTypes.Value value;
189+
while ((value = await reader.NextAsync(CancellationToken.None)) != null)
190+
{
191+
values.Add(long.Parse(value.StringValue));
192+
}
193+
194+
Assert.Single(values);
195+
Assert.Equal(100L, values[0]);
196+
}
197+
}
198+
199+
[Fact]
200+
public async Task BatchWrite()
201+
{
202+
var managedSession = await CreateManagedSessionAsync();
203+
string key1 = Guid.NewGuid().ToString();
204+
string key2 = Guid.NewGuid().ToString();
205+
206+
var request = new BatchWriteRequest
207+
{
208+
MutationGroups =
209+
{
210+
new BatchWriteRequest.Types.MutationGroup
211+
{
212+
Mutations = { new Mutation { Insert = new Mutation.Types.Write { Table = _tableName, Columns = { "Key", "OriginalValue", "Value" }, Values = { new Protobuf.WellKnownTypes.ListValue { Values = { Protobuf.WellKnownTypes.Value.ForString(key1), Protobuf.WellKnownTypes.Value.ForString("1"), Protobuf.WellKnownTypes.Value.ForString("100") } } } } } }
213+
},
214+
new BatchWriteRequest.Types.MutationGroup
215+
{
216+
Mutations = { new Mutation { Insert = new Mutation.Types.Write { Table = _tableName, Columns = { "Key", "OriginalValue", "Value" }, Values = { new Protobuf.WellKnownTypes.ListValue { Values = { Protobuf.WellKnownTypes.Value.ForString(key2), Protobuf.WellKnownTypes.Value.ForString("1"), Protobuf.WellKnownTypes.Value.ForString("200") } } } } } }
217+
}
218+
}
219+
};
220+
221+
var responseStream = await managedSession.BatchWriteAsync(request, null);
222+
var responses = new List<BatchWriteResponse>();
223+
while (await responseStream.MoveNextAsync(CancellationToken.None))
224+
{
225+
responses.Add(responseStream.Current);
226+
}
227+
228+
// Note: BatchWrite status check depends on backend, but we expect successes for these inserts.
229+
Assert.NotEmpty(responses);
230+
foreach (var resp in responses)
231+
{
232+
Assert.Equal(0, resp.Status.Code); // OK
233+
}
234+
235+
// Verify data
236+
using var connection = _fixture.GetConnection();
237+
using var cmd = connection.CreateSelectCommand($"SELECT SUM(Value) FROM {_tableName} WHERE Key IN (@k1, @k2)");
238+
cmd.Parameters.Add("k1", SpannerDbType.String, key1);
239+
cmd.Parameters.Add("k2", SpannerDbType.String, key2);
240+
var sum = await cmd.ExecuteScalarAsync();
241+
Assert.Equal(300L, sum);
242+
}
243+
}

0 commit comments

Comments
 (0)