@@ -84,6 +84,8 @@ public func ==(lhs: Task, rhs: Task) -> Bool {
8484
8585/// A private class used to encapsulate a Unix pipe.
8686private final class Pipe {
87+ typealias ReadProducer = SignalProducer < NSData , TaskError >
88+
8789 /// The file descriptor for reading data.
8890 let readFD : Int32
8991
@@ -140,7 +142,7 @@ private final class Pipe {
140142 ///
141143 /// After starting the returned producer, `readFD` should not be used
142144 /// anywhere else, as it may close unexpectedly.
143- func transferReadsToProducer( ) -> SignalProducer < NSData , TaskError > {
145+ func transferReadsToProducer( ) -> ReadProducer {
144146 return SignalProducer { observer, disposable in
145147 dispatch_group_enter ( self . group)
146148 let channel = dispatch_io_create ( DISPATCH_IO_STREAM, self . readFD, self . queue) { error in
@@ -424,7 +426,7 @@ public func launchTask(task: Task, standardInput: SignalProducer<NSData, NoError
424426 case Failed( TaskError )
425427 case Interrupted
426428
427- var producer : SignalProducer < NSData , TaskError > {
429+ var producer : Pipe . ReadProducer {
428430 switch self {
429431 case let . Value( data) :
430432 return . init( value: data)
@@ -439,70 +441,53 @@ public func launchTask(task: Task, standardInput: SignalProducer<NSData, NoError
439441 }
440442
441443 return SignalProducer { observer, disposable in
442- let stdoutAggregated = MutableProperty < Aggregation ? > ( nil )
443- let stderrAggregated = MutableProperty < Aggregation ? > ( nil )
444-
445- stdoutProducer. startWithSignal { signal, signalDisposable in
446- disposable += signalDisposable
444+ func startAggregating( producer: Pipe . ReadProducer ) -> Pipe . ReadProducer {
445+ let aggregated = MutableProperty < Aggregation ? > ( nil )
446+
447+ producer. startWithSignal { signal, signalDisposable in
448+ disposable += signalDisposable
449+
450+ let aggregate = NSMutableData ( )
451+ signal. observe ( Observer ( next: { data in
452+ observer. sendNext ( . StandardOutput( data) )
453+ aggregate. appendData ( data)
454+ } , failed: { error in
455+ observer. sendFailed ( error)
456+ aggregated. value = . Failed( error)
457+ } , completed: {
458+ aggregated. value = . Value( aggregate)
459+ } , interrupted: {
460+ aggregated. value = . Interrupted
461+ } ) )
462+ }
447463
448- let aggregate = NSMutableData ( )
449- signal. observe ( Observer ( next: { data in
450- observer. sendNext ( . StandardOutput( data) )
451- aggregate. appendData ( data)
452- } , failed: { error in
453- observer. sendFailed ( error)
454- stdoutAggregated. value = . Failed( error)
455- } , completed: {
456- stdoutAggregated. value = . Value( aggregate)
457- } , interrupted: {
458- stdoutAggregated. value = . Interrupted
459- } ) )
464+ return aggregated. producer
465+ . ignoreNil ( )
466+ . promoteErrors ( TaskError . self)
467+ . flatMap ( . Concat) { $0. producer }
460468 }
461469
462- stderrProducer. startWithSignal { signal, signalDisposable in
463- disposable += signalDisposable
464-
465- let aggregate = NSMutableData ( )
466- signal. observe ( Observer ( next: { data in
467- observer. sendNext ( . StandardError( data) )
468- aggregate. appendData ( data)
469- } , failed: { error in
470- observer. sendFailed ( error)
471- stderrAggregated. value = . Failed( error)
472- } , completed: {
473- stderrAggregated. value = . Value( aggregate)
474- } , interrupted: {
475- stderrAggregated. value = . Interrupted
476- } ) )
477- }
470+ let stdoutAggregated = startAggregating ( stdoutProducer)
471+ let stderrAggregated = startAggregating ( stderrProducer)
478472
479473 rawTask. standardOutput = stdoutPipe. writeHandle
480474 rawTask. standardError = stderrPipe. writeHandle
481475
482476 dispatch_group_enter ( group)
483477 rawTask. terminationHandler = { nstask in
484- func getProducer( property: MutableProperty < Aggregation ? > ) -> SignalProducer < NSData , TaskError > {
485- return property. producer
486- . ignoreNil ( )
487- . promoteErrors ( TaskError . self)
488- . flatMap ( . Concat) { $0. producer }
489- }
490- let stdoutAggregatedProducer = getProducer ( stdoutAggregated)
491- let stderrAggregatedProducer = getProducer ( stderrAggregated)
492-
493478 let terminationStatus = nstask. terminationStatus
494479 if terminationStatus == EXIT_SUCCESS {
495480 // Wait for stderr to finish, then pass
496481 // through stdout.
497- disposable += stderrAggregatedProducer
498- . then ( stdoutAggregatedProducer )
482+ disposable += stderrAggregated
483+ . then ( stdoutAggregated )
499484 . map ( TaskEvent . Success)
500485 . start ( observer)
501486 } else {
502487 // Wait for stdout to finish, then pass
503488 // through stderr.
504- disposable += stdoutAggregatedProducer
505- . then ( stderrAggregatedProducer )
489+ disposable += stdoutAggregated
490+ . then ( stderrAggregated )
506491 . flatMap ( . Concat) { data -> SignalProducer < TaskEvent < NSData > , TaskError > in
507492 let errorString = ( data. length > 0 ? NSString ( data: data, encoding: NSUTF8StringEncoding) as? String : nil )
508493 return SignalProducer ( error: . ShellTaskFailed( task, exitCode: terminationStatus, standardError: errorString) )
0 commit comments