Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

using Google.Cloud.ClientTesting;
using Google.Cloud.Spanner.Common.V1;
using Google.Cloud.Spanner.V1;
using Google.Cloud.Spanner.V1.Internal.Logging;
using System;
using System.Threading.Tasks;

namespace Google.Cloud.Spanner.Data.CommonTesting;

Expand All @@ -39,4 +41,6 @@ public abstract class CloudSpannerFixtureBase<TDatabase> : CloudProjectFixtureBa
protected CloudSpannerFixtureBase(Func<string, TDatabase> databaseFactory) => Database = databaseFactory(ProjectId);

public SpannerConnection GetConnection(Logger logger = null, bool logCommitStats = false) => Database.GetConnection(logger, logCommitStats);

public async Task<ManagedSession> GetManagedSession() => await Database.GetManagedSession().ConfigureAwait(false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
using Google.Api.Gax.ResourceNames;
using Google.Cloud.Spanner.Admin.Instance.V1;
using Google.Cloud.Spanner.Common.V1;
using Google.Cloud.Spanner.V1;
using Google.Cloud.Spanner.V1.Internal.Logging;
using Grpc.Core;
using System;
using System.Threading.Tasks;

namespace Google.Cloud.Spanner.Data.CommonTesting;

Expand All @@ -27,6 +29,8 @@ namespace Google.Cloud.Spanner.Data.CommonTesting;
/// </summary>
public abstract class SpannerTestDatabaseBase
{
private ManagedSession _multiplexSession;

/// <summary>
/// The Spanner Host name to connect to. It is read from the environment variable "TEST_SPANNER_HOST".
/// </summary>
Expand Down Expand Up @@ -175,7 +179,31 @@ protected void MaybeCreateInstanceOnEmulator(string projectId)
public SpannerConnection GetConnection(Logger logger, bool logCommitStats = false) =>
new SpannerConnection(new SpannerConnectionStringBuilder(ConnectionString)
{
SessionPoolManager = SessionPoolManager.Create(new V1.SessionPoolOptions(), logger),
SessionPoolManager = SessionPoolManager.Create(new ManagedSessionOptions(), logger),
LogCommitStats = logCommitStats
});

public async Task<ManagedSession> GetManagedSession()
{
if (_multiplexSession != null && GetEnvironmentVariableOrDefault("SPANNER_EMULATOR_HOST", null) == null)
{
// Only return the same multiplex session if we are NOT testing on the emulator
// The emulator does not handle concurrent transactions on a single multiplex session well
return _multiplexSession;
}

var options = new ManagedSessionOptions();

_multiplexSession = await CreateMultiplexSession(options).ConfigureAwait(false);

return _multiplexSession;
}

private async Task<ManagedSession> CreateMultiplexSession(ManagedSessionOptions options)
{
var poolManager = SessionPoolManager.Create(options);
var muxSession = await poolManager.AcquireManagedSessionAsync(SpannerClientCreationOptions, DatabaseName, null);

return muxSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ StringValue STRING(MAX),
var dropCommand = connection.CreateDdlCommand($"DROP DATABASE {dbName}");
await dropCommand.ExecuteNonQueryAsync();
}

await SessionPoolHelpers.ShutdownPoolAsync(builder.WithDatabase(dbName));
}

[Fact]
Expand Down Expand Up @@ -146,8 +144,6 @@ StringValue STRING(MAX),
var dropCommand = connection.CreateDdlCommand($"DROP DATABASE {dbName}");
await dropCommand.ExecuteNonQueryAsync();
}

await SessionPoolHelpers.ShutdownPoolAsync(builder.WithDatabase(dbName));
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private async Task AssertEmptyMutationsFailAsync(SpannerCommand emptyMutation)
// "the amount of values does not match the number of columns in the key".
if (!_fixture.RunningOnEmulator) // The message is different on the emulator.
{
Assert.Contains("does not specify any value", exception.RpcException.Message);
// This error is expected for Multiplex Sesions as they expect a mutation key during commit for mutation only transactions
Assert.Contains("Failed to initialize transaction due to invalid mutation key.", exception.RpcException.Message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public async Task BadDbName()
Assert.Equal(ErrorCode.NotFound, e.ErrorCode);
Assert.False(e.IsTransientSpannerFault());
}

// Shut the pool associated with the bad database down, to avoid seeing spurious connection failures
// later in the log.
await SessionPoolHelpers.ShutdownPoolAsync(connectionString);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,21 @@ private async Task RunStress(Func<SpannerConnectionStringBuilder, Task> func, in
// The maximum roundtrip time for Spanner (and MySQL) is about 200ms per
// write. If we initialize with the target sustained # sessions,
// we shouldn't see any more sessions created.
int countToPreWarm = Math.Min(TargetQps / 4, 800);
var options = new SessionPoolOptions
{
MaximumActiveSessions = Math.Max(countToPreWarm + 50, 400),
MinimumPooledSessions = countToPreWarm,
MaximumConcurrentSessionCreates = Math.Min(countToPreWarm, 50)
};
var options = new ManagedSessionOptions();

var sessionPoolManager = SessionPoolManager.Create(options);
var connectionStringBuilder = new SpannerConnectionStringBuilder(_fixture.ConnectionString)
{
SessionPoolManager = sessionPoolManager,
MaximumGrpcChannels = Math.Max(4, 8 * TargetQps / 2000)
};
var pool = await connectionStringBuilder.AcquireSessionPoolAsync();
var managedSession = await connectionStringBuilder.AcquireManagedSessionAsync();
var logger = Logger.DefaultLogger;
logger.ResetPerformanceData();

logger.Info("Prewarming session pool for stress test");
// Prewarm step: allow up to 30 seconds for the session pool to be populated.
var cancellationToken = new CancellationTokenSource(30000).Token;
await pool.WhenPoolReady(_fixture.DatabaseName, cancellationToken);

logger.Info($"Prewarm complete. Pool stats: {pool.GetSegmentStatisticsSnapshot(_fixture.DatabaseName)}");

// Now run the test, with performance logging enabled, but without debug logging.
// (Debug logging can write a lot to our log file, breaking the test.)
Expand All @@ -193,8 +184,6 @@ private async Task RunStress(Func<SpannerConnectionStringBuilder, Task> func, in
}
logger.Info($"Spanner latency = {latencyMs}ms");

await SessionPoolHelpers.ShutdownPoolAsync(connectionStringBuilder);

// Spanner latency with 100 qps simulated is usually around 75ms.
// We allow for a latency multiplier from callers, because callers may be executing,
// more than one command. In particular, with inline transactions, mutation commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,50 +86,6 @@ private async Task IncrementByOneAsync(SpannerConnection connection, bool orphan
}
}

[Fact]
public async Task Commit_ReturnsToPool()
{
using var connection = new SpannerConnection(_fixture.ConnectionString);
await connection.OpenAsync();

using var transaction = await connection.BeginTransactionAsync();
using var command = connection.CreateSelectCommand($"SELECT Int64Value FROM {_fixture.TableName} WHERE K=@k");
command.Parameters.Add("k", SpannerDbType.String, _key);
command.Transaction = transaction;

var value = await command.ExecuteScalarAsync();

transaction.Commit();

var poolStatistics = connection.GetSessionPoolSegmentStatistics();

// Because the session is eagerly returned to the pool after a commit, there shouldn't
// be any active sessions even before we dispose of the transaction explicitly.
Assert.Equal(0, poolStatistics.ActiveSessionCount);
}

[Fact]
public async Task Rollback_ReturnsToPool()
{
using var connection = new SpannerConnection(_fixture.ConnectionString);
await connection.OpenAsync();

using var transaction = await connection.BeginTransactionAsync();
using var command = connection.CreateSelectCommand($"SELECT Int64Value FROM {_fixture.TableName} WHERE K=@k");
command.Parameters.Add("k", SpannerDbType.String, _key);
command.Transaction = transaction;

var value = await command.ExecuteScalarAsync();

transaction.Rollback();

var poolStatistics = connection.GetSessionPoolSegmentStatistics();

// Because the session is eagerly returned to the pool after a rollback, there shouldn't
// be any active sessions even before we dispose of the transaction explicitly.
Assert.Equal(0, poolStatistics.ActiveSessionCount);
}

[Fact]
public async Task DetachOnDisposeTransactionIsDetached()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,17 @@ public class ExecuteHelperTests
public ExecuteHelperTests(SpannerDatabaseFixture fixture) =>
_fixture = fixture;

[Fact]
public Task SessionNotFound() => WithSessionPool(async pool =>
{
var session = await pool.Client.CreateSessionAsync(_fixture.DatabaseName);
await pool.Client.DeleteSessionAsync(session.SessionName);

// The session doesn't become invalid immediately after deletion.
// Wait for a minute to ensure the session is really expired.
await Task.Delay(TimeSpan.FromMinutes(1));

var request = new ExecuteSqlRequest
{
Sql = $"SELECT 1",
Session = session.Name
};
var exception = await Assert.ThrowsAsync<RpcException>(() => pool.Client.ExecuteSqlAsync(request));
Assert.True(ExecuteHelper.IsSessionExpiredError(exception));
});

// This code is separated out in case we need more tests. It's really just fluff.
private async Task WithSessionPool(Func<SessionPool, Task> action)
private async Task WithManagedSession(Func<ManagedSession, Task> action)
{
var builder = new SpannerConnectionStringBuilder(_fixture.ConnectionString);
var pool = await builder.AcquireSessionPoolAsync();
var managedSession = await builder.AcquireManagedSessionAsync();
try
{
await action(pool);
await action(managedSession);
}
finally
{
builder.SessionPoolManager.Release(pool);
}
}
}
Loading
Loading