Skip to content

Commit 8562161

Browse files
authored
TakeFirst, TakeLast, TakeWhile, SkipFirst, SkipWhile (#813)
1 parent 8d24d3d commit 8562161

File tree

7 files changed

+198
-79
lines changed

7 files changed

+198
-79
lines changed

ReactiveSwift.xcodeproj/project.pbxproj

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,26 @@
6969
9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
7070
9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
7171
9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
72+
9A2D5C9F259F8059005682ED /* TakeFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C9E259F8059005682ED /* TakeFirst.swift */; };
73+
9A2D5CA0259F8059005682ED /* TakeFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C9E259F8059005682ED /* TakeFirst.swift */; };
74+
9A2D5CA1259F8059005682ED /* TakeFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C9E259F8059005682ED /* TakeFirst.swift */; };
75+
9A2D5CA2259F8059005682ED /* TakeFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C9E259F8059005682ED /* TakeFirst.swift */; };
76+
9A2D5CAE259F8112005682ED /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CAD259F8112005682ED /* TakeLast.swift */; };
77+
9A2D5CAF259F8112005682ED /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CAD259F8112005682ED /* TakeLast.swift */; };
78+
9A2D5CB0259F8112005682ED /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CAD259F8112005682ED /* TakeLast.swift */; };
79+
9A2D5CB1259F8112005682ED /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CAD259F8112005682ED /* TakeLast.swift */; };
80+
9A2D5CB8259F8199005682ED /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CB7259F8199005682ED /* TakeWhile.swift */; };
81+
9A2D5CB9259F8199005682ED /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CB7259F8199005682ED /* TakeWhile.swift */; };
82+
9A2D5CBA259F8199005682ED /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CB7259F8199005682ED /* TakeWhile.swift */; };
83+
9A2D5CBB259F8199005682ED /* TakeWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CB7259F8199005682ED /* TakeWhile.swift */; };
84+
9A2D5CC2259F81FC005682ED /* SkipFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CC1259F81FC005682ED /* SkipFirst.swift */; };
85+
9A2D5CC3259F81FC005682ED /* SkipFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CC1259F81FC005682ED /* SkipFirst.swift */; };
86+
9A2D5CC4259F81FC005682ED /* SkipFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CC1259F81FC005682ED /* SkipFirst.swift */; };
87+
9A2D5CC5259F81FC005682ED /* SkipFirst.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CC1259F81FC005682ED /* SkipFirst.swift */; };
88+
9A2D5CCC259F8263005682ED /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CCB259F8263005682ED /* SkipWhile.swift */; };
89+
9A2D5CCD259F8263005682ED /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CCB259F8263005682ED /* SkipWhile.swift */; };
90+
9A2D5CCE259F8263005682ED /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CCB259F8263005682ED /* SkipWhile.swift */; };
91+
9A2D5CCF259F8263005682ED /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5CCB259F8263005682ED /* SkipWhile.swift */; };
7292
9A2D5C4F259F7B21005682ED /* MapError.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C4E259F7B21005682ED /* MapError.swift */; };
7393
9A2D5C50259F7B21005682ED /* MapError.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C4E259F7B21005682ED /* MapError.swift */; };
7494
9A2D5C51259F7B21005682ED /* MapError.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A2D5C4E259F7B21005682ED /* MapError.swift */; };
@@ -288,6 +308,11 @@
288308
9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = "<group>"; };
289309
9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = "<group>"; };
290310
9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = "<group>"; };
311+
9A2D5C9E259F8059005682ED /* TakeFirst.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TakeFirst.swift; sourceTree = "<group>"; };
312+
9A2D5CAD259F8112005682ED /* TakeLast.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TakeLast.swift; sourceTree = "<group>"; };
313+
9A2D5CB7259F8199005682ED /* TakeWhile.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = TakeWhile.swift; sourceTree = "<group>"; };
314+
9A2D5CC1259F81FC005682ED /* SkipFirst.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SkipFirst.swift; sourceTree = "<group>"; };
315+
9A2D5CCB259F8263005682ED /* SkipWhile.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
291316
9A2D5C4E259F7B21005682ED /* MapError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MapError.swift; sourceTree = "<group>"; };
292317
9A2D5C58259F7B31005682ED /* Materialize.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Materialize.swift; sourceTree = "<group>"; };
293318
9A2D5C62259F7B47005682ED /* MaterializeAsResult.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MaterializeAsResult.swift; sourceTree = "<group>"; };
@@ -447,6 +472,11 @@
447472
9AFA491A24E9A925003D263C /* Filter.swift */,
448473
9AFA491F24E9A988003D263C /* CompactMap.swift */,
449474
9AFA492424E9B15C003D263C /* Operators.swift */,
475+
9A2D5C9E259F8059005682ED /* TakeFirst.swift */,
476+
9A2D5CAD259F8112005682ED /* TakeLast.swift */,
477+
9A2D5CB7259F8199005682ED /* TakeWhile.swift */,
478+
9A2D5CC1259F81FC005682ED /* SkipFirst.swift */,
479+
9A2D5CCB259F8263005682ED /* SkipWhile.swift */,
450480
9A2D5C4E259F7B21005682ED /* MapError.swift */,
451481
9A2D5C58259F7B31005682ED /* Materialize.swift */,
452482
9A2D5C62259F7B47005682ED /* MaterializeAsResult.swift */,
@@ -948,6 +978,7 @@
948978
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
949979
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
950980
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
981+
9A2D5CBB259F8199005682ED /* TakeWhile.swift in Sources */,
951982
57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */,
952983
57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */,
953984
9A2D5C5C259F7B31005682ED /* Materialize.swift in Sources */,
@@ -964,15 +995,19 @@
964995
57A4D1BE1BA13D7A00F7D4B1 /* Bag.swift in Sources */,
965996
9A1B824420835EEC00EB7C09 /* ResultExtensions.swift in Sources */,
966997
57A4D1C01BA13D7A00F7D4B1 /* FoundationExtensions.swift in Sources */,
998+
9A2D5CC5259F81FC005682ED /* SkipFirst.swift in Sources */,
967999
9AFA490F24E9A0C4003D263C /* Observer.swift in Sources */,
9681000
D85C652D1C0E70E5005A77AD /* Flatten.swift in Sources */,
9691001
9ABCB1881D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
9701002
EBCC7DBF1BBF01E200A2AE92 /* Signal.Observer.swift in Sources */,
9711003
9A2D5C7A259F7D3D005682ED /* AttemptMap.swift in Sources */,
9721004
C79B64801CD52E4E003F2376 /* EventLogger.swift in Sources */,
1005+
9A2D5CA2259F8059005682ED /* TakeFirst.swift in Sources */,
9731006
9AFA492324E9A988003D263C /* CompactMap.swift in Sources */,
9741007
4A0E11021D2A92720065D310 /* Lifetime.swift in Sources */,
1008+
9A2D5CB1259F8112005682ED /* TakeLast.swift in Sources */,
9751009
BE9CF3981D751B71003AE479 /* UnidirectionalBinding.swift in Sources */,
1010+
9A2D5CCF259F8263005682ED /* SkipWhile.swift in Sources */,
9761011
9A2D5C84259F7E3E005682ED /* DematerializeResults.swift in Sources */,
9771012
9A2D5C52259F7B21005682ED /* MapError.swift in Sources */,
9781013
);
@@ -1015,6 +1050,7 @@
10151050
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
10161051
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
10171052
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
1053+
9A2D5CBA259F8199005682ED /* TakeWhile.swift in Sources */,
10181054
A9B315C11B3940810001CB9C /* Action.swift in Sources */,
10191055
A9B315C21B3940810001CB9C /* Property.swift in Sources */,
10201056
9A2D5C5B259F7B31005682ED /* Materialize.swift in Sources */,
@@ -1031,15 +1067,19 @@
10311067
A9B315C61B3940810001CB9C /* Bag.swift in Sources */,
10321068
9A1B824320835EEC00EB7C09 /* ResultExtensions.swift in Sources */,
10331069
A9B315C81B3940810001CB9C /* FoundationExtensions.swift in Sources */,
1070+
9A2D5CC4259F81FC005682ED /* SkipFirst.swift in Sources */,
10341071
9AFA490E24E9A0C4003D263C /* Observer.swift in Sources */,
10351072
D85C652C1C0E70E4005A77AD /* Flatten.swift in Sources */,
10361073
9ABCB1871D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
10371074
EBCC7DBE1BBF01E200A2AE92 /* Signal.Observer.swift in Sources */,
10381075
9A2D5C79259F7D3D005682ED /* AttemptMap.swift in Sources */,
10391076
C79B647F1CD52E4D003F2376 /* EventLogger.swift in Sources */,
1077+
9A2D5CA1259F8059005682ED /* TakeFirst.swift in Sources */,
10401078
9AFA492224E9A988003D263C /* CompactMap.swift in Sources */,
10411079
4A0E11011D2A92720065D310 /* Lifetime.swift in Sources */,
1080+
9A2D5CB0259F8112005682ED /* TakeLast.swift in Sources */,
10421081
BE9CF3971D751B71003AE479 /* UnidirectionalBinding.swift in Sources */,
1082+
9A2D5CCE259F8263005682ED /* SkipWhile.swift in Sources */,
10431083
9A2D5C83259F7E3E005682ED /* DematerializeResults.swift in Sources */,
10441084
9A2D5C51259F7B21005682ED /* MapError.swift in Sources */,
10451085
);
@@ -1054,6 +1094,7 @@
10541094
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
10551095
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
10561096
EBCC7DBC1BBF010C00A2AE92 /* Signal.Observer.swift in Sources */,
1097+
9A2D5CB8259F8199005682ED /* TakeWhile.swift in Sources */,
10571098
D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
10581099
9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */,
10591100
9A2D5C59259F7B31005682ED /* Materialize.swift in Sources */,
@@ -1070,15 +1111,19 @@
10701111
D0C312E719EF2A5800984962 /* Scheduler.swift in Sources */,
10711112
9A1B824120835EEC00EB7C09 /* ResultExtensions.swift in Sources */,
10721113
D0C312CD19EF2A5800984962 /* Atomic.swift in Sources */,
1114+
9A2D5CC2259F81FC005682ED /* SkipFirst.swift in Sources */,
10731115
9AFA490C24E9A0C4003D263C /* Observer.swift in Sources */,
10741116
D08C54BA1A69C54300AD8286 /* Property.swift in Sources */,
10751117
D0D11AB91A6AE87700C1F8B1 /* Action.swift in Sources */,
10761118
C79B647C1CD52E23003F2376 /* EventLogger.swift in Sources */,
10771119
9A2D5C77259F7D3D005682ED /* AttemptMap.swift in Sources */,
10781120
9ABCB1851D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
1121+
9A2D5C9F259F8059005682ED /* TakeFirst.swift in Sources */,
10791122
9AFA492024E9A988003D263C /* CompactMap.swift in Sources */,
10801123
D08C54B81A69A9D000AD8286 /* SignalProducer.swift in Sources */,
1124+
9A2D5CAE259F8112005682ED /* TakeLast.swift in Sources */,
10811125
BE9CF3951D751B6B003AE479 /* UnidirectionalBinding.swift in Sources */,
1126+
9A2D5CCC259F8263005682ED /* SkipWhile.swift in Sources */,
10821127
9A2D5C81259F7E3E005682ED /* DematerializeResults.swift in Sources */,
10831128
9A2D5C4F259F7B21005682ED /* MapError.swift in Sources */,
10841129
);
@@ -1121,6 +1166,7 @@
11211166
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
11221167
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
11231168
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
1169+
9A2D5CB9259F8199005682ED /* TakeWhile.swift in Sources */,
11241170
9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
11251171
EBCC7DBD1BBF01E100A2AE92 /* Signal.Observer.swift in Sources */,
11261172
9A2D5C5A259F7B31005682ED /* Materialize.swift in Sources */,
@@ -1137,15 +1183,19 @@
11371183
D03B4A3E19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
11381184
9A1B824220835EEC00EB7C09 /* ResultExtensions.swift in Sources */,
11391185
D08C54B71A69A3DB00AD8286 /* Event.swift in Sources */,
1186+
9A2D5CC3259F81FC005682ED /* SkipFirst.swift in Sources */,
11401187
9AFA490D24E9A0C4003D263C /* Observer.swift in Sources */,
11411188
C79B647D1CD52E4A003F2376 /* EventLogger.swift in Sources */,
11421189
D0C312CE19EF2A5800984962 /* Atomic.swift in Sources */,
11431190
D0C312E819EF2A5800984962 /* Scheduler.swift in Sources */,
11441191
9A2D5C78259F7D3D005682ED /* AttemptMap.swift in Sources */,
11451192
D0C312D019EF2A5800984962 /* Bag.swift in Sources */,
1193+
9A2D5CA0259F8059005682ED /* TakeFirst.swift in Sources */,
11461194
9AFA492124E9A988003D263C /* CompactMap.swift in Sources */,
11471195
D0D11ABA1A6AE87700C1F8B1 /* Action.swift in Sources */,
1196+
9A2D5CAF259F8112005682ED /* TakeLast.swift in Sources */,
11481197
BE9CF3961D751B70003AE479 /* UnidirectionalBinding.swift in Sources */,
1198+
9A2D5CCD259F8263005682ED /* SkipWhile.swift in Sources */,
11491199
9A2D5C82259F7E3E005682ED /* DematerializeResults.swift in Sources */,
11501200
9A2D5C50259F7B21005682ED /* MapError.swift in Sources */,
11511201
);

Sources/Event.swift

Lines changed: 10 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -273,101 +273,32 @@ extension Signal.Event where Error == Swift.Error {
273273

274274
extension Signal.Event {
275275
internal static func take(first count: Int) -> Transformation<Value, Error> {
276-
assert(count >= 1)
277-
278-
return { action, _ in
279-
var taken = 0
280-
281-
return Signal.Observer { event in
282-
guard let value = event.value else {
283-
action(event)
284-
return
285-
}
286-
287-
if taken < count {
288-
taken += 1
289-
action(.value(value))
290-
}
291-
292-
if taken == count {
293-
action(.completed)
294-
}
295-
}
276+
return { downstream, _ in
277+
Operators.TakeFirst(downstream: downstream, count: count)
296278
}
297279
}
298280

299281
internal static func take(last count: Int) -> Transformation<Value, Error> {
300-
return { action, _ in
301-
var buffer: [Value] = []
302-
buffer.reserveCapacity(count)
303-
304-
return Signal.Observer { event in
305-
switch event {
306-
case let .value(value):
307-
// To avoid exceeding the reserved capacity of the buffer,
308-
// we remove then add. Remove elements until we have room to
309-
// add one more.
310-
while (buffer.count + 1) > count {
311-
buffer.remove(at: 0)
312-
}
313-
314-
buffer.append(value)
315-
case let .failed(error):
316-
action(.failed(error))
317-
case .completed:
318-
buffer.forEach { action(.value($0)) }
319-
action(.completed)
320-
case .interrupted:
321-
action(.interrupted)
322-
}
323-
}
282+
return { downstream, _ in
283+
Operators.TakeLast(downstream: downstream, count: count)
324284
}
325285
}
326286

327287
internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
328-
return { action, _ in
329-
return Signal.Observer { event in
330-
if let value = event.value, !shouldContinue(value) {
331-
action(.completed)
332-
} else {
333-
action(event)
334-
}
335-
}
288+
return { downstream, _ in
289+
Operators.TakeWhile(downstream: downstream, shouldContinue: shouldContinue)
336290
}
337291
}
338292

339293
internal static func skip(first count: Int) -> Transformation<Value, Error> {
340-
precondition(count > 0)
341-
342-
return { action, _ in
343-
var skipped = 0
344-
345-
return Signal.Observer { event in
346-
if case .value = event, skipped < count {
347-
skipped += 1
348-
} else {
349-
action(event)
350-
}
351-
}
294+
return { downstream, _ in
295+
Operators.SkipFirst(downstream: downstream, count: count)
352296
}
353297
}
354298

355299
internal static func skip(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
356-
return { action, _ in
357-
var isSkipping = true
358-
359-
return Signal.Observer { event in
360-
switch event {
361-
case let .value(value):
362-
isSkipping = isSkipping && shouldContinue(value)
363-
if !isSkipping {
364-
fallthrough
365-
}
366-
367-
case .failed, .completed, .interrupted:
368-
action(event)
369-
}
370-
}
300+
return { downstream, _ in
301+
Operators.SkipWhile(downstream: downstream, shouldContinueToSkip: shouldContinue)
371302
}
372303
}
373304
}

Sources/Observers/SkipFirst.swift

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
extension Operators {
2+
internal final class SkipFirst<Value, Error: Swift.Error>: Observer<Value, Error> {
3+
let downstream: Observer<Value, Error>
4+
let count: Int
5+
var skipped: Int = 0
6+
7+
init(downstream: Observer<Value, Error>, count: Int) {
8+
precondition(count >= 1)
9+
10+
self.downstream = downstream
11+
self.count = count
12+
}
13+
14+
override func receive(_ value: Value) {
15+
if skipped < count {
16+
skipped += 1
17+
} else {
18+
downstream.receive(value)
19+
}
20+
}
21+
22+
override func terminate(_ termination: Termination<Error>) {
23+
downstream.terminate(termination)
24+
}
25+
}
26+
}

Sources/Observers/SkipWhile.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
extension Operators {
2+
internal final class SkipWhile<Value, Error: Swift.Error>: Observer<Value, Error> {
3+
let downstream: Observer<Value, Error>
4+
let shouldContinueToSkip: (Value) -> Bool
5+
var isSkipping = true
6+
7+
init(downstream: Observer<Value, Error>, shouldContinueToSkip: @escaping (Value) -> Bool) {
8+
self.downstream = downstream
9+
self.shouldContinueToSkip = shouldContinueToSkip
10+
}
11+
12+
override func receive(_ value: Value) {
13+
isSkipping = isSkipping && shouldContinueToSkip(value)
14+
15+
if !isSkipping {
16+
downstream.receive(value)
17+
}
18+
}
19+
20+
override func terminate(_ termination: Termination<Error>) {
21+
downstream.terminate(termination)
22+
}
23+
}
24+
}

Sources/Observers/TakeFirst.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
extension Operators {
2+
internal final class TakeFirst<Value, Error: Swift.Error>: Observer<Value, Error> {
3+
let downstream: Observer<Value, Error>
4+
let count: Int
5+
var taken: Int = 0
6+
7+
init(downstream: Observer<Value, Error>, count: Int) {
8+
precondition(count >= 1)
9+
10+
self.downstream = downstream
11+
self.count = count
12+
}
13+
14+
override func receive(_ value: Value) {
15+
if taken < count {
16+
taken += 1
17+
downstream.receive(value)
18+
}
19+
20+
if taken == count {
21+
downstream.terminate(.completed)
22+
}
23+
}
24+
25+
override func terminate(_ termination: Termination<Error>) {
26+
downstream.terminate(termination)
27+
}
28+
}
29+
}

Sources/Observers/TakeLast.swift

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
extension Operators {
2+
internal final class TakeLast<Value, Error: Swift.Error>: Observer<Value, Error> {
3+
let downstream: Observer<Value, Error>
4+
let count: Int
5+
var buffer: [Value] = []
6+
7+
init(downstream: Observer<Value, Error>, count: Int) {
8+
precondition(count >= 1)
9+
10+
self.downstream = downstream
11+
self.count = count
12+
13+
buffer.reserveCapacity(count)
14+
}
15+
16+
override func receive(_ value: Value) {
17+
// To avoid exceeding the reserved capacity of the buffer,
18+
// we remove then add. Remove elements until we have room to
19+
// add one more.
20+
while (buffer.count + 1) > count {
21+
buffer.remove(at: 0)
22+
}
23+
24+
buffer.append(value)
25+
}
26+
27+
override func terminate(_ termination: Termination<Error>) {
28+
if case .completed = termination {
29+
buffer.forEach(downstream.receive)
30+
buffer = []
31+
}
32+
33+
downstream.terminate(termination)
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)