Skip to content

Commit ff75e09

Browse files
authored
feat: Spanner non-awaiting DDL (#15280)
* feat: Spanner non-awaiting DDL Adds a method to execute DDL on Spanner without having the client awaiting the long-running operation to finish. This makes it possible to for example start the creation of a secondary index using the client library, without having the library waiting for that operation to finish. Creating a secondary index on a table with a large amount of data can take a very long time (several days).
1 parent cc5abf1 commit ff75e09

File tree

3 files changed

+123
-14
lines changed

3 files changed

+123
-14
lines changed

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.IntegrationTests/AdminTests.cs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
using Google.Api.Gax.Grpc;
16+
using Google.Cloud.Spanner.Admin.Database.V1;
1517
using Google.Cloud.Spanner.Data.CommonTesting;
18+
using Google.LongRunning;
19+
using Google.Protobuf;
20+
using Google.Protobuf.WellKnownTypes;
1621
using System;
1722
using System.Threading.Tasks;
1823
using Xunit;
@@ -252,5 +257,64 @@ AlbumTitle STRING(MAX),
252257
await dropCommand.ExecuteNonQueryAsync();
253258
}
254259
}
260+
261+
[Fact]
262+
public async Task StartDdlReturnsOperationName()
263+
{
264+
string dbName = GenerateDatabaseName();
265+
var builder = new SpannerConnectionStringBuilder(_fixture.Database.NoDbConnectionString);
266+
var connectionOptions = new SpannerClientCreationOptions(builder);
267+
var adminClientBuilder = connectionOptions.CreateDatabaseAdminClientBuilder();
268+
var adminClient = await adminClientBuilder.BuildAsync();
269+
var channel = adminClientBuilder.LastCreatedChannel;
270+
271+
try
272+
{
273+
using (var connection = new SpannerConnection(builder))
274+
{
275+
var createDbCommand = connection.CreateDdlCommand($"CREATE DATABASE {dbName}");
276+
var operationName = await createDbCommand.StartDdlAsync();
277+
Assert.False(string.IsNullOrEmpty(operationName));
278+
279+
await HandleLro<Database, CreateDatabaseMetadata>(
280+
adminClient.CreateDatabaseOperationsClient, operationName);
281+
}
282+
283+
using (var connection = new SpannerConnection(builder.WithDatabase(dbName)))
284+
{
285+
var createTableCommand = connection.CreateDdlCommand(
286+
"CREATE TABLE Singers (SingerId INT64 PRIMARY KEY, Name STRING(1024))");
287+
var operationName = await createTableCommand.StartDdlAsync();
288+
Assert.False(string.IsNullOrEmpty(operationName));
289+
290+
await HandleLro<Empty, UpdateDatabaseDdlMetadata>(
291+
adminClient.UpdateDatabaseDdlOperationsClient, operationName);
292+
}
293+
294+
using (var connection = new SpannerConnection(builder))
295+
{
296+
var dropCommand = connection.CreateDdlCommand($"DROP DATABASE {dbName}");
297+
var operationName = await dropCommand.StartDdlAsync();
298+
// DropDatabase does not return a long-running operation.
299+
Assert.Null(operationName);
300+
}
301+
}
302+
finally
303+
{
304+
channel?.Shutdown();
305+
}
306+
307+
async Task HandleLro<TResponse, TMetadata>(OperationsClient client, string operationName)
308+
where TResponse : class, IMessage<TResponse>, new()
309+
where TMetadata : class, IMessage<TMetadata>, new()
310+
{
311+
var rawOperation = await client.GetOperationAsync(operationName);
312+
var operation = new Operation<TResponse, TMetadata>(rawOperation, client);
313+
var completedOperation = await operation.PollUntilCompletedAsync();
314+
315+
Assert.True(completedOperation.IsCompleted);
316+
Assert.Null(completedOperation.Exception);
317+
}
318+
}
255319
}
256320
}

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using Google.Cloud.Spanner.Admin.Database.V1;
1818
using Google.Cloud.Spanner.Common.V1;
1919
using Google.Cloud.Spanner.V1;
20+
using Google.LongRunning;
2021
using Google.Protobuf;
2122
using Google.Protobuf.WellKnownTypes;
2223
using Grpc.Core;
@@ -235,10 +236,30 @@ private async Task<SpannerDataReader> ExecuteDmlReaderAsync(CommandBehavior beha
235236
}
236237

237238
private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
239+
{
240+
await ExecuteDdlAsync(pollUntilCompleted: true, cancellationToken).ConfigureAwait(false);
241+
return 0;
242+
}
243+
244+
/// <summary>
245+
/// Starts a DDL operation, but does not wait for the long-running operation to finish.
246+
/// </summary>
247+
/// <returns>
248+
/// The name of the long-running operation that was created or null if the DDL statement did not
249+
/// create a long-running operation.
250+
/// </returns>
251+
internal async Task<string> StartDdlAsync(CancellationToken cancellationToken)
252+
{
253+
var operation = await ExecuteDdlAsync(pollUntilCompleted: false, cancellationToken).ConfigureAwait(false);
254+
return operation?.Name;
255+
}
256+
257+
private async Task<Operation> ExecuteDdlAsync(bool pollUntilCompleted, CancellationToken cancellationToken)
238258
{
239259
string commandText = CommandTextBuilder.CommandText;
240260
var builder = Connection.Builder;
241261
var connectionOptions = new SpannerClientCreationOptions(builder);
262+
Operation operation = null;
242263

243264
// Create the builder separately from actually building, so we can note the channel that it created.
244265
// (This is fairly unpleasant, but we'll try to improve this in the next version of GAX.)
@@ -257,12 +278,9 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
257278
ExtraStatements = { CommandTextBuilder.ExtraStatements ?? new string[0] },
258279
ProtoDescriptors = CommandTextBuilder.ProtobufDescriptors?.ToByteString() ?? ByteString.Empty,
259280
};
260-
var response = await databaseAdminClient.CreateDatabaseAsync(request).ConfigureAwait(false);
261-
response = await response.PollUntilCompletedAsync().ConfigureAwait(false);
262-
if (response.IsFaulted)
263-
{
264-
throw SpannerException.FromOperationFailedException(response.Exception);
265-
}
281+
var createDbOperation = await databaseAdminClient.CreateDatabaseAsync(request).ConfigureAwait(false);
282+
var response = await HandleLro(createDbOperation).ConfigureAwait(false);
283+
operation = response.RpcMessage;
266284
}
267285
else if (CommandTextBuilder.IsDropDatabaseCommand)
268286
{
@@ -293,13 +311,9 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
293311
Statements = { commandText, CommandTextBuilder.ExtraStatements ?? Enumerable.Empty<string>() },
294312
ProtoDescriptors = CommandTextBuilder.ProtobufDescriptors?.ToByteString() ?? ByteString.Empty,
295313
};
296-
297-
var response = await databaseAdminClient.UpdateDatabaseDdlAsync(request).ConfigureAwait(false);
298-
response = await response.PollUntilCompletedAsync().ConfigureAwait(false);
299-
if (response.IsFaulted)
300-
{
301-
throw SpannerException.FromOperationFailedException(response.Exception);
302-
}
314+
var updateDdlOperation = await databaseAdminClient.UpdateDatabaseDdlAsync(request).ConfigureAwait(false);
315+
var response = await HandleLro(updateDdlOperation).ConfigureAwait(false);
316+
operation = response.RpcMessage;
303317
}
304318
}
305319
catch (RpcException gRpcException)
@@ -312,7 +326,22 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
312326
channel?.Shutdown();
313327
}
314328

315-
return 0;
329+
return operation;
330+
331+
async Task<Operation<TResponse, TMetadata>> HandleLro<TResponse, TMetadata>(Operation<TResponse, TMetadata> operationToPoll)
332+
where TResponse : class, IMessage<TResponse>, new()
333+
where TMetadata : class, IMessage<TMetadata>, new()
334+
{
335+
if (pollUntilCompleted)
336+
{
337+
operationToPoll = await operationToPoll.PollUntilCompletedAsync().ConfigureAwait(false);
338+
}
339+
if (operationToPoll.IsFaulted)
340+
{
341+
throw SpannerException.FromOperationFailedException(operationToPoll.Exception);
342+
}
343+
return operationToPoll;
344+
}
316345
}
317346

318347
private async Task<int> ExecuteMutationsAsync(CancellationToken cancellationToken)

apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,22 @@ public long ExecutePartitionedUpdate() =>
513513
public Task<long> ExecutePartitionedUpdateAsync(CancellationToken cancellationToken = default) =>
514514
CreateExecutableCommand().ExecutePartitionedUpdateAsync(cancellationToken);
515515

516+
/// <summary>
517+
/// Executes this command as DDL, but does not wait for the execution of the DDL statements to finish. The
518+
/// method returns the name of the long-running operation that was started. The cancellation token can only be
519+
/// used to cancel the request to start the execution of the DDL statements. It cannot be used to cancel the
520+
/// long-running operation once it has been started.
521+
/// The command must contain one or more DDL statements;
522+
/// <see cref="SpannerConnection.CreateDdlCommand(string, string[])"/> for details.
523+
/// </summary>
524+
/// <param name="cancellationToken">An optional token for canceling the call.</param>
525+
/// <returns>
526+
/// The name of the long-running operation that was started for the DDL statement(s).
527+
/// Note: The ID is empty for DropDatabase commands.
528+
/// </returns>
529+
public Task<string> StartDdlAsync(CancellationToken cancellationToken = default) =>
530+
CreateExecutableCommand().StartDdlAsync(cancellationToken);
531+
516532
/// <summary>
517533
/// Creates an executable command that captures all the necessary information from this command.
518534
/// </summary>

0 commit comments

Comments
 (0)