Skip to content
Merged
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.Session;

internal interface ISessionSource<TSession> where TSession : ISession
{
ValueTask<TSession> OpenSession(CancellationToken cancellationToken = default);
ValueTask<TSession> OpenSession(CancellationToken cancellationToken);

void Return(TSession session);
}
24 changes: 12 additions & 12 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ internal class PoolingSession : IPoolingSession

private readonly IDriver _driver;
private readonly PoolingSessionSource _poolingSessionSource;
private readonly YdbConnectionStringBuilder _settings;
private readonly ILogger<PoolingSession> _logger;

private volatile bool _isBroken;
private volatile bool _isBroken = true;

private readonly bool _disableServerBalancer;

private string SessionId { get; set; } = string.Empty;
private long NodeId { get; set; }
Expand All @@ -30,13 +31,13 @@ internal class PoolingSession : IPoolingSession
internal PoolingSession(
IDriver driver,
PoolingSessionSource poolingSessionSource,
YdbConnectionStringBuilder settings,
bool disableServerBalancer,
ILogger<PoolingSession> logger
)
{
_driver = driver;
_poolingSessionSource = poolingSessionSource;
_settings = settings;
_disableServerBalancer = disableServerBalancer;
_logger = logger;
}

Expand Down Expand Up @@ -96,18 +97,16 @@ public async Task RollbackTransaction(
}
}

public void OnNotSuccessStatusCode(StatusCode code)
public void OnNotSuccessStatusCode(StatusCode statusCode)
{
if (code is
StatusCode.Cancelled or
if (statusCode is
StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.InternalError or
StatusCode.SessionExpired or
StatusCode.ClientTransportTimeout or
StatusCode.Unavailable or
StatusCode.ClientTransportUnavailable)
{
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode);

_isBroken = true;
}
Expand All @@ -117,7 +116,7 @@ public async Task Open(CancellationToken cancellationToken)
{
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };

if (!_settings.DisableServerBalancer)
if (!_disableServerBalancer)
{
requestSettings.ClientCapabilities.Add(SessionBalancer);
}
Expand All @@ -133,6 +132,7 @@ public async Task Open(CancellationToken cancellationToken)

SessionId = response.SessionId;
NodeId = response.NodeId;
_isBroken = false;

_ = Task.Run(async () =>
{
Expand Down Expand Up @@ -188,7 +188,7 @@ public async Task Open(CancellationToken cancellationToken)
}
catch (YdbException e)
{
if (e.Code == StatusCode.Cancelled)
if (e.Code == StatusCode.ClientTransportTimeout)
{
_logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)");

Expand Down
10 changes: 5 additions & 5 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ namespace Ydb.Sdk.Ado.Session;
internal class PoolingSessionFactory : IPoolingSessionFactory
{
private readonly IDriver _driver;
private readonly YdbConnectionStringBuilder _settings;
private readonly bool _disableServerBalancer;
private readonly ILogger<PoolingSession> _logger;

public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILogger<PoolingSession> logger)
public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
{
_driver = driver;
_settings = settings;
_logger = logger;
_disableServerBalancer = settings.DisableServerBalancer;
_logger = loggerFactory.CreateLogger<PoolingSession>();
}

public IPoolingSession NewSession(PoolingSessionSource source) =>
new PoolingSession(_driver, source, _settings, _logger);
new PoolingSession(_driver, source, _disableServerBalancer, _logger);
}
36 changes: 18 additions & 18 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
private volatile bool _pruningTimerEnabled;
private int _pruningSampleIndex;

private volatile int _numConnectors;
private volatile int _numSessions;
private volatile int _idleCount;

public PoolingSessionSource(
Expand Down Expand Up @@ -111,15 +111,15 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
return session;
}

// If we're here, our waiting attempt on the idle connector channel was released with a null
// (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked.
// If we're here, our waiting attempt on the idle session channel was released with a null
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
if (TryGetIdleSession(out session))
{
return session;
}

// We might have closed a connector in the meantime and no longer be at max capacity
// so try to open a new connector and if that fails, loop again.
// We might have closed a session in the meantime and no longer be at max capacity
// so try to open a new session and if that fails, loop again.
session = await OpenNewSession(finalToken).ConfigureAwait(false);
if (session != null)
{
Expand All @@ -139,9 +139,9 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio

private async ValueTask<IPoolingSession?> OpenNewSession(CancellationToken cancellationToken)
{
for (var numConnectors = _numConnectors; numConnectors < _maxSessionSize; numConnectors = _numConnectors)
for (var numSessions = _numSessions; numSessions < _maxSessionSize; numSessions = _numSessions)
{
if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors)
if (Interlocked.CompareExchange(ref _numSessions, numSessions + 1, numSessions) != numSessions)
{
continue;
}
Expand All @@ -153,8 +153,8 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio

// Only start pruning if we've incremented open count past _min.
// Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception
// on NpgsqlConnector.Open due to timeout, CancellationToken or other reasons.
if (numConnectors >= _minSessionSize)
// on NpgsqlSession.Open due to timeout, CancellationToken or other reasons.
if (numSessions >= _minSessionSize)
{
UpdatePruningTimer();
}
Expand All @@ -164,9 +164,9 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
catch
{
// Physical open failed, decrement the open and busy counter back down.
Interlocked.Decrement(ref _numConnectors);
Interlocked.Decrement(ref _numSessions);

// In case there's a waiting attempt on the channel, we write a null to the idle connector channel
// In case there's a waiting attempt on the channel, we write a null to the idle session channel
// to wake it up, so it will try opening (and probably throw immediately)
// Statement order is important since we have synchronous completions on the channel.
_idleSessionWriter.TryWrite(null);
Expand Down Expand Up @@ -220,15 +220,15 @@ private void CloseSession(IPoolingSession session)
{
session.DeleteSession();

var numConnectors = Interlocked.Decrement(ref _numConnectors);
var numSessions = Interlocked.Decrement(ref _numSessions);

// If a connector has been closed for any reason, we write a null to the idle connector channel to wake up
// If a session has been closed for any reason, we write a null to the idle session channel to wake up
// a waiter, who will open a new physical connection
// Statement order is important since we have synchronous completions on the channel.
_idleSessionWriter.TryWrite(null);

// Only turn off the timer one time, when it was this Close that brought Open back to _min.
if (numConnectors == _minSessionSize)
if (numSessions == _minSessionSize)
{
UpdatePruningTimer();
}
Expand All @@ -238,13 +238,13 @@ private void UpdatePruningTimer()
{
lock (_pruningTimer)
{
var numConnectors = _numConnectors;
if (numConnectors > _minSessionSize && !_pruningTimerEnabled)
var numSessions = _numSessions;
if (numSessions > _minSessionSize && !_pruningTimerEnabled)
{
_pruningTimerEnabled = true;
_pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan);
}
else if (numConnectors <= _minSessionSize && _pruningTimerEnabled)
else if (numSessions <= _minSessionSize && _pruningTimerEnabled)
{
_pruningTimer.Change(Timeout.Infinite, Timeout.Infinite);
_pruningSampleIndex = 0;
Expand Down Expand Up @@ -285,7 +285,7 @@ private static void PruneIdleSessions(object? state)
}

while (toPrune > 0 &&
pool._numConnectors > pool._minSessionSize &&
pool._numSessions > pool._minSessionSize &&
pool._idleSessionReader.TryRead(out var session) &&
session != null)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Pool/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ StatusCode.SessionExpired or
StatusCode.ClientTransportTimeout or
StatusCode.ClientTransportUnavailable)
{
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, code);

IsActive = false;
}
Expand Down
Loading
Loading