Skip to content
This repository was archived by the owner on Sep 28, 2025. It is now read-only.

Commit 176e6dd

Browse files
committed
fix: Use shared token on underlying stream process
This way, when the stop command is called, the underlying stream process should be signaled to cancel right away too.
1 parent ce2e286 commit 176e6dd

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

src/StreamMaster.Streams/Broadcasters/SourceBroadcaster.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,20 +106,21 @@ public async Task<long> SetSourceStreamAsync(SMStreamInfo SMStreamInfo, Cancella
106106

107107
this.SMStreamInfo = SMStreamInfo;
108108

109+
// Start a new streaming task
110+
_cancellationTokenSource = new CancellationTokenSource();
111+
109112
Stopwatch stopwatch = Stopwatch.StartNew();
110113
try
111114
{
112115
(Stream? stream, int processId, ProxyStreamError? error) =
113-
await streamFactory.GetStream(SMStreamInfo, cancellationToken).ConfigureAwait(false);
116+
await streamFactory.GetStream(SMStreamInfo, _cancellationTokenSource.Token).ConfigureAwait(false);
114117
stopwatch.Stop();
115118
if (stream == null || error != null)
116119
{
117120
logger.LogError("Could not create source stream for channel broadcaster: {Id} {Name} {Error}", SMStreamInfo.Id, SMStreamInfo.Name, error?.Message);
118121
return 0;
119122
}
120123

121-
// Start a new streaming task
122-
_cancellationTokenSource = new CancellationTokenSource();
123124
_streamingTask = Task.Run(() => RunPipelineAsync(stream, SMStreamInfo.Name, cancellationToken: _cancellationTokenSource.Token), _cancellationTokenSource.Token);
124125
return stopwatch.ElapsedMilliseconds;
125126
}
@@ -193,6 +194,11 @@ private async Task RunPipelineAsync(Stream sourceStream, string name, int buffer
193194
timeoutCts?.Dispose();
194195
}
195196

197+
if (cancellationToken.IsCancellationRequested)
198+
{
199+
break;
200+
}
201+
196202
if (bytesRead == 0)
197203
{
198204
if (!hasReadData)

src/StreamMaster.Streams/Factories/CommandExecutor.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s
3434

3535
using var registration = cancellationToken.Register(() =>
3636
{
37+
logger.LogDebug("Cancellation requested for Stream process");
3738
GracefullyTerminateProcess();
3839
});
3940

@@ -46,7 +47,20 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s
4647
return new GetStreamResult(null, -1, error);
4748
}
4849

50+
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
51+
{
52+
try
53+
{
54+
Process.Start("setpgrp", $"{_process.Id}");
55+
}
56+
catch (Exception ex)
57+
{
58+
logger.LogWarning(ex, "Failed to set process group for {ProcessId}", _process.Id);
59+
}
60+
}
61+
4962
string stderrFilePath = Path.Combine(BuildInfo.CommandErrorFolder, $"stderr_{_process.Id}.log");
63+
Directory.CreateDirectory(Path.GetDirectoryName(stderrFilePath)!);
5064
errorWriter = new StreamWriter(stderrFilePath, append: true, Encoding.UTF8);
5165

5266
// Clean up older logs to keep only the latest 10
@@ -64,6 +78,7 @@ public GetStreamResult ExecuteCommand(CommandProfileDto commandProfile, string s
6478
}
6579
};
6680
_process.BeginErrorReadLine();
81+
_process.EnableRaisingEvents = true; // Ensure Exited event is raised
6782
_process.Exited += Process_Exited;
6883

6984
stopwatch.Stop();
@@ -242,6 +257,16 @@ private static void ConfigureProcess(Process process, string commandExec, string
242257
process.StartInfo.RedirectStandardOutput = true;
243258
process.StartInfo.RedirectStandardError = true;
244259
process.StartInfo.StandardOutputEncoding = Encoding.UTF8;
260+
process.StartInfo.StandardErrorEncoding = Encoding.UTF8;
261+
process.StartInfo.WindowStyle = ProcessWindowStyle.Hidden;
262+
263+
if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
264+
{
265+
process.StartInfo.Environment["SM_PROCESS_ID"] = process.Id.ToString();
266+
process.StartInfo.Environment["SM_PROCESS_TYPE"] = "STREAM";
267+
}
268+
269+
process.EnableRaisingEvents = true;
245270
}
246271

247272
/// <summary>
@@ -273,4 +298,4 @@ public void Dispose()
273298
_disposed = true;
274299
GC.SuppressFinalize(this);
275300
}
276-
}
301+
}

0 commit comments

Comments
 (0)