Skip to content

Commit e9dc04f

Browse files
feat(Spanner.Data): Add support for BatchWrite.
1 parent 4e3c369 commit e9dc04f

File tree

8 files changed

+643
-0
lines changed

8 files changed

+643
-0
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.ClientTesting;
16+
using Google.Cloud.Spanner.Data.CommonTesting;
17+
using Google.Cloud.Spanner.V1;
18+
using System;
19+
using System.Collections.Generic;
20+
using System.Threading.Tasks;
21+
using Xunit;
22+
23+
namespace Google.Cloud.Spanner.Data.IntegrationTests;
24+
25+
/// <summary>
26+
/// Integration tests for <see cref="SpannerBatchWriteCommand"/>.
27+
/// These tests validate the public ADO.NET-like surface for the Spanner BatchWrite RPC.
28+
/// </summary>
29+
[Collection(nameof(SpannerBatchWriteCommandTableFixture))]
30+
[CommonTestDiagnostics]
31+
public class SpannerBatchWriteCommandTests
32+
{
33+
private readonly SpannerBatchWriteCommandTableFixture _fixture;
34+
35+
public SpannerBatchWriteCommandTests(SpannerBatchWriteCommandTableFixture fixture) =>
36+
_fixture = fixture;
37+
38+
[Fact]
39+
public async Task BatchWrite()
40+
{
41+
using var connection = _fixture.GetConnection();
42+
await connection.OpenAsync();
43+
44+
var command = connection.CreateBatchWriteCommand();
45+
46+
// BatchWrite executes mutation groups independently.
47+
// Group 0: Two non-conflicting inserts. If both are valid, the group succeeds.
48+
command.Add(CreateInsertCommand(connection, "v1"), CreateInsertCommand(connection, "v2"));
49+
// Group 1: A single insert.
50+
command.Add(CreateInsertCommand(connection, "v3"));
51+
52+
var (successes, failures) = await GetResults(command.ExecuteNonQueryAsync());
53+
54+
// We expect both groups to succeed.
55+
Assert.Empty(failures);
56+
Assert.Equivalent(new[] { 0, 1 }, successes);
57+
}
58+
59+
[Fact]
60+
public async Task BatchWrite_VariousTypes()
61+
{
62+
var key1 = IdGenerator.FromGuid();
63+
var key2 = IdGenerator.FromGuid();
64+
var key3 = IdGenerator.FromGuid();
65+
66+
using var connection = _fixture.GetConnection();
67+
await connection.OpenAsync();
68+
69+
// Preparation: Insert rows that will be target for Update and Delete later in the batch.
70+
await CreateInsertCommand(connection, key: key2).ExecuteNonQueryAsync();
71+
await CreateInsertCommand(connection, key: key3).ExecuteNonQueryAsync();
72+
73+
var command = connection.CreateBatchWriteCommand();
74+
75+
// Testing that BatchWrite correctly handles different types of write operations.
76+
// Group 0: A standard Insert of a new row.
77+
command.Add(CreateInsertCommand(connection, key: key1));
78+
79+
// Group 1: An Update of an existing row (key2).
80+
command.Add(connection.CreateUpdateCommand(_fixture.TableName, new SpannerParameterCollection
81+
{
82+
{ _fixture.KeyName, SpannerDbType.String, key2 },
83+
{ _fixture.ValueName, SpannerDbType.String, "updated" }
84+
}));
85+
86+
// Group 2: A Delete of an existing row (key3).
87+
command.Add(connection.CreateDeleteCommand(_fixture.TableName, new SpannerParameterCollection
88+
{
89+
{ _fixture.KeyName, SpannerDbType.String, key3 }
90+
}));
91+
92+
var (successes, failures) = await GetResults(command.ExecuteNonQueryAsync());
93+
94+
// We expect 3 successes as we have 3 independent mutation groups (Insert, Update, Delete).
95+
Assert.Empty(failures);
96+
Assert.Equivalent(new[] { 0, 1, 2 }, successes);
97+
}
98+
99+
[Fact]
100+
public async Task BatchWrite_Conflict()
101+
{
102+
using var connection = _fixture.GetConnection();
103+
await connection.OpenAsync();
104+
105+
var command = connection.CreateBatchWriteCommand();
106+
var cmd = CreateInsertCommand(connection, key: IdGenerator.FromGuid());
107+
108+
// Testing the independent nature of mutation groups.
109+
// Group 0: Two identical inserts. This group will fail atomically because
110+
// you cannot have multiple mutations for the same key in a single group.
111+
command.Add(cmd, cmd);
112+
113+
// Group 1: A single valid insert. This should succeed even though Group 0 fails.
114+
command.Add(CreateInsertCommand(connection));
115+
116+
var (successes, failures) = await GetResults(command.ExecuteNonQueryAsync());
117+
118+
// We expect Group 0 to fail and Group 1 to succeed.
119+
Assert.Equivalent(new[] { 0 }, failures);
120+
Assert.Equivalent(new[] { 1 }, successes);
121+
}
122+
123+
[Fact]
124+
public async Task BatchWrite_AddEnumerable()
125+
{
126+
using var connection = _fixture.GetConnection();
127+
await connection.OpenAsync();
128+
129+
var command = connection.CreateBatchWriteCommand();
130+
131+
var commands = new List<SpannerCommand>
132+
{
133+
CreateInsertCommand(connection),
134+
CreateInsertCommand(connection)
135+
};
136+
137+
// Validates the overload that adds a collection of commands as a single atomic group.
138+
// Group 0: Two commands added as a collection.
139+
command.Add(commands);
140+
141+
var (successes, failures) = await GetResults(command.ExecuteNonQueryAsync());
142+
143+
// One group was added, so we expect index 0 to succeed.
144+
Assert.Empty(failures);
145+
Assert.Equivalent(new[] { 0 }, successes);
146+
}
147+
148+
[Fact]
149+
public void BatchWrite_UnsupportedCommandType_Throws()
150+
{
151+
using var connection = _fixture.GetConnection();
152+
var command = connection.CreateBatchWriteCommand();
153+
154+
// SELECT is not a write operation and is not supported by the BatchWrite RPC.
155+
var selectCommand = connection.CreateSelectCommand("SELECT 1");
156+
157+
var exception = Assert.Throws<InvalidOperationException>(() => command.Add(selectCommand));
158+
Assert.Contains("not supported in BatchWrite", exception.Message);
159+
}
160+
161+
/// <summary>
162+
/// Helper to create an insert command with a unique key.
163+
/// </summary>
164+
private SpannerCommand CreateInsertCommand(SpannerConnection connection, string key = null, string value = null) =>
165+
connection.CreateInsertCommand(_fixture.TableName, new SpannerParameterCollection
166+
{
167+
{ _fixture.KeyName, SpannerDbType.String, key ?? IdGenerator.FromGuid() },
168+
{ _fixture.ValueName, SpannerDbType.String, value ?? "v" }
169+
});
170+
171+
/// <summary>
172+
/// Helper to aggregate the indexes of successes and failures from the BatchWrite response stream.
173+
/// Each index corresponds to the 0-based position of the mutation group in the request.
174+
/// </summary>
175+
private static async Task<(List<int> successes, List<int> failures)> GetResults(IAsyncEnumerable<SpannerBatchWriteCommand.BatchWriteResult> responseStream)
176+
{
177+
var successes = new List<int>();
178+
var failures = new List<int>();
179+
await foreach (var response in responseStream)
180+
{
181+
if (response.Status.Code == 0)
182+
{
183+
successes.AddRange(response.Indexes);
184+
}
185+
else
186+
{
187+
failures.AddRange(response.Indexes);
188+
}
189+
}
190+
successes.Sort();
191+
failures.Sort();
192+
return (successes, failures);
193+
}
194+
}
195+
196+
/// <summary>
197+
/// Fixture used to create and maintain the table used for BatchWrite integration tests.
198+
/// </summary>
199+
[CollectionDefinition(nameof(SpannerBatchWriteCommandTableFixture))]
200+
public class SpannerBatchWriteCommandTableFixture : SpannerTableFixture, ICollectionFixture<SpannerBatchWriteCommandTableFixture>
201+
{
202+
public readonly string KeyName = "Key";
203+
public readonly string ValueName = "Value";
204+
205+
public SpannerBatchWriteCommandTableFixture() : base("SpannerBatchWrite")
206+
{
207+
}
208+
209+
protected override void CreateTable()
210+
{
211+
ExecuteDdl($@"CREATE TABLE {TableName} (
212+
{KeyName} STRING(256),
213+
{ValueName} STRING(256),
214+
) PRIMARY KEY ({KeyName})");
215+
}
216+
217+
protected override void PopulateTable(bool fresh)
218+
{
219+
// The BatchWrite tests manage their own data to ensures isolation and predictable results
220+
// for atomicity and conflict scenarios.
221+
}
222+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.Spanner.V1;
16+
using System;
17+
using System.Collections.Generic;
18+
using System.Linq;
19+
using Xunit;
20+
21+
namespace Google.Cloud.Spanner.Data.Tests;
22+
23+
public class SpannerBatchWriteCommandTests
24+
{
25+
[Fact]
26+
public void Add_ValidCommands()
27+
{
28+
var connection = new SpannerConnection();
29+
var command = connection.CreateBatchWriteCommand();
30+
31+
var cmd1 = connection.CreateInsertCommand("table", new SpannerParameterCollection { { "col", SpannerDbType.String, "val" } });
32+
var cmd2 = connection.CreateUpdateCommand("table", new SpannerParameterCollection { { "col", SpannerDbType.String, "val" } });
33+
34+
command.Add(cmd1, cmd2);
35+
36+
Assert.Single(command.CommandGroups);
37+
Assert.Equal(2, command.CommandGroups[0].Commands.Count);
38+
Assert.Same(cmd1, command.CommandGroups[0].Commands[0]);
39+
Assert.Same(cmd2, command.CommandGroups[0].Commands[1]);
40+
}
41+
42+
[Fact]
43+
public void Add_SeveralGroups()
44+
{
45+
var connection = new SpannerConnection();
46+
var command = connection.CreateBatchWriteCommand();
47+
48+
command.Add(connection.CreateInsertCommand("table"));
49+
command.Add(connection.CreateUpdateCommand("table"));
50+
51+
Assert.Equal(2, command.CommandGroups.Count);
52+
}
53+
54+
[Fact]
55+
public void Add_UnsupportedCommandType_Throws()
56+
{
57+
var connection = new SpannerConnection();
58+
var command = connection.CreateBatchWriteCommand();
59+
var selectCommand = connection.CreateSelectCommand("SELECT 1");
60+
61+
Assert.Throws<InvalidOperationException>(() => command.Add(selectCommand));
62+
}
63+
64+
[Fact]
65+
public void Add_NullCommand_Throws()
66+
{
67+
var connection = new SpannerConnection();
68+
var command = connection.CreateBatchWriteCommand();
69+
70+
Assert.Throws<ArgumentNullException>(() => command.Add((SpannerCommand)null));
71+
}
72+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.Spanner.V1;
16+
using Google.Rpc;
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Linq;
20+
21+
namespace Google.Cloud.Spanner.Data;
22+
23+
public sealed partial class SpannerBatchWriteCommand
24+
{
25+
/// <summary>
26+
/// Represents the result of a single command group in a BatchWrite operation.
27+
/// </summary>
28+
public sealed class BatchWriteResult
29+
{
30+
/// <summary>
31+
/// An `OK` status indicates success. Any other status indicates a failure.
32+
/// </summary>
33+
public Status Status { get; }
34+
35+
/// <summary>
36+
/// The mutation groups applied in this batch. The values index into the
37+
/// `mutation_groups` field in the corresponding `BatchWriteRequest`.
38+
/// </summary>
39+
public IEnumerable<int> Indexes { get; }
40+
41+
/// <summary>
42+
/// The commit timestamp of the transaction that applied this batch.
43+
/// Present if status is OK and the mutation groups were applied, absent
44+
/// otherwise.
45+
///
46+
/// For mutation groups with conditions, a status=OK and missing
47+
/// commit_timestamp means that the mutation groups were not applied due to the
48+
/// condition not being satisfied after evaluation.
49+
/// </summary>
50+
public DateTime? CommitTimestamp { get; }
51+
52+
internal BatchWriteResult(BatchWriteResponse batchWriteResponse)
53+
{
54+
Status = batchWriteResponse.Status;
55+
Indexes = batchWriteResponse.Indexes;
56+
CommitTimestamp = batchWriteResponse.CommitTimestamp?.ToDateTime();
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)