@@ -18,7 +18,7 @@ internal partial class RemoteJSRuntime : JSRuntime
1818 private readonly CircuitOptions _options ;
1919 private readonly ILogger < RemoteJSRuntime > _logger ;
2020 private CircuitClientProxy _clientProxy ;
21- private readonly ConcurrentDictionary < long , DotNetStreamReference > _pendingDotNetToJSStreams = new ( ) ;
21+ private readonly ConcurrentDictionary < long , CancelableDotNetStreamReference > _pendingDotNetToJSStreams = new ( ) ;
2222 private bool _permanentlyDisconnected ;
2323 private readonly long _maximumIncomingBytes ;
2424 private int _byteArraysToBeRevivedTotalBytes ;
@@ -152,21 +152,27 @@ protected override void ReceiveByteArray(int id, byte[] data)
152152
153153 protected override async Task TransmitStreamAsync ( long streamId , DotNetStreamReference dotNetStreamReference )
154154 {
155- if ( ! _pendingDotNetToJSStreams . TryAdd ( streamId , dotNetStreamReference ) )
155+ var cancelableStreamReference = new CancelableDotNetStreamReference ( dotNetStreamReference ) ;
156+ if ( _pendingDotNetToJSStreams . TryAdd ( streamId , cancelableStreamReference ) )
156157 {
157158 throw new ArgumentException ( $ "The stream { streamId } is already pending.") ;
158159 }
159160
160161 // SignalR only supports streaming being initiated from the JS side, so we have to ask it to
161162 // start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up
162163 // and discard it.
163- var cancellationToken = new CancellationTokenSource ( TimeSpan . FromSeconds ( 10 ) ) . Token ;
164- cancellationToken . Register ( ( ) =>
164+ CancellationTokenSource cancellationTokenSource = new ( TimeSpan . FromSeconds ( 10 ) ) ;
165+
166+ // Store CTS to dispose later.
167+ cancelableStreamReference . CancellationTokenSource = cancellationTokenSource ;
168+
169+ cancellationTokenSource . Token . Register ( ( ) =>
165170 {
166171 // If by now the stream hasn't been claimed for sending, stop tracking it
167- if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var timedOutStream ) && ! timedOutStream . LeaveOpen )
172+ if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var timedOutCancelableStreamReference ) )
168173 {
169- timedOutStream . Stream . Dispose ( ) ;
174+ timedOutCancelableStreamReference . StreamReference . Dispose ( ) ;
175+ timedOutCancelableStreamReference . CancellationTokenSource ? . Dispose ( ) ;
170176 }
171177 } ) ;
172178
@@ -175,8 +181,13 @@ protected override async Task TransmitStreamAsync(long streamId, DotNetStreamRef
175181
176182 public bool TryClaimPendingStreamForSending ( long streamId , out DotNetStreamReference pendingStream )
177183 {
178- if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out pendingStream ) )
184+ if ( _pendingDotNetToJSStreams . TryRemove ( streamId , out var cancelableStreamReference ) )
179185 {
186+ pendingStream = cancelableStreamReference . StreamReference ;
187+
188+ // Dispose CTS for claimed Stream.
189+ cancelableStreamReference . CancellationTokenSource ? . Dispose ( ) ;
190+
180191 return true ;
181192 }
182193
@@ -193,6 +204,18 @@ public void MarkPermanentlyDisconnected()
193204 protected override async Task < Stream > ReadJSDataAsStreamAsync ( IJSStreamReference jsStreamReference , long totalLength , CancellationToken cancellationToken = default )
194205 => await RemoteJSDataStream . CreateRemoteJSDataStreamAsync ( this , jsStreamReference , totalLength , _maximumIncomingBytes , _options . JSInteropDefaultCallTimeout , cancellationToken ) ;
195206
207+ private class CancelableDotNetStreamReference
208+ {
209+ public CancelableDotNetStreamReference ( DotNetStreamReference streamReference )
210+ {
211+ StreamReference = streamReference ;
212+ }
213+
214+ public CancellationTokenSource ? CancellationTokenSource { get ; set ; }
215+
216+ public DotNetStreamReference StreamReference { get ; }
217+ }
218+
196219 public static partial class Log
197220 {
198221 [ LoggerMessage ( 1 , LogLevel . Debug , "Begin invoke JS interop '{AsyncHandle}': '{FunctionIdentifier}'" , EventName = "BeginInvokeJS" ) ]
0 commit comments