Skip to content

Commit 93f4f91

Browse files
committed
Merge pull request #44 from Carthage/data-cleanup
Clean up data propogation
2 parents 8f0ea70 + d7d2916 commit 93f4f91

File tree

1 file changed

+20
-78
lines changed

1 file changed

+20
-78
lines changed

ReactiveTask/Task.swift

Lines changed: 20 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private final class Pipe {
140140
///
141141
/// After starting the returned producer, `readFD` should not be used
142142
/// anywhere else, as it may close unexpectedly.
143-
func transferReadsToProducer() -> SignalProducer<dispatch_data_t, TaskError> {
143+
func transferReadsToProducer() -> SignalProducer<NSData, TaskError> {
144144
return SignalProducer { observer, disposable in
145145
dispatch_group_enter(self.group)
146146
let channel = dispatch_io_create(DISPATCH_IO_STREAM, self.readFD, self.queue) { error in
@@ -159,7 +159,7 @@ private final class Pipe {
159159
dispatch_io_set_low_water(channel, 1)
160160
dispatch_io_read(channel, 0, Int.max, self.queue) { (done, data, error) in
161161
if let data = data {
162-
observer.sendNext(data)
162+
observer.sendNext(data as! NSData)
163163
}
164164

165165
if error == ECANCELED {
@@ -230,60 +230,6 @@ private final class Pipe {
230230
}
231231
}
232232

233-
/// Sent when reading from a pipe.
234-
private enum ReadData {
235-
/// A chunk of data, sent as soon as it is received.
236-
case Chunk(NSData)
237-
238-
/// The aggregate of all data sent so far, sent right before completion.
239-
///
240-
/// No further chunks will occur after this has been sent.
241-
case Aggregated(NSData)
242-
243-
/// Convenience constructor for a `Chunk` from `dispatch_data_t`.
244-
static func chunk(data: dispatch_data_t) -> ReadData {
245-
return .Chunk(data as! NSData)
246-
}
247-
248-
/// Convenience constructor for an `Aggregated` from `dispatch_data_t`.
249-
static func aggregated(data: dispatch_data_t?) -> ReadData {
250-
if let data = data {
251-
return .Aggregated(data as! NSData)
252-
} else {
253-
return .Aggregated(NSData())
254-
}
255-
}
256-
}
257-
258-
/// Takes ownership of the read handle from the given pipe, then sends
259-
/// `ReadData` values for all data read.
260-
private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer<ReadData, TaskError> {
261-
let readProducer = pipe.transferReadsToProducer()
262-
263-
return SignalProducer { observer, disposable in
264-
var buffer: dispatch_data_t? = nil
265-
266-
readProducer.startWithSignal { signal, signalDisposable in
267-
disposable.addDisposable(signalDisposable)
268-
269-
signal.observe(Observer(next: { data in
270-
observer.sendNext(.chunk(data))
271-
272-
if let existingBuffer = buffer {
273-
buffer = dispatch_data_create_concat(existingBuffer, data)
274-
} else {
275-
buffer = data
276-
}
277-
}, failed: observer.sendFailed
278-
, completed: {
279-
observer.sendNext(.aggregated(buffer))
280-
observer.sendCompleted()
281-
}, interrupted: observer.sendInterrupted
282-
))
283-
}
284-
}
285-
}
286-
287233
public protocol TaskEventType {
288234
/// The type of value embedded in a `Success` event.
289235
typealias T
@@ -473,8 +419,8 @@ public func launchTask(taskDescription: Task, standardInput: SignalProducer<NSDa
473419

474420
SignalProducer(result: Pipe.create(queue, group) &&& Pipe.create(queue, group))
475421
.flatMap(.Merge) { stdoutPipe, stderrPipe -> SignalProducer<TaskEvent<NSData>, TaskError> in
476-
let stdoutProducer = aggregateDataReadFromPipe(stdoutPipe)
477-
let stderrProducer = aggregateDataReadFromPipe(stderrPipe)
422+
let stdoutProducer = stdoutPipe.transferReadsToProducer()
423+
let stderrProducer = stderrPipe.transferReadsToProducer()
478424

479425
return SignalProducer { observer, disposable in
480426
let (stdoutAggregated, stdoutAggregatedObserver) = SignalProducer<NSData, TaskError>.buffer(1)
@@ -483,38 +429,34 @@ public func launchTask(taskDescription: Task, standardInput: SignalProducer<NSDa
483429
stdoutProducer.startWithSignal { signal, signalDisposable in
484430
disposable += signalDisposable
485431

486-
signal.observe(Observer(next: { readData in
487-
switch readData {
488-
case let .Chunk(data):
489-
observer.sendNext(.StandardOutput(data))
490-
491-
case let .Aggregated(data):
492-
stdoutAggregatedObserver.sendNext(data)
493-
}
432+
let aggregate = NSMutableData()
433+
signal.observe(Observer(next: { data in
434+
observer.sendNext(.StandardOutput(data))
435+
aggregate.appendData(data)
494436
}, failed: { error in
495437
observer.sendFailed(error)
496438
stdoutAggregatedObserver.sendFailed(error)
497-
}, completed: stdoutAggregatedObserver.sendCompleted
498-
, interrupted: stdoutAggregatedObserver.sendInterrupted
439+
}, completed: {
440+
stdoutAggregatedObserver.sendNext(aggregate)
441+
stdoutAggregatedObserver.sendCompleted()
442+
}, interrupted: stdoutAggregatedObserver.sendInterrupted
499443
))
500444
}
501445

502446
stderrProducer.startWithSignal { signal, signalDisposable in
503447
disposable += signalDisposable
504448

505-
signal.observe(Observer(next: { readData in
506-
switch readData {
507-
case let .Chunk(data):
508-
observer.sendNext(.StandardError(data))
509-
510-
case let .Aggregated(data):
511-
stderrAggregatedObserver.sendNext(data)
512-
}
449+
let aggregate = NSMutableData()
450+
signal.observe(Observer(next: { data in
451+
observer.sendNext(.StandardError(data))
452+
aggregate.appendData(data)
513453
}, failed: { error in
514454
observer.sendFailed(error)
515455
stderrAggregatedObserver.sendFailed(error)
516-
}, completed: stderrAggregatedObserver.sendCompleted
517-
, interrupted: stderrAggregatedObserver.sendInterrupted
456+
}, completed: {
457+
stderrAggregatedObserver.sendNext(aggregate)
458+
stderrAggregatedObserver.sendCompleted()
459+
}, interrupted: stderrAggregatedObserver.sendInterrupted
518460
))
519461
}
520462

0 commit comments

Comments
 (0)