Skip to content
This repository was archived by the owner on Sep 28, 2025. It is now read-only.

Commit 25f5256

Browse files
committed
feat: Wrap Stream for termination handling
1 parent 176e6dd commit 25f5256

File tree

3 files changed

+374
-115
lines changed

3 files changed

+374
-115
lines changed

src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ public class SourceBroadcaster(ILogger<ISourceBroadcaster> logger, StreamConnect
3030

3131
/// <inheritdoc/>
3232
public StreamHandlerMetrics Metrics => StreamMetricsRecorder.Metrics;
33+
3334
public bool IsMultiView { get; set; }
3435
public CancellationToken CancellationToken { get; } = cancellationToken;
36+
3537
/// <inheritdoc/>
3638
public ConcurrentDictionary<string, IStreamDataToClients> ChannelBroadcasters { get; } = new();
3739

@@ -72,6 +74,7 @@ public bool RemoveChannelBroadcaster(string Id)
7274
{
7375
return ChannelBroadcasters.TryRemove(Id, out _);
7476
}
77+
7578
public async Task<long> SetSourceMultiViewStreamAsync(IChannelBroadcaster channelBroadcaster, CancellationToken cancellationToken)
7679
{
7780
logger.LogInformation("Setting source stream {Name} to {StreamName}", Name, SMStreamInfo.Name);
@@ -269,6 +272,7 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer
269272
public async Task StopAsync()
270273
{
271274
await _stopLock.WaitAsync().ConfigureAwait(false);
275+
Task? taskToAwait = null;
272276
try
273277
{
274278
if (Interlocked.CompareExchange(ref _isStopped, 1, 0) == 0)
@@ -277,12 +281,29 @@ public async Task StopAsync()
277281
{
278282
_cancellationTokenSource?.Cancel();
279283
}
284+
taskToAwait = _streamingTask;
280285
}
281286
}
282287
finally
283288
{
284289
_stopLock.Release();
285290
}
291+
292+
if (taskToAwait != null)
293+
{
294+
try
295+
{
296+
await taskToAwait.ConfigureAwait(false);
297+
}
298+
catch (OperationCanceledException)
299+
{
300+
logger.LogDebug("Task was already cancelled");
301+
}
302+
catch (Exception ex)
303+
{
304+
logger.LogError(ex, "Error during SourceBroadcaster streaming task completion wait.");
305+
}
306+
}
286307
}
287308

288309
/// <inheritdoc/>

0 commit comments

Comments
 (0)