Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Google.Api.Gax.Grpc;
using Google.Cloud.Spanner.Admin.Database.V1;
using Google.Cloud.Spanner.Data.CommonTesting;
using Google.LongRunning;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -252,5 +257,64 @@ AlbumTitle STRING(MAX),
await dropCommand.ExecuteNonQueryAsync();
}
}

[Fact]
public async Task StartDdlReturnsOperationName()
{
string dbName = GenerateDatabaseName();
var builder = new SpannerConnectionStringBuilder(_fixture.Database.NoDbConnectionString);
var connectionOptions = new SpannerClientCreationOptions(builder);
var adminClientBuilder = connectionOptions.CreateDatabaseAdminClientBuilder();
var adminClient = await adminClientBuilder.BuildAsync();
var channel = adminClientBuilder.LastCreatedChannel;

try
{
using (var connection = new SpannerConnection(builder))
{
var createDbCommand = connection.CreateDdlCommand($"CREATE DATABASE {dbName}");
var operationName = await createDbCommand.StartDdlAsync();
Assert.False(string.IsNullOrEmpty(operationName));

await HandleLro<Database, CreateDatabaseMetadata>(
adminClient.CreateDatabaseOperationsClient, operationName);
}

using (var connection = new SpannerConnection(builder.WithDatabase(dbName)))
{
var createTableCommand = connection.CreateDdlCommand(
"CREATE TABLE Singers (SingerId INT64 PRIMARY KEY, Name STRING(1024))");
var operationName = await createTableCommand.StartDdlAsync();
Assert.False(string.IsNullOrEmpty(operationName));

await HandleLro<Empty, UpdateDatabaseDdlMetadata>(
adminClient.UpdateDatabaseDdlOperationsClient, operationName);
}

using (var connection = new SpannerConnection(builder))
{
var dropCommand = connection.CreateDdlCommand($"DROP DATABASE {dbName}");
var operationName = await dropCommand.StartDdlAsync();
// DropDatabase does not return a long-running operation.
Assert.Null(operationName);
}
}
finally
{
channel?.Shutdown();
}

async Task HandleLro<TResponse, TMetadata>(OperationsClient client, string operationName)
where TResponse : class, IMessage<TResponse>, new()
where TMetadata : class, IMessage<TMetadata>, new()
{
var rawOperation = await client.GetOperationAsync(operationName);
var operation = new Operation<TResponse, TMetadata>(rawOperation, client);
var completedOperation = await operation.PollUntilCompletedAsync();

Assert.True(completedOperation.IsCompleted);
Assert.Null(completedOperation.Exception);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Google.Cloud.Spanner.Admin.Database.V1;
using Google.Cloud.Spanner.Common.V1;
using Google.Cloud.Spanner.V1;
using Google.LongRunning;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
Expand Down Expand Up @@ -235,10 +236,30 @@ private async Task<SpannerDataReader> ExecuteDmlReaderAsync(CommandBehavior beha
}

private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
{
await ExecuteDdlAsync(pollUntilCompleted: true, cancellationToken).ConfigureAwait(false);
return 0;
}

/// <summary>
/// Starts a DDL operation, but does not wait for the long-running operation to finish.
/// </summary>
/// <returns>
/// The name of the long-running operation that was created or null if the DDL statement did not
/// create a long-running operation.
/// </returns>
internal async Task<string> StartDdlAsync(CancellationToken cancellationToken)
{
var operation = await ExecuteDdlAsync(pollUntilCompleted: false, cancellationToken).ConfigureAwait(false);
return operation?.Name;
}

private async Task<Operation> ExecuteDdlAsync(bool pollUntilCompleted, CancellationToken cancellationToken)
{
string commandText = CommandTextBuilder.CommandText;
var builder = Connection.Builder;
var connectionOptions = new SpannerClientCreationOptions(builder);
Operation operation = null;

// Create the builder separately from actually building, so we can note the channel that it created.
// (This is fairly unpleasant, but we'll try to improve this in the next version of GAX.)
Expand All @@ -257,12 +278,9 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
ExtraStatements = { CommandTextBuilder.ExtraStatements ?? new string[0] },
ProtoDescriptors = CommandTextBuilder.ProtobufDescriptors?.ToByteString() ?? ByteString.Empty,
};
var response = await databaseAdminClient.CreateDatabaseAsync(request).ConfigureAwait(false);
response = await response.PollUntilCompletedAsync().ConfigureAwait(false);
if (response.IsFaulted)
{
throw SpannerException.FromOperationFailedException(response.Exception);
}
var createDbOperation = await databaseAdminClient.CreateDatabaseAsync(request).ConfigureAwait(false);
var response = await HandleLro(createDbOperation).ConfigureAwait(false);
operation = response.RpcMessage;
}
else if (CommandTextBuilder.IsDropDatabaseCommand)
{
Expand Down Expand Up @@ -293,13 +311,9 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
Statements = { commandText, CommandTextBuilder.ExtraStatements ?? Enumerable.Empty<string>() },
ProtoDescriptors = CommandTextBuilder.ProtobufDescriptors?.ToByteString() ?? ByteString.Empty,
};

var response = await databaseAdminClient.UpdateDatabaseDdlAsync(request).ConfigureAwait(false);
response = await response.PollUntilCompletedAsync().ConfigureAwait(false);
if (response.IsFaulted)
{
throw SpannerException.FromOperationFailedException(response.Exception);
}
var updateDdlOperation = await databaseAdminClient.UpdateDatabaseDdlAsync(request).ConfigureAwait(false);
var response = await HandleLro(updateDdlOperation).ConfigureAwait(false);
operation = response.RpcMessage;
}
}
catch (RpcException gRpcException)
Expand All @@ -312,7 +326,22 @@ private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
channel?.Shutdown();
}

return 0;
return operation;

async Task<Operation<TResponse, TMetadata>> HandleLro<TResponse, TMetadata>(Operation<TResponse, TMetadata> operationToPoll)
where TResponse : class, IMessage<TResponse>, new()
where TMetadata : class, IMessage<TMetadata>, new()
{
if (pollUntilCompleted)
{
operationToPoll = await operationToPoll.PollUntilCompletedAsync().ConfigureAwait(false);
}
if (operationToPoll.IsFaulted)
{
throw SpannerException.FromOperationFailedException(operationToPoll.Exception);
}
return operationToPoll;
}
}

private async Task<int> ExecuteMutationsAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,22 @@ public long ExecutePartitionedUpdate() =>
public Task<long> ExecutePartitionedUpdateAsync(CancellationToken cancellationToken = default) =>
CreateExecutableCommand().ExecutePartitionedUpdateAsync(cancellationToken);

/// <summary>
/// Executes this command as DDL, but does not wait for the execution of the DDL statements to finish. The
/// method returns the name of the long-running operation that was started. The cancellation token can only be
/// used to cancel the request to start the execution of the DDL statements. It cannot be used to cancel the
/// long-running operation once it has been started.
/// The command must contain one or more DDL statements;
/// <see cref="SpannerConnection.CreateDdlCommand(string, string[])"/> for details.
/// </summary>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
/// <returns>
/// The name of the long-running operation that was started for the DDL statement(s).
/// Note: The ID is empty for DropDatabase commands.
/// </returns>
public Task<string> StartDdlAsync(CancellationToken cancellationToken = default) =>
CreateExecutableCommand().StartDdlAsync(cancellationToken);

/// <summary>
/// Creates an executable command that captures all the necessary information from this command.
/// </summary>
Expand Down
Loading