diff --git a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs index 34c76baada6f..4ced7924b291 100644 --- a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs +++ b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs @@ -18,7 +18,7 @@ internal partial class RemoteJSRuntime : JSRuntime private readonly CircuitOptions _options; private readonly ILogger _logger; private CircuitClientProxy _clientProxy; - private readonly ConcurrentDictionary _pendingDotNetToJSStreams = new(); + private readonly ConcurrentDictionary _pendingDotNetToJSStreams = new(); private bool _permanentlyDisconnected; private readonly long _maximumIncomingBytes; private int _byteArraysToBeRevivedTotalBytes; @@ -152,7 +152,8 @@ protected override void ReceiveByteArray(int id, byte[] data) protected override async Task TransmitStreamAsync(long streamId, DotNetStreamReference dotNetStreamReference) { - if (!_pendingDotNetToJSStreams.TryAdd(streamId, dotNetStreamReference)) + var cancelableStreamReference = new CancelableDotNetStreamReference(dotNetStreamReference); + if (!_pendingDotNetToJSStreams.TryAdd(streamId, cancelableStreamReference)) { throw new ArgumentException($"The stream {streamId} is already pending."); } @@ -160,13 +161,18 @@ protected override async Task TransmitStreamAsync(long streamId, DotNetStreamRef // SignalR only supports streaming being initiated from the JS side, so we have to ask it to // start the stream. We'll give it a maximum of 10 seconds to do so, after which we give up // and discard it. - var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; - cancellationToken.Register(() => + CancellationTokenSource cancellationTokenSource = new(TimeSpan.FromSeconds(10)); + + // Store CTS to dispose later. + cancelableStreamReference.CancellationTokenSource = cancellationTokenSource; + + cancellationTokenSource.Token.Register(() => { // If by now the stream hasn't been claimed for sending, stop tracking it - if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutStream) && !timedOutStream.LeaveOpen) + if (_pendingDotNetToJSStreams.TryRemove(streamId, out var timedOutCancelableStreamReference)) { - timedOutStream.Stream.Dispose(); + timedOutCancelableStreamReference.StreamReference.Dispose(); + timedOutCancelableStreamReference.CancellationTokenSource?.Dispose(); } }); @@ -175,8 +181,13 @@ protected override async Task TransmitStreamAsync(long streamId, DotNetStreamRef public bool TryClaimPendingStreamForSending(long streamId, out DotNetStreamReference pendingStream) { - if (_pendingDotNetToJSStreams.TryRemove(streamId, out pendingStream)) + if (_pendingDotNetToJSStreams.TryRemove(streamId, out var cancelableStreamReference)) { + pendingStream = cancelableStreamReference.StreamReference; + + // Dispose CTS for claimed Stream. + cancelableStreamReference.CancellationTokenSource?.Dispose(); + return true; } @@ -193,6 +204,18 @@ public void MarkPermanentlyDisconnected() protected override async Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, CancellationToken cancellationToken = default) => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken); + private class CancelableDotNetStreamReference + { + public CancelableDotNetStreamReference(DotNetStreamReference streamReference) + { + StreamReference = streamReference; + } + + public CancellationTokenSource? CancellationTokenSource { get; set; } + + public DotNetStreamReference StreamReference { get; } + } + public static partial class Log { [LoggerMessage(1, LogLevel.Debug, "Begin invoke JS interop '{AsyncHandle}': '{FunctionIdentifier}'", EventName = "BeginInvokeJS")]