Skip to content

Commit ae57645

Browse files
KirillKurdyukovLiamHamsters
authored andcommitted
dev: Prepare ISession for switching implementation (ydb-platform#481)
- Optimization: On BadSession, do not invoke the `DeleteSession()` method. - Canceling AttachStream after calling the `DeleteSession` method. - Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
1 parent 1241e8f commit ae57645

File tree

18 files changed

+138
-166
lines changed

18 files changed

+138
-166
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
2+
- Canceling AttachStream after calling the `DeleteSession` method.
3+
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
14
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
25
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
36
- Added new ADO.NET options:

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Ydb.Sdk.Ado.Session;
23
using Ydb.Sdk.Pool;
34
using Ydb.Sdk.Services.Query;
45

@@ -9,7 +10,7 @@ internal static class PoolManager
910
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
1011
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
1112

12-
internal static async Task<Services.Query.Session> GetSession(
13+
internal static async Task<ISession> GetSession(
1314
YdbConnectionStringBuilder connectionString,
1415
CancellationToken cancellationToken
1516
)

src/Ydb.Sdk/src/Ado/Session/ISession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal interface ISession
88
{
9+
IDriver Driver { get; }
10+
911
bool IsBroken { get; }
1012

1113
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal class ImplicitSession : ISession
88
{
9-
private readonly IDriver _driver;
10-
119
public ImplicitSession(IDriver driver)
1210
{
13-
_driver = driver;
11+
Driver = driver;
1412
}
1513

14+
public IDriver Driver { get; }
1615
public bool IsBroken => false;
1716

1817
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -36,7 +35,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
3635
};
3736
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
3837

39-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
38+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
4039
}
4140

4241
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,19 @@ internal class PoolingSession : IPoolingSession
1515
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
1616
private static readonly CreateSessionRequest CreateSessionRequest = new();
1717

18-
private readonly IDriver _driver;
1918
private readonly PoolingSessionSource _poolingSessionSource;
2019
private readonly ILogger<PoolingSession> _logger;
20+
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2121

2222
private volatile bool _isBroken = true;
23+
private volatile bool _isBadSession;
2324

2425
private readonly bool _disableServerBalancer;
2526

2627
private string SessionId { get; set; } = string.Empty;
2728
private long NodeId { get; set; }
2829

30+
public IDriver Driver { get; }
2931
public bool IsBroken => _isBroken;
3032

3133
internal PoolingSession(
@@ -35,10 +37,10 @@ internal PoolingSession(
3537
ILogger<PoolingSession> logger
3638
)
3739
{
38-
_driver = driver;
3940
_poolingSessionSource = poolingSessionSource;
4041
_disableServerBalancer = disableServerBalancer;
4142
_logger = logger;
43+
Driver = driver;
4244
}
4345

4446
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -60,15 +62,15 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
6062
};
6163
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
6264

63-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
65+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
6466
}
6567

6668
public async Task CommitTransaction(
6769
string txId,
6870
CancellationToken cancellationToken = default
6971
)
7072
{
71-
var response = await _driver.UnaryCall(
73+
var response = await Driver.UnaryCall(
7274
QueryService.CommitTransactionMethod,
7375
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
7476
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -85,7 +87,7 @@ public async Task RollbackTransaction(
8587
CancellationToken cancellationToken = default
8688
)
8789
{
88-
var response = await _driver.UnaryCall(
90+
var response = await Driver.UnaryCall(
8991
QueryService.RollbackTransactionMethod,
9092
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
9193
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -99,6 +101,8 @@ public async Task RollbackTransaction(
99101

100102
public void OnNotSuccessStatusCode(StatusCode statusCode)
101103
{
104+
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;
105+
102106
if (statusCode is
103107
StatusCode.BadSession or
104108
StatusCode.SessionBusy or
@@ -121,7 +125,7 @@ public async Task Open(CancellationToken cancellationToken)
121125
requestSettings.ClientCapabilities.Add(SessionBalancer);
122126
}
123127

124-
var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
128+
var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
125129

126130
if (response.Status.IsNotSuccess())
127131
{
@@ -138,7 +142,7 @@ public async Task Open(CancellationToken cancellationToken)
138142
{
139143
try
140144
{
141-
using var stream = await _driver.ServerStreamCall(
145+
using var stream = await Driver.ServerStreamCall(
142146
QueryService.AttachSessionMethod,
143147
new AttachSessionRequest { SessionId = SessionId },
144148
new GrpcRequestSettings { NodeId = NodeId }
@@ -161,10 +165,12 @@ public async Task Open(CancellationToken cancellationToken)
161165

162166
completeTask.SetResult();
163167

168+
var lifecycleAttachToken = _attachStreamLifecycleCts.Token;
169+
164170
try
165171
{
166172
// ReSharper disable once MethodSupportsCancellation
167-
while (await stream.MoveNextAsync())
173+
while (await stream.MoveNextAsync(lifecycleAttachToken))
168174
{
169175
var sessionState = stream.Current;
170176

@@ -215,14 +221,16 @@ public async Task DeleteSession()
215221
{
216222
try
217223
{
218-
if (_isBroken)
224+
_isBroken = true;
225+
_attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout);
226+
227+
if (_isBadSession)
219228
{
220229
return;
221230
}
222231

223-
_isBroken = true;
224-
225-
var deleteSessionResponse = await _driver.UnaryCall(
232+
_isBadSession = true;
233+
var deleteSessionResponse = await Driver.UnaryCall(
226234
QueryService.DeleteSessionMethod,
227235
new DeleteSessionRequest { SessionId = SessionId },
228236
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
// This file contains session pooling algorithms adapted from Npgsql
2-
// Original source: https://github.com/npgsql/npgsql
3-
// Copyright (c) 2002-2025, Npgsql
4-
// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file
5-
61
using System.Diagnostics.CodeAnalysis;
72
using System.Runtime.CompilerServices;
83
using System.Threading.Channels;
@@ -92,45 +87,34 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
9287

9388
var finalToken = ctsGetSession.Token;
9489

95-
try
90+
var session = await OpenNewSession(finalToken).ConfigureAwait(false);
91+
if (session != null)
92+
return session;
93+
94+
while (true)
9695
{
97-
var session = await OpenNewSession(finalToken).ConfigureAwait(false);
98-
if (session != null)
99-
return session;
96+
session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false);
10097

101-
while (true)
98+
if (CheckIdleSession(session))
10299
{
103-
session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false);
104-
105-
if (CheckIdleSession(session))
106-
{
107-
return session;
108-
}
100+
return session;
101+
}
109102

110-
// If we're here, our waiting attempt on the idle session channel was released with a null
111-
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
112-
if (TryGetIdleSession(out session))
113-
{
114-
return session;
115-
}
103+
// If we're here, our waiting attempt on the idle session channel was released with a null
104+
// (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked.
105+
if (TryGetIdleSession(out session))
106+
{
107+
return session;
108+
}
116109

117-
// We might have closed a session in the meantime and no longer be at max capacity
118-
// so try to open a new session and if that fails, loop again.
119-
session = await OpenNewSession(finalToken).ConfigureAwait(false);
120-
if (session != null)
121-
{
122-
return session;
123-
}
110+
// We might have closed a session in the meantime and no longer be at max capacity
111+
// so try to open a new session and if that fails, loop again.
112+
session = await OpenNewSession(finalToken).ConfigureAwait(false);
113+
if (session != null)
114+
{
115+
return session;
124116
}
125117
}
126-
catch (OperationCanceledException e)
127-
{
128-
throw new YdbException(StatusCode.Cancelled,
129-
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
130-
$"(currently {_maxSessionSize}) or 'CreateSessionTimeout' " +
131-
$"(currently {_createSessionTimeout} seconds) in your connection string.", e
132-
);
133-
}
134118
}
135119

136120
private async ValueTask<IPoolingSession?> OpenNewSession(CancellationToken cancellationToken)

src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Diagnostics.CodeAnalysis;
44
using System.Text;
55
using Ydb.Sdk.Ado.Internal;
6-
using Ydb.Sdk.Services.Query;
76
using Ydb.Sdk.Value;
87

98
namespace Ydb.Sdk.Ado;
@@ -206,8 +205,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
206205
preparedSql.Append(sql);
207206

208207
var execSettings = CommandTimeout > 0
209-
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
210-
: new ExecuteQuerySettings();
208+
? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
209+
: new GrpcRequestSettings();
211210

212211
var transaction = YdbConnection.CurrentTransaction;
213212

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString)
2525
// Init default connection string
2626
private void InitDefaultValues()
2727
{
28-
_host = YdbAdoDefaultSettings.Host;
29-
_port = YdbAdoDefaultSettings.Port;
30-
_database = YdbAdoDefaultSettings.Database;
28+
_host = "localhost";
29+
_port = 2136;
30+
_database = "/local";
3131
_minSessionPool = 0;
3232
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
3333
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
3434
_sessionIdleTimeout = 300;
3535
_sessionPruningInterval = 10;
36-
_useTls = YdbAdoDefaultSettings.UseTls;
36+
_useTls = false;
3737
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
3838
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
3939
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;

src/Ydb.Sdk/src/Ado/YdbSchema.cs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
using System.Globalization;
44
using Ydb.Scheme;
55
using Ydb.Scheme.V1;
6+
using Ydb.Sdk.Ado.Internal;
67
using Ydb.Sdk.Ado.Schema;
78
using Ydb.Sdk.Services.Table;
9+
using Ydb.Table;
10+
using Ydb.Table.V1;
811

912
namespace Ydb.Sdk.Ado;
1013

@@ -59,10 +62,28 @@ public static async Task<YdbTable> DescribeTable(
5962
{
6063
try
6164
{
62-
var describeResponse = await ydbConnection.Session
63-
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
65+
describeTableSettings ??= new DescribeTableSettings();
6466

65-
return new YdbTable(tableName, describeResponse);
67+
var describeResponse = await ydbConnection.Session.Driver.UnaryCall(
68+
TableService.DescribeTableMethod,
69+
new DescribeTableRequest
70+
{
71+
Path = WithSuffix(ydbConnection.Database) + tableName,
72+
IncludeTableStats = describeTableSettings.IncludeTableStats,
73+
IncludePartitionStats = describeTableSettings.IncludePartitionStats,
74+
IncludeShardKeyBounds = describeTableSettings.IncludeShardKeyBounds
75+
},
76+
describeTableSettings
77+
);
78+
79+
if (describeResponse.Operation.Status.IsNotSuccess())
80+
{
81+
throw YdbException.FromServer(describeResponse.Operation.Status, describeResponse.Operation.Issues);
82+
}
83+
84+
var describeResult = describeResponse.Operation.Result.Unpack<DescribeTableResult>();
85+
86+
return new YdbTable(tableName, describeResult);
6687
}
6788
catch (YdbException e)
6889
{
@@ -241,31 +262,20 @@ private static async Task AppendDescribeTable(
241262
string? tableType,
242263
Action<YdbTable, string> appendInTable)
243264
{
244-
try
245-
{
246-
var describeResponse = await ydbConnection.Session
247-
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
248-
var ydbTable = new YdbTable(tableName, describeResponse);
265+
var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings);
249266

250-
var type = ydbTable.IsSystem
251-
? "SYSTEM_TABLE"
252-
: ydbTable.Type switch
253-
{
254-
YdbTable.TableType.Table => "TABLE",
255-
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
256-
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
257-
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
258-
};
259-
if (type.IsPattern(tableType))
267+
var type = ydbTable.IsSystem
268+
? "SYSTEM_TABLE"
269+
: ydbTable.Type switch
260270
{
261-
appendInTable(ydbTable, type);
262-
}
263-
}
264-
catch (YdbException e)
271+
YdbTable.TableType.Table => "TABLE",
272+
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
273+
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
274+
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
275+
};
276+
if (type.IsPattern(tableType))
265277
{
266-
ydbConnection.OnNotSuccessStatusCode(e.Code);
267-
268-
throw;
278+
appendInTable(ydbTable, type);
269279
}
270280
}
271281

0 commit comments

Comments
 (0)