Skip to content

Commit ed78a0a

Browse files
authored
Implement lazyMap as an event transformation. (#581)
* Implement `lazyMap` as an event transformation. * Add interruption scheduler test cases for the five async operators.
1 parent bbd5d49 commit ed78a0a

File tree

4 files changed

+104
-11
lines changed

4 files changed

+104
-11
lines changed

Sources/Event.swift

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,53 @@ extension Signal.Event {
699699
}
700700
}
701701

702+
internal static func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Transformation<U, Error> {
703+
return { action, lifetime in
704+
let box = Atomic<Value?>(nil)
705+
let completionDisposable = SerialDisposable()
706+
let valueDisposable = SerialDisposable()
707+
708+
lifetime += valueDisposable
709+
lifetime += completionDisposable
710+
711+
lifetime.observeEnded {
712+
scheduler.schedule {
713+
action(.interrupted)
714+
}
715+
}
716+
717+
return { event in
718+
switch event {
719+
case let .value(value):
720+
// Schedule only when there is no prior outstanding value.
721+
if box.swap(value) == nil {
722+
valueDisposable.inner = scheduler.schedule {
723+
if let value = box.swap(nil) {
724+
action(.value(transform(value)))
725+
}
726+
}
727+
}
728+
729+
case .completed, .failed:
730+
// Completion and failure should not discard the outstanding
731+
// value.
732+
completionDisposable.inner = scheduler.schedule {
733+
action(event.map(transform))
734+
}
735+
736+
case .interrupted:
737+
// `interrupted` overrides any outstanding value and any
738+
// scheduled completion/failure.
739+
valueDisposable.dispose()
740+
completionDisposable.dispose()
741+
scheduler.schedule {
742+
action(.interrupted)
743+
}
744+
}
745+
}
746+
}
747+
}
748+
702749
internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
703750
precondition(interval >= 0)
704751

Sources/Signal.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,7 @@ extension Signal {
595595
/// - returns: A signal that sends values obtained using `transform` as this
596596
/// signal sends values.
597597
public func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> Signal<U, Error> {
598-
return flatMap(.latest) { value in
599-
return SignalProducer({ transform(value) })
600-
.start(on: scheduler)
601-
}
598+
return flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform))
602599
}
603600

604601
/// Preserve only values which pass the given closure.

Sources/SignalProducer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ extension SignalProducer {
895895
/// - returns: A producer that, when started, sends values obtained using
896896
/// `transform` as this producer sends values.
897897
public func lazyMap<U>(on scheduler: Scheduler, transform: @escaping (Value) -> U) -> SignalProducer<U, Error> {
898-
return lift { $0.lazyMap(on: scheduler, transform: transform) }
898+
return core.flatMapEvent(Signal.Event.lazyMap(on: scheduler, transform: transform))
899899
}
900900

901901
/// Preserve only values which pass the given closure.

Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,11 @@ class SignalProducerLiftingSpec: QuickSpec {
313313
}
314314

315315
it("should interrupt ASAP and discard outstanding events") {
316-
testAsyncASAPInterruption { $0.lazyMap(on: $1) { $0 } }
316+
testAsyncASAPInterruption(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } }
317+
}
318+
319+
it("should interrupt on the given scheduler") {
320+
testAsyncInterruptionScheduler(op: "lazyMap") { $0.lazyMap(on: $1) { $0 } }
317321
}
318322
}
319323

@@ -1133,7 +1137,11 @@ class SignalProducerLiftingSpec: QuickSpec {
11331137
}
11341138

11351139
it("should interrupt ASAP and discard outstanding events") {
1136-
testAsyncASAPInterruption { $0.observe(on: $1) }
1140+
testAsyncASAPInterruption(op: "observe(on:)") { $0.observe(on: $1) }
1141+
}
1142+
1143+
it("should interrupt on the given scheduler") {
1144+
testAsyncInterruptionScheduler(op: "observe(on:)") { $0.observe(on: $1) }
11371145
}
11381146
}
11391147

@@ -1200,7 +1208,11 @@ class SignalProducerLiftingSpec: QuickSpec {
12001208
}
12011209

12021210
it("should interrupt ASAP and discard outstanding events") {
1203-
testAsyncASAPInterruption { $0.delay(10.0, on: $1) }
1211+
testAsyncASAPInterruption(op: "delay") { $0.delay(10.0, on: $1) }
1212+
}
1213+
1214+
it("should interrupt on the given scheduler") {
1215+
testAsyncInterruptionScheduler(op: "delay") { $0.delay(10.0, on: $1) }
12041216
}
12051217
}
12061218

@@ -1298,13 +1310,21 @@ class SignalProducerLiftingSpec: QuickSpec {
12981310
}
12991311

13001312
it("should interrupt ASAP and discard outstanding events") {
1301-
testAsyncASAPInterruption { $0.throttle(10.0, on: $1) }
1313+
testAsyncASAPInterruption(op: "throttle") { $0.throttle(10.0, on: $1) }
1314+
}
1315+
1316+
it("should interrupt on the given scheduler") {
1317+
testAsyncInterruptionScheduler(op: "throttle") { $0.throttle(10.0, on: $1) }
13021318
}
13031319
}
13041320

13051321
describe("debounce") {
13061322
it("should interrupt ASAP and discard outstanding events") {
1307-
testAsyncASAPInterruption { $0.delay(10.0, on: $1) }
1323+
testAsyncASAPInterruption(op: "debounce") { $0.debounce(10.0, on: $1) }
1324+
}
1325+
1326+
it("should interrupt on the given scheduler") {
1327+
testAsyncInterruptionScheduler(op: "debounce") { $0.debounce(10.0, on: $1) }
13081328
}
13091329
}
13101330

@@ -2036,7 +2056,36 @@ class SignalProducerLiftingSpec: QuickSpec {
20362056
}
20372057
}
20382058

2059+
private func testAsyncInterruptionScheduler(
2060+
op: String,
2061+
file: FileString = #file,
2062+
line: UInt = #line,
2063+
transform: (SignalProducer<Int, NoError>, TestScheduler) -> SignalProducer<Int, NoError>
2064+
) {
2065+
var isInterrupted = false
2066+
2067+
let scheduler = TestScheduler()
2068+
let producer = transform(SignalProducer(0 ..< 128), scheduler)
2069+
2070+
let failedExpectations = gatherFailingExpectations {
2071+
let disposable = producer.startWithInterrupted { isInterrupted = true }
2072+
expect(isInterrupted) == false
2073+
2074+
disposable.dispose()
2075+
expect(isInterrupted) == false
2076+
2077+
scheduler.run()
2078+
expect(isInterrupted) == true
2079+
}
2080+
2081+
if !failedExpectations.isEmpty {
2082+
fail("The async operator `\(op)` does not interrupt on the appropriate scheduler.",
2083+
location: SourceLocation(file: file, line: line))
2084+
}
2085+
}
2086+
20392087
private func testAsyncASAPInterruption(
2088+
op: String,
20402089
file: FileString = #file,
20412090
line: UInt = #line,
20422091
transform: (SignalProducer<Int, NoError>, TestScheduler) -> SignalProducer<Int, NoError>
@@ -2073,7 +2122,7 @@ private func testAsyncASAPInterruption(
20732122
}
20742123

20752124
if !failedExpectations.isEmpty {
2076-
fail("The ASAP interruption test of the async operator has failed.",
2125+
fail("The ASAP interruption test of the async operator `\(op)` has failed.",
20772126
location: SourceLocation(file: file, line: line))
20782127
}
20792128
}

0 commit comments

Comments
 (0)