Skip to content

Commit 741e28b

Browse files
committed
feat(ado): add ImplicitSession with PoolManager integration and stress-tested lifecycle
1 parent c6d6a59 commit 741e28b

File tree

6 files changed

+288
-149
lines changed

6 files changed

+288
-149
lines changed

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ namespace Ydb.Sdk.Ado.Session;
55

66
internal class ImplicitSession : ISession
77
{
8-
private readonly Action? _onClose;
8+
private readonly ImplicitSessionSource _source;
99

10-
public ImplicitSession(IDriver driver, Action? onClose = null)
10+
public ImplicitSession(IDriver driver, ImplicitSessionSource source)
1111
{
12-
Driver = driver;
13-
_onClose = onClose;
12+
Driver = driver ?? throw new ArgumentNullException(nameof(driver));
13+
_source = source ?? throw new ArgumentNullException(nameof(source));
1414
}
1515

1616
public IDriver Driver { get; }
@@ -50,7 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code)
5050
{
5151
}
5252

53-
public void Close() => _onClose?.Invoke();
53+
public void Close() => _source.ReleaseLease();
5454

5555
private static YdbException NotSupportedTransaction =>
5656
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");

src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
2020
if (!TryAcquireLease())
2121
throw new ObjectDisposedException(nameof(ImplicitSessionSource));
2222

23-
return new ValueTask<ISession>(new ImplicitSession(_driver, ReleaseLease));
23+
return new ValueTask<ISession>(new ImplicitSession(_driver, this));
2424
}
2525

2626
private bool TryAcquireLease()
@@ -39,23 +39,21 @@ private bool TryAcquireLease()
3939
return true;
4040
}
4141

42-
private void ReleaseLease()
43-
{
44-
if (Interlocked.Decrement(ref _activeLeaseCount) == 0)
45-
{
46-
_onBecameEmpty?.Invoke();
47-
}
48-
}
42+
internal void ReleaseLease() => Interlocked.Decrement(ref _activeLeaseCount);
4943

5044
public ValueTask DisposeAsync()
5145
{
5246
Interlocked.Exchange(ref _isDisposed, 1);
5347

54-
if (Volatile.Read(ref _activeLeaseCount) == 0)
48+
var spinner = new SpinWait();
49+
while (Volatile.Read(ref _activeLeaseCount) != 0)
5550
{
56-
_onBecameEmpty?.Invoke();
51+
spinner.SpinOnce();
5752
}
5853

54+
_onBecameEmpty?.Invoke();
55+
5956
return ValueTask.CompletedTask;
6057
}
58+
6159
}

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -216,19 +216,9 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
216216
throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)");
217217
}
218218

219-
var execResult = await YdbConnection.Session.ExecuteQuery(
220-
preparedSql.ToString(),
221-
ydbParameters,
222-
execSettings,
223-
transaction?.TransactionControl
224-
);
225-
226-
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
227-
execResult,
228-
YdbConnection.OnNotSuccessStatusCode,
229-
transaction,
230-
cancellationToken
231-
);
219+
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(await YdbConnection.Session.ExecuteQuery(
220+
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl
221+
), YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken);
232222

233223
YdbConnection.LastReader = ydbDataReader;
234224
YdbConnection.LastCommand = CommandText;
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
using Moq;
2+
using Xunit;
3+
using Ydb.Sdk.Ado.Session;
4+
5+
namespace Ydb.Sdk.Ado.Tests.Session;
6+
7+
public class YdbImplicitStressTests : TestBase
8+
{
9+
private static IDriver DummyDriver() => new Mock<IDriver>(MockBehavior.Strict).Object;
10+
11+
[Fact(Timeout = 30_000)]
12+
public async Task Dispose_WaitsForAllLeases_AndSignalsOnEmptyExactlyOnce()
13+
{
14+
var driver = DummyDriver();
15+
16+
var onEmptyCalls = 0;
17+
var opened = 0;
18+
var closed = 0;
19+
20+
var source = new ImplicitSessionSource(driver, onEmpty: () => Interlocked.Increment(ref onEmptyCalls));
21+
22+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
23+
24+
var workers = Enumerable.Range(0, 200).Select(async i =>
25+
{
26+
var rnd = new Random(unchecked(i ^ Environment.TickCount));
27+
for (var j = 0; j < 10; j++)
28+
{
29+
try
30+
{
31+
var s = await source.OpenSession(cts.Token);
32+
Interlocked.Increment(ref opened);
33+
34+
await Task.Delay(rnd.Next(0, 5), cts.Token);
35+
36+
s.Close();
37+
Interlocked.Increment(ref closed);
38+
}
39+
catch (ObjectDisposedException)
40+
{
41+
}
42+
}
43+
}).ToArray();
44+
45+
var disposer = Task.Run(async () =>
46+
{
47+
await Task.Delay(10, cts.Token);
48+
await source.DisposeAsync();
49+
}, cts.Token);
50+
51+
await Task.WhenAll(workers.Append(disposer));
52+
53+
Assert.True(opened > 0);
54+
Assert.Equal(opened, closed);
55+
Assert.Equal(1, Volatile.Read(ref onEmptyCalls));
56+
57+
await Assert.ThrowsAsync<ObjectDisposedException>(
58+
() => source.OpenSession(CancellationToken.None).AsTask());
59+
}
60+
61+
[Fact(Timeout = 30_000)]
62+
public async Task Stress_Counts_AreBalanced()
63+
{
64+
var driver = DummyDriver();
65+
66+
var opened = 0;
67+
var closed = 0;
68+
var onEmptyCalls = 0;
69+
70+
var source = new ImplicitSessionSource(driver, onEmpty: () => Interlocked.Increment(ref onEmptyCalls));
71+
72+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
73+
74+
var workers = Enumerable.Range(0, 200).Select(async i =>
75+
{
76+
var rnd = new Random(unchecked(i ^ Environment.TickCount));
77+
for (var j = 0; j < 10; j++)
78+
{
79+
try
80+
{
81+
var s = await source.OpenSession(cts.Token);
82+
Interlocked.Increment(ref opened);
83+
84+
await Task.Delay(rnd.Next(0, 3), cts.Token);
85+
86+
s.Close();
87+
Interlocked.Increment(ref closed);
88+
}
89+
catch (ObjectDisposedException)
90+
{
91+
}
92+
}
93+
}).ToArray();
94+
95+
var disposer = Task.Run(async () => await source.DisposeAsync(), cts.Token);
96+
97+
await Task.WhenAll(workers.Append(disposer));
98+
99+
Assert.Equal(opened, closed);
100+
Assert.Equal(1, onEmptyCalls);
101+
Assert.True(opened > 0);
102+
103+
await Assert.ThrowsAsync<ObjectDisposedException>(
104+
() => source.OpenSession(CancellationToken.None).AsTask());
105+
}
106+
107+
[Fact(Timeout = 30_000)]
108+
public async Task Open_RacingWithDispose_StateRemainsConsistent()
109+
{
110+
var driver = DummyDriver();
111+
112+
var onEmptyCalls = 0;
113+
var source = new ImplicitSessionSource(driver, onEmpty: () => Interlocked.Increment(ref onEmptyCalls));
114+
115+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
116+
117+
var opens = Enumerable.Range(0, 1000).Select(async _ =>
118+
{
119+
try
120+
{
121+
var s = await source.OpenSession(cts.Token);
122+
s.Close();
123+
return 1;
124+
}
125+
catch (ObjectDisposedException)
126+
{
127+
return 0;
128+
}
129+
}).ToArray();
130+
131+
var disposeTask = Task.Run(async () =>
132+
{
133+
await Task.Yield();
134+
await source.DisposeAsync();
135+
}, cts.Token);
136+
137+
await Task.WhenAll(opens.Append(disposeTask));
138+
139+
Assert.Equal(1, Volatile.Read(ref onEmptyCalls));
140+
141+
await Assert.ThrowsAsync<ObjectDisposedException>(
142+
() => source.OpenSession(CancellationToken.None).AsTask());
143+
}
144+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
using Xunit;
2+
3+
namespace Ydb.Sdk.Ado.Tests.Session;
4+
5+
public class YdbImplictConnectionTests : TestBase
6+
{
7+
[Fact]
8+
public async Task ImplicitSession_SimpleScalar_Works()
9+
{
10+
await using var connection = CreateConnection();
11+
connection.ConnectionString += ";EnableImplicitSession=true";
12+
await connection.OpenAsync();
13+
14+
var cmd = connection.CreateCommand();
15+
cmd.CommandText = "SELECT 40 + 2;";
16+
var scalar = await cmd.ExecuteScalarAsync();
17+
Assert.Equal(42, Convert.ToInt32(scalar));
18+
}
19+
20+
[Fact]
21+
public async Task ImplicitSession_RepeatedScalars_WorksManyTimes()
22+
{
23+
await using var connection = CreateConnection();
24+
connection.ConnectionString += ";EnableImplicitSession=true";
25+
await connection.OpenAsync();
26+
27+
for (var i = 0; i < 30; i++)
28+
{
29+
var cmd = connection.CreateCommand();
30+
cmd.CommandText = $"SELECT {i};";
31+
var scalar = await cmd.ExecuteScalarAsync();
32+
Assert.Equal(i, Convert.ToInt32(scalar));
33+
}
34+
}
35+
36+
[Fact]
37+
public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck()
38+
{
39+
using var connection = CreateConnection();
40+
connection.ConnectionString += ";EnableImplicitSession=true";
41+
connection.Open();
42+
43+
var cmd = connection.CreateCommand();
44+
cmd.CommandText = "SELECT 1; SELECT 1;";
45+
using var reader = cmd.ExecuteReader();
46+
47+
var ex = Assert.Throws<YdbOperationInProgressException>(() => cmd.ExecuteReader());
48+
Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message);
49+
}
50+
51+
[Fact]
52+
public async Task ImplicitSession_DisallowsTransactions_And_AllowsNonTransactionalCommands()
53+
{
54+
var table = $"Implicit_{Guid.NewGuid():N}";
55+
56+
await using var connection = CreateConnection();
57+
connection.ConnectionString += ";EnableImplicitSession=true";
58+
await connection.OpenAsync();
59+
60+
try
61+
{
62+
await using (var create = connection.CreateCommand())
63+
{
64+
create.CommandText = $"""
65+
CREATE TABLE {table} (
66+
Id Int32,
67+
Name Text,
68+
PRIMARY KEY (Id)
69+
)
70+
""";
71+
await create.ExecuteNonQueryAsync();
72+
}
73+
74+
await using (var insert = connection.CreateCommand())
75+
{
76+
insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (1, 'A');";
77+
await insert.ExecuteNonQueryAsync();
78+
}
79+
80+
var tx = connection.BeginTransaction();
81+
await using (var insertTx = connection.CreateCommand())
82+
{
83+
insertTx.Transaction = tx;
84+
insertTx.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (2, 'B');";
85+
var ex = await Assert.ThrowsAsync<YdbException>(async () => await insertTx.ExecuteNonQueryAsync());
86+
Assert.Contains("Transactions are not supported in implicit sessions", ex.Message);
87+
}
88+
89+
await tx.RollbackAsync();
90+
91+
await using (var check = connection.CreateCommand())
92+
{
93+
check.CommandText = $"SELECT COUNT(*) FROM {table};";
94+
var count = Convert.ToInt32(await check.ExecuteScalarAsync());
95+
Assert.Equal(1, count);
96+
}
97+
}
98+
finally
99+
{
100+
await using var drop = connection.CreateCommand();
101+
drop.CommandText = $"DROP TABLE {table}";
102+
await drop.ExecuteNonQueryAsync();
103+
}
104+
}
105+
106+
[Fact]
107+
public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst()
108+
{
109+
await using var connection = CreateConnection();
110+
connection.ConnectionString += ";EnableImplicitSession=true";
111+
await connection.OpenAsync();
112+
113+
var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" };
114+
using var cts = new CancellationTokenSource();
115+
116+
var reader = await cmd.ExecuteReaderAsync(cts.Token);
117+
118+
await reader.ReadAsync(cts.Token);
119+
Assert.Equal(1, reader.GetValue(0));
120+
Assert.True(await reader.NextResultAsync(cts.Token));
121+
122+
await cts.CancelAsync();
123+
124+
await reader.ReadAsync(cts.Token);
125+
Assert.Equal(1, reader.GetValue(0));
126+
Assert.False(await reader.NextResultAsync());
127+
}
128+
}

0 commit comments

Comments
 (0)