Skip to content

Commit 4e3c369

Browse files
feat(Spanner.V1): Add support for BatchWrite.
1 parent f4d2505 commit 4e3c369

File tree

3 files changed

+49
-1
lines changed

3 files changed

+49
-1
lines changed

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
using Google.Api.Gax;
16+
using Google.Api.Gax.Grpc;
1617
using System.Threading;
1718
using System.Threading.Tasks;
1819

@@ -49,6 +50,28 @@ public async Task<ManagedTransaction> BeginTransactionAsync(TransactionOptions t
4950
return ManagedTransaction.FromTransactionOptions(_options, session, transactionOptions, singleUse, shared);
5051
}
5152

53+
/// <summary>
54+
/// Executes a BatchWrite RPC asynchronously, returning a stream of responses.
55+
/// </summary>
56+
/// <remarks>
57+
/// This method modifies the <paramref name="request"/> to include the session name from this managed session.
58+
/// </remarks>
59+
/// <param name="request">The batch write request. Must not be null.</param>
60+
/// <param name="callSettings">If not null, applies overrides to this RPC call.</param>
61+
/// <returns>A task representing the asynchronous operation. When the task completes, the result is the response stream of <see cref="BatchWriteResponse"/> objects.</returns>
62+
public async Task<AsyncResponseStream<BatchWriteResponse>> BatchWriteAsync(BatchWriteRequest request, CallSettings callSettings)
63+
{
64+
GaxPreconditions.CheckNotNull(request, nameof(request));
65+
CancellationToken cancellationToken = callSettings.CancellationToken ?? default;
66+
var session = await _lifecycleManager.GetFreshSessionAsync(cancellationToken).ConfigureAwait(false);
67+
68+
// Populate the request with the current session name.
69+
request.Session = session.SessionName.ToString();
70+
71+
var response = _options.Client.BatchWrite(request, callSettings);
72+
return response.GetResponseStream();
73+
}
74+
5275
/// <summary>
5376
/// Ensures the underlying Spanner session is fresh.
5477
/// </summary>

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SessionPool.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
using Google.Api.Gax;
16+
using Google.Api.Gax.Grpc;
1617
using Google.Cloud.Spanner.Common.V1;
1718
using Google.Cloud.Spanner.V1.Internal.Logging;
1819
using Google.Protobuf;
@@ -118,6 +119,23 @@ public SessionPoolSegmentStatistics GetSegmentStatisticsSnapshot(SessionPoolSegm
118119
public SessionPoolSegmentStatistics GetSegmentStatisticsSnapshot(DatabaseName databaseName) =>
119120
GetSegmentStatisticsSnapshot(SessionPoolSegmentKey.Create(databaseName));
120121

122+
/// <summary>
123+
/// Executes a BatchWrite RPC asynchronously, returning a stream of responses.
124+
/// </summary>
125+
/// <param name="request">The batch write request. Must not be null.</param>
126+
/// <param name="key">The session pool segment key. Must not be null.</param>
127+
/// <param name="callSettings">If not null, applies overrides to this RPC call.</param>
128+
/// <returns>A task representing the asynchronous operation. When the task completes, the result is the response stream of <see cref="BatchWriteResponse"/> objects.</returns>
129+
[Obsolete("Use ManagedSession.BatchWriteAsync instead.")]
130+
public async Task<AsyncResponseStream<BatchWriteResponse>> BatchWriteAsync(BatchWriteRequest request, SessionPoolSegmentKey key, CallSettings callSettings)
131+
{
132+
GaxPreconditions.CheckNotNull(request, nameof(request));
133+
GaxPreconditions.CheckNotNull(key, nameof(key));
134+
135+
var managedSession = _managedSessions.GetOrAdd(key, CreateManagedSession);
136+
return await managedSession.BatchWriteAsync(request, callSettings).ConfigureAwait(false);
137+
}
138+
121139
/// <summary>
122140
/// Asynchronously acquires a session that will handle transaction creation as needed.
123141
/// This is equivalent to calling <see cref="AcquireSessionAsync(SessionPoolSegmentKey, TransactionOptions, CancellationToken)"/>
@@ -308,4 +326,4 @@ public Task ShutdownPoolAsync(SessionPoolSegmentKey key, CancellationToken cance
308326
.WithDatabaseRole(key.DatabaseRole)
309327
.WithTimeout(Options.Timeout));
310328
}
311-
}
329+
}

apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SpannerClientPartial.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ partial void Modify_PartitionReadRequest(ref PartitionReadRequest request, ref C
166166
ApplyRequestIdHeader(ref settings);
167167
}
168168

169+
partial void Modify_BatchWriteRequest(ref BatchWriteRequest request, ref CallSettings settings)
170+
{
171+
ApplyResourcePrefixHeaderFromSession(ref settings, request.Session);
172+
MaybeApplyRouteToLeaderHeader(ref settings);
173+
ApplyRequestIdHeader(ref settings);
174+
}
175+
169176
internal static void ApplyResourcePrefixHeaderFromDatabase(ref CallSettings settings, string resource)
170177
{
171178
// If we haven't been given a resource name, just leave the request as it is.

0 commit comments

Comments
 (0)