@@ -27,7 +27,8 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
27
27
}
28
28
}
29
29
30
- private let reducerLock = NSLock ( )
30
+ private let reducerLock = RecursiveLock ( )
31
+
31
32
private var state : State
32
33
private var hasStarted = false
33
34
@@ -53,48 +54,60 @@ final class Floodgate<State, Event>: FeedbackEventConsumer<Event> {
53
54
)
54
55
}
55
56
56
- reducerLock. perform {
57
+ reducerLock. perform { reentrant in
58
+ assert ( reentrant == false )
57
59
drainEvents ( )
58
60
}
59
61
}
60
62
61
63
override func process( _ event: Event , for token: Token ) {
62
64
enqueue ( event, for: token)
63
65
64
- if reducerLock. try ( ) {
65
- repeat {
66
- drainEvents ( )
67
- reducerLock. unlock ( )
68
- } while queue. withValue ( { $0. hasEvents } ) && reducerLock. try ( )
69
- // ^^^
70
- // Restart the event draining after we unlock the reducer lock, iff:
71
- //
72
- // 1. the queue still has unprocessed events; and
73
- // 2. no concurrent actor has taken the reducer lock, which implies no event draining would be started
74
- // unless we take active action.
75
- //
76
- // This eliminates a race condition in the following sequence of operations:
77
- //
78
- // | Thread A | Thread B |
79
- // |------------------------------------|------------------------------------|
80
- // | concurrent dequeue: no item | |
81
- // | | concurrent enqueue |
82
- // | | trylock lock: BUSY |
83
- // | unlock lock | |
84
- // | | |
85
- // | <<< The enqueued event is left unprocessed. >>> |
66
+ var continueToDrain = false
67
+
68
+ repeat {
69
+ // We use a recursive lock to guard the reducer, so as to allow state access via `withValue` to be
70
+ // reentrant. But in order to not deadlock in ReactiveSwift, reentrant calls MUST NOT drain the queue.
71
+ // Otherwise, `consume(_:)` will eventually invoke the reducer and send out a state via `changeObserver`,
72
+ // leading to a deadlock.
86
73
//
87
- // The trylock-unlock duo has a synchronize-with relationship, which ensures that Thread A must see any
88
- // concurrent enqueue that *happens before* the trylock.
89
- }
74
+ // If we know that the call is reentrant, we can confidently skip draining the queue anyway, because the
75
+ // outmost call — the one who first acquires the lock on the current lock owner thread — is already looping
76
+ // to exhaustively drain the queue.
77
+ continueToDrain = reducerLock. tryPerform { isReentrant in
78
+ guard isReentrant == false else { return false }
79
+ drainEvents ( )
80
+ return true
81
+ }
82
+ } while queue. withValue ( { $0. hasEvents } ) && continueToDrain
83
+ // ^^^
84
+ // Restart the event draining after we unlock the reducer lock, iff:
85
+ //
86
+ // 1. the queue still has unprocessed events; and
87
+ // 2. no concurrent actor has taken the reducer lock, which implies no event draining would be started
88
+ // unless we take active action.
89
+ //
90
+ // This eliminates a race condition in the following sequence of operations:
91
+ //
92
+ // | Thread A | Thread B |
93
+ // |------------------------------------|------------------------------------|
94
+ // | concurrent dequeue: no item | |
95
+ // | | concurrent enqueue |
96
+ // | | trylock lock: BUSY |
97
+ // | unlock lock | |
98
+ // | | |
99
+ // | <<< The enqueued event is left unprocessed. >>> |
100
+ //
101
+ // The trylock-unlock duo has a synchronize-with relationship, which ensures that Thread A must see any
102
+ // concurrent enqueue that *happens before* the trylock.
90
103
}
91
104
92
105
override func dequeueAllEvents( for token: Token ) {
93
106
queue. modify { $0. events. removeAll ( where: { _, t in t == token } ) }
94
107
}
95
108
96
109
func withValue< Result> ( _ action: ( State , Bool ) -> Result ) -> Result {
97
- reducerLock. perform { action ( state, hasStarted) }
110
+ reducerLock. perform { _ in action ( state, hasStarted) }
98
111
}
99
112
100
113
func dispose( ) {
0 commit comments