Skip to content

Commit 34aa540

Browse files
committed
Borrow the start and observe overloads from neilpa/Rex
1 parent e8df67a commit 34aa540

File tree

3 files changed

+110
-69
lines changed

3 files changed

+110
-69
lines changed

ReactiveTask.xcodeproj/project.pbxproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
objects = {
88

99
/* Begin PBXBuildFile section */
10+
CD0E570A1BBD2A31009BB2AE /* Extensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = CD0E57091BBD2A31009BB2AE /* Extensions.swift */; settings = {ASSET_TAGS = (); }; };
1011
D02130921AF87B6500B9EC20 /* Result.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D02130911AF87B6500B9EC20 /* Result.framework */; };
1112
D0BFEA5E1A2D1E5E00E23194 /* ReactiveTask.h in Headers */ = {isa = PBXBuildFile; fileRef = D0BFEA5D1A2D1E5E00E23194 /* ReactiveTask.h */; settings = {ATTRIBUTES = (Public, ); }; };
1213
D0BFEA641A2D1E5E00E23194 /* ReactiveTask.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = D0BFEA581A2D1E5E00E23194 /* ReactiveTask.framework */; };
@@ -30,6 +31,7 @@
3031
/* End PBXContainerItemProxy section */
3132

3233
/* Begin PBXFileReference section */
34+
CD0E57091BBD2A31009BB2AE /* Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Extensions.swift; sourceTree = "<group>"; };
3335
D02130911AF87B6500B9EC20 /* Result.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = Result.framework; sourceTree = BUILT_PRODUCTS_DIR; };
3436
D0BFEA581A2D1E5E00E23194 /* ReactiveTask.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = ReactiveTask.framework; sourceTree = BUILT_PRODUCTS_DIR; };
3537
D0BFEA5C1A2D1E5E00E23194 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
@@ -111,6 +113,7 @@
111113
children = (
112114
D0BFEA5D1A2D1E5E00E23194 /* ReactiveTask.h */,
113115
D0BFEAA31A2D216600E23194 /* Errors.swift */,
116+
CD0E57091BBD2A31009BB2AE /* Extensions.swift */,
114117
D0BFEA9F1A2D212000E23194 /* Task.swift */,
115118
D0BFEA5B1A2D1E5E00E23194 /* Supporting Files */,
116119
);
@@ -321,6 +324,7 @@
321324
buildActionMask = 2147483647;
322325
files = (
323326
D0BFEAA41A2D216600E23194 /* Errors.swift in Sources */,
327+
CD0E570A1BBD2A31009BB2AE /* Extensions.swift in Sources */,
324328
D0BFEAA01A2D212000E23194 /* Task.swift in Sources */,
325329
);
326330
runOnlyForDeploymentPostprocessing = 0;

ReactiveTask/Extensions.swift

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
//
2+
// Extensions.swift
3+
// ReactiveTask
4+
//
5+
// Created by Syo Ikeda on 10/1/15.
6+
// Copyright © 2015 Carthage. All rights reserved.
7+
//
8+
9+
import ReactiveCocoa
10+
11+
extension Signal {
12+
13+
/// Bring back the `observe` overload. The `observeNext` or pattern matching
14+
/// on `observe(Event)` is still annoying in practice and more verbose. This is
15+
/// also likely to change in a later RAC 4 alpha.
16+
internal func observe(next next: (T -> ())? = nil, error: (E -> ())? = nil, completed: (() -> ())? = nil, interrupted: (() -> ())? = nil) -> Disposable? {
17+
return self.observe { (event: Event<T, E>) in
18+
switch event {
19+
case let .Next(value):
20+
next?(value)
21+
case let .Error(err):
22+
error?(err)
23+
case .Completed:
24+
completed?()
25+
case .Interrupted:
26+
interrupted?()
27+
}
28+
}
29+
}
30+
}
31+
32+
extension SignalProducer {
33+
34+
/// Bring back the `start` overload. The `startNext` or pattern matching
35+
/// on `start(Event)` is annoying in practice and more verbose. This is also
36+
/// likely to change in a later RAC 4 alpha.
37+
internal func start(next next: (T -> ())? = nil, error: (E -> ())? = nil, completed: (() -> ())? = nil, interrupted: (() -> ())? = nil) -> Disposable? {
38+
return self.start { (event: Event<T, E>) in
39+
switch event {
40+
case let .Next(value):
41+
next?(value)
42+
case let .Error(err):
43+
error?(err)
44+
case .Completed:
45+
completed?()
46+
case .Interrupted:
47+
interrupted?()
48+
}
49+
}
50+
}
51+
}

ReactiveTask/Task.swift

Lines changed: 55 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -184,26 +184,21 @@ private final class Pipe {
184184
producer.startWithSignal { signal, producerDisposable in
185185
disposable.addDisposable(producerDisposable)
186186

187-
signal.observe {
188-
switch $0 {
189-
case let .Next(data):
190-
let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil)
191-
192-
dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in
193-
if error == ECANCELED {
194-
sendInterrupted(observer)
195-
} else if error != 0 {
196-
sendError(observer, .POSIXError(error))
197-
}
187+
signal.observe(next: { data in
188+
let dispatchData = dispatch_data_create(data.bytes, data.length, self.queue, nil)
189+
190+
dispatch_io_write(channel, 0, dispatchData, self.queue) { (done, data, error) in
191+
if error == ECANCELED {
192+
sendInterrupted(observer)
193+
} else if error != 0 {
194+
sendError(observer, .POSIXError(error))
198195
}
199-
case .Completed:
200-
dispatch_io_close(channel, 0)
201-
case .Interrupted:
202-
sendInterrupted(observer)
203-
default:
204-
break
205196
}
206-
}
197+
}, completed: {
198+
dispatch_io_close(channel, 0)
199+
}, interrupted: {
200+
sendInterrupted(observer)
201+
})
207202
}
208203

209204
disposable.addDisposable {
@@ -249,25 +244,22 @@ private func aggregateDataReadFromPipe(pipe: Pipe) -> SignalProducer<ReadData, R
249244
readProducer.startWithSignal { signal, signalDisposable in
250245
disposable.addDisposable(signalDisposable)
251246

252-
signal.observe {
253-
switch $0 {
254-
case let .Next(data):
255-
sendNext(observer, .chunk(data))
247+
signal.observe(next: { data in
248+
sendNext(observer, .chunk(data))
256249

257-
if let existingBuffer = buffer {
258-
buffer = dispatch_data_create_concat(existingBuffer, data)
259-
} else {
260-
buffer = data
261-
}
262-
case let .Error(error):
263-
sendError(observer, error)
264-
case .Completed:
265-
sendNext(observer, .aggregated(buffer))
266-
sendCompleted(observer)
267-
case .Interrupted:
268-
sendInterrupted(observer)
250+
if let existingBuffer = buffer {
251+
buffer = dispatch_data_create_concat(existingBuffer, data)
252+
} else {
253+
buffer = data
269254
}
270-
}
255+
}, error: { error in
256+
sendError(observer, error)
257+
}, completed: {
258+
sendNext(observer, .aggregated(buffer))
259+
sendCompleted(observer)
260+
}, interrupted: {
261+
sendInterrupted(observer)
262+
})
271263
}
272264
}
273265
}
@@ -454,49 +446,43 @@ public func launchTask(taskDescription: TaskDescription) -> SignalProducer<TaskE
454446
stdoutProducer.startWithSignal { signal, signalDisposable in
455447
disposable += signalDisposable
456448

457-
signal.observe {
458-
switch $0 {
459-
case let .Next(readData):
460-
switch readData {
461-
case let .Chunk(data):
462-
sendNext(observer, .StandardOutput(data))
449+
signal.observe(next: { readData in
450+
switch readData {
451+
case let .Chunk(data):
452+
sendNext(observer, .StandardOutput(data))
463453

464-
case let .Aggregated(data):
465-
sendNext(stdoutAggregatedSink, data)
466-
}
467-
case let .Error(error):
468-
sendError(observer, error)
469-
sendError(stdoutAggregatedSink, error)
470-
case .Completed:
471-
sendCompleted(stdoutAggregatedSink)
472-
case .Interrupted:
473-
sendInterrupted(stdoutAggregatedSink)
454+
case let .Aggregated(data):
455+
sendNext(stdoutAggregatedSink, data)
474456
}
475-
}
457+
}, error: { error in
458+
sendError(observer, error)
459+
sendError(stdoutAggregatedSink, error)
460+
}, completed: {
461+
sendCompleted(stdoutAggregatedSink)
462+
}, interrupted: {
463+
sendInterrupted(stdoutAggregatedSink)
464+
})
476465
}
477466

478467
stderrProducer.startWithSignal { signal, signalDisposable in
479468
disposable += signalDisposable
480469

481-
signal.observe {
482-
switch $0 {
483-
case let .Next(readData):
484-
switch readData {
485-
case let .Chunk(data):
486-
sendNext(observer, .StandardError(data))
470+
signal.observe(next: { readData in
471+
switch readData {
472+
case let .Chunk(data):
473+
sendNext(observer, .StandardError(data))
487474

488-
case let .Aggregated(data):
489-
sendNext(stderrAggregatedSink, data)
490-
}
491-
case let .Error(error):
492-
sendError(observer, error)
493-
sendError(stderrAggregatedSink, error)
494-
case .Completed:
495-
sendCompleted(stderrAggregatedSink)
496-
case .Interrupted:
497-
sendInterrupted(stderrAggregatedSink)
475+
case let .Aggregated(data):
476+
sendNext(stderrAggregatedSink, data)
498477
}
499-
}
478+
}, error: { error in
479+
sendError(observer, error)
480+
sendError(stderrAggregatedSink, error)
481+
}, completed: {
482+
sendCompleted(stderrAggregatedSink)
483+
}, interrupted: {
484+
sendInterrupted(stderrAggregatedSink)
485+
})
500486
}
501487

502488
task.standardOutput = stdoutPipe.writeHandle

0 commit comments

Comments
 (0)