Skip to content

Commit 81190ad

Browse files
dev: Added PoolingSessionTests.cs (#479)
1 parent 616dd9f commit 81190ad

File tree

6 files changed

+331
-37
lines changed

6 files changed

+331
-37
lines changed

src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace Ydb.Sdk.Ado.Session;
22

33
internal interface ISessionSource<TSession> where TSession : ISession
44
{
5-
ValueTask<TSession> OpenSession(CancellationToken cancellationToken = default);
5+
ValueTask<TSession> OpenSession(CancellationToken cancellationToken);
66

77
void Return(TSession session);
88
}

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ internal class PoolingSession : IPoolingSession
1717

1818
private readonly IDriver _driver;
1919
private readonly PoolingSessionSource _poolingSessionSource;
20-
private readonly YdbConnectionStringBuilder _settings;
2120
private readonly ILogger<PoolingSession> _logger;
2221

23-
private volatile bool _isBroken;
22+
private volatile bool _isBroken = true;
23+
24+
private readonly bool _disableServerBalancer;
2425

2526
private string SessionId { get; set; } = string.Empty;
2627
private long NodeId { get; set; }
@@ -30,13 +31,13 @@ internal class PoolingSession : IPoolingSession
3031
internal PoolingSession(
3132
IDriver driver,
3233
PoolingSessionSource poolingSessionSource,
33-
YdbConnectionStringBuilder settings,
34+
bool disableServerBalancer,
3435
ILogger<PoolingSession> logger
3536
)
3637
{
3738
_driver = driver;
3839
_poolingSessionSource = poolingSessionSource;
39-
_settings = settings;
40+
_disableServerBalancer = disableServerBalancer;
4041
_logger = logger;
4142
}
4243

@@ -96,18 +97,16 @@ public async Task RollbackTransaction(
9697
}
9798
}
9899

99-
public void OnNotSuccessStatusCode(StatusCode code)
100+
public void OnNotSuccessStatusCode(StatusCode statusCode)
100101
{
101-
if (code is
102-
StatusCode.Cancelled or
102+
if (statusCode is
103103
StatusCode.BadSession or
104104
StatusCode.SessionBusy or
105-
StatusCode.InternalError or
105+
StatusCode.SessionExpired or
106106
StatusCode.ClientTransportTimeout or
107-
StatusCode.Unavailable or
108107
StatusCode.ClientTransportUnavailable)
109108
{
110-
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);
109+
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, statusCode);
111110

112111
_isBroken = true;
113112
}
@@ -117,7 +116,7 @@ public async Task Open(CancellationToken cancellationToken)
117116
{
118117
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };
119118

120-
if (!_settings.DisableServerBalancer)
119+
if (!_disableServerBalancer)
121120
{
122121
requestSettings.ClientCapabilities.Add(SessionBalancer);
123122
}
@@ -133,6 +132,7 @@ public async Task Open(CancellationToken cancellationToken)
133132

134133
SessionId = response.SessionId;
135134
NodeId = response.NodeId;
135+
_isBroken = false;
136136

137137
_ = Task.Run(async () =>
138138
{
@@ -188,7 +188,7 @@ public async Task Open(CancellationToken cancellationToken)
188188
}
189189
catch (YdbException e)
190190
{
191-
if (e.Code == StatusCode.Cancelled)
191+
if (e.Code == StatusCode.ClientTransportTimeout)
192192
{
193193
_logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)");
194194

src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ namespace Ydb.Sdk.Ado.Session;
55
internal class PoolingSessionFactory : IPoolingSessionFactory
66
{
77
private readonly IDriver _driver;
8-
private readonly YdbConnectionStringBuilder _settings;
8+
private readonly bool _disableServerBalancer;
99
private readonly ILogger<PoolingSession> _logger;
1010

11-
public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILogger<PoolingSession> logger)
11+
public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
1212
{
1313
_driver = driver;
14-
_settings = settings;
15-
_logger = logger;
14+
_disableServerBalancer = settings.DisableServerBalancer;
15+
_logger = loggerFactory.CreateLogger<PoolingSession>();
1616
}
1717

1818
public IPoolingSession NewSession(PoolingSessionSource source) =>
19-
new PoolingSession(_driver, source, _settings, _logger);
19+
new PoolingSession(_driver, source, _disableServerBalancer, _logger);
2020
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
3131
private volatile bool _pruningTimerEnabled;
3232
private int _pruningSampleIndex;
3333

34-
private volatile int _numConnectors;
34+
private volatile int _numSessions;
3535
private volatile int _idleCount;
3636

3737
public PoolingSessionSource(
@@ -111,15 +111,15 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
111111
return session;
112112
}
113113

114-
// If we're here, our waiting attempt on the idle connector channel was released with a null
115-
// (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked.
114+
// If we're here, our waiting attempt on the idle session channel was released with a null
115+
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
116116
if (TryGetIdleSession(out session))
117117
{
118118
return session;
119119
}
120120

121-
// We might have closed a connector in the meantime and no longer be at max capacity
122-
// so try to open a new connector and if that fails, loop again.
121+
// We might have closed a session in the meantime and no longer be at max capacity
122+
// so try to open a new session and if that fails, loop again.
123123
session = await OpenNewSession(finalToken).ConfigureAwait(false);
124124
if (session != null)
125125
{
@@ -139,9 +139,9 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
139139

140140
private async ValueTask<IPoolingSession?> OpenNewSession(CancellationToken cancellationToken)
141141
{
142-
for (var numConnectors = _numConnectors; numConnectors < _maxSessionSize; numConnectors = _numConnectors)
142+
for (var numSessions = _numSessions; numSessions < _maxSessionSize; numSessions = _numSessions)
143143
{
144-
if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors)
144+
if (Interlocked.CompareExchange(ref _numSessions, numSessions + 1, numSessions) != numSessions)
145145
{
146146
continue;
147147
}
@@ -153,8 +153,8 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
153153

154154
// Only start pruning if we've incremented open count past _min.
155155
// Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception
156-
// on NpgsqlConnector.Open due to timeout, CancellationToken or other reasons.
157-
if (numConnectors >= _minSessionSize)
156+
// on NpgsqlSession.Open due to timeout, CancellationToken or other reasons.
157+
if (numSessions >= _minSessionSize)
158158
{
159159
UpdatePruningTimer();
160160
}
@@ -164,9 +164,9 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
164164
catch
165165
{
166166
// Physical open failed, decrement the open and busy counter back down.
167-
Interlocked.Decrement(ref _numConnectors);
167+
Interlocked.Decrement(ref _numSessions);
168168

169-
// In case there's a waiting attempt on the channel, we write a null to the idle connector channel
169+
// In case there's a waiting attempt on the channel, we write a null to the idle session channel
170170
// to wake it up, so it will try opening (and probably throw immediately)
171171
// Statement order is important since we have synchronous completions on the channel.
172172
_idleSessionWriter.TryWrite(null);
@@ -220,15 +220,15 @@ private void CloseSession(IPoolingSession session)
220220
{
221221
session.DeleteSession();
222222

223-
var numConnectors = Interlocked.Decrement(ref _numConnectors);
223+
var numSessions = Interlocked.Decrement(ref _numSessions);
224224

225-
// If a connector has been closed for any reason, we write a null to the idle connector channel to wake up
225+
// If a session has been closed for any reason, we write a null to the idle session channel to wake up
226226
// a waiter, who will open a new physical connection
227227
// Statement order is important since we have synchronous completions on the channel.
228228
_idleSessionWriter.TryWrite(null);
229229

230230
// Only turn off the timer one time, when it was this Close that brought Open back to _min.
231-
if (numConnectors == _minSessionSize)
231+
if (numSessions == _minSessionSize)
232232
{
233233
UpdatePruningTimer();
234234
}
@@ -238,13 +238,13 @@ private void UpdatePruningTimer()
238238
{
239239
lock (_pruningTimer)
240240
{
241-
var numConnectors = _numConnectors;
242-
if (numConnectors > _minSessionSize && !_pruningTimerEnabled)
241+
var numSessions = _numSessions;
242+
if (numSessions > _minSessionSize && !_pruningTimerEnabled)
243243
{
244244
_pruningTimerEnabled = true;
245245
_pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan);
246246
}
247-
else if (numConnectors <= _minSessionSize && _pruningTimerEnabled)
247+
else if (numSessions <= _minSessionSize && _pruningTimerEnabled)
248248
{
249249
_pruningTimer.Change(Timeout.Infinite, Timeout.Infinite);
250250
_pruningSampleIndex = 0;
@@ -285,7 +285,7 @@ private static void PruneIdleSessions(object? state)
285285
}
286286

287287
while (toPrune > 0 &&
288-
pool._numConnectors > pool._minSessionSize &&
288+
pool._numSessions > pool._minSessionSize &&
289289
pool._idleSessionReader.TryRead(out var session) &&
290290
session != null)
291291
{

src/Ydb.Sdk/src/Pool/SessionPool.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ StatusCode.SessionExpired or
231231
StatusCode.ClientTransportTimeout or
232232
StatusCode.ClientTransportUnavailable)
233233
{
234-
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);
234+
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason Status: {Status}", SessionId, code);
235235

236236
IsActive = false;
237237
}

0 commit comments

Comments
 (0)