@@ -426,7 +426,6 @@ extension PipeConfiguration {
426
426
private func runPipeline( ) async throws -> CollectedResult < Output , Error > {
427
427
// Create a pipe for standard error
428
428
let sharedErrorPipe = try FileDescriptor . pipe ( )
429
- let sharedErrorPipeOutput = FileDescriptorOutput ( fileDescriptor: sharedErrorPipe. writeEnd, closeAfterSpawningProcess: false )
430
429
431
430
return try await withThrowingTaskGroup ( of: CollectedPipeResult . self, returning: CollectedResult< Output, Error> . self ) { group in
432
431
// Collect error output from all stages
@@ -465,7 +464,7 @@ extension PipeConfiguration {
465
464
configuration,
466
465
input: self . input,
467
466
output: . fileDescriptor( writeEnd, closeAfterSpawningProcess: true ) ,
468
- error: sharedErrorPipeOutput
467
+ error: FileDescriptorOutput ( fileDescriptor : sharedErrorPipe . writeEnd , closeAfterSpawningProcess : false )
469
468
)
470
469
471
470
taskResult = PipelineTaskResult . success (
@@ -565,6 +564,10 @@ extension PipeConfiguration {
565
564
}
566
565
}
567
566
567
+ // Close outputs in case the function did not
568
+ try await outWriter. finish ( )
569
+ try await errWriter. finish ( )
570
+
568
571
return 0
569
572
}
570
573
@@ -600,7 +603,7 @@ extension PipeConfiguration {
600
603
configuration,
601
604
input: . fileDescriptor( readEnd, closeAfterSpawningProcess: true ) ,
602
605
output: . fileDescriptor( writeEnd, closeAfterSpawningProcess: true ) ,
603
- error: sharedErrorPipeOutput
606
+ error: FileDescriptorOutput ( fileDescriptor : sharedErrorPipe . writeEnd , closeAfterSpawningProcess : false )
604
607
)
605
608
606
609
taskResult = PipelineTaskResult . success (
@@ -678,6 +681,10 @@ extension PipeConfiguration {
678
681
}
679
682
}
680
683
684
+ // Close outputs in case the function did not
685
+ try await outWriter. finish ( )
686
+ try await errWriter. finish ( )
687
+
681
688
return 0
682
689
}
683
690
@@ -712,7 +719,7 @@ extension PipeConfiguration {
712
719
configuration,
713
720
input: . fileDescriptor( readEnd, closeAfterSpawningProcess: true ) ,
714
721
output: self . output,
715
- error: sharedErrorPipeOutput
722
+ error: FileDescriptorOutput ( fileDescriptor : sharedErrorPipe . writeEnd , closeAfterSpawningProcess : false )
716
723
)
717
724
return PipelineTaskResult . success ( lastIndex, SendableCollectedResult ( finalResult) )
718
725
case . replaceStdout:
@@ -844,6 +851,11 @@ extension PipeConfiguration {
844
851
return ( retVal, . none)
845
852
}
846
853
854
+ // FIXME: determine how best to handle these writers so that the function doesn't finish them, and it doesn't cause deadlock
855
+ // Close outputs in case the function did not
856
+ //try await outWriter.finish()
857
+ //try await errWriter.finish()
858
+
847
859
var exitCode : UInt32 = 0
848
860
var output : Output . OutputType ? = nil
849
861
for try await r in group {
0 commit comments