Skip to content

Commit 350ea66

Browse files
authored
Debounce, Throttle, CollectEvery (#816)
1 parent 56a5a3a commit 350ea66

File tree

6 files changed

+288
-155
lines changed

6 files changed

+288
-155
lines changed

ReactiveSwift.xcodeproj/project.pbxproj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,18 @@
197197
9A2D5D0E259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; };
198198
9A2D5D0F259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; };
199199
9A2D5D10259F8D1F005682ED /* ScanMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D0C259F8D1F005682ED /* ScanMap.swift */; };
200+
9A2D5D53259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
201+
9A2D5D54259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
202+
9A2D5D55259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
203+
9A2D5D56259FA000005682ED /* Throttle.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D52259FA000005682ED /* Throttle.swift */; };
204+
9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
205+
9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
206+
9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
207+
9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D5C259FA0DD005682ED /* Debounce.swift */; };
208+
9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
209+
9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
210+
9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
211+
9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5D66259FA59E005682ED /* CollectEvery.swift */; };
200212
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
201213
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
202214
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
@@ -424,6 +436,9 @@
424436
9A2D5CF8259F8634005682ED /* UniqueValues.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UniqueValues.swift; sourceTree = "<group>"; };
425437
9A2D5D02259F8C39005682ED /* Reduce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Reduce.swift; sourceTree = "<group>"; };
426438
9A2D5D0C259F8D1F005682ED /* ScanMap.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ScanMap.swift; sourceTree = "<group>"; };
439+
9A2D5D52259FA000005682ED /* Throttle.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Throttle.swift; sourceTree = "<group>"; };
440+
9A2D5D5C259FA0DD005682ED /* Debounce.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Debounce.swift; sourceTree = "<group>"; };
441+
9A2D5D66259FA59E005682ED /* CollectEvery.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CollectEvery.swift; sourceTree = "<group>"; };
427442
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = "<group>"; };
428443
9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = "<group>"; };
429444
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = "<group>"; };
@@ -571,6 +586,9 @@
571586
9A2D5C80259F7E3E005682ED /* DematerializeResults.swift */,
572587
9A2D5C8A259F7ED5005682ED /* Dematerialize.swift */,
573588
9A2D5C76259F7D3D005682ED /* AttemptMap.swift */,
589+
9A2D5D52259FA000005682ED /* Throttle.swift */,
590+
9A2D5D5C259FA0DD005682ED /* Debounce.swift */,
591+
9A2D5D66259FA59E005682ED /* CollectEvery.swift */,
574592
);
575593
path = Observers;
576594
sourceTree = "<group>";
@@ -1029,6 +1047,7 @@
10291047
57A4D1B11BA13D7A00F7D4B1 /* Optional.swift in Sources */,
10301048
57A4D1B41BA13D7A00F7D4B1 /* Disposable.swift in Sources */,
10311049
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
1050+
9A2D5D6A259FA59E005682ED /* CollectEvery.swift in Sources */,
10321051
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
10331052
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
10341053
9A2D5CF2259F85AE005682ED /* SkipRepeats.swift in Sources */,
@@ -1045,6 +1064,9 @@
10451064
9A2D5CE8259F852B005682ED /* CombinePrevious.swift in Sources */,
10461065
9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
10471066
9A2D5D06259F8C39005682ED /* Reduce.swift in Sources */,
1067+
9A2D5D56259FA000005682ED /* Throttle.swift in Sources */,
1068+
9A67963E1F6059440058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
1069+
9A2D5D60259FA0DD005682ED /* Debounce.swift in Sources */,
10481070
9AFA491424E9A196003D263C /* Map.swift in Sources */,
10491071
9A2D5D33259F942B005682ED /* LazyMap.swift in Sources */,
10501072
9A2D5CFC259F8634005682ED /* UniqueValues.swift in Sources */,
@@ -1111,6 +1133,7 @@
11111133
A9F793341B60D0140026BCBA /* Optional.swift in Sources */,
11121134
A9B315BC1B3940810001CB9C /* Disposable.swift in Sources */,
11131135
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
1136+
9A2D5D69259FA59E005682ED /* CollectEvery.swift in Sources */,
11141137
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
11151138
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
11161139
9A2D5CF1259F85AE005682ED /* SkipRepeats.swift in Sources */,
@@ -1130,6 +1153,9 @@
11301153
9AFA491324E9A196003D263C /* Map.swift in Sources */,
11311154
9A2D5CFB259F8634005682ED /* UniqueValues.swift in Sources */,
11321155
9A2D5C65259F7B47005682ED /* MaterializeAsResult.swift in Sources */,
1156+
9A2D5D55259FA000005682ED /* Throttle.swift in Sources */,
1157+
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
1158+
9A2D5D5F259FA0DD005682ED /* Debounce.swift in Sources */,
11331159
9AFA491324E9A196003D263C /* Map.swift in Sources */,
11341160
9A2D5D32259F942B005682ED /* LazyMap.swift in Sources */,
11351161
9A2D5C8D259F7ED5005682ED /* Dematerialize.swift in Sources */,
@@ -1166,6 +1192,7 @@
11661192
D871D69F1B3B29A40070F16C /* Optional.swift in Sources */,
11671193
D08C54B61A69A3DB00AD8286 /* Event.swift in Sources */,
11681194
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
1195+
9A2D5D67259FA59E005682ED /* CollectEvery.swift in Sources */,
11691196
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
11701197
EBCC7DBC1BBF010C00A2AE92 /* Signal.Observer.swift in Sources */,
11711198
9A2D5CEF259F85AE005682ED /* SkipRepeats.swift in Sources */,
@@ -1182,6 +1209,9 @@
11821209
9A2D5CE5259F852B005682ED /* CombinePrevious.swift in Sources */,
11831210
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
11841211
9A2D5D03259F8C39005682ED /* Reduce.swift in Sources */,
1212+
9A2D5D53259FA000005682ED /* Throttle.swift in Sources */,
1213+
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
1214+
9A2D5D5D259FA0DD005682ED /* Debounce.swift in Sources */,
11851215
9AFA491124E9A196003D263C /* Map.swift in Sources */,
11861216
9A2D5D30259F942B005682ED /* LazyMap.swift in Sources */,
11871217
9A2D5CF9259F8634005682ED /* UniqueValues.swift in Sources */,
@@ -1248,6 +1278,7 @@
12481278
D08C54B41A69A2AF00AD8286 /* Signal.swift in Sources */,
12491279
D8E84A671B3B32FB00C3E831 /* Optional.swift in Sources */,
12501280
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
1281+
9A2D5D68259FA59E005682ED /* CollectEvery.swift in Sources */,
12511282
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
12521283
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
12531284
9A2D5CF0259F85AE005682ED /* SkipRepeats.swift in Sources */,
@@ -1267,6 +1298,9 @@
12671298
9AFA491224E9A196003D263C /* Map.swift in Sources */,
12681299
9A2D5CFA259F8634005682ED /* UniqueValues.swift in Sources */,
12691300
9A2D5C64259F7B47005682ED /* MaterializeAsResult.swift in Sources */,
1301+
9A2D5D54259FA000005682ED /* Throttle.swift in Sources */,
1302+
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */,
1303+
9A2D5D5E259FA0DD005682ED /* Debounce.swift in Sources */,
12701304
9AFA491224E9A196003D263C /* Map.swift in Sources */,
12711305
9A2D5D31259F942B005682ED /* LazyMap.swift in Sources */,
12721306
9A2D5C8C259F7ED5005682ED /* Dematerialize.swift in Sources */,

Sources/Event.swift

Lines changed: 19 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -423,170 +423,37 @@ extension Signal.Event {
423423
}
424424

425425
internal static func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
426-
precondition(interval >= 0)
427-
428-
return { action, lifetime in
429-
let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState())
430-
let schedulerDisposable = SerialDisposable()
431-
432-
lifetime.observeEnded {
433-
schedulerDisposable.dispose()
434-
scheduler.schedule { action(.interrupted) }
435-
}
436-
437-
return Signal.Observer { event in
438-
guard let value = event.value else {
439-
schedulerDisposable.inner = scheduler.schedule {
440-
action(event)
441-
}
442-
return
443-
}
444-
445-
let scheduleDate: Date = state.modify { state in
446-
state.pendingValue = value
447-
448-
let proposedScheduleDate: Date
449-
if let previousDate = state.previousDate, previousDate <= scheduler.currentDate {
450-
proposedScheduleDate = previousDate.addingTimeInterval(interval)
451-
} else {
452-
proposedScheduleDate = scheduler.currentDate
453-
}
454-
455-
return proposedScheduleDate < scheduler.currentDate ? scheduler.currentDate : proposedScheduleDate
456-
}
457-
458-
schedulerDisposable.inner = scheduler.schedule(after: scheduleDate) {
459-
if let pendingValue = state.modify({ $0.retrieveValue(date: scheduleDate) }) {
460-
action(.value(pendingValue))
461-
}
462-
}
463-
}
426+
return { downstream, lifetime in
427+
Operators.Throttle(downstream: downstream, downstreamLifetime: lifetime, target: scheduler, interval: interval)
464428
}
465429
}
466430

467431
internal static func debounce(_ interval: TimeInterval, on scheduler: DateScheduler, discardWhenCompleted: Bool) -> Transformation<Value, Error> {
468-
precondition(interval >= 0)
469-
470-
return { action, lifetime in
471-
let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState(previousDate: scheduler.currentDate, pendingValue: nil))
472-
let d = SerialDisposable()
473-
474-
lifetime.observeEnded {
475-
d.dispose()
476-
scheduler.schedule { action(.interrupted) }
477-
}
478-
479-
return Signal.Observer { event in
480-
switch event {
481-
case let .value(value):
482-
state.modify { state in
483-
state.pendingValue = value
484-
}
485-
let date = scheduler.currentDate.addingTimeInterval(interval)
486-
d.inner = scheduler.schedule(after: date) {
487-
if let pendingValue = state.modify({ $0.retrieveValue(date: date) }) {
488-
action(.value(pendingValue))
489-
}
490-
}
491-
492-
case .completed:
493-
d.inner = scheduler.schedule {
494-
let pending: (value: Value, previousDate: Date)? = state.modify { state in
495-
defer { state.pendingValue = nil }
496-
guard let pendingValue = state.pendingValue, let previousDate = state.previousDate else { return nil }
497-
return (pendingValue, previousDate)
498-
}
499-
if !discardWhenCompleted, let (pendingValue, previousDate) = pending {
500-
scheduler.schedule(after: previousDate.addingTimeInterval(interval)) {
501-
action(.value(pendingValue))
502-
action(.completed)
503-
}
504-
} else {
505-
action(.completed)
506-
}
507-
}
508-
509-
case .failed, .interrupted:
510-
d.inner = scheduler.schedule {
511-
action(event)
512-
}
513-
}
514-
}
432+
return { downstream, lifetime in
433+
Operators.Debounce(
434+
downstream: downstream,
435+
downstreamLifetime: lifetime,
436+
target: scheduler,
437+
interval: interval,
438+
discardWhenCompleted: discardWhenCompleted
439+
)
515440
}
516441
}
517442

518443
internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardWhenCompleted: Bool) -> Transformation<[Value], Error> {
519-
return { action, lifetime in
520-
let state = Atomic<CollectEveryState<Value>>(.init(skipEmpty: skipEmpty))
521-
let d = SerialDisposable()
522-
523-
d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1) {
524-
let (currentValues, isCompleted) = state.modify { ($0.collect(), $0.isCompleted) }
525-
if let currentValues = currentValues {
526-
action(.value(currentValues))
527-
}
528-
if isCompleted {
529-
action(.completed)
530-
}
531-
}
532-
533-
lifetime.observeEnded {
534-
d.dispose()
535-
scheduler.schedule { action(.interrupted) }
536-
}
537-
538-
return Signal.Observer { event in
539-
switch event {
540-
case let .value(value):
541-
state.modify { $0.values.append(value) }
542-
case let .failed(error):
543-
d.inner = scheduler.schedule { action(.failed(error)) }
544-
case .completed where !discardWhenCompleted:
545-
state.modify { $0.isCompleted = true }
546-
case .completed:
547-
d.inner = scheduler.schedule { action(.completed) }
548-
case .interrupted:
549-
d.inner = scheduler.schedule { action(.interrupted) }
550-
}
551-
}
444+
return { downstream, lifetime in
445+
Operators.CollectEvery(
446+
downstream: downstream,
447+
downstreamLifetime: lifetime,
448+
target: scheduler,
449+
interval: interval,
450+
skipEmpty: skipEmpty,
451+
discardWhenCompleted: discardWhenCompleted
452+
)
552453
}
553454
}
554455
}
555456

556-
private struct CollectEveryState<Value> {
557-
let skipEmpty: Bool
558-
var values: [Value] = []
559-
var isCompleted: Bool = false
560-
561-
init(skipEmpty: Bool) {
562-
self.skipEmpty = skipEmpty
563-
}
564-
565-
var hasValues: Bool {
566-
return !values.isEmpty || !skipEmpty
567-
}
568-
569-
mutating func collect() -> [Value]? {
570-
guard hasValues else { return nil }
571-
defer { values.removeAll() }
572-
return values
573-
}
574-
}
575-
576-
private struct ThrottleState<Value> {
577-
var previousDate: Date?
578-
var pendingValue: Value?
579-
580-
mutating func retrieveValue(date: Date) -> Value? {
581-
defer {
582-
if pendingValue != nil {
583-
pendingValue = nil
584-
previousDate = date
585-
}
586-
}
587-
return pendingValue
588-
}
589-
}
590457

591458
extension Signal.Event where Error == Never {
592459
internal static func promoteError<F>(_: F.Type) -> Transformation<Value, F> {

Sources/Observers/CollectEvery.swift

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import Dispatch
2+
3+
extension Operators {
4+
internal final class CollectEvery<Value, Error: Swift.Error>: UnaryAsyncOperator<Value, [Value], Error> {
5+
let interval: DispatchTimeInterval
6+
let discardWhenCompleted: Bool
7+
let targetWithClock: DateScheduler
8+
9+
private let state: Atomic<CollectEveryState<Value>>
10+
private let timerDisposable = SerialDisposable()
11+
12+
init(
13+
downstream: Observer<[Value], Error>,
14+
downstreamLifetime: Lifetime,
15+
target: DateScheduler,
16+
interval: DispatchTimeInterval,
17+
skipEmpty: Bool,
18+
discardWhenCompleted: Bool
19+
) {
20+
self.interval = interval
21+
self.discardWhenCompleted = discardWhenCompleted
22+
self.targetWithClock = target
23+
self.state = Atomic(CollectEveryState(skipEmpty: skipEmpty))
24+
25+
super.init(downstream: downstream, downstreamLifetime: downstreamLifetime, target: target)
26+
27+
downstreamLifetime += timerDisposable
28+
29+
let initialDate = targetWithClock.currentDate.addingTimeInterval(interval)
30+
timerDisposable.inner = targetWithClock.schedule(after: initialDate, interval: interval, leeway: interval * 0.1) {
31+
let (currentValues, isCompleted) = self.state.modify { ($0.collect(), $0.isCompleted) }
32+
33+
if let currentValues = currentValues {
34+
self.unscheduledSend(currentValues)
35+
}
36+
37+
if isCompleted {
38+
self.unscheduledTerminate(.completed)
39+
}
40+
}
41+
}
42+
43+
override func receive(_ value: Value) {
44+
state.modify { $0.values.append(value) }
45+
}
46+
47+
override func terminate(_ termination: Termination<Error>) {
48+
guard isActive else { return }
49+
50+
if case .completed = termination, !discardWhenCompleted {
51+
state.modify { $0.isCompleted = true }
52+
} else {
53+
timerDisposable.dispose()
54+
super.terminate(termination)
55+
}
56+
}
57+
}
58+
}
59+
60+
private struct CollectEveryState<Value> {
61+
let skipEmpty: Bool
62+
var values: [Value] = []
63+
var isCompleted: Bool = false
64+
65+
init(skipEmpty: Bool) {
66+
self.skipEmpty = skipEmpty
67+
}
68+
69+
var hasValues: Bool {
70+
return !values.isEmpty || !skipEmpty
71+
}
72+
73+
mutating func collect() -> [Value]? {
74+
guard hasValues else { return nil }
75+
defer { values.removeAll() }
76+
return values
77+
}
78+
}

0 commit comments

Comments
 (0)