@@ -79,11 +79,7 @@ public async Task HighContention_OpenClose_NotCanceledException()
7979 {
8080 const int highContentionTasks = 100 ;
8181 const int maxSessionSize = highContentionTasks / 2 ;
82- var mockPoolingSessionFactory = new MockPoolingSessionFactory ( maxSessionSize )
83- {
84- Open = async _ => await Task . Yield ( )
85- } ;
86-
82+ var mockPoolingSessionFactory = new MockPoolingSessionFactory ( maxSessionSize ) ;
8783 var sessionSource = new PoolingSessionSource < MockPoolingSession > (
8884 mockPoolingSessionFactory , new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize }
8985 ) ;
@@ -215,10 +211,7 @@ public async Task StressTest_HighContention_OpenClose()
215211 const int highContentionTasks = maxSessionSize * 5 ;
216212
217213 var mockFactory = new MockPoolingSessionFactory ( maxSessionSize )
218- {
219- Open = async _ => await Task . Yield ( ) ,
220- IsBroken = ( ) => Random . Shared . NextDouble ( ) < 0.05
221- } ;
214+ { IsBroken = ( ) => Random . Shared . NextDouble ( ) < 0.05 } ;
222215 var settings = new YdbConnectionStringBuilder
223216 { MaxSessionPool = maxSessionSize , MinSessionPool = minSessionSize } ;
224217 var sessionSource = new PoolingSessionSource < MockPoolingSession > ( mockFactory , settings ) ;
@@ -245,6 +238,88 @@ public async Task StressTest_HighContention_OpenClose()
245238
246239 await Task . WhenAll ( workers ) ;
247240 }
241+
242+ [ Fact ]
243+ public async Task Get_Session_From_Exhausted_Pool ( )
244+ {
245+ var mockFactory = new MockPoolingSessionFactory ( 1 ) ;
246+ var settings = new YdbConnectionStringBuilder
247+ {
248+ MaxSessionPool = 1 ,
249+ MinSessionPool = 0
250+ } ;
251+
252+ var sessionSource = new PoolingSessionSource < MockPoolingSession > ( mockFactory , settings ) ;
253+ var session = await sessionSource . OpenSession ( ) ;
254+ var cts = new CancellationTokenSource ( ) ;
255+ cts . CancelAfter ( 500 ) ;
256+
257+ await Assert . ThrowsAsync < TaskCanceledException > ( async ( ) => await sessionSource . OpenSession ( cts . Token ) ) ;
258+ await session . Close ( ) ;
259+
260+ Assert . Equal ( 1 , mockFactory . NumSession ) ;
261+ Assert . Equal ( 1 , mockFactory . SessionOpenedCount ) ;
262+ }
263+
264+ [ Fact ]
265+ public async Task Return_IsBroken_Session ( )
266+ {
267+ const int maxSessionSize = 10 ;
268+ var mockFactory = new MockPoolingSessionFactory ( maxSessionSize ) { IsBroken = ( ) => true } ;
269+ var settings = new YdbConnectionStringBuilder
270+ {
271+ MaxSessionPool = maxSessionSize ,
272+ MinSessionPool = 0
273+ } ;
274+ var sessionSource = new PoolingSessionSource < MockPoolingSession > ( mockFactory , settings ) ;
275+
276+ for ( var it = 0 ; it < maxSessionSize * 2 ; it ++ )
277+ {
278+ var session = await sessionSource . OpenSession ( ) ;
279+ await session . Close ( ) ;
280+ }
281+
282+ Assert . Equal ( 0 , mockFactory . NumSession ) ;
283+ Assert . Equal ( maxSessionSize * 2 , mockFactory . SessionOpenedCount ) ;
284+ }
285+
286+ [ Fact ]
287+ public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession ( )
288+ {
289+ var isBroken = false ;
290+ const int maxSessionSize = 10 ;
291+ var mockFactory = new MockPoolingSessionFactory ( maxSessionSize ) { IsBroken = ( ) => isBroken } ;
292+ var settings = new YdbConnectionStringBuilder
293+ {
294+ MaxSessionPool = maxSessionSize ,
295+ MinSessionPool = 0
296+ } ;
297+ var sessionSource = new PoolingSessionSource < MockPoolingSession > ( mockFactory , settings ) ;
298+
299+ var openSessions = new List < ISession > ( ) ;
300+ for ( var it = 0 ; it < maxSessionSize ; it ++ )
301+ {
302+ openSessions . Add ( await sessionSource . OpenSession ( ) ) ;
303+ }
304+
305+ foreach ( var session in openSessions )
306+ {
307+ await session . Close ( ) ;
308+ }
309+
310+ Assert . Equal ( maxSessionSize , mockFactory . NumSession ) ;
311+
312+ isBroken = true ;
313+ for ( var it = 0 ; it < maxSessionSize ; it ++ )
314+ {
315+ var session = await sessionSource . OpenSession ( ) ;
316+ isBroken = false ;
317+ await session . Close ( ) ;
318+ }
319+
320+ Assert . Equal ( 1 , mockFactory . NumSession ) ;
321+ Assert . Equal ( maxSessionSize + 1 , mockFactory . SessionOpenedCount ) ;
322+ }
248323}
249324
250325internal static class ISessionExtension
@@ -261,26 +336,24 @@ internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFa
261336 internal int NumSession => Volatile . Read ( ref _numSession ) ;
262337
263338 internal Func < int , Task > Open { private get ; init ; } = _ => Task . CompletedTask ;
264-
265- internal Func < int , Task > DeleteSession { private get ; init ; } = _ => Task . CompletedTask ;
266-
267339 internal Func < bool > IsBroken { private get ; init ; } = ( ) => false ;
268-
269340 internal Func < ValueTask > Dispose { private get ; init ; } = ( ) => ValueTask . CompletedTask ;
270341
271342 public MockPoolingSession NewSession ( PoolingSessionSource < MockPoolingSession > source ) =>
272343 new ( source ,
273- sessionCountOpened =>
344+ async sessionCountOpened =>
274345 {
346+ await Open ( sessionCountOpened ) ;
347+
275348 Assert . True ( Interlocked . Increment ( ref _numSession ) <= maxSessionSize ) ;
276349
277- return Open ( sessionCountOpened ) ;
350+ await Task . Yield ( ) ;
278351 } ,
279- sessionCountOpened =>
352+ ( ) =>
280353 {
281- Interlocked . Decrement ( ref _numSession ) ;
354+ Assert . True ( Interlocked . Decrement ( ref _numSession ) >= 0 ) ;
282355
283- return DeleteSession ( sessionCountOpened ) ;
356+ return Task . CompletedTask ;
284357 } ,
285358 IsBroken ,
286359 Interlocked . Increment ( ref _sessionOpened )
@@ -292,7 +365,7 @@ public MockPoolingSession NewSession(PoolingSessionSource<MockPoolingSession> so
292365internal class MockPoolingSession (
293366 PoolingSessionSource < MockPoolingSession > source ,
294367 Func < int , Task > mockOpen ,
295- Func < int , Task > mockDeleteSession ,
368+ Func < Task > mockDeleteSession ,
296369 Func < bool > mockIsBroken ,
297370 int sessionNum
298371) : PoolingSessionBase < MockPoolingSession > ( source )
@@ -302,7 +375,7 @@ int sessionNum
302375 public override bool IsBroken => mockIsBroken ( ) ;
303376
304377 internal override Task Open ( CancellationToken cancellationToken ) => mockOpen ( sessionNum ) ;
305- internal override Task DeleteSession ( ) => mockDeleteSession ( sessionNum ) ;
378+ internal override Task DeleteSession ( ) => mockDeleteSession ( ) ;
306379
307380 public override ValueTask < IServerStream < ExecuteQueryResponsePart > > ExecuteQuery (
308381 string query ,
0 commit comments