Skip to content

Commit 769a65b

Browse files
next changes
1 parent 3601d20 commit 769a65b

File tree

7 files changed

+207
-87
lines changed

7 files changed

+207
-87
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- Feat ADO.NET: `YdbDataSource.OpenRetryableConnectionAsync` opens a retryable connection with automatic retries for transient failures.
12
- Fixed bug ADO.NET/PoolManager: `SemaphoreSlim.WaitAsync` over-release on cancellation.
23
- Feat ADO.NET: Mark `YdbConnection.State` as `Broken` when the underlying session is broken, including background deactivation.
34
- Feat ADO.NET: Added YdbDataSource `ExecuteAsync` and `ExecuteInTransaction` convenience methods.

src/Ydb.Sdk/src/Ado/Session/RetryableSession.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal sealed class InMemoryServerStream : IServerStream<ExecuteQueryResponseP
6060
private readonly GrpcRequestSettings _settings;
6161

6262
private List<ExecuteQueryResponsePart>? _responses;
63-
private int _iterator;
63+
private int _index = -1;
6464

6565
public InMemoryServerStream(
6666
ISessionSource sessionSource,
@@ -78,12 +78,7 @@ public InMemoryServerStream(
7878

7979
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = default)
8080
{
81-
if (_responses is not null)
82-
{
83-
return ++_iterator < _responses.Count;
84-
}
85-
86-
_responses = await _ydbRetryPolicyExecutor.ExecuteAsync<List<ExecuteQueryResponsePart>>(async ct =>
81+
_responses ??= await _ydbRetryPolicyExecutor.ExecuteAsync<List<ExecuteQueryResponsePart>>(async ct =>
8782
{
8883
using var session = await _sessionSource.OpenSession(ct);
8984

@@ -113,12 +108,16 @@ public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = defa
113108
}
114109
}, cancellationToken);
115110

116-
return _responses.Count > 0;
111+
if (_index + 1 >= _responses.Count)
112+
return false;
113+
114+
_index++;
115+
return true;
117116
}
118117

119-
public ExecuteQueryResponsePart Current => _responses is not null && _iterator < _responses.Count
120-
? _responses[_iterator]
121-
: throw new InvalidOperationException("No response found");
118+
public ExecuteQueryResponsePart Current => _responses is not null && _index < _responses.Count
119+
? _responses[_index]
120+
: throw new InvalidOperationException("Enumeration has not started or has already finished");
122121

123122
public void Dispose()
124123
{
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using Xunit;
2+
using Ydb.Query;
3+
using Ydb.Sdk.Ado.Session;
4+
5+
namespace Ydb.Sdk.Ado.Tests.Session;
6+
7+
internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFactory<MockPoolingSession>
8+
{
9+
private int _sessionOpened;
10+
private int _numSession;
11+
12+
internal int SessionOpenedCount => Volatile.Read(ref _sessionOpened);
13+
internal int NumSession => Volatile.Read(ref _numSession);
14+
15+
internal Func<int, Task> Open { private get; init; } = _ => Task.CompletedTask;
16+
internal Func<int, bool> IsBroken { private get; init; } = _ => false;
17+
internal Func<ValueTask> Dispose { private get; init; } = () => ValueTask.CompletedTask;
18+
19+
internal Func<int, IServerStream<ExecuteQueryResponsePart>> ExecuteQuery { private get; init; } =
20+
_ => throw new NotImplementedException();
21+
22+
public MockPoolingSession NewSession(PoolingSessionSource<MockPoolingSession> source) =>
23+
new(source,
24+
async sessionCountOpened =>
25+
{
26+
await Open(sessionCountOpened);
27+
28+
Assert.True(Interlocked.Increment(ref _numSession) <= maxSessionSize);
29+
30+
await Task.Yield();
31+
},
32+
() =>
33+
{
34+
Assert.True(Interlocked.Decrement(ref _numSession) >= 0);
35+
36+
return Task.CompletedTask;
37+
},
38+
IsBroken,
39+
ExecuteQuery,
40+
Interlocked.Increment(ref _sessionOpened)
41+
);
42+
43+
public ValueTask DisposeAsync() => Dispose();
44+
}
45+
46+
internal class MockPoolingSession(
47+
PoolingSessionSource<MockPoolingSession> source,
48+
Func<int, Task> mockOpen,
49+
Func<Task> mockDeleteSession,
50+
Func<int, bool> mockIsBroken,
51+
Func<int, IServerStream<ExecuteQueryResponsePart>> executeQuery,
52+
int sessionId
53+
) : PoolingSessionBase<MockPoolingSession>(source)
54+
{
55+
private bool _isBroken;
56+
57+
public int SessionId => sessionId;
58+
public override IDriver Driver => null!;
59+
public override bool IsBroken => _isBroken || mockIsBroken(sessionId);
60+
61+
internal override Task Open(CancellationToken cancellationToken) => mockOpen(sessionId);
62+
internal override Task DeleteSession() => mockDeleteSession();
63+
64+
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
65+
string query,
66+
Dictionary<string, TypedValue> parameters,
67+
GrpcRequestSettings settings,
68+
TransactionControl? txControl
69+
) => new(executeQuery(sessionId));
70+
71+
public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
72+
throw new NotImplementedException();
73+
74+
public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
75+
throw new NotImplementedException();
76+
77+
public override void OnNotSuccessStatusCode(StatusCode code) => _isBroken = true;
78+
}
79+
80+
internal static class ISessionExtension
81+
{
82+
internal static int SessionId(this ISession session) => ((MockPoolingSession)session).SessionId;
83+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Collections.Concurrent;
22
using Xunit;
3-
using Ydb.Query;
43
using Ydb.Sdk.Ado.Session;
54

65
namespace Ydb.Sdk.Ado.Tests.Session;
@@ -388,76 +387,3 @@ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession()
388387
Assert.Equal(maxSessionSize + 1, mockFactory.SessionOpenedCount);
389388
}
390389
}
391-
392-
internal static class ISessionExtension
393-
{
394-
internal static int SessionId(this ISession session) => ((MockPoolingSession)session).SessionId;
395-
}
396-
397-
internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFactory<MockPoolingSession>
398-
{
399-
private int _sessionOpened;
400-
private int _numSession;
401-
402-
internal int SessionOpenedCount => Volatile.Read(ref _sessionOpened);
403-
internal int NumSession => Volatile.Read(ref _numSession);
404-
405-
internal Func<int, Task> Open { private get; init; } = _ => Task.CompletedTask;
406-
internal Func<int, bool> IsBroken { private get; init; } = _ => false;
407-
internal Func<ValueTask> Dispose { private get; init; } = () => ValueTask.CompletedTask;
408-
409-
public MockPoolingSession NewSession(PoolingSessionSource<MockPoolingSession> source) =>
410-
new(source,
411-
async sessionCountOpened =>
412-
{
413-
await Open(sessionCountOpened);
414-
415-
Assert.True(Interlocked.Increment(ref _numSession) <= maxSessionSize);
416-
417-
await Task.Yield();
418-
},
419-
() =>
420-
{
421-
Assert.True(Interlocked.Decrement(ref _numSession) >= 0);
422-
423-
return Task.CompletedTask;
424-
},
425-
sessionNum => IsBroken(sessionNum),
426-
Interlocked.Increment(ref _sessionOpened)
427-
);
428-
429-
public ValueTask DisposeAsync() => Dispose();
430-
}
431-
432-
internal class MockPoolingSession(
433-
PoolingSessionSource<MockPoolingSession> source,
434-
Func<int, Task> mockOpen,
435-
Func<Task> mockDeleteSession,
436-
Func<int, bool> mockIsBroken,
437-
int sessionNum
438-
) : PoolingSessionBase<MockPoolingSession>(source)
439-
{
440-
public int SessionId => sessionNum;
441-
public override IDriver Driver => null!;
442-
public override bool IsBroken => mockIsBroken(sessionNum);
443-
444-
internal override Task Open(CancellationToken cancellationToken) => mockOpen(sessionNum);
445-
internal override Task DeleteSession() => mockDeleteSession();
446-
447-
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
448-
string query,
449-
Dictionary<string, TypedValue> parameters,
450-
GrpcRequestSettings settings,
451-
TransactionControl? txControl
452-
) => throw new NotImplementedException();
453-
454-
public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
455-
throw new NotImplementedException();
456-
457-
public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
458-
throw new NotImplementedException();
459-
460-
public override void OnNotSuccessStatusCode(StatusCode code)
461-
{
462-
}
463-
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public PoolingSessionTests()
4242
[InlineData(StatusCode.SessionExpired, true)]
4343
[InlineData(StatusCode.ClientTransportTimeout, true)]
4444
[InlineData(StatusCode.ClientTransportUnavailable, true)]
45+
[InlineData(StatusCode.ClientTransportResourceExhausted, true)]
46+
[InlineData(StatusCode.ClientTransportUnknown, true)]
4547
[InlineData(StatusCode.Overloaded, false)]
4648
public async Task OnNotSuccessStatusCode_WhenStatusCodeIsNotSuccess_UpdateIsBroken(StatusCode statusCode,
4749
bool isError)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
using Xunit;
2+
using Ydb.Query;
3+
using Ydb.Sdk.Ado.Internal;
4+
using Ydb.Sdk.Ado.RetryPolicy;
5+
using Ydb.Sdk.Ado.Session;
6+
using Ydb.Sdk.Ado.Tests.Utils;
7+
8+
namespace Ydb.Sdk.Ado.Tests.Session;
9+
10+
public class RetryableSessionTests
11+
{
12+
[Fact]
13+
public async Task MoveNextAsync_WhenRetryableStatus_RetriesUpToMaxAttempts_ThenThrows()
14+
{
15+
var factory = new MockPoolingSessionFactory(1)
16+
{
17+
IsBroken = _ => false,
18+
ExecuteQuery = _ => new MockAsyncEnumerator<ExecuteQueryResponsePart>(
19+
new List<ExecuteQueryResponsePart> { new() { Status = StatusIds.Types.StatusCode.BadSession } })
20+
};
21+
22+
var retryableSession = new RetryableSession(new PoolingSessionSource<MockPoolingSession>(
23+
factory,
24+
new YdbConnectionStringBuilder { MaxSessionPool = 1 }),
25+
new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 }))
26+
);
27+
28+
var inMemoryStream = await retryableSession.ExecuteQuery(
29+
"SELECT * FROM session",
30+
new Dictionary<string, TypedValue>(),
31+
new GrpcRequestSettings(),
32+
null
33+
);
34+
35+
Assert.Equal(StatusCode.BadSession,
36+
(await Assert.ThrowsAsync<YdbException>(async () => await inMemoryStream.MoveNextAsync())).Code);
37+
Assert.Equal(5, factory.SessionOpenedCount);
38+
}
39+
40+
[Fact]
41+
public async Task MoveNextAsync_WhenNonRetryable_ThrowsWithoutRetry()
42+
{
43+
var factory = new MockPoolingSessionFactory(1)
44+
{
45+
IsBroken = _ => false,
46+
ExecuteQuery = _ => new MockAsyncEnumerator<ExecuteQueryResponsePart>(
47+
new List<ExecuteQueryResponsePart> { new() { Status = StatusIds.Types.StatusCode.Unauthorized } })
48+
};
49+
50+
var retryableSession = new RetryableSession(new PoolingSessionSource<MockPoolingSession>(
51+
factory,
52+
new YdbConnectionStringBuilder { MaxSessionPool = 1 }),
53+
new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 }))
54+
);
55+
56+
var inMemoryStream = await retryableSession.ExecuteQuery(
57+
"SELECT * FROM session",
58+
new Dictionary<string, TypedValue>(),
59+
new GrpcRequestSettings(),
60+
null
61+
);
62+
63+
Assert.Equal(StatusCode.Unauthorized,
64+
(await Assert.ThrowsAsync<YdbException>(async () => await inMemoryStream.MoveNextAsync())).Code);
65+
Assert.Equal(1, factory.SessionOpenedCount);
66+
}
67+
68+
[Fact]
69+
public async Task MoveNextAsync_SucceedsOnThirdAttempt_StopsRetrying()
70+
{
71+
var attempt = 0;
72+
var factory = new MockPoolingSessionFactory(1)
73+
{
74+
IsBroken = _ => false,
75+
ExecuteQuery = _ =>
76+
{
77+
attempt++;
78+
if (attempt < 3)
79+
return new MockAsyncEnumerator<ExecuteQueryResponsePart>(
80+
new List<ExecuteQueryResponsePart>
81+
{
82+
new() { Status = StatusIds.Types.StatusCode.BadSession }
83+
});
84+
return new MockAsyncEnumerator<ExecuteQueryResponsePart>(
85+
new List<ExecuteQueryResponsePart> { new() { Status = StatusIds.Types.StatusCode.Success } }
86+
);
87+
}
88+
};
89+
90+
var retryableSession = new RetryableSession(new PoolingSessionSource<MockPoolingSession>(
91+
factory,
92+
new YdbConnectionStringBuilder { MaxSessionPool = 1 }),
93+
new YdbRetryPolicyExecutor(new YdbRetryPolicy(new YdbRetryPolicyConfig { MaxAttempts = 5 }))
94+
);
95+
var inMemoryStream = await retryableSession.ExecuteQuery(
96+
"SELECT * FROM session",
97+
new Dictionary<string, TypedValue>(),
98+
new GrpcRequestSettings(),
99+
null
100+
);
101+
102+
var hasItem = await inMemoryStream.MoveNextAsync();
103+
Assert.True(hasItem);
104+
Assert.False(inMemoryStream.Current.Status.IsNotSuccess());
105+
106+
Assert.Equal(3, factory.SessionOpenedCount);
107+
}
108+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbParameterTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,15 @@ public async Task Decimal_WhenDecimalIsScaleAndPrecision_ReturnDecimal(string? v
218218
[InlineData("-98.765", 5, 2)]
219219
[InlineData("100.01", 5, 1)]
220220
[InlineData("100000", 5, 0)]
221+
[InlineData("12345678901", 10, 0)]
221222
public async Task Decimal_WhenNotRepresentableBySystemDecimal_ThrowsOverflowException(string value, byte precision,
222223
byte scale)
223224
{
224225
await using var ydbConnection = await CreateOpenConnectionAsync();
225226
var tableName = $"DecimalOverflowTable__{Random.Shared.Next()}";
226227
var decimalValue = decimal.Parse(value, CultureInfo.InvariantCulture);
227228
await new YdbCommand(ydbConnection)
228-
{ CommandText = $"CREATE TABLE {tableName}(d Decimal(5,2), PRIMARY KEY(d))" }
229+
{ CommandText = $"CREATE TABLE {tableName}(d Decimal({precision}, {scale}), PRIMARY KEY(d))" }
229230
.ExecuteNonQueryAsync();
230231

231232
Assert.Equal($"Value {decimalValue} does not fit Decimal({precision}, {scale})",
@@ -234,7 +235,7 @@ public async Task Decimal_WhenNotRepresentableBySystemDecimal_ThrowsOverflowExce
234235
CommandText = $"INSERT INTO {tableName}(d) VALUES (@d);",
235236
Parameters =
236237
{
237-
new YdbParameter("d", DbType.Decimal, 123.456m)
238+
new YdbParameter("d", DbType.Decimal, decimal.Parse(value, CultureInfo.InvariantCulture))
238239
{ Value = decimalValue, Precision = precision, Scale = scale }
239240
}
240241
}.ExecuteNonQueryAsync())).Message);

0 commit comments

Comments
 (0)