Skip to content

Commit 56a5a3a

Browse files
authored
ObserveOn, LazyMap, Delay (#815)
1 parent 70ad4da commit 56a5a3a

File tree

6 files changed

+264
-82
lines changed

6 files changed

+264
-82
lines changed

ReactiveSwift.xcodeproj/project.pbxproj

Lines changed: 97 additions & 0 deletions
Large diffs are not rendered by default.

Sources/Event.swift

Lines changed: 6 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -405,96 +405,20 @@ extension Signal.Event {
405405
}
406406

407407
internal static func observe(on scheduler: Scheduler) -> Transformation<Value, Error> {
408-
return { action, lifetime in
409-
lifetime.observeEnded {
410-
scheduler.schedule {
411-
action(.interrupted)
412-
}
413-
}
414-
415-
return Signal.Observer { event in
416-
scheduler.schedule {
417-
if !lifetime.hasEnded {
418-
action(event)
419-
}
420-
}
421-
}
408+
return { downstream, lifetime in
409+
Operators.ObserveOn(downstream: downstream, downstreamLifetime: lifetime, target: scheduler)
422410
}
423411
}
424412

425413
internal static func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Transformation<U, Error> {
426-
return { action, lifetime in
427-
let box = Atomic<Value?>(nil)
428-
let completionDisposable = SerialDisposable()
429-
let valueDisposable = SerialDisposable()
430-
431-
lifetime += valueDisposable
432-
lifetime += completionDisposable
433-
434-
lifetime.observeEnded {
435-
scheduler.schedule {
436-
action(.interrupted)
437-
}
438-
}
439-
440-
return Signal.Observer { event in
441-
switch event {
442-
case let .value(value):
443-
// Schedule only when there is no prior outstanding value.
444-
if box.swap(value) == nil {
445-
valueDisposable.inner = scheduler.schedule {
446-
if let value = box.swap(nil) {
447-
action(.value(transform(value)))
448-
}
449-
}
450-
}
451-
452-
case .completed, .failed:
453-
// Completion and failure should not discard the outstanding
454-
// value.
455-
completionDisposable.inner = scheduler.schedule {
456-
action(event.map(transform))
457-
}
458-
459-
case .interrupted:
460-
// `interrupted` overrides any outstanding value and any
461-
// scheduled completion/failure.
462-
valueDisposable.dispose()
463-
completionDisposable.dispose()
464-
scheduler.schedule {
465-
action(.interrupted)
466-
}
467-
}
468-
}
414+
return { downstream, lifetime in
415+
Operators.LazyMap(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, transform: transform)
469416
}
470417
}
471418

472419
internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
473-
precondition(interval >= 0)
474-
475-
return { action, lifetime in
476-
lifetime.observeEnded {
477-
scheduler.schedule {
478-
action(.interrupted)
479-
}
480-
}
481-
482-
return Signal.Observer { event in
483-
switch event {
484-
case .failed, .interrupted:
485-
scheduler.schedule {
486-
action(event)
487-
}
488-
489-
case .value, .completed:
490-
let date = scheduler.currentDate.addingTimeInterval(interval)
491-
scheduler.schedule(after: date) {
492-
if !lifetime.hasEnded {
493-
action(event)
494-
}
495-
}
496-
}
497-
}
420+
return { downstream, lifetime in
421+
Operators.Delay(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, interval: interval)
498422
}
499423
}
500424

Sources/Observers/Delay.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import Foundation
2+
3+
extension Operators {
4+
internal final class Delay<Value, Error: Swift.Error>: UnaryAsyncOperator<Value, Value, Error> {
5+
let interval: TimeInterval
6+
let targetWithClock: DateScheduler
7+
8+
init(
9+
downstream: Observer<Value, Error>,
10+
downstreamLifetime: Lifetime,
11+
target: DateScheduler,
12+
interval: TimeInterval
13+
) {
14+
precondition(interval >= 0)
15+
16+
self.interval = interval
17+
self.targetWithClock = target
18+
super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target)
19+
}
20+
21+
22+
override func receive(_ value: Value) {
23+
guard isActive else { return }
24+
25+
targetWithClock.schedule(after: computeNextDate()) {
26+
self.unscheduledSend(value)
27+
}
28+
}
29+
30+
override func terminate(_ termination: Termination<Error>) {
31+
if case .completed = termination {
32+
targetWithClock.schedule(after: computeNextDate()) {
33+
super.terminate(.completed)
34+
}
35+
} else {
36+
super.terminate(termination)
37+
}
38+
}
39+
40+
private func computeNextDate() -> Date {
41+
targetWithClock.currentDate.addingTimeInterval(interval)
42+
}
43+
}
44+
}

Sources/Observers/LazyMap.swift

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
extension Operators {
2+
internal final class LazyMap<Value, NewValue, Error: Swift.Error>: UnaryAsyncOperator<Value, NewValue, Error> {
3+
let transform: (Value) -> NewValue
4+
let box = Atomic<Value?>(nil)
5+
let valueDisposable = SerialDisposable()
6+
7+
init(
8+
downstream: Observer<NewValue, Error>,
9+
downstreamLifetime: Lifetime,
10+
target: Scheduler,
11+
transform: @escaping (Value) -> NewValue
12+
) {
13+
self.transform = transform
14+
super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target)
15+
16+
downstreamLifetime += valueDisposable
17+
}
18+
19+
override func receive(_ value: Value) {
20+
// Schedule only when there is no prior outstanding value.
21+
if box.swap(value) == nil {
22+
valueDisposable.inner = target.schedule {
23+
if let value = self.box.swap(nil) {
24+
self.unscheduledSend(self.transform(value))
25+
}
26+
}
27+
}
28+
}
29+
30+
override func terminate(_ termination: Termination<Error>) {
31+
if case .interrupted = termination {
32+
// `interrupted` immediately cancels any scheduled value.
33+
//
34+
// On the other hand, completion and failure does not cancel anything, and is scheduled to run after any
35+
// scheduled value. `valueDisposable` will naturally be disposed by `downstreamLifetime` as soon as the
36+
// downstream has processed the termination.
37+
valueDisposable.dispose()
38+
}
39+
40+
super.terminate(termination)
41+
}
42+
}
43+
}

Sources/Observers/ObserveOn.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
extension Operators {
2+
internal final class ObserveOn<Value, Error: Swift.Error>: UnaryAsyncOperator<Value, Value, Error> {
3+
override func receive(_ value: Value) {
4+
target.schedule {
5+
guard !self.downstreamLifetime.hasEnded else { return }
6+
self.unscheduledSend(value)
7+
}
8+
}
9+
}
10+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
internal class UnaryAsyncOperator<InputValue, OutputValue, Error: Swift.Error>: Observer<InputValue, Error> {
2+
let downstreamLifetime: Lifetime
3+
let target: Scheduler
4+
5+
/// Whether or not the downstream observer can still receive values or termination.
6+
///
7+
/// - note: This is a thread-safe atomic read. So you can use it in any part of `receive(_:)` or `terminate(_:)` to
8+
/// attempt to early out before expensive scheduling or computation.
9+
var isActive: Bool { state.is(.active) }
10+
11+
// Direct access is discouraged for subclasses by keeping this private.
12+
private let downstream: Observer<OutputValue, Error>
13+
private let state: UnsafeAtomicState<AsyncOperatorState>
14+
15+
public init(
16+
downstream: Observer<OutputValue, Error>,
17+
downstreamLifetime: Lifetime,
18+
target: Scheduler
19+
) {
20+
self.downstream = downstream
21+
self.downstreamLifetime = downstreamLifetime
22+
self.target = target
23+
self.state = UnsafeAtomicState(.active)
24+
25+
super.init()
26+
27+
downstreamLifetime.observeEnded {
28+
if self.state.tryTransition(from: .active, to: .terminated) {
29+
target.schedule {
30+
downstream.terminate(.interrupted)
31+
}
32+
}
33+
}
34+
}
35+
36+
deinit {
37+
state.deinitialize()
38+
}
39+
40+
open override func receive(_ value: InputValue) { fatalError() }
41+
42+
/// Send a value to the downstream without any implicit scheduling on `target`.
43+
///
44+
/// - important: Subclasses must invoke this only after having hopped onto the target scheduler.
45+
final func unscheduledSend(_ value: OutputValue) {
46+
downstream.receive(value)
47+
}
48+
49+
open override func terminate(_ termination: Termination<Error>) {
50+
// The atomic transition here must happen **after** we hop onto the target scheduler. This is to preserve the timing
51+
// behaviour observed in previous versions of ReactiveSwift.
52+
53+
target.schedule {
54+
if self.state.tryTransition(from: .active, to: .terminated) {
55+
self.downstream.terminate(termination)
56+
}
57+
}
58+
}
59+
}
60+
61+
private enum AsyncOperatorState: Int32 {
62+
case active
63+
case terminated
64+
}

0 commit comments

Comments
 (0)