Skip to content

Commit 89b6819

Browse files
committed
Handle recent changes in the ReactiveCocoa API affecting the use of observe() and start()
1 parent efa21a8 commit 89b6819

File tree

1 file changed

+69
-55
lines changed

1 file changed

+69
-55
lines changed

ReactiveTask/Task.swift

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -184,21 +184,26 @@ private final class Pipe {
184184
producer.startWithSignal { signal, producerDisposable in
185185
disposable.addDisposable(producerDisposable)
186186

187-
signal.observe(next: { data in
188-
let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil)
189-
190-
dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in
191-
if error == ECANCELED {
192-
sendInterrupted(observer)
193-
} else if error != 0 {
194-
sendError(observer, .POSIXError(error))
187+
signal.observe {
188+
switch $0 {
189+
case let .Next(data):
190+
let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil)
191+
192+
dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in
193+
if error == ECANCELED {
194+
sendInterrupted(observer)
195+
} else if error != 0 {
196+
sendError(observer, .POSIXError(error))
197+
}
195198
}
199+
case .Completed:
200+
dispatch_io_close(channel, 0)
201+
case .Interrupted:
202+
sendInterrupted(observer)
203+
default:
204+
break
196205
}
197-
}, completed: {
198-
dispatch_io_close(channel, 0)
199-
}, interrupted: {
200-
sendInterrupted(observer)
201-
})
206+
}
202207
}
203208

204209
disposable.addDisposable {
@@ -244,22 +249,25 @@ private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer<ReadData, R
244249
readProducer.startWithSignal { signal, signalDisposable in
245250
disposable.addDisposable(signalDisposable)
246251

247-
signal.observe(next: { data in
248-
sendNext(observer, .chunk(data))
252+
signal.observe {
253+
switch $0 {
254+
case let .Next(data):
255+
sendNext(observer, .chunk(data))
249256

250-
if let existingBuffer = buffer {
251-
buffer = dispatch_data_create_concat(existingBuffer, data)
252-
} else {
253-
buffer = data
257+
if let existingBuffer = buffer {
258+
buffer = dispatch_data_create_concat(existingBuffer, data)
259+
} else {
260+
buffer = data
261+
}
262+
case let .Error(error):
263+
sendError(observer, error)
264+
case .Completed:
265+
sendNext(observer, .aggregated(buffer))
266+
sendCompleted(observer)
267+
case .Interrupted:
268+
sendInterrupted(observer)
254269
}
255-
}, error: { error in
256-
sendError(observer, error)
257-
}, completed: {
258-
sendNext(observer, .aggregated(buffer))
259-
sendCompleted(observer)
260-
}, interrupted: {
261-
sendInterrupted(observer)
262-
})
270+
}
263271
}
264272
}
265273
}
@@ -446,43 +454,49 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
446454
stdoutProducer.startWithSignal { signal, signalDisposable in
447455
disposable += signalDisposable
448456

449-
signal.observe(next: { readData in
450-
switch readData {
451-
case let .Chunk(data):
452-
sendNext(observer, .StandardOutput(data))
457+
signal.observe {
458+
switch $0 {
459+
case let .Next(readData):
460+
switch readData {
461+
case let .Chunk(data):
462+
sendNext(observer, .StandardOutput(data))
453463

454-
case let .Aggregated(data):
455-
sendNext(stdoutAggregatedSink, data)
464+
case let .Aggregated(data):
465+
sendNext(stdoutAggregatedSink, data)
466+
}
467+
case let .Error(error):
468+
sendError(observer, error)
469+
sendError(stdoutAggregatedSink, error)
470+
case .Completed:
471+
sendCompleted(stdoutAggregatedSink)
472+
case .Interrupted:
473+
sendInterrupted(stdoutAggregatedSink)
456474
}
457-
}, error: { error in
458-
sendError(observer, error)
459-
sendError(stdoutAggregatedSink, error)
460-
}, completed: {
461-
sendCompleted(stdoutAggregatedSink)
462-
}, interrupted: {
463-
sendInterrupted(stdoutAggregatedSink)
464-
})
475+
}
465476
}
466477

467478
stderrProducer.startWithSignal { signal, signalDisposable in
468479
disposable += signalDisposable
469480

470-
signal.observe(next: { readData in
471-
switch readData {
472-
case let .Chunk(data):
473-
sendNext(observer, .StandardError(data))
481+
signal.observe {
482+
switch $0 {
483+
case let .Next(readData):
484+
switch readData {
485+
case let .Chunk(data):
486+
sendNext(observer, .StandardError(data))
474487

475-
case let .Aggregated(data):
476-
sendNext(stderrAggregatedSink, data)
488+
case let .Aggregated(data):
489+
sendNext(stderrAggregatedSink, data)
490+
}
491+
case let .Error(error):
492+
sendError(observer, error)
493+
sendError(stderrAggregatedSink, error)
494+
case .Completed:
495+
sendCompleted(stderrAggregatedSink)
496+
case .Interrupted:
497+
sendInterrupted(stderrAggregatedSink)
477498
}
478-
}, error: { error in
479-
sendError(observer, error)
480-
sendError(stderrAggregatedSink, error)
481-
}, completed: {
482-
sendCompleted(stderrAggregatedSink)
483-
}, interrupted: {
484-
sendInterrupted(stderrAggregatedSink)
485-
})
499+
}
486500
}
487501

488502
task.standardOutput = stdoutPipe.writeHandle

0 commit comments

Comments
 (0)