@@ -419,9 +419,28 @@ public func launchTask(task: Task, standardInput: SignalProducer<NSData, NoError
419419 let stdoutProducer = stdoutPipe. transferReadsToProducer ( )
420420 let stderrProducer = stderrPipe. transferReadsToProducer ( )
421421
422+ enum Aggregation {
423+ case Value( NSData )
424+ case Failed( TaskError )
425+ case Interrupted
426+
427+ var producer : SignalProducer < NSData , TaskError > {
428+ switch self {
429+ case let . Value( data) :
430+ return . init( value: data)
431+ case let . Failed( error) :
432+ return . init( error: error)
433+ case . Interrupted:
434+ return SignalProducer { observer, _ in
435+ observer. sendInterrupted ( )
436+ }
437+ }
438+ }
439+ }
440+
422441 return SignalProducer { observer, disposable in
423- let ( stdoutAggregated, stdoutAggregatedObserver ) = SignalProducer < NSData , TaskError > . buffer ( 1 )
424- let ( stderrAggregated, stderrAggregatedObserver ) = SignalProducer < NSData , TaskError > . buffer ( 1 )
442+ let stdoutAggregated = MutableProperty < Aggregation ? > ( nil )
443+ let stderrAggregated = MutableProperty < Aggregation ? > ( nil )
425444
426445 stdoutProducer. startWithSignal { signal, signalDisposable in
427446 disposable += signalDisposable
@@ -432,12 +451,12 @@ public func launchTask(task: Task, standardInput: SignalProducer<NSData, NoError
432451 aggregate. appendData ( data)
433452 } , failed: { error in
434453 observer. sendFailed ( error)
435- stdoutAggregatedObserver . sendFailed ( error)
454+ stdoutAggregated . value = . Failed ( error)
436455 } , completed: {
437- stdoutAggregatedObserver . sendNext ( aggregate)
438- stdoutAggregatedObserver . sendCompleted ( )
439- } , interrupted : stdoutAggregatedObserver . sendInterrupted
440- ) )
456+ stdoutAggregated . value = . Value ( aggregate)
457+ } , interrupted : {
458+ stdoutAggregated . value = . Interrupted
459+ } ) )
441460 }
442461
443462 stderrProducer. startWithSignal { signal, signalDisposable in
@@ -449,32 +468,41 @@ public func launchTask(task: Task, standardInput: SignalProducer<NSData, NoError
449468 aggregate. appendData ( data)
450469 } , failed: { error in
451470 observer. sendFailed ( error)
452- stderrAggregatedObserver . sendFailed ( error)
471+ stderrAggregated . value = . Failed ( error)
453472 } , completed: {
454- stderrAggregatedObserver . sendNext ( aggregate)
455- stderrAggregatedObserver . sendCompleted ( )
456- } , interrupted : stderrAggregatedObserver . sendInterrupted
457- ) )
473+ stderrAggregated . value = . Value ( aggregate)
474+ } , interrupted : {
475+ stderrAggregated . value = . Interrupted
476+ } ) )
458477 }
459478
460479 rawTask. standardOutput = stdoutPipe. writeHandle
461480 rawTask. standardError = stderrPipe. writeHandle
462481
463482 dispatch_group_enter ( group)
464483 rawTask. terminationHandler = { nstask in
484+ func getProducer( property: MutableProperty < Aggregation ? > ) -> SignalProducer < NSData , TaskError > {
485+ return property. producer
486+ . ignoreNil ( )
487+ . promoteErrors ( TaskError . self)
488+ . flatMap ( . Concat) { $0. producer }
489+ }
490+ let stdoutAggregatedProducer = getProducer ( stdoutAggregated)
491+ let stderrAggregatedProducer = getProducer ( stderrAggregated)
492+
465493 let terminationStatus = nstask. terminationStatus
466494 if terminationStatus == EXIT_SUCCESS {
467495 // Wait for stderr to finish, then pass
468496 // through stdout.
469- disposable += stderrAggregated
470- . then ( stdoutAggregated )
497+ disposable += stderrAggregatedProducer
498+ . then ( stdoutAggregatedProducer )
471499 . map ( TaskEvent . Success)
472500 . start ( observer)
473501 } else {
474502 // Wait for stdout to finish, then pass
475503 // through stderr.
476- disposable += stdoutAggregated
477- . then ( stderrAggregated )
504+ disposable += stdoutAggregatedProducer
505+ . then ( stderrAggregatedProducer )
478506 . flatMap ( . Concat) { data -> SignalProducer < TaskEvent < NSData > , TaskError > in
479507 let errorString = ( data. length > 0 ? NSString ( data: data, encoding: NSUTF8StringEncoding) as? String : nil )
480508 return SignalProducer ( error: . ShellTaskFailed( task, exitCode: terminationStatus, standardError: errorString) )
0 commit comments