Skip to content

Commit dacfe93

Browse files
authored
Merge pull request #505 from ReactiveCocoa/anders/migrate-operators
Improve TransformerCore and move `take(first:)`.
2 parents ea5fd45 + 323a13d commit dacfe93

File tree

4 files changed

+91
-53
lines changed

4 files changed

+91
-53
lines changed

Sources/Event.swift

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ extension Signal.Event: EventProtocol {
180180
}
181181

182182
extension Signal.Event {
183-
internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> (@escaping Signal<Value, Error>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void {
183+
internal typealias Transformation<U, E: Swift.Error> = (@escaping Signal<U, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void
184+
185+
internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
184186
return { action in
185187
return { event in
186188
switch event {
@@ -202,7 +204,7 @@ extension Signal.Event {
202204
}
203205
}
204206

205-
internal static func filterMap<U>(_ transform: @escaping (Value) -> U?) -> (@escaping Signal<U, Error>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void {
207+
internal static func filterMap<U>(_ transform: @escaping (Value) -> U?) -> Transformation<U, Error> {
206208
return { action in
207209
return { event in
208210
switch event {
@@ -224,7 +226,7 @@ extension Signal.Event {
224226
}
225227
}
226228

227-
internal static func map<U>(_ transform: @escaping (Value) -> U) -> (@escaping Signal<U, Error>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void {
229+
internal static func map<U>(_ transform: @escaping (Value) -> U) -> Transformation<U, Error> {
228230
return { action in
229231
return { event in
230232
switch event {
@@ -244,7 +246,7 @@ extension Signal.Event {
244246
}
245247
}
246248

247-
internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> (@escaping Signal<Value, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void {
249+
internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> Transformation<Value, E> {
248250
return { action in
249251
return { event in
250252
switch event {
@@ -263,4 +265,28 @@ extension Signal.Event {
263265
}
264266
}
265267
}
268+
269+
internal static func take(first count: Int) -> Transformation<Value, Error> {
270+
assert(count >= 1)
271+
272+
return { action in
273+
var taken = 0
274+
275+
return { event in
276+
guard let value = event.value else {
277+
action(event)
278+
return
279+
}
280+
281+
if taken < count {
282+
taken += 1
283+
action(.value(value))
284+
}
285+
286+
if taken == count {
287+
action(.completed)
288+
}
289+
}
290+
}
291+
}
266292
}

Sources/Observer.swift

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,27 @@ extension Signal {
3030
/// - parameters:
3131
/// - observer: The observer to transform.
3232
/// - transform: The transform.
33-
internal init<U, E: Swift.Error>(_ observer: Signal<U, E>.Observer, _ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> Action) {
34-
self.action = transform(observer.action)
33+
/// - disposable: The disposable to be disposed of when the `TransformerCore`
34+
/// yields any terminal event. If `observer` is a `Signal` input
35+
/// observer, this can be omitted.
36+
internal init<U, E: Swift.Error>(
37+
_ observer: Signal<U, E>.Observer,
38+
_ transform: @escaping Event.Transformation<U, E>,
39+
_ disposable: Disposable? = nil
40+
) {
41+
var hasDeliveredTerminalEvent = false
42+
43+
self.action = transform { event in
44+
if !hasDeliveredTerminalEvent {
45+
observer.action(event)
46+
47+
if event.isTerminating {
48+
hasDeliveredTerminalEvent = true
49+
disposable?.dispose()
50+
}
51+
}
52+
}
53+
3554
self.wrapped = observer.interruptsOnDeinit ? observer : nil
3655
self.interruptsOnDeinit = false
3756
}

Sources/Signal.swift

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -537,9 +537,9 @@ extension Signal {
537537
/// closure.
538538
///
539539
/// - returns: A signal that forwards events yielded by the action.
540-
internal func flatMapEvent<U, E>(_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> (Event) -> Void) -> Signal<U, E> {
540+
internal func flatMapEvent<U, E>(_ transform: @escaping Event.Transformation<U, E>) -> Signal<U, E> {
541541
return Signal<U, E> { observer in
542-
return self.observe(.init(observer, transform))
542+
return self.observe(Signal.Observer(observer, transform))
543543
}
544544
}
545545

@@ -642,31 +642,8 @@ extension Signal {
642642
/// - returns: A signal that will yield the first `count` values from `self`
643643
public func take(first count: Int) -> Signal<Value, Error> {
644644
precondition(count >= 0)
645-
646-
return Signal { observer in
647-
if count == 0 {
648-
observer.sendCompleted()
649-
return nil
650-
}
651-
652-
var taken = 0
653-
654-
return self.observe { event in
655-
guard let value = event.value else {
656-
observer.action(event)
657-
return
658-
}
659-
660-
if taken < count {
661-
taken += 1
662-
observer.send(value: value)
663-
}
664-
665-
if taken == count {
666-
observer.sendCompleted()
667-
}
668-
}
669-
}
645+
guard count >= 1 else { return .empty }
646+
return flatMapEvent(Signal.Event.take(first: count))
670647
}
671648
}
672649

Sources/SignalProducer.swift

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public struct SignalProducer<Value, Error: Swift.Error> {
7070
let interruptHandle = AnyDisposable(observer.sendInterrupted)
7171

7272
return SignalProducerCore.Instance(signal: signal,
73-
observerDidSetup: observerDidSetup,
74-
interruptHandle: interruptHandle)
73+
observerDidSetup: observerDidSetup,
74+
interruptHandle: interruptHandle)
7575
})
7676
}
7777

@@ -192,12 +192,17 @@ public struct SignalProducer<Value, Error: Swift.Error> {
192192
self.init([ first, second ] + tail)
193193
}
194194

195-
/// A producer for a Signal that will immediately complete without sending
196-
/// any values.
195+
/// A producer for a Signal that immediately completes without sending any values.
197196
public static var empty: SignalProducer {
198197
return SignalProducer(GeneratorCore { observer, _ in observer.sendCompleted() })
199198
}
200199

200+
/// A producer for a Signal that immediately interrupts when started, without
201+
/// sending any values.
202+
internal static var interrupted: SignalProducer {
203+
return SignalProducer(GeneratorCore { observer, _ in observer.sendInterrupted() })
204+
}
205+
201206
/// A producer for a Signal that never sends any events to its observers.
202207
public static var never: SignalProducer {
203208
return self.init { observer, lifetime in
@@ -243,7 +248,17 @@ internal class SignalProducerCore<Value, Error: Swift.Error> {
243248
fatalError()
244249
}
245250

246-
func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
251+
/// Start the producer with an observer created by the given generator.
252+
///
253+
/// The created observer **must** manaully dispose of the given upstream interrupt
254+
/// handle iff it performs any event transformation that might result in a terminal
255+
/// event.
256+
///
257+
/// - parameters:
258+
/// - generator: The closure to generate an observer.
259+
///
260+
/// - returns: A disposable to interrupt the started producer instance.
261+
func start(_ generator: (_ upstreamInterruptHandle: Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
247262
fatalError()
248263
}
249264

@@ -257,7 +272,7 @@ internal class SignalProducerCore<Value, Error: Swift.Error> {
257272
/// closure.
258273
///
259274
/// - returns: A producer that forwards events yielded by the action.
260-
internal func flatMapEvent<U, E>(_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void) -> SignalProducer<U, E> {
275+
internal func flatMapEvent<U, E>(_ transform: @escaping Signal<Value, Error>.Event.Transformation<U, E>) -> SignalProducer<U, E> {
261276
return SignalProducer<U, E>(TransformerCore(source: self, transform: transform))
262277
}
263278
}
@@ -269,9 +284,9 @@ private final class SignalCore<Value, Error: Swift.Error>: SignalProducerCore<Va
269284
self._make = action
270285
}
271286

272-
override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
287+
override func start(_ generator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
273288
let instance = makeInstance()
274-
instance.signal.observe(observer)
289+
instance.signal.observe(generator(instance.interruptHandle))
275290
instance.observerDidSetup()
276291
return instance.interruptHandle
277292
}
@@ -301,18 +316,18 @@ private final class SignalCore<Value, Error: Swift.Error>: SignalProducerCore<Va
301316
/// - note: This core does not use `Signal` unless it is requested via `makeInstance()`.
302317
private final class TransformerCore<Value, Error: Swift.Error, SourceValue, SourceError: Swift.Error>: SignalProducerCore<Value, Error> {
303318
private let source: SignalProducerCore<SourceValue, SourceError>
304-
private let transform: (@escaping Signal<Value, Error>.Observer.Action) -> (Signal<SourceValue, SourceError>.Event) -> Void
319+
private let transform: Signal<SourceValue, SourceError>.Event.Transformation<Value, Error>
305320

306-
init(source: SignalProducerCore<SourceValue, SourceError>, transform: @escaping (@escaping Signal<Value, Error>.Observer.Action) -> (Signal<SourceValue, SourceError>.Event) -> Void) {
321+
init(source: SignalProducerCore<SourceValue, SourceError>, transform: @escaping Signal<SourceValue, SourceError>.Event.Transformation<Value, Error>) {
307322
self.source = source
308323
self.transform = transform
309324
}
310325

311-
internal override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
312-
return source.start(.init(observer, transform))
326+
internal override func start(_ generator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
327+
return source.start { Signal.Observer(generator($0), transform, $0) }
313328
}
314329

315-
internal override func flatMapEvent<U, E>(_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void) -> SignalProducer<U, E> {
330+
internal override func flatMapEvent<U, E>(_ transform: @escaping Signal<Value, Error>.Event.Transformation<U, E>) -> SignalProducer<U, E> {
316331
return SignalProducer<U, E>(TransformerCore<U, E, SourceValue, SourceError>(source: source) { [innerTransform = self.transform] action in
317332
return innerTransform(transform(action))
318333
})
@@ -321,12 +336,12 @@ private final class TransformerCore<Value, Error: Swift.Error, SourceValue, Sour
321336
internal override func makeInstance() -> Instance {
322337
let product = source.makeInstance()
323338
let signal = Signal<Value, Error> { observer in
324-
return product.signal.observe(.init(observer, transform))
339+
return product.signal.observe(Signal.Observer(observer, transform))
325340
}
326341

327342
return Instance(signal: signal,
328-
observerDidSetup: product.observerDidSetup,
329-
interruptHandle: product.interruptHandle)
343+
observerDidSetup: product.observerDidSetup,
344+
interruptHandle: product.interruptHandle)
330345
}
331346
}
332347

@@ -347,11 +362,11 @@ private final class GeneratorCore<Value, Error: Swift.Error>: SignalProducerCore
347362
self.generator = generator
348363
}
349364

350-
internal override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
365+
internal override func start(_ observerGenerator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
351366
// Object allocation is a considerable overhead. So unless the core is configured
352367
// to be disposable, we would reuse the already-disposed, shared `NopDisposable`.
353368
let d: Disposable = isDisposable ? _SimpleDisposable() : NopDisposable.shared
354-
generator(observer, d)
369+
generator(observerGenerator(d), d)
355370
return d
356371
}
357372

@@ -469,7 +484,7 @@ extension SignalProducer {
469484
/// - returns: A disposable to interrupt the produced `Signal`.
470485
@discardableResult
471486
public func start(_ observer: Signal<Value, Error>.Observer = .init()) -> Disposable {
472-
return core.start(observer)
487+
return core.start { _ in observer }
473488
}
474489

475490
/// Create a `Signal` from `self`, and observe the `Signal` for all events
@@ -864,7 +879,8 @@ extension SignalProducer {
864879
/// - returns: A producer that, when started, will yield the first `count`
865880
/// values from `self`.
866881
public func take(first count: Int) -> SignalProducer<Value, Error> {
867-
return lift { $0.take(first: count) }
882+
guard count >= 1 else { return .interrupted }
883+
return core.flatMapEvent(Signal.Event.take(first: count))
868884
}
869885

870886
/// Yield an array of values when `self` completes.

0 commit comments

Comments
 (0)