@@ -392,146 +392,147 @@ extension Signal where Value: TaskEventType {
392392 }
393393}
394394
395- /// Launches a new shell task.
396- ///
397- /// - Parameters:
398- /// - task: The task to launch.
399- /// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will
400- /// be inherited from the parent process.
401- ///
402- /// - Returns: A producer that will launch the task when started, then send
403- /// `TaskEvent`s as execution proceeds.
404- public func launchTask( _ task: Task , standardInput: SignalProducer < Data , NoError > ? = nil ) -> SignalProducer < TaskEvent < Data > , TaskError > {
405- return SignalProducer { observer, disposable in
406- let queue = DispatchQueue ( label: task. description, attributes: [ ] )
407- let group = Task . group
408-
409- let process = Process ( )
410- process. launchPath = task. launchPath
411- process. arguments = task. arguments
412-
413- if let cwd = task. workingDirectoryPath {
414- process. currentDirectoryPath = cwd
415- }
395+ extension Task {
396+ /// Launches a new shell task.
397+ ///
398+ /// - Parameters:
399+ /// - standardInput: Data to stream to standard input of the launched process. If nil, stdin will
400+ /// be inherited from the parent process.
401+ ///
402+ /// - Returns: A producer that will launch the receiver when started, then send
403+ /// `TaskEvent`s as execution proceeds.
404+ public func launch( standardInput: SignalProducer < Data , NoError > ? = nil ) -> SignalProducer < TaskEvent < Data > , TaskError > {
405+ return SignalProducer { observer, disposable in
406+ let queue = DispatchQueue ( label: self . description, attributes: [ ] )
407+ let group = Task . group
416408
417- if let env = task. environment {
418- process. environment = env
419- }
409+ let process = Process ( )
410+ process. launchPath = self . launchPath
411+ process. arguments = self . arguments
412+
413+ if let cwd = self . workingDirectoryPath {
414+ process. currentDirectoryPath = cwd
415+ }
420416
421- var stdinProducer : SignalProducer < ( ) , TaskError > = . empty
417+ if let env = self . environment {
418+ process. environment = env
419+ }
422420
423- if let input = standardInput {
424- switch Pipe . create ( queue, group) {
425- case let . success( pipe) :
426- process. standardInput = pipe. readHandle
421+ var stdinProducer : SignalProducer < ( ) , TaskError > = . empty
427422
428- stdinProducer = pipe. writeDataFromProducer ( input) . on ( started: {
429- close ( pipe. readFD)
430- } )
423+ if let input = standardInput {
424+ switch Pipe . create ( queue, group) {
425+ case let . success( pipe) :
426+ process. standardInput = pipe. readHandle
431427
432- case let . failure( error) :
433- observer. send ( error: error)
434- return
428+ stdinProducer = pipe. writeDataFromProducer ( input) . on ( started: {
429+ close ( pipe. readFD)
430+ } )
431+
432+ case let . failure( error) :
433+ observer. send ( error: error)
434+ return
435+ }
435436 }
436- }
437437
438- SignalProducer ( result: Pipe . create ( queue, group) &&& Pipe . create ( queue, group) )
439- . flatMap ( . merge) { stdoutPipe, stderrPipe -> SignalProducer < TaskEvent < Data > , TaskError > in
440- let stdoutProducer = stdoutPipe. transferReadsToProducer ( )
441- let stderrProducer = stderrPipe. transferReadsToProducer ( )
442-
443- enum Aggregation {
444- case value( Data )
445- case failed( TaskError )
446- case interrupted
447-
448- var producer : Pipe . ReadProducer {
449- switch self {
450- case let . value( data) :
451- return . init( value: data)
452- case let . failed( error) :
453- return . init( error: error)
454- case . interrupted:
455- return SignalProducer { observer, _ in
456- observer. sendInterrupted ( )
438+ SignalProducer ( result: Pipe . create ( queue, group) &&& Pipe . create ( queue, group) )
439+ . flatMap ( . merge) { stdoutPipe, stderrPipe -> SignalProducer < TaskEvent < Data > , TaskError > in
440+ let stdoutProducer = stdoutPipe. transferReadsToProducer ( )
441+ let stderrProducer = stderrPipe. transferReadsToProducer ( )
442+
443+ enum Aggregation {
444+ case value( Data )
445+ case failed( TaskError )
446+ case interrupted
447+
448+ var producer : Pipe . ReadProducer {
449+ switch self {
450+ case let . value( data) :
451+ return . init( value: data)
452+ case let . failed( error) :
453+ return . init( error: error)
454+ case . interrupted:
455+ return SignalProducer { observer, _ in
456+ observer. sendInterrupted ( )
457+ }
457458 }
458459 }
459460 }
460- }
461-
462- return SignalProducer { observer, disposable in
463- func startAggregating( producer: Pipe . ReadProducer , chunk: @escaping ( Data ) -> TaskEvent < Data > ) -> Pipe . ReadProducer {
464- let aggregated = MutableProperty < Aggregation ? > ( nil )
465461
466- producer. startWithSignal { signal, signalDisposable in
467- disposable += signalDisposable
462+ return SignalProducer { observer, disposable in
463+ func startAggregating( producer: Pipe . ReadProducer , chunk: @escaping ( Data ) -> TaskEvent < Data > ) -> Pipe . ReadProducer {
464+ let aggregated = MutableProperty < Aggregation ? > ( nil )
465+
466+ producer. startWithSignal { signal, signalDisposable in
467+ disposable += signalDisposable
468+
469+ var aggregate = Data ( )
470+ signal. observe ( Observer ( value: { data in
471+ observer. send ( value: chunk ( data) )
472+ aggregate. append ( data)
473+ } , failed: { error in
474+ observer. send ( error: error)
475+ aggregated. value = . failed( error)
476+ } , completed: {
477+ aggregated. value = . value( aggregate)
478+ } , interrupted: {
479+ aggregated. value = . interrupted
480+ } ) )
481+ }
468482
469- var aggregate = Data ( )
470- signal. observe ( Observer ( value: { data in
471- observer. send ( value: chunk ( data) )
472- aggregate. append ( data)
473- } , failed: { error in
474- observer. send ( error: error)
475- aggregated. value = . failed( error)
476- } , completed: {
477- aggregated. value = . value( aggregate)
478- } , interrupted: {
479- aggregated. value = . interrupted
480- } ) )
483+ return aggregated. producer
484+ . skipNil ( )
485+ . flatMap ( . concat) { $0. producer }
481486 }
482487
483- return aggregated. producer
484- . skipNil ( )
485- . flatMap ( . concat) { $0. producer }
486- }
487-
488- let stdoutAggregated = startAggregating ( producer: stdoutProducer, chunk: TaskEvent . standardOutput)
489- let stderrAggregated = startAggregating ( producer: stderrProducer, chunk: TaskEvent . standardError)
490-
491- process. standardOutput = stdoutPipe. writeHandle
492- process. standardError = stderrPipe. writeHandle
493-
494- group. enter ( )
495- process. terminationHandler = { nstask in
496- let terminationStatus = nstask. terminationStatus
497- if terminationStatus == EXIT_SUCCESS {
498- // Wait for stderr to finish, then pass
499- // through stdout.
500- disposable += stderrAggregated
501- . then ( stdoutAggregated)
502- . map ( TaskEvent . success)
503- . start ( observer)
504- } else {
505- // Wait for stdout to finish, then pass
506- // through stderr.
507- disposable += stdoutAggregated
508- . then ( stderrAggregated)
509- . flatMap ( . concat) { data -> SignalProducer < TaskEvent < Data > , TaskError > in
510- let errorString = ( data. count > 0 ? String ( data: data, encoding: . utf8) : nil )
511- return SignalProducer ( error: . shellTaskFailed( task, exitCode: terminationStatus, standardError: errorString) )
512- }
513- . start ( observer)
488+ let stdoutAggregated = startAggregating ( producer: stdoutProducer, chunk: TaskEvent . standardOutput)
489+ let stderrAggregated = startAggregating ( producer: stderrProducer, chunk: TaskEvent . standardError)
490+
491+ process. standardOutput = stdoutPipe. writeHandle
492+ process. standardError = stderrPipe. writeHandle
493+
494+ group. enter ( )
495+ process. terminationHandler = { nstask in
496+ let terminationStatus = nstask. terminationStatus
497+ if terminationStatus == EXIT_SUCCESS {
498+ // Wait for stderr to finish, then pass
499+ // through stdout.
500+ disposable += stderrAggregated
501+ . then ( stdoutAggregated)
502+ . map ( TaskEvent . success)
503+ . start ( observer)
504+ } else {
505+ // Wait for stdout to finish, then pass
506+ // through stderr.
507+ disposable += stdoutAggregated
508+ . then ( stderrAggregated)
509+ . flatMap ( . concat) { data -> SignalProducer < TaskEvent < Data > , TaskError > in
510+ let errorString = ( data. count > 0 ? String ( data: data, encoding: . utf8) : nil )
511+ return SignalProducer ( error: . shellTaskFailed( self , exitCode: terminationStatus, standardError: errorString) )
512+ }
513+ . start ( observer)
514+ }
515+ group. leave ( )
514516 }
515- group. leave ( )
516- }
517-
518- observer. send ( value: . launch( task) )
519- process. launch ( )
520- close ( stdoutPipe. writeFD)
521- close ( stderrPipe. writeFD)
517+
518+ observer. send ( value: . launch( self ) )
519+ process. launch ( )
520+ close ( stdoutPipe. writeFD)
521+ close ( stderrPipe. writeFD)
522522
523- stdinProducer. startWithSignal { signal, signalDisposable in
524- disposable += signalDisposable
525- }
523+ stdinProducer. startWithSignal { signal, signalDisposable in
524+ disposable += signalDisposable
525+ }
526526
527- let _ = disposable. add {
528- process. terminate ( )
527+ let _ = disposable. add {
528+ process. terminate ( )
529+ }
529530 }
530531 }
531- }
532- . startWithSignal { signal , taskDisposable in
533- disposable . add ( taskDisposable )
534- signal . observe ( observer )
535- }
532+ . startWithSignal { signal , taskDisposable in
533+ disposable . add ( taskDisposable)
534+ signal . observe ( observer )
535+ }
536+ }
536537 }
537538}
0 commit comments