Skip to content

Commit be64b16

Browse files
updates
1 parent 4498eee commit be64b16

File tree

8 files changed

+51
-41
lines changed

8 files changed

+51
-41
lines changed

src/Ydb.Sdk/src/Ado/Session/ISession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
2323

2424
void OnNotSuccessStatusCode(StatusCode code);
2525

26-
ValueTask Close();
26+
void Close();
2727
}

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public void OnNotSuccessStatusCode(StatusCode code)
4848
{
4949
}
5050

51-
public ValueTask Close() => ValueTask.CompletedTask;
51+
public void Close()
52+
{
53+
}
5254

5355
private static YdbException NotSupportedTransaction =>
5456
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,13 @@ private void WakeUpWaiter()
185185
} // wake up waiter!
186186
}
187187

188-
public ValueTask Return(T session)
188+
public void Return(T session)
189189
{
190190
if (session.IsBroken || IsDisposed)
191191
{
192192
CloseSession(session);
193193

194-
return IsDisposed ? TryDisposeCore() : ValueTask.CompletedTask;
194+
return;
195195
}
196196

197197
// Statement order is important
@@ -202,15 +202,13 @@ public ValueTask Return(T session)
202202
{
203203
if (waiter.TrySetResult(session))
204204
{
205-
return ValueTask.CompletedTask;
205+
return;
206206
}
207207
}
208208

209209
_idleSessions.Push(session);
210210

211211
WakeUpWaiter();
212-
213-
return ValueTask.CompletedTask;
214212
}
215213

216214
private void CloseSession(T session)
@@ -273,11 +271,24 @@ public async ValueTask DisposeAsync()
273271
}
274272
}
275273

276-
await TryDisposeCore();
277-
}
274+
var spinWait = new SpinWait();
275+
do
276+
{
277+
for (var i = 0; i < _maxSessionSize; i++)
278+
{
279+
var session = Volatile.Read(ref _sessions[i]);
278280

279-
private ValueTask TryDisposeCore() =>
280-
_numSessions == 0 ? _sessionFactory.DisposeAsync() : ValueTask.CompletedTask;
281+
if (session != null && session.CompareAndSet(PoolingSessionState.In, PoolingSessionState.Clean))
282+
{
283+
CloseSession(session);
284+
}
285+
}
286+
287+
spinWait.SpinOnce();
288+
} while (_numSessions > 0);
289+
290+
await _sessionFactory.DisposeAsync();
291+
}
281292
}
282293

283294
internal interface IPoolingSessionFactory<T> : IAsyncDisposable where T : PoolingSessionBase<T>
@@ -330,5 +341,5 @@ public abstract ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
330341

331342
public abstract void OnNotSuccessStatusCode(StatusCode code);
332343

333-
public ValueTask Close() => _source.Return((T)this);
344+
public void Close() => _source.Return((T)this);
334345
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public override async Task CloseAsync()
143143
}
144144
finally
145145
{
146-
await _session.Close();
146+
_session.Close();
147147
}
148148
}
149149

src/Ydb.Sdk/src/Services/Query/SessionPool.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
180180

181181
public new void OnNotSuccessStatusCode(StatusCode code) => base.OnNotSuccessStatusCode(code);
182182

183-
public ValueTask Close() => Release();
183+
public void Close() => Release().AsTask().GetAwaiter().GetResult();
184184

185185
public async Task CommitTransaction(string txId, CancellationToken cancellationToken = default)
186186
{

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void Setup()
2525
public async Task SingleThreaded_OpenClose()
2626
{
2727
var session = await _poolingSessionSource.OpenSession();
28-
await session.Close();
28+
session.Close();
2929
}
3030

3131
[Benchmark]
@@ -39,7 +39,7 @@ public async Task MultiThreaded_OpenClose()
3939
{
4040
var session = await _poolingSessionSource.OpenSession();
4141
await Task.Yield();
42-
await session.Close();
42+
session.Close();
4343
});
4444
}
4545

@@ -58,7 +58,7 @@ public async Task HighContention_OpenClose()
5858
{
5959
var session = await _poolingSessionSource.OpenSession();
6060
await Task.Yield();
61-
await session.Close();
61+
session.Close();
6262
});
6363
}
6464

@@ -79,7 +79,7 @@ public async Task SessionReuse_Pattern()
7979
{
8080
var session = await _poolingSessionSource.OpenSession();
8181
await Task.Yield();
82-
await session.Close();
82+
session.Close();
8383
}
8484
});
8585
}
@@ -102,7 +102,7 @@ public async Task SessionReuse_HighContention_Pattern()
102102
{
103103
var session = await _poolingSessionSource.OpenSession();
104104
await Task.Yield();
105-
await session.Close();
105+
session.Close();
106106
}
107107
});
108108
}
@@ -124,7 +124,7 @@ public async Task SessionReuse_HighIterations_Pattern()
124124
{
125125
var session = await _poolingSessionSource.OpenSession();
126126
await Task.Yield();
127-
await session.Close();
127+
session.Close();
128128
}
129129
});
130130
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public async Task Reuse_Session_Before_Creating_new()
2020
new YdbConnectionStringBuilder());
2121
var session = await sessionSource.OpenSession();
2222
var sessionId = session.SessionId();
23-
await session.Close();
23+
session.Close();
2424
session = await sessionSource.OpenSession();
2525
Assert.Equal(sessionId, session.SessionId());
2626
}
@@ -58,7 +58,7 @@ public async Task Creating_Session_Throw_Exception()
5858
// ReSharper disable once AccessToModifiedClosure
5959
Interlocked.Increment(ref countSuccess);
6060
Assert.True(session.SessionId() > maxSessionSize * 2);
61-
await session.Close();
61+
session.Close();
6262
}
6363
catch (YdbException e)
6464
{
@@ -95,7 +95,7 @@ public async Task HighContention_OpenClose_NotCanceledException()
9595
var session = await sessionSource.OpenSession();
9696
Assert.True(session.SessionId() <= maxSessionSize);
9797
await Task.Yield();
98-
await session.Close();
98+
session.Close();
9999
});
100100
}
101101

@@ -124,26 +124,25 @@ public async Task DisposeAsync_Cancel_WaitersSession()
124124
waitingSessionTasks.Add(Task.Run(async () =>
125125
{
126126
var session = await sessionSource.OpenSession();
127-
await session.Close();
127+
session.Close();
128128
}));
129129
}
130130

131-
await sessionSource.DisposeAsync();
131+
var disposeTask = Task.Run(async () => await sessionSource.DisposeAsync());
132132
Assert.Equal(maxSessionSize, mockFactory.NumSession);
133-
Assert.Equal("Session Source is disposed.",
134-
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.OpenSession())).Message);
135133

136134
for (var i = 0; i < maxSessionSize; i++)
137135
{
138-
await Assert.ThrowsAsync<TaskCanceledException>(() => waitingSessionTasks[i]);
136+
openSessions[i].Close();
139137
}
140138

139+
await disposeTask;
140+
Assert.Equal(0, mockFactory.NumSession);
141141
for (var i = 0; i < maxSessionSize; i++)
142142
{
143-
await openSessions[i].Close();
143+
await Assert.ThrowsAnyAsync<Exception>(() => waitingSessionTasks[i]);
144144
}
145145

146-
Assert.Equal(0, mockFactory.NumSession);
147146
Assert.Equal("Session Source is disposed.",
148147
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.OpenSession())).Message);
149148
}
@@ -184,7 +183,7 @@ public async Task StressTest_DisposeAsync_Close_Driver()
184183
{
185184
var session = await sessionSource.OpenSession();
186185
await Task.Yield();
187-
await session.Close();
186+
session.Close();
188187
}
189188
catch (YdbException e)
190189
{
@@ -226,7 +225,7 @@ public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCoun
226225

227226
foreach (var it in openSessions)
228227
{
229-
await it.Close();
228+
it.Close();
230229
}
231230

232231
await Task.Delay(TimeSpan.FromSeconds(idleTimeoutSeconds * 5)); // cleaning idle sessions
@@ -240,7 +239,7 @@ public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCoun
240239

241240
foreach (var it in openSessionTasks)
242241
{
243-
await (await it).Close();
242+
(await it).Close();
244243
}
245244

246245
Assert.Equal(minSessionSize, mockFactory.NumSession);
@@ -273,7 +272,7 @@ public async Task StressTest_HighContention_OpenClose()
273272
while (!cts.IsCancellationRequested)
274273
{
275274
var session = await sessionSource.OpenSession(cts.Token);
276-
await session.Close();
275+
session.Close();
277276
await Task.Delay(Random.Shared.Next(maxSessionSize), cts.Token);
278277
}
279278
}
@@ -302,7 +301,7 @@ public async Task Get_Session_From_Exhausted_Pool()
302301
cts.CancelAfter(500);
303302

304303
await Assert.ThrowsAsync<TaskCanceledException>(async () => await sessionSource.OpenSession(cts.Token));
305-
await session.Close();
304+
session.Close();
306305

307306
Assert.Equal(1, mockFactory.NumSession);
308307
Assert.Equal(1, mockFactory.SessionOpenedCount);
@@ -323,7 +322,7 @@ public async Task Return_IsBroken_Session()
323322
for (var it = 0; it < maxSessionSize * 2; it++)
324323
{
325324
var session = await sessionSource.OpenSession();
326-
await session.Close();
325+
session.Close();
327326
}
328327

329328
Assert.Equal(0, mockFactory.NumSession);
@@ -352,7 +351,7 @@ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession()
352351

353352
foreach (var session in openSessions)
354353
{
355-
await session.Close();
354+
session.Close();
356355
}
357356

358357
Assert.Equal(maxSessionSize, mockFactory.NumSession);
@@ -362,7 +361,7 @@ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession()
362361
{
363362
var session = await sessionSource.OpenSession();
364363
isBroken = false;
365-
await session.Close();
364+
session.Close();
366365
}
367366

368367
Assert.Equal(1, mockFactory.NumSession);

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,10 +293,8 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
293293
ydbConnection.ConnectionString = connectionString;
294294
await ydbConnection.OpenAsync();
295295
}
296-
catch (YdbException e)
296+
catch (YdbException)
297297
{
298-
Assert.Equal(StatusCode.Unspecified, e.Code);
299-
Assert.Equal("Session Source is disposed.", e.Message);
300298
Interlocked.Add(ref _counter, i);
301299
return;
302300
}

0 commit comments

Comments
 (0)