Skip to content

Commit 6feb366

Browse files
committed
Move take(first:).
1 parent ea5fd45 commit 6feb366

File tree

4 files changed

+80
-44
lines changed

4 files changed

+80
-44
lines changed

Sources/Event.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,28 @@ extension Signal.Event {
263263
}
264264
}
265265
}
266+
267+
internal static func take(first count: Int) -> (@escaping Signal<Value, Error>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void {
268+
assert(count >= 1)
269+
270+
return { action in
271+
var taken = 0
272+
273+
return { event in
274+
guard let value = event.value else {
275+
action(event)
276+
return
277+
}
278+
279+
if taken < count {
280+
taken += 1
281+
action(.value(value))
282+
}
283+
284+
if taken == count {
285+
action(.completed)
286+
}
287+
}
288+
}
289+
}
266290
}

Sources/Observer.swift

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,27 @@ extension Signal {
3030
/// - parameters:
3131
/// - observer: The observer to transform.
3232
/// - transform: The transform.
33-
internal init<U, E: Swift.Error>(_ observer: Signal<U, E>.Observer, _ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> Action) {
34-
self.action = transform(observer.action)
33+
/// - disposable: The disposable to be disposed of when the transformation
34+
/// yields any terminal event that is not originated from the
35+
/// upstream.
36+
internal init<U, E: Swift.Error>(
37+
_ observer: Signal<U, E>.Observer,
38+
_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> Action,
39+
_ disposable: Disposable
40+
) {
41+
var hasDeliveredTerminalEvent = false
42+
43+
self.action = transform { event in
44+
if !hasDeliveredTerminalEvent {
45+
observer.action(event)
46+
47+
if event.isTerminating {
48+
hasDeliveredTerminalEvent = true
49+
disposable.dispose()
50+
}
51+
}
52+
}
53+
3554
self.wrapped = observer.interruptsOnDeinit ? observer : nil
3655
self.interruptsOnDeinit = false
3756
}

Sources/Signal.swift

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ extension Signal {
539539
/// - returns: A signal that forwards events yielded by the action.
540540
internal func flatMapEvent<U, E>(_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> (Event) -> Void) -> Signal<U, E> {
541541
return Signal<U, E> { observer in
542-
return self.observe(.init(observer, transform))
542+
return self.observe(.init(observer, transform, NopDisposable.shared))
543543
}
544544
}
545545

@@ -642,31 +642,8 @@ extension Signal {
642642
/// - returns: A signal that will yield the first `count` values from `self`
643643
public func take(first count: Int) -> Signal<Value, Error> {
644644
precondition(count >= 0)
645-
646-
return Signal { observer in
647-
if count == 0 {
648-
observer.sendCompleted()
649-
return nil
650-
}
651-
652-
var taken = 0
653-
654-
return self.observe { event in
655-
guard let value = event.value else {
656-
observer.action(event)
657-
return
658-
}
659-
660-
if taken < count {
661-
taken += 1
662-
observer.send(value: value)
663-
}
664-
665-
if taken == count {
666-
observer.sendCompleted()
667-
}
668-
}
669-
}
645+
guard count >= 1 else { return .empty }
646+
return flatMapEvent(Signal.Event.take(first: count))
670647
}
671648
}
672649

Sources/SignalProducer.swift

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public struct SignalProducer<Value, Error: Swift.Error> {
7070
let interruptHandle = AnyDisposable(observer.sendInterrupted)
7171

7272
return SignalProducerCore.Instance(signal: signal,
73-
observerDidSetup: observerDidSetup,
74-
interruptHandle: interruptHandle)
73+
observerDidSetup: observerDidSetup,
74+
interruptHandle: interruptHandle)
7575
})
7676
}
7777

@@ -192,12 +192,17 @@ public struct SignalProducer<Value, Error: Swift.Error> {
192192
self.init([ first, second ] + tail)
193193
}
194194

195-
/// A producer for a Signal that will immediately complete without sending
196-
/// any values.
195+
/// A producer for a Signal that immediately completes without sending any values.
197196
public static var empty: SignalProducer {
198197
return SignalProducer(GeneratorCore { observer, _ in observer.sendCompleted() })
199198
}
200199

200+
/// A producer for a Signal that immediately interrupts when started, without
201+
/// sending any values.
202+
internal static var interrupted: SignalProducer {
203+
return SignalProducer(GeneratorCore { observer, _ in observer.sendInterrupted() })
204+
}
205+
201206
/// A producer for a Signal that never sends any events to its observers.
202207
public static var never: SignalProducer {
203208
return self.init { observer, lifetime in
@@ -243,7 +248,17 @@ internal class SignalProducerCore<Value, Error: Swift.Error> {
243248
fatalError()
244249
}
245250

246-
func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
251+
/// Start the producer with an observer created by the given generator.
252+
///
253+
/// The created observer **must** manaully dispose of the given upstream interrupt
254+
/// handle iff it performs any event transformation that might result in a terminal
255+
/// event.
256+
///
257+
/// - parameters:
258+
/// - generator: The closure to generate an observer.
259+
///
260+
/// - returns: A disposable to interrupt the started producer instance.
261+
func start(_ generator: (_ upstreamInterruptHandle: Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
247262
fatalError()
248263
}
249264

@@ -269,9 +284,9 @@ private final class SignalCore<Value, Error: Swift.Error>: SignalProducerCore<Va
269284
self._make = action
270285
}
271286

272-
override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
287+
override func start(_ generator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
273288
let instance = makeInstance()
274-
instance.signal.observe(observer)
289+
instance.signal.observe(generator(instance.interruptHandle))
275290
instance.observerDidSetup()
276291
return instance.interruptHandle
277292
}
@@ -308,8 +323,8 @@ private final class TransformerCore<Value, Error: Swift.Error, SourceValue, Sour
308323
self.transform = transform
309324
}
310325

311-
internal override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
312-
return source.start(.init(observer, transform))
326+
internal override func start(_ generator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
327+
return source.start { Signal.Observer(generator($0), transform, $0) }
313328
}
314329

315330
internal override func flatMapEvent<U, E>(_ transform: @escaping (@escaping Signal<U, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void) -> SignalProducer<U, E> {
@@ -321,12 +336,12 @@ private final class TransformerCore<Value, Error: Swift.Error, SourceValue, Sour
321336
internal override func makeInstance() -> Instance {
322337
let product = source.makeInstance()
323338
let signal = Signal<Value, Error> { observer in
324-
return product.signal.observe(.init(observer, transform))
339+
return product.signal.observe(.init(observer, transform, product.interruptHandle))
325340
}
326341

327342
return Instance(signal: signal,
328-
observerDidSetup: product.observerDidSetup,
329-
interruptHandle: product.interruptHandle)
343+
observerDidSetup: product.observerDidSetup,
344+
interruptHandle: product.interruptHandle)
330345
}
331346
}
332347

@@ -347,11 +362,11 @@ private final class GeneratorCore<Value, Error: Swift.Error>: SignalProducerCore
347362
self.generator = generator
348363
}
349364

350-
internal override func start(_ observer: Signal<Value, Error>.Observer) -> Disposable {
365+
internal override func start(_ observerGenerator: (Disposable) -> Signal<Value, Error>.Observer) -> Disposable {
351366
// Object allocation is a considerable overhead. So unless the core is configured
352367
// to be disposable, we would reuse the already-disposed, shared `NopDisposable`.
353368
let d: Disposable = isDisposable ? _SimpleDisposable() : NopDisposable.shared
354-
generator(observer, d)
369+
generator(observerGenerator(d), d)
355370
return d
356371
}
357372

@@ -469,7 +484,7 @@ extension SignalProducer {
469484
/// - returns: A disposable to interrupt the produced `Signal`.
470485
@discardableResult
471486
public func start(_ observer: Signal<Value, Error>.Observer = .init()) -> Disposable {
472-
return core.start(observer)
487+
return core.start { _ in observer }
473488
}
474489

475490
/// Create a `Signal` from `self`, and observe the `Signal` for all events
@@ -864,7 +879,8 @@ extension SignalProducer {
864879
/// - returns: A producer that, when started, will yield the first `count`
865880
/// values from `self`.
866881
public func take(first count: Int) -> SignalProducer<Value, Error> {
867-
return lift { $0.take(first: count) }
882+
guard count >= 1 else { return .interrupted }
883+
return core.flatMapEvent(Signal.Event.take(first: count))
868884
}
869885

870886
/// Yield an array of values when `self` completes.

0 commit comments

Comments
 (0)