Skip to content

Commit 8cd524f

Browse files
feat ADO.NET: create session timeout & cancellation token propagation
1 parent 5547288 commit 8cd524f

15 files changed

+187
-102
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
- `YdbCommand`: Cancellation token propagation in `Execute*` methods
2+
- `YdbConnection`: Cancellation token propagation in `OpenAsync` method.
3+
- `YdbDataReader`: Cancellation token propagation in `ReadAsync` and `NextResultAsync` methods.
4+
- Added `CreateSessionTimeout` to ADO.NET when creating a session. (Default 5 seconds.)
5+
16
## v0.18.3
27

38
- Added `ConnectTimeout`: time to wait (in seconds) while trying to establish a connection.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Ydb.Sdk.Pool;
23
using Ydb.Sdk.Services.Query;
34

45
namespace Ydb.Sdk.Ado;
@@ -15,7 +16,7 @@ CancellationToken cancellationToken
1516
{
1617
if (Pools.TryGetValue(connectionString.ConnectionString, out var sessionPool))
1718
{
18-
return await sessionPool.GetSession();
19+
return await sessionPool.GetSession(cancellationToken);
1920
}
2021

2122
try
@@ -24,15 +25,21 @@ CancellationToken cancellationToken
2425

2526
if (Pools.TryGetValue(connectionString.ConnectionString, out var pool))
2627
{
27-
return await pool.GetSession();
28+
return await pool.GetSession(cancellationToken);
2829
}
2930

30-
var newSessionPool = new SessionPool(await connectionString.BuildDriver(), connectionString.MaxSessionPool,
31-
disposingDriver: true);
31+
var newSessionPool = new SessionPool(
32+
await connectionString.BuildDriver(),
33+
new SessionPoolConfig(
34+
MaxSessionPool: connectionString.MaxSessionPool,
35+
CreateSessionTimeout: connectionString.CreateSessionTimeout,
36+
DisposeDriver: true
37+
)
38+
);
3239

3340
Pools[connectionString.ConnectionString] = newSessionPool;
3441

35-
return await newSessionPool.GetSession();
42+
return await newSessionPool.GetSession(cancellationToken);
3643
}
3744
finally
3845
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace Ydb.Sdk.Ado;
2+
3+
internal static class YdbAdoDefaultSettings
4+
{
5+
internal const string Host = "localhost";
6+
7+
internal const int Port = 2136;
8+
9+
internal const string Database = "/local";
10+
11+
internal const bool UseTls = false;
12+
}

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.Logging.Abstractions;
66
using Ydb.Sdk.Auth;
7+
using Ydb.Sdk.Pool;
78
using Ydb.Sdk.Transport;
89

910
namespace Ydb.Sdk.Ado;
@@ -24,18 +25,19 @@ public YdbConnectionStringBuilder(string connectionString)
2425
// Init default connection string
2526
private void InitDefaultValues()
2627
{
27-
_host = "localhost";
28-
_port = 2136;
29-
_database = "/local";
30-
_maxSessionPool = 100;
31-
_useTls = false;
28+
_host = YdbAdoDefaultSettings.Host;
29+
_port = YdbAdoDefaultSettings.Port;
30+
_database = YdbAdoDefaultSettings.Database;
31+
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
32+
_useTls = YdbAdoDefaultSettings.UseTls;
3233
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
33-
_keepAlivePingDelay = GrpcDefaultSettings.DefaultKeepAlivePingSeconds;
34-
_keepAlivePingTimeout = GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds;
35-
_enableMultipleHttp2Connections = false;
34+
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
35+
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;
36+
_enableMultipleHttp2Connections = GrpcDefaultSettings.EnableMultipleHttp2Connections;
3637
_maxSendMessageSize = GrpcDefaultSettings.MaxSendMessageSize;
3738
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
38-
_disableDiscovery = false;
39+
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
40+
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
3941
}
4042

4143
public string Host
@@ -246,6 +248,24 @@ public bool DisableDiscovery
246248

247249
private bool _disableDiscovery;
248250

251+
public int CreateSessionTimeout
252+
{
253+
get => _createSessionTimeout;
254+
set
255+
{
256+
if (value < 0)
257+
{
258+
throw new ArgumentOutOfRangeException(nameof(value), value,
259+
"Invalid create session timeout: " + value);
260+
}
261+
262+
_createSessionTimeout = value;
263+
SaveValue(nameof(CreateSessionTimeout), value);
264+
}
265+
}
266+
267+
private int _createSessionTimeout;
268+
249269
public ILoggerFactory? LoggerFactory { get; init; }
250270

251271
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -410,6 +430,9 @@ static YdbConnectionOption()
410430
"MaxReceiveMessageSize", "Max Receive Message Size");
411431
AddOption(new YdbConnectionOption<bool>(BoolExtractor, (builder, disableDiscovery) =>
412432
builder.DisableDiscovery = disableDiscovery), "DisableDiscovery", "Disable Discovery");
433+
AddOption(new YdbConnectionOption<int>(IntExtractor,
434+
(builder, createSessionTimeout) => builder.CreateSessionTimeout = createSessionTimeout),
435+
"CreateSessionTimeout", "Create Session Timeout");
413436
}
414437

415438
private static void AddOption(YdbConnectionOption option, params string[] keys)

src/Ydb.Sdk/src/Ado/YdbDataReader.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Ydb.Sdk.Ado;
88

99
public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord>
1010
{
11-
private readonly IAsyncEnumerator<ExecuteQueryResponsePart> _stream;
11+
private readonly IServerStream<ExecuteQueryResponsePart> _stream;
1212
private readonly YdbTransaction? _ydbTransaction;
1313
private readonly RepeatedField<IssueMessage> _issueMessagesInStream = new();
1414
private readonly Action<Status> _onNotSuccessStatus;
@@ -53,7 +53,7 @@ private enum State
5353
internal bool IsOpen => ReaderState is State.NewResultSet or State.ReadResultSet;
5454

5555
private YdbDataReader(
56-
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
56+
IServerStream<ExecuteQueryResponsePart> resultSetStream,
5757
Action<Status> onNotSuccessStatus,
5858
YdbTransaction? ydbTransaction)
5959
{
@@ -63,7 +63,7 @@ private YdbDataReader(
6363
}
6464

6565
internal static async Task<YdbDataReader> CreateYdbDataReader(
66-
IAsyncEnumerator<ExecuteQueryResponsePart> resultSetStream,
66+
IServerStream<ExecuteQueryResponsePart> resultSetStream,
6767
Action<Status> onStatus,
6868
YdbTransaction? ydbTransaction = null)
6969
{
@@ -75,7 +75,7 @@ internal static async Task<YdbDataReader> CreateYdbDataReader(
7575

7676
private async Task Init()
7777
{
78-
if (State.IsConsumed == await NextExecPart())
78+
if (State.IsConsumed == await NextExecPart(CancellationToken.None))
7979
{
8080
throw new YdbException("YDB server closed the stream");
8181
}
@@ -448,7 +448,7 @@ public override async Task<bool> NextResultAsync(CancellationToken cancellationT
448448
State.ReadResultSet => await new Func<Task<State>>(async () =>
449449
{
450450
State state;
451-
while ((state = await NextExecPart()) == State.ReadResultSet)
451+
while ((state = await NextExecPart(cancellationToken)) == State.ReadResultSet)
452452
{
453453
}
454454

@@ -475,7 +475,7 @@ public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
475475
return true;
476476
}
477477

478-
while ((ReaderState = await NextExecPart()) == State.ReadResultSet) // reset _currentRowIndex
478+
while ((ReaderState = await NextExecPart(cancellationToken)) == State.ReadResultSet) // reset _currentRowIndex
479479
{
480480
if (++_currentRowIndex < RowsCount)
481481
{
@@ -521,7 +521,7 @@ public override async Task CloseAsync()
521521
}
522522

523523
_onNotSuccessStatus(new Status(StatusCode.SessionBusy));
524-
await _stream.DisposeAsync();
524+
_stream.Dispose();
525525

526526
if (_ydbTransaction != null)
527527
{
@@ -542,13 +542,13 @@ private YdbValue GetFieldYdbValue(int ordinal)
542542
: ydbValue;
543543
}
544544

545-
private async ValueTask<State> NextExecPart()
545+
private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)
546546
{
547547
try
548548
{
549549
_currentRowIndex = -1;
550550

551-
if (!await _stream.MoveNextAsync())
551+
if (!await _stream.MoveNextAsync(cancellationToken))
552552
{
553553
return State.IsConsumed;
554554
}
@@ -561,7 +561,7 @@ private async ValueTask<State> NextExecPart()
561561
{
562562
OnFailReadStream();
563563

564-
while (await _stream.MoveNextAsync())
564+
while (await _stream.MoveNextAsync(cancellationToken))
565565
{
566566
_issueMessagesInStream.AddRange(_stream.Current.Issues);
567567
}

src/Ydb.Sdk/src/DriverConfig.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ public class DriverConfig
1414
TimeSpan.FromSeconds(GrpcDefaultSettings.ConnectTimeoutSeconds);
1515

1616
public TimeSpan KeepAlivePingDelay { get; init; } =
17-
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingSeconds);
17+
TimeSpan.FromSeconds(GrpcDefaultSettings.KeepAlivePingSeconds);
1818

1919
public TimeSpan KeepAlivePingTimeout { get; init; } =
20-
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds);
20+
TimeSpan.FromSeconds(GrpcDefaultSettings.KeepAlivePingTimeoutSeconds);
2121

2222
public string? User { get; init; }
2323
public string? Password { get; init; }
2424

25-
public bool EnableMultipleHttp2Connections { get; init; }
25+
public bool EnableMultipleHttp2Connections { get; init; } = GrpcDefaultSettings.EnableMultipleHttp2Connections;
2626

2727
public int MaxSendMessageSize { get; init; } = GrpcDefaultSettings.MaxSendMessageSize;
2828
public int MaxReceiveMessageSize { get; init; } = GrpcDefaultSettings.MaxReceiveMessageSize;

src/Ydb.Sdk/src/GrpcDefaultSettings.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,20 @@ internal static class GrpcDefaultSettings
55
/// <summary>
66
/// Default interval (in seconds) for sending keep-alive ping messages.
77
/// </summary>
8-
internal const int DefaultKeepAlivePingSeconds = 10;
8+
internal const int KeepAlivePingSeconds = 10;
99

1010
/// <summary>
1111
/// Default timeout (in seconds) for receiving a response to a keep-alive ping.
1212
/// </summary>
13-
internal const int DefaultKeepAlivePingTimeoutSeconds = 10;
13+
internal const int KeepAlivePingTimeoutSeconds = 10;
1414

1515
internal const int MaxSendMessageSize = 64 * 1024 * 1024; // 64 Mb
1616

1717
internal const int MaxReceiveMessageSize = 64 * 1024 * 1024; // 64 Mb
1818

1919
internal const int ConnectTimeoutSeconds = 5;
20+
21+
internal const bool EnableMultipleHttp2Connections = false;
22+
23+
internal const bool DisableDiscovery = false;
2024
}

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
4545
public Task RequestStreamComplete();
4646
}
4747

48+
public interface IServerStream<out TResponse> : IDisposable
49+
{
50+
public ValueTask<bool> MoveNextAsync(CancellationToken cancellationToken = default);
51+
52+
public TResponse Current { get; }
53+
}
54+
4855
public abstract class BaseDriver : IDriver
4956
{
5057
private readonly ICredentialsProvider? _credentialsProvider;
@@ -200,7 +207,7 @@ public async ValueTask DisposeAsync()
200207
}
201208
}
202209

203-
public sealed class ServerStream<TResponse> : IAsyncEnumerator<TResponse>, IAsyncEnumerable<TResponse>
210+
public sealed class ServerStream<TResponse> : IServerStream<TResponse>
204211
{
205212
private readonly AsyncServerStreamingCall<TResponse> _stream;
206213
private readonly Action<RpcException> _rpcErrorAction;
@@ -211,18 +218,11 @@ internal ServerStream(AsyncServerStreamingCall<TResponse> stream, Action<RpcExce
211218
_rpcErrorAction = rpcErrorAction;
212219
}
213220

214-
public ValueTask DisposeAsync()
215-
{
216-
_stream.Dispose();
217-
218-
return default;
219-
}
220-
221-
public async ValueTask<bool> MoveNextAsync()
221+
public async ValueTask<bool> MoveNextAsync(CancellationToken cancellationToken = default)
222222
{
223223
try
224224
{
225-
return await _stream.ResponseStream.MoveNext(CancellationToken.None);
225+
return await _stream.ResponseStream.MoveNext(cancellationToken);
226226
}
227227
catch (RpcException e)
228228
{
@@ -234,7 +234,7 @@ public async ValueTask<bool> MoveNextAsync()
234234

235235
public TResponse Current => _stream.ResponseStream.Current;
236236

237-
public IAsyncEnumerator<TResponse> GetAsyncEnumerator(CancellationToken cancellationToken = new()) => this;
237+
public void Dispose() => _stream.Dispose();
238238
}
239239

240240
internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>

0 commit comments

Comments
 (0)