Skip to content

Commit 30a4fa2

Browse files
dev: Prepare ISession for switching implementation (#481)
- Optimization: On BadSession, do not invoke the `DeleteSession()` method. - Canceling AttachStream after calling the `DeleteSession` method. - Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
1 parent aeb4954 commit 30a4fa2

File tree

19 files changed

+155
-172
lines changed

19 files changed

+155
-172
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
2+
- Canceling AttachStream after calling the `DeleteSession` method.
3+
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
14
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
25
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
36
- Added new ADO.NET options:

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Ydb.Sdk.Ado.Session;
23
using Ydb.Sdk.Pool;
34
using Ydb.Sdk.Services.Query;
45

@@ -9,7 +10,7 @@ internal static class PoolManager
910
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
1011
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
1112

12-
internal static async Task<Services.Query.Session> GetSession(
13+
internal static async Task<ISession> GetSession(
1314
YdbConnectionStringBuilder connectionString,
1415
CancellationToken cancellationToken
1516
)

src/Ydb.Sdk/src/Ado/Session/ISession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal interface ISession
88
{
9+
IDriver Driver { get; }
10+
911
bool IsBroken { get; }
1012

1113
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal class ImplicitSession : ISession
88
{
9-
private readonly IDriver _driver;
10-
119
public ImplicitSession(IDriver driver)
1210
{
13-
_driver = driver;
11+
Driver = driver;
1412
}
1513

14+
public IDriver Driver { get; }
1615
public bool IsBroken => false;
1716

1817
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -36,7 +35,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
3635
};
3736
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
3837

39-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
38+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
4039
}
4140

4241
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,19 @@ internal class PoolingSession : IPoolingSession
1515
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
1616
private static readonly CreateSessionRequest CreateSessionRequest = new();
1717

18-
private readonly IDriver _driver;
1918
private readonly PoolingSessionSource _poolingSessionSource;
2019
private readonly ILogger<PoolingSession> _logger;
20+
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2121

2222
private volatile bool _isBroken = true;
23+
private volatile bool _isBadSession;
2324

2425
private readonly bool _disableServerBalancer;
2526

2627
private string SessionId { get; set; } = string.Empty;
2728
private long NodeId { get; set; }
2829

30+
public IDriver Driver { get; }
2931
public bool IsBroken => _isBroken;
3032

3133
internal PoolingSession(
@@ -35,10 +37,10 @@ internal PoolingSession(
3537
ILogger<PoolingSession> logger
3638
)
3739
{
38-
_driver = driver;
3940
_poolingSessionSource = poolingSessionSource;
4041
_disableServerBalancer = disableServerBalancer;
4142
_logger = logger;
43+
Driver = driver;
4244
}
4345

4446
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -60,15 +62,15 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
6062
};
6163
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
6264

63-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
65+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
6466
}
6567

6668
public async Task CommitTransaction(
6769
string txId,
6870
CancellationToken cancellationToken = default
6971
)
7072
{
71-
var response = await _driver.UnaryCall(
73+
var response = await Driver.UnaryCall(
7274
QueryService.CommitTransactionMethod,
7375
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
7476
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -85,7 +87,7 @@ public async Task RollbackTransaction(
8587
CancellationToken cancellationToken = default
8688
)
8789
{
88-
var response = await _driver.UnaryCall(
90+
var response = await Driver.UnaryCall(
8991
QueryService.RollbackTransactionMethod,
9092
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
9193
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -99,6 +101,8 @@ public async Task RollbackTransaction(
99101

100102
public void OnNotSuccessStatusCode(StatusCode statusCode)
101103
{
104+
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;
105+
102106
if (statusCode is
103107
StatusCode.BadSession or
104108
StatusCode.SessionBusy or
@@ -121,7 +125,7 @@ public async Task Open(CancellationToken cancellationToken)
121125
requestSettings.ClientCapabilities.Add(SessionBalancer);
122126
}
123127

124-
var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
128+
var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
125129

126130
if (response.Status.IsNotSuccess())
127131
{
@@ -138,7 +142,7 @@ public async Task Open(CancellationToken cancellationToken)
138142
{
139143
try
140144
{
141-
using var stream = await _driver.ServerStreamCall(
145+
using var stream = await Driver.ServerStreamCall(
142146
QueryService.AttachSessionMethod,
143147
new AttachSessionRequest { SessionId = SessionId },
144148
new GrpcRequestSettings { NodeId = NodeId }
@@ -161,10 +165,12 @@ public async Task Open(CancellationToken cancellationToken)
161165

162166
completeTask.SetResult();
163167

168+
var lifecycleAttachToken = _attachStreamLifecycleCts.Token;
169+
164170
try
165171
{
166172
// ReSharper disable once MethodSupportsCancellation
167-
while (await stream.MoveNextAsync())
173+
while (await stream.MoveNextAsync(lifecycleAttachToken))
168174
{
169175
var sessionState = stream.Current;
170176

@@ -215,14 +221,16 @@ public async Task DeleteSession()
215221
{
216222
try
217223
{
218-
if (_isBroken)
224+
_isBroken = true;
225+
_attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout);
226+
227+
if (_isBadSession)
219228
{
220229
return;
221230
}
222231

223-
_isBroken = true;
224-
225-
var deleteSessionResponse = await _driver.UnaryCall(
232+
_isBadSession = true;
233+
var deleteSessionResponse = await Driver.UnaryCall(
226234
QueryService.DeleteSessionMethod,
227235
new DeleteSessionRequest { SessionId = SessionId },
228236
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
// This file contains session pooling algorithms adapted from Npgsql
2-
// Original source: https://github.com/npgsql/npgsql
3-
// Copyright (c) 2002-2025, Npgsql
4-
// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file
5-
61
using System.Diagnostics.CodeAnalysis;
72
using System.Runtime.CompilerServices;
83
using System.Threading.Channels;
@@ -92,45 +87,34 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
9287

9388
var finalToken = ctsGetSession.Token;
9489

95-
try
90+
var session = await OpenNewSession(finalToken).ConfigureAwait(false);
91+
if (session != null)
92+
return session;
93+
94+
while (true)
9695
{
97-
var session = await OpenNewSession(finalToken).ConfigureAwait(false);
98-
if (session != null)
99-
return session;
96+
session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false);
10097

101-
while (true)
98+
if (CheckIdleSession(session))
10299
{
103-
session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false);
104-
105-
if (CheckIdleSession(session))
106-
{
107-
return session;
108-
}
100+
return session;
101+
}
109102

110-
// If we're here, our waiting attempt on the idle session channel was released with a null
111-
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
112-
if (TryGetIdleSession(out session))
113-
{
114-
return session;
115-
}
103+
// If we're here, our waiting attempt on the idle session channel was released with a null
104+
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
105+
if (TryGetIdleSession(out session))
106+
{
107+
return session;
108+
}
116109

117-
// We might have closed a session in the meantime and no longer be at max capacity
118-
// so try to open a new session and if that fails, loop again.
119-
session = await OpenNewSession(finalToken).ConfigureAwait(false);
120-
if (session != null)
121-
{
122-
return session;
123-
}
110+
// We might have closed a session in the meantime and no longer be at max capacity
111+
// so try to open a new session and if that fails, loop again.
112+
session = await OpenNewSession(finalToken).ConfigureAwait(false);
113+
if (session != null)
114+
{
115+
return session;
124116
}
125117
}
126-
catch (OperationCanceledException e)
127-
{
128-
throw new YdbException(StatusCode.Cancelled,
129-
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
130-
$"(currently {_maxSessionSize}) or 'CreateSessionTimeout' " +
131-
$"(currently {_createSessionTimeout} seconds) in your connection string.", e
132-
);
133-
}
134118
}
135119

136120
private async ValueTask<IPoolingSession?> OpenNewSession(CancellationToken cancellationToken)

src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Diagnostics.CodeAnalysis;
44
using System.Text;
55
using Ydb.Sdk.Ado.Internal;
6-
using Ydb.Sdk.Services.Query;
76
using Ydb.Sdk.Value;
87

98
namespace Ydb.Sdk.Ado;
@@ -206,8 +205,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
206205
preparedSql.Append(sql);
207206

208207
var execSettings = CommandTimeout > 0
209-
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
210-
: new ExecuteQuerySettings();
208+
? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
209+
: new GrpcRequestSettings();
211210

212211
var transaction = YdbConnection.CurrentTransaction;
213212

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Sdk.Ado.Session;
45
using Ydb.Sdk.Services.Query;
56
using static System.Data.IsolationLevel;
67

@@ -24,7 +25,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
2425
[param: AllowNull] init => _connectionStringBuilder = value;
2526
}
2627

27-
internal Services.Query.Session Session
28+
internal ISession Session
2829
{
2930
get
3031
{
@@ -35,7 +36,7 @@ internal Services.Query.Session Session
3536
private set => _session = value;
3637
}
3738

38-
private Services.Query.Session _session = null!;
39+
private ISession _session = null!;
3940

4041
public YdbConnection()
4142
{
@@ -91,8 +92,18 @@ public override void ChangeDatabase(string databaseName)
9192
public override async Task OpenAsync(CancellationToken cancellationToken)
9293
{
9394
ThrowIfConnectionOpen();
94-
95-
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
95+
try
96+
{
97+
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
98+
}
99+
catch (OperationCanceledException e)
100+
{
101+
throw new YdbException(StatusCode.Cancelled,
102+
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
103+
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
104+
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
105+
);
106+
}
96107

97108
OnStateChange(ClosedToOpenEventArgs);
98109

@@ -124,7 +135,7 @@ public override async Task CloseAsync()
124135
}
125136
finally
126137
{
127-
await _session.Release();
138+
_session.Close();
128139
}
129140
}
130141

@@ -152,7 +163,7 @@ internal void OnNotSuccessStatusCode(StatusCode code)
152163
{
153164
_session.OnNotSuccessStatusCode(code);
154165

155-
if (!_session.IsActive)
166+
if (_session.IsBroken)
156167
{
157168
ConnectionState = ConnectionState.Broken;
158169
}

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString)
2525
// Init default connection string
2626
private void InitDefaultValues()
2727
{
28-
_host = YdbAdoDefaultSettings.Host;
29-
_port = YdbAdoDefaultSettings.Port;
30-
_database = YdbAdoDefaultSettings.Database;
28+
_host = "localhost";
29+
_port = 2136;
30+
_database = "/local";
3131
_minSessionPool = 0;
3232
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
3333
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
3434
_sessionIdleTimeout = 300;
3535
_sessionPruningInterval = 10;
36-
_useTls = YdbAdoDefaultSettings.UseTls;
36+
_useTls = false;
3737
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
3838
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
3939
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;

0 commit comments

Comments
 (0)