1- using Microsoft . Extensions . Logging ;
2- using Microsoft . Extensions . Options ;
3- using System . Collections . Concurrent ;
1+ using System . Collections . Concurrent ;
42using System . Diagnostics ;
3+ using System . Diagnostics . CodeAnalysis ;
54using System . Runtime . InteropServices ;
5+ using Microsoft . Extensions . Logging ;
6+ using Microsoft . Extensions . Options ;
67
78namespace ModelContextProtocol . AspNetCore ;
89
910internal sealed partial class StatefulSessionManager (
1011 IOptions < HttpServerTransportOptions > httpServerTransportOptions ,
11- ILogger < StatefulSessionManager > logger ) : ConcurrentDictionary < string , StreamableHttpSession > ( StringComparer . Ordinal )
12+ ILogger < StatefulSessionManager > logger )
1213{
1314 // Workaround for https://github.com/dotnet/runtime/issues/91121. This is fixed in .NET 9 and later.
1415 private readonly ILogger _logger = logger ;
1516
17+ private readonly ConcurrentDictionary < string , StreamableHttpSession > _sessions = new ( StringComparer . Ordinal ) ;
18+
1619 private readonly TimeProvider _timeProvider = httpServerTransportOptions . Value . TimeProvider ;
1720 private readonly TimeSpan _idleTimeout = httpServerTransportOptions . Value . IdleTimeout ;
1821 private readonly long _idleTimeoutTicks = httpServerTransportOptions . Value . IdleTimeout . Ticks ;
1922 private readonly int _maxIdleSessionCount = httpServerTransportOptions . Value . MaxIdleSessionCount ;
2023
21- private readonly SemaphoreSlim _idlePruningLock = new ( 1 , 1 ) ;
22- private readonly List < long > _idleTimeStamps = [ ] ;
24+ private readonly object _idlePruningLock = new ( ) ;
25+ private readonly List < long > _idleTimestamps = [ ] ;
2326 private readonly List < string > _idleSessionIds = [ ] ;
2427 private int _nextIndexToPrune ;
2528
@@ -30,62 +33,72 @@ internal sealed partial class StatefulSessionManager(
3033 public void IncrementIdleSessionCount ( ) => Interlocked . Increment ( ref _currentIdleSessionCount ) ;
3134 public void DecrementIdleSessionCount ( ) => Interlocked . Decrement ( ref _currentIdleSessionCount ) ;
3235
36+ public bool TryGetValue ( string key , [ NotNullWhen ( true ) ] out StreamableHttpSession ? value ) => _sessions . TryGetValue ( key , out value ) ;
37+ public bool TryRemove ( string key , [ NotNullWhen ( true ) ] out StreamableHttpSession ? value ) => _sessions . TryRemove ( key , out value ) ;
38+
3339 public async ValueTask StartNewSessionAsync ( StreamableHttpSession newSession , CancellationToken cancellationToken )
3440 {
35- if ( TryAddSessionImmediately ( newSession ) )
41+ while ( ! TryAddSessionImmediately ( newSession ) )
3642 {
37- return ;
38- }
43+ StreamableHttpSession ? sessionToPrune = null ;
3944
40- await _idlePruningLock . WaitAsync ( cancellationToken ) ;
41- try
42- {
43- while ( ! TryAddSessionImmediately ( newSession ) )
45+ lock ( _idlePruningLock )
4446 {
4547 EnsureIdleSessionsSortedUnsynchronized ( ) ;
4648
4749 while ( _nextIndexToPrune < _idleSessionIds . Count )
4850 {
4951 var pruneId = _idleSessionIds [ _nextIndexToPrune ++ ] ;
50- if ( TryGetValue ( pruneId , out var sessionToPrune ) )
52+ if ( TryGetValue ( pruneId , out sessionToPrune ) )
5153 {
52- if ( ! sessionToPrune . IsActive && TryRemove ( pruneId , out _ ) )
54+ if ( ! sessionToPrune . IsActive && _sessions . TryRemove ( pruneId , out sessionToPrune ) )
5355 {
5456 LogIdleSessionLimit ( pruneId , _maxIdleSessionCount ) ;
55-
56- // We're intentionally waiting for the idle session to be disposed before releasing the _idlePruningLock to
57- // ensure new sessions are not created faster than they're removed when we're at or above the maximum idle session count.
58- await DisposeSessionAsync ( sessionToPrune ) ;
59-
60- // Take one last chance to check if the initialize request was aborted before we incur the cost of managing a new session.
61- cancellationToken . ThrowIfCancellationRequested ( ) ;
62- AddSession ( newSession ) ;
63- return ;
57+ break ;
6458 }
6559 }
6660 }
6761
68- // If we couldn't find any active idle sessions to dispose, start another full prune to repopulate _idleSessionIds.
69- PruneIdleSessionsUnsynchronized ( ) ;
70-
71- if ( _idleSessionIds . Count == 0 && Interlocked . Read ( ref _currentIdleSessionCount ) >= _maxIdleSessionCount )
62+ if ( sessionToPrune is null )
7263 {
73- // This indicates all idle sessions are in the process of being disposed which should not happen during normal operation.
74- // Since there are no idle sessions to prune right now, log a critical error and create the new session anyway.
75- LogTooManyIdleSessionsClosingConcurrently ( newSession . Id , _maxIdleSessionCount , Interlocked . Read ( ref _currentIdleSessionCount ) ) ;
76- AddSession ( newSession ) ;
77- return ;
64+ // If we couldn't find any active idle sessions to dispose, start another full prune to repopulate _idleSessionIds.
65+ PruneIdleSessionsUnsynchronized ( ) ;
66+
67+ if ( _idleSessionIds . Count > 0 )
68+ {
69+ continue ;
70+ }
71+ else
72+ {
73+ if ( Volatile . Read ( ref _currentIdleSessionCount ) >= _maxIdleSessionCount )
74+ {
75+ // This indicates all idle sessions are in the process of being disposed which should not happen during normal operation.
76+ // Since there are no idle sessions to prune right now, log a critical error and create the new session anyway.
77+ LogTooManyIdleSessionsClosingConcurrently ( newSession . Id , _maxIdleSessionCount , Volatile . Read ( ref _currentIdleSessionCount ) ) ;
78+ }
79+
80+ AddSession ( newSession ) ;
81+ return ;
82+ }
7883 }
7984 }
80- }
81- catch
82- {
83- await newSession . DisposeAsync ( ) ;
84- throw ;
85- }
86- finally
87- {
88- _idlePruningLock . Release ( ) ;
85+
86+ try
87+ {
88+ // Since we're at or above the maximum idle session count, we're intentionally waiting for the idle session to be disposed
89+ // before adding a new session to the dictionary to ensure sessions not created faster than they're removed.
90+ await DisposeSessionAsync ( sessionToPrune ) ;
91+
92+ // Take one last chance to check if the initialize request was aborted before we incur the cost of managing a new session.
93+ cancellationToken . ThrowIfCancellationRequested ( ) ;
94+ AddSession ( newSession ) ;
95+ return ;
96+ }
97+ catch
98+ {
99+ await newSession . DisposeAsync ( ) ;
100+ throw ;
101+ }
89102 }
90103 }
91104
@@ -95,15 +108,10 @@ public async ValueTask StartNewSessionAsync(StreamableHttpSession newSession, Ca
95108 /// </summary>
96109 public async Task PruneIdleSessionsAsync ( CancellationToken cancellationToken )
97110 {
98- await _idlePruningLock . WaitAsync ( cancellationToken ) ;
99- try
111+ lock ( _idlePruningLock )
100112 {
101113 PruneIdleSessionsUnsynchronized ( ) ;
102114 }
103- finally
104- {
105- _idlePruningLock . Release ( ) ;
106- }
107115 }
108116
109117 private void PruneIdleSessionsUnsynchronized ( )
@@ -116,11 +124,11 @@ private void PruneIdleSessionsUnsynchronized()
116124
117125 // We clear the lists at the start of pruning rather than the end so we can use them between runs
118126 // to find the most idle sessions to remove one-at-a-time if necessary to make room for new sessions.
119- _idleTimeStamps . Clear ( ) ;
127+ _idleTimestamps . Clear ( ) ;
120128 _idleSessionIds . Clear ( ) ;
121129 _nextIndexToPrune = - 1 ;
122130
123- foreach ( var ( _, session ) in this )
131+ foreach ( var ( _, session ) in _sessions )
124132 {
125133 if ( session . IsActive || session . SessionClosed . IsCancellationRequested )
126134 {
@@ -136,11 +144,11 @@ private void PruneIdleSessionsUnsynchronized()
136144 }
137145
138146 // Add the timestamp and the session
139- _idleTimeStamps . Add ( session . LastActivityTicks ) ;
147+ _idleTimestamps . Add ( session . LastActivityTicks ) ;
140148 _idleSessionIds . Add ( session . Id ) ;
141149 }
142150
143- if ( _idleTimeStamps . Count > _maxIdleSessionCount )
151+ if ( _idleTimestamps . Count > _maxIdleSessionCount )
144152 {
145153 // Sort only if the maximum is breached and sort solely by the timestamp.
146154 EnsureIdleSessionsSortedUnsynchronized ( ) ;
@@ -163,7 +171,7 @@ private void EnsureIdleSessionsSortedUnsynchronized()
163171 return ;
164172 }
165173
166- var timestamps = CollectionsMarshal . AsSpan ( _idleTimeStamps ) ;
174+ var timestamps = CollectionsMarshal . AsSpan ( _idleTimestamps ) ;
167175 timestamps . Sort ( CollectionsMarshal . AsSpan ( _idleSessionIds ) ) ;
168176 _nextIndexToPrune = 0 ;
169177 }
@@ -175,9 +183,9 @@ public async Task DisposeAllSessionsAsync()
175183 {
176184 List < Task > disposeSessionTasks = [ ] ;
177185
178- foreach ( var ( sessionKey , _) in this )
186+ foreach ( var ( sessionKey , _) in _sessions )
179187 {
180- if ( TryRemove ( sessionKey , out var session ) )
188+ if ( _sessions . TryRemove ( sessionKey , out var session ) )
181189 {
182190 disposeSessionTasks . Add ( DisposeSessionAsync ( session ) ) ;
183191 }
@@ -188,7 +196,7 @@ public async Task DisposeAllSessionsAsync()
188196
189197 private bool TryAddSessionImmediately ( StreamableHttpSession session )
190198 {
191- if ( Interlocked . Read ( ref _currentIdleSessionCount ) < _maxIdleSessionCount )
199+ if ( Volatile . Read ( ref _currentIdleSessionCount ) < _maxIdleSessionCount )
192200 {
193201 AddSession ( session ) ;
194202 return true ;
@@ -199,15 +207,15 @@ private bool TryAddSessionImmediately(StreamableHttpSession session)
199207
200208 private void AddSession ( StreamableHttpSession session )
201209 {
202- if ( ! TryAdd ( session . Id , session ) )
210+ if ( ! _sessions . TryAdd ( session . Id , session ) )
203211 {
204212 throw new UnreachableException ( $ "Unreachable given good entropy! Session with ID '{ session . Id } ' has already been created.") ;
205213 }
206214 }
207215
208216 private void RemoveAndCloseSession ( string sessionId )
209217 {
210- if ( ! TryRemove ( sessionId , out var session ) )
218+ if ( ! _sessions . TryRemove ( sessionId , out var session ) )
211219 {
212220 return ;
213221 }
0 commit comments