@@ -193,11 +193,11 @@ private final class Pipe {
193193/// to it as data is received.
194194private func aggregateDataReadFromPipe( pipe: Pipe , forwardingSink: SinkOf < NSData > ? ) -> SignalProducer < NSData , ReactiveTaskError > {
195195 return pipe. transferReadsToProducer ( )
196- |> on ( next: { ( data: dispatch_data_t ) in
197- forwardingSink? . put ( data as NSData )
198- return ( )
199- } )
200196 |> reduce ( nil ) { ( buffer: dispatch_data_t ? , data: dispatch_data_t ) in
197+ // FIXME: This should go into on(next:), but the compiler currently
198+ // crashes when that's attempted.
199+ forwardingSink? . put ( data as NSData )
200+
201201 if let buffer = buffer {
202202 return dispatch_data_create_concat ( buffer, data)
203203 } else {
@@ -232,16 +232,23 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
232232 task. environment = env
233233 }
234234
235- var stdinSignal : SignalProducer < ( ) , ReactiveTaskError > = . empty
235+ var stdinProducer : SignalProducer < ( ) , ReactiveTaskError > = . empty
236236
237237 if let input = taskDescription. standardInput {
238238 switch Pipe . create ( ) {
239239 case let . Success( pipe) :
240240 task. standardInput = pipe. unbox. readHandle
241241
242- stdinSignal = pipe. unbox. writeDataFromProducer ( input) |> on ( started: {
242+ // FIXME: This is basically a reimplementation of on(started:)
243+ // to avoid a compiler crash.
244+ stdinProducer = SignalProducer { observer, disposable in
243245 close ( pipe. unbox. readFD)
244- } )
246+
247+ pipe. unbox. writeDataFromProducer ( input) . startWithSignal { signal, signalDisposable in
248+ disposable. addDisposable ( signalDisposable)
249+ signal. observe ( observer)
250+ }
251+ }
245252
246253 case let . Failure( error) :
247254 sendError ( observer, error. unbox)
@@ -250,7 +257,7 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
250257
251258 SignalProducer ( result: Pipe . create ( ) )
252259 |> zipWith ( SignalProducer ( result: Pipe . create ( ) ) )
253- |> mergeMap { stdoutPipe, stderrPipe -> SignalProducer < NSData > in
260+ |> joinMap ( . Merge ) { stdoutPipe, stderrPipe -> SignalProducer < NSData , ReactiveTaskError > in
254261 let stdoutProducer = aggregateDataReadFromPipe ( stdoutPipe, standardOutput)
255262 let stderrProducer = aggregateDataReadFromPipe ( stderrPipe, standardError)
256263
@@ -266,38 +273,33 @@ public func launchTask(taskDescription: TaskDescription, standardOutput: SinkOf<
266273 if disposable. disposed {
267274 stdoutPipe. closePipe ( )
268275 stderrPipe. closePipe ( )
269- stdinSignal . start ( ) . dispose ( )
276+ stdinProducer . start ( ) . dispose ( )
270277 return
271278 }
272279
273280 task. launch ( )
274281 close ( stdoutPipe. writeFD)
275282 close ( stderrPipe. writeFD)
276283
277- stdinSignal. startWithSignal { signal, stdinDisposable in
278- disposable. addDisposable ( stdinDisposable)
279-
280- signal. observe ( error: { error in
281- sendError ( observer, error)
282- } )
283- }
284+ let stdinDisposable = stdinProducer. start ( )
285+ disposable. addDisposable ( stdinDisposable)
284286
285287 disposable. addDisposable {
286288 task. terminate ( )
287289 }
288290 }
289291
290- return stdoutProducer
291- |> combineLatestWith ( stderrSignal )
292- |> combineLatestWith ( terminationStatusSignal )
293- |> map { datas , terminationStatus -> ( NSData , NSData , Int32 ) in
294- return ( datas . 0 , datas . 1 , terminationStatus )
295- }
292+ return
293+ combineLatest (
294+ stdoutProducer ,
295+ stderrProducer ,
296+ terminationStatusProducer |> promoteErrors ( ReactiveTaskError . self )
297+ )
296298 |> tryMap { stdoutData, stderrData, terminationStatus -> Result < NSData , ReactiveTaskError > in
297299 if terminationStatus == EXIT_SUCCESS {
298300 return success ( stdoutData)
299301 } else {
300- let errorString = ( stderrData. length > 0 ? String ( data: stderrData, encoding: NSUTF8StringEncoding) : nil )
302+ let errorString = ( stderrData. length > 0 ? NSString ( data: stderrData, encoding: NSUTF8StringEncoding) : nil )
301303 return failure ( . ShellTaskFailed( exitCode: terminationStatus, standardError: errorString) )
302304 }
303305 }
0 commit comments