@@ -192,25 +192,35 @@ private final class Pipe {
192192/// If `forwardingSink` is non-nil, each incremental piece of data will be sent
193193/// to it as data is received.
194194private func aggregateDataReadFromPipe( pipe: Pipe , forwardingSink: SinkOf < NSData > ? ) -> SignalProducer < NSData , ReactiveTaskError > {
195- return pipe. transferReadsToProducer ( )
196- |> reduce ( nil ) { ( buffer: dispatch_data_t ? , data: dispatch_data_t ) in
197- // FIXME: This should go into on(next:), but the compiler currently
198- // crashes when that's attempted.
199- forwardingSink? . put ( data as NSData )
200-
201- if let buffer = buffer {
202- return dispatch_data_create_concat ( buffer, data)
203- } else {
204- return data
205- }
206- }
207- |> map { ( data: dispatch_data_t ? ) -> NSData in
208- if let data = data {
209- return data as NSData
210- } else {
211- return NSData ( )
212- }
195+ let readProducer = pipe. transferReadsToProducer ( )
196+
197+ return SignalProducer { observer, disposable in
198+ var buffer : dispatch_data_t ? = nil
199+
200+ readProducer. startWithSignal { signal, signalDisposable in
201+ disposable. addDisposable ( signalDisposable)
202+
203+ signal. observe ( next: { data in
204+ forwardingSink? . put ( data as NSData )
205+
206+ if let existingBuffer = buffer {
207+ buffer = dispatch_data_create_concat ( existingBuffer, data)
208+ } else {
209+ buffer = data
210+ }
211+ } , error: { error in
212+ sendError ( observer, error)
213+ } , completed: {
214+ if let buffer = buffer {
215+ sendNext ( observer, buffer as NSData )
216+ } else {
217+ sendNext ( observer, NSData ( ) )
218+ }
219+
220+ sendCompleted ( observer)
221+ } )
213222 }
223+ }
214224}
215225
216226/// Launches a new shell task, using the parameters from `taskDescription`.
0 commit comments