Skip to content

Commit 65f1fa0

Browse files
authored
Fix Effect.throttle data races and scheduler value return (#669)
* Fix `Effect.throttle` data races and scheduler value return The `Effect.throttle` operator had multiple data races when updating `throttleTimes` and `throttleValues` shared state, because: 1. `Effect.throttle` can be called from any scheduler. 2. The internal `flatMap` runs on the current chain scheduler, but the throttled (delayed) value runs on the passed in `scheduler` parameter (which can be different). By protecting shared state with a lock (similar to the cancellables'), these data races should be addressed. Additionally, the `Effect.throttle` should return all values in the `scheduler` passed in, so that the API contract is honored and values come from where callers expect them to. ## Changes - Add new `throttleLock` `recursive lock to protect `throttleTimes` and `throttleValues` shared state in `Effect.throttle` operator. - Ensure all values in `Effect.throttle` come from the passed in `scheduler`. * Fix `Effect.throttle` tests, Use `sync` lock helper
1 parent 30f3027 commit 65f1fa0

File tree

2 files changed

+44
-23
lines changed

2 files changed

+44
-23
lines changed

Sources/ComposableArchitecture/Effects/Throttling.swift

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,41 @@ extension Effect {
1919
scheduler: S,
2020
latest: Bool
2121
) -> Effect where S: Scheduler {
22-
self.flatMap { value -> AnyPublisher<Output, Failure> in
23-
guard let throttleTime = throttleTimes[id] as! S.SchedulerTimeType? else {
24-
throttleTimes[id] = scheduler.now
25-
throttleValues[id] = nil
26-
return Just(value).setFailureType(to: Failure.self).eraseToAnyPublisher()
27-
}
22+
self.receive(on: scheduler)
23+
.flatMap { value -> AnyPublisher<Output, Failure> in
24+
throttleLock.lock()
25+
defer { throttleLock.unlock() }
2826

29-
let value = latest ? value : (throttleValues[id] as! Output? ?? value)
30-
throttleValues[id] = value
27+
guard let throttleTime = throttleTimes[id] as! S.SchedulerTimeType? else {
28+
throttleTimes[id] = scheduler.now
29+
throttleValues[id] = nil
30+
return Just(value).setFailureType(to: Failure.self).eraseToAnyPublisher()
31+
}
3132

32-
guard throttleTime.distance(to: scheduler.now) < interval else {
33-
throttleTimes[id] = scheduler.now
34-
throttleValues[id] = nil
35-
return Just(value).setFailureType(to: Failure.self).eraseToAnyPublisher()
36-
}
33+
let value = latest ? value : (throttleValues[id] as! Output? ?? value)
34+
throttleValues[id] = value
3735

38-
return Just(value)
39-
.delay(
40-
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)), scheduler: scheduler
41-
)
42-
.handleEvents(receiveOutput: { _ in throttleTimes[id] = scheduler.now })
43-
.setFailureType(to: Failure.self)
44-
.eraseToAnyPublisher()
45-
}
46-
.eraseToEffect()
47-
.cancellable(id: id, cancelInFlight: true)
36+
guard throttleTime.distance(to: scheduler.now) < interval else {
37+
throttleTimes[id] = scheduler.now
38+
throttleValues[id] = nil
39+
return Just(value).setFailureType(to: Failure.self).eraseToAnyPublisher()
40+
}
41+
42+
return Just(value)
43+
.delay(
44+
for: scheduler.now.distance(to: throttleTime.advanced(by: interval)), scheduler: scheduler
45+
)
46+
.handleEvents(
47+
receiveOutput: { _ in throttleLock.sync { throttleTimes[id] = scheduler.now } }
48+
)
49+
.setFailureType(to: Failure.self)
50+
.eraseToAnyPublisher()
51+
}
52+
.eraseToEffect()
53+
.cancellable(id: id, cancelInFlight: true)
4854
}
4955
}
5056

5157
var throttleTimes: [AnyHashable: Any] = [:]
5258
var throttleValues: [AnyHashable: Any] = [:]
59+
let throttleLock = NSRecursiveLock()

Tests/ComposableArchitectureTests/EffectThrottleTests.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ final class EffectThrottleTests: XCTestCase {
2626

2727
runThrottledEffect(value: 1)
2828

29+
scheduler.advance()
30+
2931
// A value emits right away.
3032
XCTAssertEqual(values, [1])
3133

3234
runThrottledEffect(value: 2)
3335

36+
scheduler.advance()
37+
3438
// A second value is throttled.
3539
XCTAssertEqual(values, [1])
3640

@@ -76,11 +80,15 @@ final class EffectThrottleTests: XCTestCase {
7680

7781
runThrottledEffect(value: 1)
7882

83+
scheduler.advance()
84+
7985
// A value emits right away.
8086
XCTAssertEqual(values, [1])
8187

8288
runThrottledEffect(value: 2)
8389

90+
scheduler.advance()
91+
8492
// A second value is throttled.
8593
XCTAssertEqual(values, [1])
8694

@@ -124,13 +132,17 @@ final class EffectThrottleTests: XCTestCase {
124132

125133
runThrottledEffect(value: 1)
126134

135+
scheduler.advance()
136+
127137
// A value emits right away.
128138
XCTAssertEqual(values, [1])
129139

130140
scheduler.advance(by: 2)
131141

132142
runThrottledEffect(value: 2)
133143

144+
scheduler.advance()
145+
134146
// A second value is emitted right away.
135147
XCTAssertEqual(values, [1, 2])
136148
}
@@ -156,6 +168,8 @@ final class EffectThrottleTests: XCTestCase {
156168

157169
runThrottledEffect(value: 1)
158170

171+
scheduler.advance()
172+
159173
// A value emits right away.
160174
XCTAssertEqual(values, [1])
161175

0 commit comments

Comments
 (0)