Skip to content

Commit ec43e45

Browse files
committed
Replay-1 semantic to state producer in setup closure.
1 parent cf9367e commit ec43e45

File tree

3 files changed

+121
-31
lines changed

3 files changed

+121
-31
lines changed

Loop/FeedbackLoop.swift

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,25 @@ public final class FeedbackLoop<State, Event> {
66
private let token: Lifetime.Token
77

88
public var producer: SignalProducer<State, Never> {
9-
SignalProducer { observer, lifetime in
10-
self.floodgate.withValue { initial, hasStarted -> Void in
11-
if hasStarted {
12-
// The feedback loop has started already, so the initial value has to be manually delivered.
13-
// Uninitialized feedback loop that does not start immediately will emit the initial state
14-
// when `start()` is called.
15-
observer.send(value: initial)
16-
}
17-
18-
lifetime += self.floodgate.stateDidChange.observe(observer)
19-
}
20-
}
9+
floodgate.producer
2110
}
2211

12+
private let feedbacks: [Feedback]
13+
2314
public init(
2415
initial: State,
2516
reduce: @escaping (inout State, Event) -> Void,
2617
feedbacks: [Feedback]
2718
) {
2819
(lifetime, token) = Lifetime.make()
29-
floodgate = Floodgate<State, Event>(state: initial, reducer: reduce)
30-
lifetime.observeEnded(floodgate.dispose)
20+
self.floodgate = Floodgate<State, Event>(state: initial, reducer: reduce)
21+
self.feedbacks = feedbacks
3122

32-
for feedback in feedbacks {
33-
lifetime += feedback
34-
.events(floodgate.stateDidChange.producer, floodgate)
35-
}
23+
lifetime.observeEnded(floodgate.dispose)
3624
}
3725

3826
public func start() {
39-
floodgate.bootstrap()
27+
floodgate.bootstrap(with: feedbacks)
4028
}
4129

4230
public func stop() {
@@ -71,7 +59,58 @@ extension FeedbackLoop {
7159
/// `Feedback(skippingRepeated:effects:)` enqueues events inside `flatMap(.latest)`, so that unprocessed events
7260
/// are automatically removed when the inner producer has switched.
7361
///
74-
/// - important: The `state` producer provided to the setup closure **does not** replay the current state.
62+
/// ## State producer in the `setup` closure
63+
/// The setup closure provides you a `state` producer — it replays the latest state at starting time, and then
64+
/// publishes all state changes.
65+
///
66+
/// Loop guarantees only that this `state` producer is **eventually consistent** with events emitted by your
67+
/// feedback. This means you should not make any strong assumptions on events you enqueued being immediately
68+
/// reflected by `state`.
69+
///
70+
/// For example, if you start the `state` producer again, synchronously after enqueuing an event, the event
71+
/// may not have been processed yet, and therefore the assertion would fail:
72+
/// ```swift
73+
/// Feedback { state, output in
74+
/// state
75+
/// .filter { $0.apples.isEmpty == false }
76+
/// .map(value: Event.eatAllApples)
77+
/// .take(first: 1)
78+
/// .concat(
79+
/// state
80+
/// .take(first: 1)
81+
/// .on(value: { state in
82+
/// guard state.apples.isEmpty else { return }
83+
///
84+
/// // ❌🙅‍♀️ No guarantee that this is true.
85+
/// fatalError("It should have eaten all the apples!")
86+
/// })
87+
/// )
88+
/// .enqueue(to: output)
89+
/// }
90+
/// ```
91+
///
92+
/// You can however expect it to be eventually consistent:
93+
/// ```swift
94+
/// Feedback { state, output in
95+
/// state
96+
/// .filter { $0.apples.isEmpty == false }
97+
/// .map(value: Event.eatAllApples)
98+
/// .take(first: 1)
99+
/// .concat(
100+
/// state
101+
/// .filter { $0.apples.isEmpty }
102+
/// .take(first: 1)
103+
/// .on(value: { state in
104+
/// guard state.apples.isEmpty else { return }
105+
///
106+
/// // ✅👍 We would eventually observe this, when the loop event queue
107+
/// // has caught up with `.eatAppleApples` we enqueued earlier.
108+
/// fatalError("It should have eaten all the apples!")
109+
/// })
110+
/// )
111+
/// .enqueue(to: output)
112+
/// }
113+
/// ```
75114
///
76115
/// - parameters:
77116
/// - setup: The setup closure to construct a data flow producing events in respond to changes from `state`,

Loop/Floodgate.swift

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,42 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
1313

1414
let (stateDidChange, changeObserver) = Signal<State, Never>.pipe()
1515

16+
/// Replay the current value, and then publish the subsequent changes.
17+
var producer: SignalProducer<State, Never> {
18+
SignalProducer { observer, lifetime in
19+
self.withValue { initial, hasStarted -> Void in
20+
observer.send(value: initial)
21+
lifetime += self.stateDidChange.observe(observer)
22+
}
23+
}
24+
}
25+
1626
private let reducerLock = NSLock()
1727
private var state: State
1828
private var hasStarted = false
1929

2030
private let queue = Atomic(QueueState())
2131
private let reducer: (inout State, Event) -> Void
32+
private let feedbackDisposables = CompositeDisposable()
2233

2334
init(state: State, reducer: @escaping (inout State, Event) -> Void) {
2435
self.state = state
2536
self.reducer = reducer
2637
}
2738

28-
func bootstrap() {
29-
reducerLock.lock()
30-
defer { reducerLock.unlock() }
31-
32-
guard !hasStarted else { return }
39+
deinit {
40+
dispose()
41+
}
3342

34-
hasStarted = true
43+
func bootstrap(with feedbacks: [FeedbackLoop<State, Event>.Feedback]) {
44+
for feedback in feedbacks {
45+
// Pass `producer` which has replay-1 semantic.
46+
feedbackDisposables += feedback.events(producer, self)
47+
}
3548

36-
changeObserver.send(value: state)
37-
drainEvents()
49+
reducerLock.perform {
50+
drainEvents()
51+
}
3852
}
3953

4054
override func process(_ event: Event, for token: Token) {
@@ -77,8 +91,14 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
7791
}
7892

7993
func dispose() {
80-
queue.modify {
94+
let shouldDisposeFeedbacks: Bool = queue.modify {
95+
let old = $0.isOuterLifetimeEnded
8196
$0.isOuterLifetimeEnded = true
97+
return old == false
98+
}
99+
100+
if shouldDisposeFeedbacks {
101+
feedbackDisposables.dispose()
82102
}
83103
}
84104

LoopTests/FeedbackLoopSystemTests.swift

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,10 +159,10 @@ class FeedbackLoopSystemTests: XCTestCase {
159159
var values: [String] = []
160160
system
161161
.skipRepeats()
162-
.take(first: 2)
162+
.take(first: 3)
163163
.startWithValues { values.append($0) }
164164

165-
expect(values) == ["initial", "initial_a"]
165+
expect(values) == ["initial", "initial_a", "initial_a_a"]
166166
expect(startCount) == 2
167167
}
168168

@@ -304,4 +304,35 @@ class FeedbackLoopSystemTests: XCTestCase {
304304

305305
expect(results).toEventually(equal([0, 1, 3, 6, 6, 7, 9, 12]))
306306
}
307+
308+
func test_feedback_state_producer_replays_latest_value() {
309+
let system = SignalProducer<Int, Never>.feedbackLoop(
310+
initial: 0,
311+
reduce: { (state: inout Int, event: Int) in
312+
state += event
313+
},
314+
feedbacks: [
315+
FeedbackLoop.Feedback { state, output in
316+
state
317+
.take(first: 1)
318+
.then(SignalProducer(value: 2))
319+
.concat(
320+
// `state` is NOT GUARANTEED to reflect events emitted earlier in the producer chain.
321+
state
322+
.take(first: 3)
323+
.map { $0 + 1000 }
324+
)
325+
.enqueue(to: output)
326+
}
327+
]
328+
)
329+
330+
var results: [Int] = []
331+
332+
system.startWithValues { value in
333+
results.append(value)
334+
}
335+
336+
expect(results) == [0, 2, 1002, 2004, 4006]
337+
}
307338
}

0 commit comments

Comments
 (0)