Skip to content

Commit f4b59cc

Browse files
feat ADO.NET: create session timeout & cancellation token propagation (#459)
- ADO.NET: session is now deactivated when cancelled. - Fixed bug ADO.NET: throws an `InvalidOperationException` if the connection is broken during the next invocation. - Fixed bug `YdbCommand`: `Execute*` methods now propagate the cancellation token only for initializing YdbDataReader; the token is not passed to the server stream. - `YdbCommand`: Improved cancellation token propagation in `Execute*` methods. - `YdbConnection`: Added cancellation token propagation support in `OpenAsync`. - `YdbDataReader`: Added cancellation token propagation support in `ReadAsync` and `NextResultAsync`. - Added `CreateSessionTimeout` option to ADO.NET session creation; default is 5 seconds.
1 parent 5547288 commit f4b59cc

21 files changed

+325
-118
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
- ADO.NET: session is now deactivated when cancelled.
2+
- Fixed bug ADO.NET: throws an `InvalidOperationException` if the connection is broken during the next invocation.
3+
- Fixed bug `YdbCommand`: `Execute*` methods now propagate the cancellation token only for initializing YdbDataReader; the token is not passed to the server stream.
4+
- `YdbCommand`: Improved cancellation token propagation in `Execute*` methods.
5+
- `YdbConnection`: Added cancellation token propagation support in `OpenAsync`.
6+
- `YdbDataReader`: Added cancellation token propagation support in `ReadAsync` and `NextResultAsync`.
7+
- Added `CreateSessionTimeout` option to ADO.NET session creation; default is 5 seconds.
8+
19
## v0.18.3
210

311
- Added `ConnectTimeout`: time to wait (in seconds) while trying to establish a connection.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Ydb.Sdk.Pool;
23
using Ydb.Sdk.Services.Query;
34

45
namespace Ydb.Sdk.Ado;
@@ -15,7 +16,7 @@ CancellationToken cancellationToken
1516
{
1617
if (Pools.TryGetValue(connectionString.ConnectionString, out var sessionPool))
1718
{
18-
return await sessionPool.GetSession();
19+
return await sessionPool.GetSession(cancellationToken);
1920
}
2021

2122
try
@@ -24,15 +25,21 @@ CancellationToken cancellationToken
2425

2526
if (Pools.TryGetValue(connectionString.ConnectionString, out var pool))
2627
{
27-
return await pool.GetSession();
28+
return await pool.GetSession(cancellationToken);
2829
}
2930

30-
var newSessionPool = new SessionPool(await connectionString.BuildDriver(), connectionString.MaxSessionPool,
31-
disposingDriver: true);
31+
var newSessionPool = new SessionPool(
32+
await connectionString.BuildDriver(),
33+
new SessionPoolConfig(
34+
MaxSessionPool: connectionString.MaxSessionPool,
35+
CreateSessionTimeout: connectionString.CreateSessionTimeout,
36+
DisposeDriver: true
37+
)
38+
);
3239

3340
Pools[connectionString.ConnectionString] = newSessionPool;
3441

35-
return await newSessionPool.GetSession();
42+
return await newSessionPool.GetSession(cancellationToken);
3643
}
3744
finally
3845
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace Ydb.Sdk.Ado;
2+
3+
internal static class YdbAdoDefaultSettings
4+
{
5+
internal const string Host = "localhost";
6+
7+
internal const int Port = 2136;
8+
9+
internal const string Database = "/local";
10+
11+
internal const bool UseTls = false;
12+
}

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
174174
throw new YdbOperationInProgressException(YdbConnection);
175175
}
176176

177-
YdbConnection.EnsureConnectionOpen();
177+
YdbConnection.ThrowIfConnectionClosed();
178178

179179
var ydbParameters = DbParameterCollection.YdbParameters;
180180
var (sql, paramNames) = SqlParser.Parse(
@@ -201,7 +201,6 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
201201
var execSettings = CommandTimeout > 0
202202
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
203203
: new ExecuteQuerySettings();
204-
execSettings.CancellationToken = cancellationToken;
205204

206205
var transaction = YdbConnection.CurrentTransaction;
207206

@@ -211,9 +210,9 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
211210
}
212211

213212
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
214-
await YdbConnection.Session.ExecuteQuery(
215-
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl
216-
), YdbConnection.OnStatus, transaction
213+
await YdbConnection.Session
214+
.ExecuteQuery(preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
215+
YdbConnection.OnStatus, transaction, cancellationToken
217216
);
218217

219218
YdbConnection.LastReader = ydbDataReader;

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ internal Session Session
2828
{
2929
get
3030
{
31-
EnsureConnectionOpen();
31+
ThrowIfConnectionClosed();
3232

3333
return _session;
3434
}
@@ -53,7 +53,7 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
5353

5454
protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
5555
{
56-
EnsureConnectionOpen();
56+
ThrowIfConnectionClosed();
5757

5858
return BeginTransaction(isolationLevel switch
5959
{
@@ -66,7 +66,7 @@ protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLev
6666

6767
public YdbTransaction BeginTransaction(TxMode txMode = TxMode.SerializableRw)
6868
{
69-
EnsureConnectionOpen();
69+
ThrowIfConnectionClosed();
7070

7171
if (CurrentTransaction is { Completed: false })
7272
{
@@ -90,7 +90,7 @@ public override void ChangeDatabase(string databaseName)
9090

9191
public override async Task OpenAsync(CancellationToken cancellationToken)
9292
{
93-
EnsureConnectionClosed();
93+
ThrowIfConnectionOpen();
9494

9595
try
9696
{
@@ -100,6 +100,11 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
100100
{
101101
throw e switch
102102
{
103+
OperationCanceledException => throw new YdbException(StatusCode.Cancelled,
104+
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
105+
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
106+
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
107+
),
103108
Driver.TransportException transportException => new YdbException(transportException),
104109
StatusUnsuccessfulException unsuccessfulException => new YdbException(unsuccessfulException.Status),
105110
_ => e
@@ -147,7 +152,7 @@ public override string ConnectionString
147152
set
148153
#pragma warning restore CS8765 // Nullability of type of parameter doesn't match overridden member (possibly because of nullability attributes).
149154
{
150-
EnsureConnectionClosed();
155+
ThrowIfConnectionOpen();
151156

152157
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
153158
_connectionStringBuilder = value != null ? new YdbConnectionStringBuilder(value) : null;
@@ -181,7 +186,7 @@ public override string ServerVersion
181186
{
182187
get
183188
{
184-
EnsureConnectionOpen();
189+
ThrowIfConnectionClosed();
185190

186191
return string.Empty; // TODO ServerVersion
187192
}
@@ -212,17 +217,17 @@ public override Task<DataTable> GetSchemaAsync(
212217
CancellationToken cancellationToken = default
213218
) => YdbSchema.GetSchemaAsync(this, collectionName, restrictionValues, cancellationToken);
214219

215-
internal void EnsureConnectionOpen()
220+
internal void ThrowIfConnectionClosed()
216221
{
217-
if (ConnectionState == ConnectionState.Closed)
222+
if (ConnectionState is ConnectionState.Closed or ConnectionState.Broken)
218223
{
219224
throw new InvalidOperationException("Connection is closed");
220225
}
221226
}
222227

223-
private void EnsureConnectionClosed()
228+
private void ThrowIfConnectionOpen()
224229
{
225-
if (ConnectionState != ConnectionState.Closed)
230+
if (ConnectionState == ConnectionState.Open)
226231
{
227232
throw new InvalidOperationException("Connection already open");
228233
}

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.Logging.Abstractions;
66
using Ydb.Sdk.Auth;
7+
using Ydb.Sdk.Pool;
78
using Ydb.Sdk.Transport;
89

910
namespace Ydb.Sdk.Ado;
@@ -24,18 +25,19 @@ public YdbConnectionStringBuilder(string connectionString)
2425
// Init default connection string
2526
private void InitDefaultValues()
2627
{
27-
_host = "localhost";
28-
_port = 2136;
29-
_database = "/local";
30-
_maxSessionPool = 100;
31-
_useTls = false;
28+
_host = YdbAdoDefaultSettings.Host;
29+
_port = YdbAdoDefaultSettings.Port;
30+
_database = YdbAdoDefaultSettings.Database;
31+
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
32+
_useTls = YdbAdoDefaultSettings.UseTls;
3233
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
33-
_keepAlivePingDelay = GrpcDefaultSettings.DefaultKeepAlivePingSeconds;
34-
_keepAlivePingTimeout = GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds;
35-
_enableMultipleHttp2Connections = false;
34+
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
35+
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;
36+
_enableMultipleHttp2Connections = GrpcDefaultSettings.EnableMultipleHttp2Connections;
3637
_maxSendMessageSize = GrpcDefaultSettings.MaxSendMessageSize;
3738
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
38-
_disableDiscovery = false;
39+
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
40+
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
3941
}
4042

4143
public string Host
@@ -246,6 +248,24 @@ public bool DisableDiscovery
246248

247249
private bool _disableDiscovery;
248250

251+
public int CreateSessionTimeout
252+
{
253+
get => _createSessionTimeout;
254+
set
255+
{
256+
if (value < 0)
257+
{
258+
throw new ArgumentOutOfRangeException(nameof(value), value,
259+
"Invalid create session timeout: " + value);
260+
}
261+
262+
_createSessionTimeout = value;
263+
SaveValue(nameof(CreateSessionTimeout), value);
264+
}
265+
}
266+
267+
private int _createSessionTimeout;
268+
249269
public ILoggerFactory? LoggerFactory { get; init; }
250270

251271
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -410,6 +430,9 @@ static YdbConnectionOption()
410430
"MaxReceiveMessageSize", "Max Receive Message Size");
411431
AddOption(new YdbConnectionOption<bool>(BoolExtractor, (builder, disableDiscovery) =>
412432
builder.DisableDiscovery = disableDiscovery), "DisableDiscovery", "Disable Discovery");
433+
AddOption(new YdbConnectionOption<int>(IntExtractor,
434+
(builder, createSessionTimeout) => builder.CreateSessionTimeout = createSessionTimeout),
435+
"CreateSessionTimeout", "Create Session Timeout");
413436
}
414437

415438
private static void AddOption(YdbConnectionOption option, params string[] keys)

src/Ydb.Sdk/src/Ado/YdbDataReader.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Ydb.Sdk.Ado;
88

99
public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord>
1010
{
11-
private readonly IAsyncEnumerator<ExecuteQueryResponsePart> _stream;
11+
private readonly IServerStream<ExecuteQueryResponsePart> _stream;
1212
private readonly YdbTransaction? _ydbTransaction;
1313
private readonly RepeatedField<IssueMessage> _issueMessagesInStream = new();
1414
private readonly Action<Status> _onNotSuccessStatus;
@@ -53,7 +53,7 @@ private enum State
5353
internal bool IsOpen => ReaderState is State.NewResultSet or State.ReadResultSet;
5454

5555
private YdbDataReader(
56-
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
56+
IServerStream<ExecuteQueryResponsePart> resultSetStream,
5757
Action<Status> onNotSuccessStatus,
5858
YdbTransaction? ydbTransaction)
5959
{
@@ -63,19 +63,21 @@ private YdbDataReader(
6363
}
6464

6565
internal static async Task<YdbDataReader> CreateYdbDataReader(
66-
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
66+
IServerStream<ExecuteQueryResponsePart> resultSetStream,
6767
Action<Status> onStatus,
68-
YdbTransaction? ydbTransaction = null)
68+
YdbTransaction? ydbTransaction = null,
69+
CancellationToken cancellationToken = default
70+
)
6971
{
7072
var ydbDataReader = new YdbDataReader(resultSetStream, onStatus, ydbTransaction);
71-
await ydbDataReader.Init();
73+
await ydbDataReader.Init(cancellationToken);
7274

7375
return ydbDataReader;
7476
}
7577

76-
private async Task Init()
78+
private async Task Init(CancellationToken cancellationToken)
7779
{
78-
if (State.IsConsumed == await NextExecPart())
80+
if (State.IsConsumed == await NextExecPart(cancellationToken))
7981
{
8082
throw new YdbException("YDB server closed the stream");
8183
}
@@ -448,7 +450,7 @@ public override async Task<bool> NextResultAsync(CancellationToken cancellationT
448450
State.ReadResultSet => await new Func<Task<State>>(async () =>
449451
{
450452
State state;
451-
while ((state = await NextExecPart()) == State.ReadResultSet)
453+
while ((state = await NextExecPart(cancellationToken)) == State.ReadResultSet)
452454
{
453455
}
454456

@@ -475,7 +477,7 @@ public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
475477
return true;
476478
}
477479

478-
while ((ReaderState = await NextExecPart()) == State.ReadResultSet) // reset _currentRowIndex
480+
while ((ReaderState = await NextExecPart(cancellationToken)) == State.ReadResultSet) // reset _currentRowIndex
479481
{
480482
if (++_currentRowIndex < RowsCount)
481483
{
@@ -521,7 +523,7 @@ public override async Task CloseAsync()
521523
}
522524

523525
_onNotSuccessStatus(new Status(StatusCode.SessionBusy));
524-
await _stream.DisposeAsync();
526+
_stream.Dispose();
525527

526528
if (_ydbTransaction != null)
527529
{
@@ -542,13 +544,13 @@ private YdbValue GetFieldYdbValue(int ordinal)
542544
: ydbValue;
543545
}
544546

545-
private async ValueTask<State> NextExecPart()
547+
private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)
546548
{
547549
try
548550
{
549551
_currentRowIndex = -1;
550552

551-
if (!await _stream.MoveNextAsync())
553+
if (!await _stream.MoveNextAsync(cancellationToken))
552554
{
553555
return State.IsConsumed;
554556
}
@@ -561,7 +563,7 @@ private async ValueTask<State> NextExecPart()
561563
{
562564
OnFailReadStream();
563565

564-
while (await _stream.MoveNextAsync())
566+
while (await _stream.MoveNextAsync(cancellationToken))
565567
{
566568
_issueMessagesInStream.AddRange(_stream.Current.Issues);
567569
}

src/Ydb.Sdk/src/Ado/YdbException.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@ internal YdbException(Driver.TransportException transportException)
1414
}
1515

1616
internal YdbException(Status status, Exception? innerException = null)
17-
: base(status.ToString(), innerException)
17+
: this(status.StatusCode, status.ToString(), innerException)
1818
{
19-
Code = status.StatusCode;
20-
var policy = RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy;
19+
}
20+
21+
internal YdbException(StatusCode statusCode, string message, Exception? innerException = null)
22+
: base(message, innerException)
23+
{
24+
Code = statusCode;
25+
var policy = RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy;
2126

2227
IsTransient = policy == RetryPolicy.Unconditional;
2328
IsTransientWhenIdempotent = policy != RetryPolicy.None;

src/Ydb.Sdk/src/DriverConfig.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ public class DriverConfig
1414
TimeSpan.FromSeconds(GrpcDefaultSettings.ConnectTimeoutSeconds);
1515

1616
public TimeSpan KeepAlivePingDelay { get; init; } =
17-
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingSeconds);
17+
TimeSpan.FromSeconds(GrpcDefaultSettings.KeepAlivePingSeconds);
1818

1919
public TimeSpan KeepAlivePingTimeout { get; init; } =
20-
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds);
20+
TimeSpan.FromSeconds(GrpcDefaultSettings.KeepAlivePingTimeoutSeconds);
2121

2222
public string? User { get; init; }
2323
public string? Password { get; init; }
2424

25-
public bool EnableMultipleHttp2Connections { get; init; }
25+
public bool EnableMultipleHttp2Connections { get; init; } = GrpcDefaultSettings.EnableMultipleHttp2Connections;
2626

2727
public int MaxSendMessageSize { get; init; } = GrpcDefaultSettings.MaxSendMessageSize;
2828
public int MaxReceiveMessageSize { get; init; } = GrpcDefaultSettings.MaxReceiveMessageSize;

0 commit comments

Comments
 (0)