diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 93dde306..b33d1641 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,4 @@ +- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm. - Added new ADO.NET options: - `MinSessionPool`: The minimum connection pool size. - `SessionIdleTimeout`: The time (in seconds) to wait before closing idle session in the pool if the count of all sessions exceeds `MinSessionPool`. diff --git a/src/Ydb.Sdk/src/Ado/Session/ISession.cs b/src/Ydb.Sdk/src/Ado/Session/ISession.cs index bf5522b3..fc8c70ba 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISession.cs @@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session; internal interface ISession { + bool IsBroken { get; } + ValueTask> ExecuteQuery( string query, Dictionary parameters, diff --git a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs index db81e3ba..c37a7e10 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs @@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.Session; internal interface ISessionSource where TSession : ISession { - ValueTask OpenSession(); + ValueTask OpenSession(CancellationToken cancellationToken = default); void Return(TSession session); } diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index c873055a..182dbf44 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -13,6 +13,8 @@ public ImplicitSession(IDriver driver) _driver = driver; } + public bool IsBroken => false; + public ValueTask> ExecuteQuery( string query, Dictionary parameters, diff --git a/src/Ydb.Sdk/src/Ado/Session/Session.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs similarity index 94% rename from src/Ydb.Sdk/src/Ado/Session/Session.cs rename to src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 5a40c326..d59be406 100644 --- a/src/Ydb.Sdk/src/Ado/Session/Session.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -8,7 +8,7 @@ namespace Ydb.Sdk.Ado.Session; -internal class Session : IPoolingSession +internal class PoolingSession : IPoolingSession { private const string SessionBalancer = "session-balancer"; @@ -18,20 +18,20 @@ internal class Session : IPoolingSession private readonly IDriver _driver; private readonly PoolingSessionSource _poolingSessionSource; private readonly YdbConnectionStringBuilder _settings; - private readonly ILogger _logger; + private readonly ILogger _logger; - private volatile bool _isActive; + private volatile bool _isBroken; private string SessionId { get; set; } = string.Empty; private long NodeId { get; set; } - public bool IsActive => _isActive; + public bool IsBroken => _isBroken; - internal Session( + internal PoolingSession( IDriver driver, PoolingSessionSource poolingSessionSource, YdbConnectionStringBuilder settings, - ILogger logger + ILogger logger ) { _driver = driver; @@ -109,7 +109,7 @@ StatusCode.Unavailable or { _logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code); - _isActive = false; + _isBroken = true; } } @@ -176,7 +176,7 @@ public async Task Open(CancellationToken cancellationToken) OnNotSuccessStatusCode(statusCode); - if (!IsActive) + if (IsBroken) { return; } @@ -204,7 +204,7 @@ public async Task Open(CancellationToken cancellationToken) } finally { - _isActive = false; + _isBroken = true; } }, cancellationToken); @@ -215,7 +215,12 @@ public async Task DeleteSession() { try { - _isActive = false; + if (_isBroken) + { + return; + } + + _isBroken = true; var deleteSessionResponse = await _driver.UnaryCall( QueryService.DeleteSessionMethod, diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs new file mode 100644 index 00000000..26730d89 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk.Ado.Session; + +internal class PoolingSessionFactory : IPoolingSessionFactory +{ + private readonly IDriver _driver; + private readonly YdbConnectionStringBuilder _settings; + private readonly ILogger _logger; + + public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILogger logger) + { + _driver = driver; + _settings = settings; + _logger = logger; + } + + public IPoolingSession NewSession(PoolingSessionSource source) => + new PoolingSession(_driver, source, _settings, _logger); +} diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index 24f49e4a..86381280 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -1,16 +1,313 @@ +// This file contains session pooling algorithms adapted from Npgsql +// Original source: https://github.com/npgsql/npgsql +// Copyright (c) 2002-2025, Npgsql +// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file + +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; + namespace Ydb.Sdk.Ado.Session; -internal class PoolingSessionSource : ISessionSource +internal sealed class PoolingSessionSource : ISessionSource { - public ValueTask OpenSession() => throw new NotImplementedException(); + private readonly ILogger _logger; + private readonly IPoolingSessionFactory _sessionFactory; + + private readonly int _minSessionSize; + private readonly int _maxSessionSize; + + private readonly ChannelReader _idleSessionReader; + private readonly ChannelWriter _idleSessionWriter; + + private readonly int _createSessionTimeout; + + private readonly Timer _pruningTimer; + private readonly int _pruningSampleSize; + private readonly int _pruningMedianIndex; + private readonly TimeSpan _pruningSamplingInterval; + private readonly int[] _pruningSamples; + private volatile bool _pruningTimerEnabled; + private int _pruningSampleIndex; + + private volatile int _numConnectors; + private volatile int _idleCount; + + public PoolingSessionSource( + IDriver driver, + IPoolingSessionFactory sessionFactory, + YdbConnectionStringBuilder settings + ) + { + _logger = driver.LoggerFactory.CreateLogger(); + _sessionFactory = sessionFactory; + + _minSessionSize = settings.MinSessionPool; + _maxSessionSize = settings.MaxSessionPool; + if (_minSessionSize > _maxSessionSize) + { + throw new ArgumentException( + $"Connection can't have 'Max Session Pool' {_maxSessionSize} under 'Min Session Pool' {_minSessionSize}"); + } + + var channel = Channel.CreateUnbounded(); + _idleSessionReader = channel.Reader; + _idleSessionWriter = channel.Writer; + + _createSessionTimeout = settings.CreateSessionTimeout; + + if (settings.SessionPruningInterval > settings.SessionIdleTimeout) + { + throw new ArgumentException( + $"YdbConnection can't have {nameof(settings.SessionIdleTimeout)} {settings.SessionIdleTimeout} (in seconds) under {nameof(settings.SessionPruningInterval)} {settings.SessionPruningInterval} (in seconds)"); + } + + _pruningTimer = new Timer(PruneIdleSessions, this, Timeout.Infinite, Timeout.Infinite); + _pruningSampleSize = DivideRoundingUp(settings.SessionIdleTimeout, settings.SessionPruningInterval); + _pruningMedianIndex = DivideRoundingUp(_pruningSampleSize, 2) - 1; // - 1 to go from length to index + _pruningSamplingInterval = TimeSpan.FromSeconds(settings.SessionPruningInterval); + _pruningSamples = new int[_pruningSampleSize]; + _pruningTimerEnabled = false; + } + + public ValueTask OpenSession(CancellationToken cancellationToken) => + TryGetIdleSession(out var session) ? new ValueTask(session) : RentAsync(cancellationToken); + + public void Return(IPoolingSession session) + { + if (session.IsBroken) + { + CloseSession(session); + return; + } + + Interlocked.Increment(ref _idleCount); + _idleSessionWriter.TryWrite(session); + } + + private async ValueTask RentAsync(CancellationToken cancellationToken) + { + using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + if (_createSessionTimeout > 0) + { + ctsGetSession.CancelAfter(TimeSpan.FromSeconds(_createSessionTimeout)); + } + + var finalToken = ctsGetSession.Token; + + try + { + var session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + return session; + + while (true) + { + session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); + + if (CheckIdleSession(session)) + { + return session; + } + + // If we're here, our waiting attempt on the idle connector channel was released with a null + // (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked. + if (TryGetIdleSession(out session)) + { + return session; + } + + // We might have closed a connector in the meantime and no longer be at max capacity + // so try to open a new connector and if that fails, loop again. + session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + { + return session; + } + } + } + catch (OperationCanceledException e) + { + throw new YdbException(StatusCode.Cancelled, + $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + + $"(currently {_maxSessionSize}) or 'CreateSessionTimeout' " + + $"(currently {_createSessionTimeout} seconds) in your connection string.", e + ); + } + } + + private async ValueTask OpenNewSession(CancellationToken cancellationToken) + { + for (var numConnectors = _numConnectors; numConnectors < _maxSessionSize; numConnectors = _numConnectors) + { + if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors) + { + continue; + } + + try + { + var session = _sessionFactory.NewSession(this); + await session.Open(cancellationToken); + + // Only start pruning if we've incremented open count past _min. + // Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception + // on NpgsqlConnector.Open due to timeout, CancellationToken or other reasons. + if (numConnectors >= _minSessionSize) + { + UpdatePruningTimer(); + } + + return session; + } + catch + { + // Physical open failed, decrement the open and busy counter back down. + Interlocked.Decrement(ref _numConnectors); - public void Return(IPoolingSession session) => throw new NotImplementedException(); + // In case there's a waiting attempt on the channel, we write a null to the idle connector channel + // to wake it up, so it will try opening (and probably throw immediately) + // Statement order is important since we have synchronous completions on the channel. + _idleSessionWriter.TryWrite(null); + + // Just in case we always call UpdatePruningTimer for failed physical open + UpdatePruningTimer(); + + throw; + } + } + + return null; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool TryGetIdleSession([NotNullWhen(true)] out IPoolingSession? session) + { + while (_idleSessionReader.TryRead(out session)) + { + if (CheckIdleSession(session)) + { + return true; + } + } + + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool CheckIdleSession([NotNullWhen(true)] IPoolingSession? session) + { + if (session == null) + { + return false; + } + + // Only decrement when the session has a value. + Interlocked.Decrement(ref _idleCount); + + if (session.IsBroken) + { + CloseSession(session); + + return false; + } + + return true; + } + + private void CloseSession(IPoolingSession session) + { + session.DeleteSession(); + + var numConnectors = Interlocked.Decrement(ref _numConnectors); + + // If a connector has been closed for any reason, we write a null to the idle connector channel to wake up + // a waiter, who will open a new physical connection + // Statement order is important since we have synchronous completions on the channel. + _idleSessionWriter.TryWrite(null); + + // Only turn off the timer one time, when it was this Close that brought Open back to _min. + if (numConnectors == _minSessionSize) + { + UpdatePruningTimer(); + } + } + + private void UpdatePruningTimer() + { + lock (_pruningTimer) + { + var numConnectors = _numConnectors; + if (numConnectors > _minSessionSize && !_pruningTimerEnabled) + { + _pruningTimerEnabled = true; + _pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan); + } + else if (numConnectors <= _minSessionSize && _pruningTimerEnabled) + { + _pruningTimer.Change(Timeout.Infinite, Timeout.Infinite); + _pruningSampleIndex = 0; + _pruningTimerEnabled = false; + } + } + } + + private static void PruneIdleSessions(object? state) + { + var pool = (PoolingSessionSource)state!; + var samples = pool._pruningSamples; + int toPrune; + lock (pool._pruningTimer) + { + // Check if we might have been contending with DisablePruning. + if (!pool._pruningTimerEnabled) + return; + + var sampleIndex = pool._pruningSampleIndex; + samples[sampleIndex] = pool._idleCount; + if (sampleIndex != pool._pruningSampleSize - 1) + { + pool._pruningSampleIndex = sampleIndex + 1; + pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); + return; + } + + // Calculate median value for pruning, reset index and timer, and release the lock. + Array.Sort(samples); + toPrune = samples[pool._pruningMedianIndex]; + pool._pruningSampleIndex = 0; + pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); + } + + if (pool._logger.IsEnabled(LogLevel.Debug)) + { + } + + while (toPrune > 0 && + pool._numConnectors > pool._minSessionSize && + pool._idleSessionReader.TryRead(out var session) && + session != null) + { + if (pool.CheckIdleSession(session)) + { + pool.CloseSession(session); + } + + toPrune--; + } + } + + private static int DivideRoundingUp(int dividend, int divisor) => (dividend + divisor - 1) / divisor; } -internal interface IPoolingSession : ISession +internal interface IPoolingSessionFactory { - bool IsActive { get; } + IPoolingSession NewSession(PoolingSessionSource source); +} +internal interface IPoolingSession : ISession +{ Task Open(CancellationToken cancellationToken); Task DeleteSession();