@@ -140,7 +140,7 @@ private final class Pipe {
140140 ///
141141 /// After starting the returned producer, `readFD` should not be used
142142 /// anywhere else, as it may close unexpectedly.
143- func transferReadsToProducer( ) -> SignalProducer < dispatch_data_t , TaskError > {
143+ func transferReadsToProducer( ) -> SignalProducer < NSData , TaskError > {
144144 return SignalProducer { observer, disposable in
145145 dispatch_group_enter ( self . group)
146146 let channel = dispatch_io_create ( DISPATCH_IO_STREAM, self . readFD, self . queue) { error in
@@ -159,7 +159,7 @@ private final class Pipe {
159159 dispatch_io_set_low_water ( channel, 1 )
160160 dispatch_io_read ( channel, 0 , Int . max, self . queue) { ( done, data, error) in
161161 if let data = data {
162- observer. sendNext ( data)
162+ observer. sendNext ( data as! NSData )
163163 }
164164
165165 if error == ECANCELED {
@@ -239,20 +239,13 @@ private enum ReadData {
239239 ///
240240 /// No further chunks will occur after this has been sent.
241241 case Aggregated( NSData )
242+ }
242243
243- /// Convenience constructor for a `Chunk` from `dispatch_data_t`.
244- static func chunk( data: dispatch_data_t ) -> ReadData {
245- return . Chunk( data as! NSData )
246- }
247-
248- /// Convenience constructor for an `Aggregated` from `dispatch_data_t`.
249- static func aggregated( data: dispatch_data_t ? ) -> ReadData {
250- if let data = data {
251- return . Aggregated( data as! NSData )
252- } else {
253- return . Aggregated( NSData ( ) )
254- }
255- }
244+ private func + ( lhs: NSData , rhs: NSData ) -> NSData {
245+ let result = NSMutableData ( )
246+ result. appendData ( lhs)
247+ result. appendData ( rhs)
248+ return result
256249}
257250
258251/// Takes ownership of the read handle from the given pipe, then sends
@@ -261,22 +254,22 @@ private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer<ReadData, T
261254 let readProducer = pipe. transferReadsToProducer ( )
262255
263256 return SignalProducer { observer, disposable in
264- var buffer : dispatch_data_t ? = nil
257+ var buffer : NSData ? = nil
265258
266259 readProducer. startWithSignal { signal, signalDisposable in
267260 disposable. addDisposable ( signalDisposable)
268261
269262 signal. observe ( Observer ( next: { data in
270- observer. sendNext ( . chunk ( data) )
263+ observer. sendNext ( . Chunk ( data) )
271264
272265 if let existingBuffer = buffer {
273- buffer = dispatch_data_create_concat ( existingBuffer, data)
266+ buffer = existingBuffer + data
274267 } else {
275268 buffer = data
276269 }
277270 } , failed: observer. sendFailed
278271 , completed: {
279- observer. sendNext ( . aggregated ( buffer) )
272+ observer. sendNext ( . Aggregated ( buffer ?? NSData ( ) ) )
280273 observer. sendCompleted ( )
281274 } , interrupted: observer. sendInterrupted
282275 ) )
0 commit comments