diff --git a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs index c37a7e10..86bdbcf7 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs @@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.Session; internal interface ISessionSource where TSession : ISession { - ValueTask OpenSession(CancellationToken cancellationToken = default); + ValueTask OpenSession(CancellationToken cancellationToken); void Return(TSession session); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index d59be406..2002ec69 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -17,10 +17,11 @@ internal class PoolingSession : IPoolingSession private readonly IDriver _driver; private readonly PoolingSessionSource _poolingSessionSource; - private readonly YdbConnectionStringBuilder _settings; private readonly ILogger _logger; - private volatile bool _isBroken; + private volatile bool _isBroken = true; + + private readonly bool _disableServerBalancer; private string SessionId { get; set; } = string.Empty; private long NodeId { get; set; } @@ -30,13 +31,13 @@ internal class PoolingSession : IPoolingSession internal PoolingSession( IDriver driver, PoolingSessionSource poolingSessionSource, - YdbConnectionStringBuilder settings, + bool disableServerBalancer, ILogger logger ) { _driver = driver; _poolingSessionSource = poolingSessionSource; - _settings = settings; + _disableServerBalancer = disableServerBalancer; _logger = logger; } @@ -96,18 +97,16 @@ public async Task RollbackTransaction( } } - public void OnNotSuccessStatusCode(StatusCode code) + public void OnNotSuccessStatusCode(StatusCode statusCode) { - if (code is - StatusCode.Cancelled or + if (statusCode is StatusCode.BadSession or StatusCode.SessionBusy or - StatusCode.InternalError or + StatusCode.SessionExpired or StatusCode.ClientTransportTimeout or - StatusCode.Unavailable or StatusCode.ClientTransportUnavailable) { - _logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code); + _logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode); _isBroken = true; } @@ -117,7 +116,7 @@ public async Task Open(CancellationToken cancellationToken) { var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken }; - if (!_settings.DisableServerBalancer) + if (!_disableServerBalancer) { requestSettings.ClientCapabilities.Add(SessionBalancer); } @@ -133,6 +132,7 @@ public async Task Open(CancellationToken cancellationToken) SessionId = response.SessionId; NodeId = response.NodeId; + _isBroken = false; _ = Task.Run(async () => { @@ -188,7 +188,7 @@ public async Task Open(CancellationToken cancellationToken) } catch (YdbException e) { - if (e.Code == StatusCode.Cancelled) + if (e.Code == StatusCode.ClientTransportTimeout) { _logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)"); diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs index 26730d89..2d6fb6c1 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs @@ -5,16 +5,16 @@ namespace Ydb.Sdk.Ado.Session; internal class PoolingSessionFactory : IPoolingSessionFactory { private readonly IDriver _driver; - private readonly YdbConnectionStringBuilder _settings; + private readonly bool _disableServerBalancer; private readonly ILogger _logger; - public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILogger logger) + public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory) { _driver = driver; - _settings = settings; - _logger = logger; + _disableServerBalancer = settings.DisableServerBalancer; + _logger = loggerFactory.CreateLogger(); } public IPoolingSession NewSession(PoolingSessionSource source) => - new PoolingSession(_driver, source, _settings, _logger); + new PoolingSession(_driver, source, _disableServerBalancer, _logger); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index 86381280..6c63a8c7 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -31,7 +31,7 @@ internal sealed class PoolingSessionSource : ISessionSource private volatile bool _pruningTimerEnabled; private int _pruningSampleIndex; - private volatile int _numConnectors; + private volatile int _numSessions; private volatile int _idleCount; public PoolingSessionSource( @@ -111,15 +111,15 @@ private async ValueTask RentAsync(CancellationToken cancellatio return session; } - // If we're here, our waiting attempt on the idle connector channel was released with a null - // (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked. + // If we're here, our waiting attempt on the idle session channel was released with a null + // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. if (TryGetIdleSession(out session)) { return session; } - // We might have closed a connector in the meantime and no longer be at max capacity - // so try to open a new connector and if that fails, loop again. + // We might have closed a session in the meantime and no longer be at max capacity + // so try to open a new session and if that fails, loop again. session = await OpenNewSession(finalToken).ConfigureAwait(false); if (session != null) { @@ -139,9 +139,9 @@ private async ValueTask RentAsync(CancellationToken cancellatio private async ValueTask OpenNewSession(CancellationToken cancellationToken) { - for (var numConnectors = _numConnectors; numConnectors < _maxSessionSize; numConnectors = _numConnectors) + for (var numSessions = _numSessions; numSessions < _maxSessionSize; numSessions = _numSessions) { - if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors) + if (Interlocked.CompareExchange(ref _numSessions, numSessions + 1, numSessions) != numSessions) { continue; } @@ -153,8 +153,8 @@ private async ValueTask RentAsync(CancellationToken cancellatio // Only start pruning if we've incremented open count past _min. // Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception - // on NpgsqlConnector.Open due to timeout, CancellationToken or other reasons. - if (numConnectors >= _minSessionSize) + // on NpgsqlSession.Open due to timeout, CancellationToken or other reasons. + if (numSessions >= _minSessionSize) { UpdatePruningTimer(); } @@ -164,9 +164,9 @@ private async ValueTask RentAsync(CancellationToken cancellatio catch { // Physical open failed, decrement the open and busy counter back down. - Interlocked.Decrement(ref _numConnectors); + Interlocked.Decrement(ref _numSessions); - // In case there's a waiting attempt on the channel, we write a null to the idle connector channel + // In case there's a waiting attempt on the channel, we write a null to the idle session channel // to wake it up, so it will try opening (and probably throw immediately) // Statement order is important since we have synchronous completions on the channel. _idleSessionWriter.TryWrite(null); @@ -220,15 +220,15 @@ private void CloseSession(IPoolingSession session) { session.DeleteSession(); - var numConnectors = Interlocked.Decrement(ref _numConnectors); + var numSessions = Interlocked.Decrement(ref _numSessions); - // If a connector has been closed for any reason, we write a null to the idle connector channel to wake up + // If a session has been closed for any reason, we write a null to the idle session channel to wake up // a waiter, who will open a new physical connection // Statement order is important since we have synchronous completions on the channel. _idleSessionWriter.TryWrite(null); // Only turn off the timer one time, when it was this Close that brought Open back to _min. - if (numConnectors == _minSessionSize) + if (numSessions == _minSessionSize) { UpdatePruningTimer(); } @@ -238,13 +238,13 @@ private void UpdatePruningTimer() { lock (_pruningTimer) { - var numConnectors = _numConnectors; - if (numConnectors > _minSessionSize && !_pruningTimerEnabled) + var numSessions = _numSessions; + if (numSessions > _minSessionSize && !_pruningTimerEnabled) { _pruningTimerEnabled = true; _pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan); } - else if (numConnectors <= _minSessionSize && _pruningTimerEnabled) + else if (numSessions <= _minSessionSize && _pruningTimerEnabled) { _pruningTimer.Change(Timeout.Infinite, Timeout.Infinite); _pruningSampleIndex = 0; @@ -285,7 +285,7 @@ private static void PruneIdleSessions(object? state) } while (toPrune > 0 && - pool._numConnectors > pool._minSessionSize && + pool._numSessions > pool._minSessionSize && pool._idleSessionReader.TryRead(out var session) && session != null) { diff --git a/src/Ydb.Sdk/src/Pool/SessionPool.cs b/src/Ydb.Sdk/src/Pool/SessionPool.cs index f048b523..28feb64e 100644 --- a/src/Ydb.Sdk/src/Pool/SessionPool.cs +++ b/src/Ydb.Sdk/src/Pool/SessionPool.cs @@ -231,7 +231,7 @@ StatusCode.SessionExpired or StatusCode.ClientTransportTimeout or StatusCode.ClientTransportUnavailable) { - _logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code); + _logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, code); IsActive = false; } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs new file mode 100644 index 00000000..93ac3425 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -0,0 +1,294 @@ +using Grpc.Core; +using Moq; +using Xunit; +using Ydb.Issue; +using Ydb.Query; +using Ydb.Query.V1; +using Ydb.Sdk.Ado.Session; +using Ydb.Sdk.Ado.Tests.Utils; + +namespace Ydb.Sdk.Ado.Tests.Session; + +public class PoolingSessionTests +{ + private const long NodeId = 3; + private const string SessionId = "sessionId"; + + private readonly Mock _mockIDriver; + private readonly Mock> _mockAttachStream = new(MockBehavior.Strict); + private readonly PoolingSessionFactory _poolingSessionFactory; + private readonly PoolingSessionSource _poolingSessionSource; + + public PoolingSessionTests() + { + var settings = new YdbConnectionStringBuilder(); + + _mockIDriver = new Mock(MockBehavior.Strict); + _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(TestUtils.LoggerFactory); + _mockIDriver.Setup(driver => driver.ServerStreamCall( + QueryService.AttachSessionMethod, + It.Is(request => request.SessionId.Equals(SessionId)), + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) + ).ReturnsAsync(_mockAttachStream.Object); + _mockAttachStream.Setup(stream => stream.Dispose()); + _poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings, TestUtils.LoggerFactory); + _poolingSessionSource = new PoolingSessionSource(_mockIDriver.Object, _poolingSessionFactory, settings); + } + + [Theory] + [InlineData(StatusCode.Aborted, false)] + [InlineData(StatusCode.BadSession, true)] + [InlineData(StatusCode.SessionBusy, true)] + [InlineData(StatusCode.SessionExpired, true)] + [InlineData(StatusCode.ClientTransportTimeout, true)] + [InlineData(StatusCode.ClientTransportUnavailable, true)] + [InlineData(StatusCode.Overloaded, false)] + public async Task OnNotSuccessStatusCode_WhenStatusCodeIsNotSuccess_UpdateIsBroken(StatusCode statusCode, + bool isError) + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = new TaskCompletionSource(); + + _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + .ReturnsAsync(true) + .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); + _mockAttachStream.SetupSequence(attachStream => attachStream.Current) + .Returns(new SessionState { Status = StatusIds.Types.StatusCode.Success }); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + Assert.True(session.IsBroken); + await session.Open(CancellationToken.None); + Assert.False(session.IsBroken); + session.OnNotSuccessStatusCode(statusCode); + Assert.Equal(isError, session.IsBroken); + tcsSecondMoveAttachStream.TrySetResult(false); + } + + [Fact] + public async Task Open_WhenCreateSessionThrowRpcException_IsBroken() + { + _mockIDriver.Setup(driver => driver.UnaryCall(QueryService.CreateSessionMethod, + It.IsAny(), + It.Is(settings => settings.ClientCapabilities.Contains("session-balancer"))) + ) + .Throws(() => new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await Assert.ThrowsAsync(() => session.Open(CancellationToken.None)); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenCreateSessionReturnBadRequest_IsBroken() + { + _mockIDriver.Setup(driver => driver.UnaryCall(QueryService.CreateSessionMethod, + It.IsAny(), + It.Is(settings => settings.ClientCapabilities.Contains("session-balancer"))) + ) + .ReturnsAsync( + new CreateSessionResponse + { + Status = StatusIds.Types.StatusCode.BadRequest, + Issues = { new IssueMessage { Message = "Mock Issue Message" } } + } + ); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + var ydbException = await Assert.ThrowsAsync(() => session.Open(CancellationToken.None)); + Assert.Equal("Status: BadRequest, Issues:\n[0] Fatal: Mock Issue Message", ydbException.Message); + Assert.Equal(StatusCode.BadRequest, ydbException.Code); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenAttachStreamFirstMoveNextAsyncThrowException_IsBroken() + { + SetupSuccessCreateSession(); + _mockAttachStream.Setup(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + var ydbException = await Assert.ThrowsAsync(() => session.Open(CancellationToken.None)); + Assert.Equal("Transport RPC call error", ydbException.Message); + Assert.Equal(StatusCode.ClientTransportTimeout, ydbException.Code); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenAttachStreamFirstMoveNextIsFalse_IsBroken() + { + SetupSuccessCreateSession(); + _mockAttachStream.Setup(attachStream => attachStream.MoveNextAsync(CancellationToken.None)).ReturnsAsync(false); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + var ydbException = await Assert.ThrowsAsync(() => session.Open(CancellationToken.None)); + Assert.Equal("Attach stream is not started!", ydbException.Message); + Assert.Equal(StatusCode.Cancelled, ydbException.Code); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenAttachStreamFirstCurrentIsBadSession_IsBroken() + { + SetupSuccessCreateSession(); + + _mockAttachStream.Setup(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + .ReturnsAsync(true); + _mockAttachStream.SetupSequence(attachStream => attachStream.Current) + .Returns(new SessionState + { + Status = StatusIds.Types.StatusCode.BadSession, + Issues = { new IssueMessage { IssueCode = 1, Severity = 1, Message = "Ouch BadSession!" } } + }); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + var ydbException = await Assert.ThrowsAsync(() => session.Open(CancellationToken.None)); + Assert.Equal("Status: BadSession, Issues:\n[1] Error: Ouch BadSession!", ydbException.Message); + Assert.Equal(StatusCode.BadSession, ydbException.Code); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenSuccessOpenThenAttachStreamIsClosed_IsBroken() + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + Assert.False(session.IsBroken); + tcsSecondMoveAttachStream.TrySetResult(false); // attach stream is closed + await Task.Delay(500); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenSuccessOpenThenAttachStreamSendRpcException_IsNotBroken() + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + Assert.False(session.IsBroken); + tcsSecondMoveAttachStream.SetException( + new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))); // attach stream is closed + await Task.Delay(500); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Open_WhenSuccessOpenThenAttachStreamSendBadSession_IsNotBroken() + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + Assert.False(session.IsBroken); + tcsSecondMoveAttachStream.SetResult(true); // attach stream is closed + await Task.Delay(500); + Assert.True(session.IsBroken); + } + + [Fact] + public async Task Delete_WhenSuccessOpenSession_IsBroken() + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + _mockIDriver.Setup(driver => driver.UnaryCall( + QueryService.DeleteSessionMethod, + It.Is(request => request.SessionId.Equals(SessionId)), + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) + ); + Assert.False(session.IsBroken); + await session.DeleteSession(); + Assert.True(session.IsBroken); + _mockIDriver.Verify(driver => driver.UnaryCall(QueryService.DeleteSessionMethod, + It.IsAny(), It.IsAny())); + tcsSecondMoveAttachStream.TrySetResult(false); + } + + [Fact] + public async Task Delete_WhenSuccessOpenSessionBadSessionInAttachStream_IsBroken() + { + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + _mockIDriver.Setup(driver => driver.UnaryCall( + QueryService.DeleteSessionMethod, + It.Is(request => request.SessionId.Equals(SessionId)), + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) + ); + Assert.False(session.IsBroken); + session.OnNotSuccessStatusCode(StatusCode.BadSession); + await session.DeleteSession(); + Assert.True(session.IsBroken); + _mockIDriver.Verify(driver => driver.UnaryCall(QueryService.DeleteSessionMethod, + It.IsAny(), It.IsAny()), Times.Never); + tcsSecondMoveAttachStream.TrySetResult(false); + } + + [Fact] + public async Task CommitTransaction_WhenSuccessOpenSession_AssertNodeId_SessionId_YdbException() + { + const string txId = "txId"; + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + _mockIDriver.Setup(driver => driver.UnaryCall( + QueryService.CommitTransactionMethod, + It.Is(request => + request.SessionId.Equals(SessionId) && request.TxId.Equals(txId)), + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) + ).ThrowsAsync(YdbException.FromServer(StatusIds.Types.StatusCode.Aborted, [])); + Assert.False(session.IsBroken); + var ydbException = await Assert.ThrowsAsync(() => session.CommitTransaction(txId)); + Assert.Equal(StatusCode.Aborted, ydbException.Code); + Assert.Equal("Status: Aborted", ydbException.Message); + tcsSecondMoveAttachStream.TrySetResult(false); + } + + [Fact] + public async Task RollbackTransaction_WhenSuccessOpenSession_AssertNodeId_SessionId_YdbException() + { + const string txId = "txId"; + SetupSuccessCreateSession(); + var tcsSecondMoveAttachStream = SetupAttachStream(); + var session = _poolingSessionFactory.NewSession(_poolingSessionSource); + await session.Open(CancellationToken.None); + _mockIDriver.Setup(driver => driver.UnaryCall( + QueryService.RollbackTransactionMethod, + It.Is(request => + request.SessionId.Equals(SessionId) && request.TxId.Equals(txId)), + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) + ).ThrowsAsync(YdbException.FromServer(StatusIds.Types.StatusCode.NotFound, [])); + Assert.False(session.IsBroken); + var ydbException = await Assert.ThrowsAsync(() => session.RollbackTransaction(txId)); + Assert.Equal(StatusCode.NotFound, ydbException.Code); + Assert.Equal("Status: NotFound", ydbException.Message); + tcsSecondMoveAttachStream.TrySetResult(false); + } + + private TaskCompletionSource SetupAttachStream() + { + var tcsSecondMoveAttachStream = new TaskCompletionSource(); + + _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + .ReturnsAsync(true) + .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); + _mockAttachStream.SetupSequence(attachStream => attachStream.Current) + .Returns(new SessionState { Status = StatusIds.Types.StatusCode.Success }) + .Returns(new SessionState { Status = StatusIds.Types.StatusCode.BadSession }); + return tcsSecondMoveAttachStream; + } + + private void SetupSuccessCreateSession() => _mockIDriver + .Setup(driver => driver.UnaryCall(QueryService.CreateSessionMethod, + It.IsAny(), + It.Is(settings => settings.ClientCapabilities.Contains("session-balancer"))) + ) + .ReturnsAsync( + new CreateSessionResponse + { + Status = StatusIds.Types.StatusCode.Success, + SessionId = SessionId, + NodeId = NodeId + } + ); +}