Skip to content

Commit 59aec0f

Browse files
patmaltfreak4pc
authored andcommitted
Implement bufferWithTrigger. (#191)
1 parent 3075c3f commit 59aec0f

File tree

9 files changed

+196
-2
lines changed

9 files changed

+196
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ Changelog
33

44
master
55
-----
6-
76
- added `partition(_:)` operator
7+
- added `bufferWithTrigger` operator
88

99
3.4.0
1010
-----

Cartfile.resolved

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
github "ReactiveX/RxSwift" "4.2.0"
1+
github "ReactiveX/RxSwift" "4.4.0"

Playground/RxSwiftExtPlayground.playground/Pages/Index.xcplaygroundpage/Contents.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
- [toSortedArray()](mapMany) operator, converts an Observable into another Observable that emits the whole sequence as a single array sorted using the provided closure and then terminates.
4646
- [count(predicate)](count) operator, counts the number of items emitted by an Observable. If predicate exists, then counts the number of items satisfying it.
4747
- [partition](partition) operator, partition a stream into two separate streams of elements that match, and don't match, the provided predicate.
48+
- [bufferWithTrigger()](bufferWithTrigger) Collects the elements of the source observable, and emits them as an array when the trigger emits.
4849
- **UIViewPropertyAnimator** [animate()](UIViewPropertyAnimator.animate) operator, returns a Completable that completes as soon as the animation ends.
4950
- **UIViewPropertyAnimator** [fractionComplete](UIViewPropertyAnimator.fractionComplete) binder, provides a reactive way to bind to `UIViewPropertyAnimator.fractionComplete`.
5051
*/
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*:
2+
> # IMPORTANT: To use `RxSwiftExtPlayground.playground`, please:
3+
4+
1. Make sure you have [Carthage](https://github.com/Carthage/Carthage) installed
5+
1. Fetch Carthage dependencies from shell: `carthage bootstrap --platform ios`
6+
1. Build scheme `RxSwiftExtPlayground` scheme for a simulator target
7+
1. Choose `View > Show Debug Area`
8+
*/
9+
10+
//: [Previous](@previous)
11+
import RxSwift
12+
import RxSwiftExt
13+
/*:
14+
## bufferWithTrigger
15+
16+
Collects the elements of the source observable, and emits them as an array when the boundary emits.
17+
*/
18+
19+
example("bufferWithTrigger") {
20+
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
21+
22+
let signalAtThreeSeconds = Observable<Int>.timer(3, scheduler: MainScheduler.instance).map { _ in () }
23+
let signalAtFiveSeconds = Observable<Int>.timer(5, scheduler: MainScheduler.instance).map { _ in () }
24+
let trigger = Observable.of(signalAtThreeSeconds, signalAtFiveSeconds).merge()
25+
26+
// unlimited buffering of values received while paused
27+
let buffered = observable.bufferWithTrigger(trigger)
28+
29+
buffered.subscribe { print($0) }
30+
31+
playgroundShouldContinueIndefinitely()
32+
}
33+
//: [Next](@next)

Playground/RxSwiftExtPlayground.playground/contents.xcplayground

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
<pages>
44
<page name='Index'/>
55
<page name='apply'/>
6+
<page name='bufferWithTrigger'/>
67
<page name='cascade'/>
78
<page name='catchErrorJustComplete'/>
89
<page name='count'/>

Readme.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ These operators are much like the RxSwift & RxCocoa core operators, but provide
8282
* [withUnretained](#withunretained)
8383
* [count](#count)
8484
* [partition](#partition)
85+
* [bufferWithTrigger](#bufferWithTrigger)
8586

8687
There are two more available operators for `materialize()`'d sequences:
8788

@@ -613,6 +614,11 @@ let numbers = Observable
613614
_ = odds.debug("odds").subscribe() // emits 1, 3, 5
614615
```
615616

617+
#### bufferWithTrigger
618+
Collects the elements of the source observable, and emits them as an array when the trigger emits.
619+
620+
Examples are available in the project's Playground.
621+
616622
Reactive Extensions details
617623
===========
618624

RxSwiftExt.xcodeproj/project.pbxproj

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,12 @@
246246
E39C42101F18B13E007F2ACD /* WeakTarget.swift in Sources */ = {isa = PBXBuildFile; fileRef = 538607CA1E6F367A000361DE /* WeakTarget.swift */; };
247247
E39C42111F18B13E007F2ACD /* WeakTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 538607CB1E6F367A000361DE /* WeakTests.swift */; };
248248
E39C42121F18B13E007F2ACD /* FilterMapTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 98309EB21EDF161700BD07D9 /* FilterMapTests.swift */; };
249+
E62D9D582199D1EF006636D7 /* bufferWithTrigger.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D572199D1EF006636D7 /* bufferWithTrigger.swift */; };
250+
E62D9D592199D2E3006636D7 /* BufferWithTriggerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D552199D188006636D7 /* BufferWithTriggerTests.swift */; };
251+
E62D9D5A2199D2E6006636D7 /* BufferWithTriggerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D552199D188006636D7 /* BufferWithTriggerTests.swift */; };
252+
E62D9D5B2199D8BB006636D7 /* BufferWithTriggerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D552199D188006636D7 /* BufferWithTriggerTests.swift */; };
253+
E62D9D5C2199DC76006636D7 /* bufferWithTrigger.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D572199D1EF006636D7 /* bufferWithTrigger.swift */; };
254+
E62D9D5D2199DC77006636D7 /* bufferWithTrigger.swift in Sources */ = {isa = PBXBuildFile; fileRef = E62D9D572199D1EF006636D7 /* bufferWithTrigger.swift */; };
249255
/* End PBXBuildFile section */
250256

251257
/* Begin PBXContainerItemProxy section */
@@ -368,6 +374,8 @@
368374
E36BDFBA1F387571008C9D56 /* RxTest.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = RxTest.framework; path = Carthage/Build/tvOS/RxTest.framework; sourceTree = "<group>"; };
369375
E39C41D01F18AF84007F2ACD /* RxSwiftExt.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwiftExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
370376
E39C41F21F18B0EA007F2ACD /* RxSwiftTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = RxSwiftTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
377+
E62D9D552199D188006636D7 /* BufferWithTriggerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BufferWithTriggerTests.swift; sourceTree = "<group>"; };
378+
E62D9D572199D1EF006636D7 /* bufferWithTrigger.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = bufferWithTrigger.swift; sourceTree = "<group>"; };
371379
/* End PBXFileReference section */
372380

373381
/* Begin PBXFrameworksBuildPhase section */
@@ -506,6 +514,7 @@
506514
children = (
507515
3DBDE5FB1FBBAE3900DF47F9 /* and.swift */,
508516
5386079A1E6F334B000361DE /* apply.swift */,
517+
E62D9D572199D1EF006636D7 /* bufferWithTrigger.swift */,
509518
5386079B1E6F334B000361DE /* cascade.swift */,
510519
5386079C1E6F334B000361DE /* catchErrorJustComplete.swift */,
511520
B69B45482190C27D00F30418 /* count.swift */,
@@ -544,6 +553,7 @@
544553
C4D2154020118F8B009804AE /* Observable+OfTypeTests.swift */,
545554
3DBDE5FD1FBBB05400DF47F9 /* AndTests.swift */,
546555
538607BB1E6F367A000361DE /* ApplyTests.swift */,
556+
E62D9D552199D188006636D7 /* BufferWithTriggerTests.swift */,
547557
538607BC1E6F367A000361DE /* CascadeTests.swift */,
548558
538607BD1E6F367A000361DE /* CatchErrorJustCompleteTests.swift */,
549559
B69B454C2190C3BC00F30418 /* CountTests.swift */,
@@ -1013,6 +1023,7 @@
10131023
538607B61E6F334B000361DE /* pausable.swift in Sources */,
10141024
98309EAF1EDF14AC00BD07D9 /* flatMapSync.swift in Sources */,
10151025
780CB21520A0ED1C00FD3F39 /* toSortedArray.swift in Sources */,
1026+
E62D9D582199D1EF006636D7 /* bufferWithTrigger.swift in Sources */,
10161027
98309EB11EDF159500BD07D9 /* filterMap.swift in Sources */,
10171028
53F336E81E70CBF700D35D38 /* distinct+RxCocoa.swift in Sources */,
10181029
A23E148721A9EFC000CD5B2F /* partition.swift in Sources */,
@@ -1069,6 +1080,7 @@
10691080
538607E61E6F36A9000361DE /* MapToTests.swift in Sources */,
10701081
5A5FCE411ED5AEC60052A9B5 /* PausableBufferedTests.swift in Sources */,
10711082
1A8741AC20745A91004BB762 /* UIViewPropertyAnimatorTests+Rx.swift in Sources */,
1083+
E62D9D592199D2E3006636D7 /* BufferWithTriggerTests.swift in Sources */,
10721084
3DBDE5FF1FBBB09900DF47F9 /* AndTests.swift in Sources */,
10731085
58C545FD1AE234C7F290334F /* ZipWithTest.swift in Sources */,
10741086
);
@@ -1085,6 +1097,7 @@
10851097
62512C6F1F0EAF950083A89F /* ignoreErrors.swift in Sources */,
10861098
62512C701F0EAF950083A89F /* ignoreWhen.swift in Sources */,
10871099
789682E720408A7500545396 /* mapAt.swift in Sources */,
1100+
E62D9D5C2199DC76006636D7 /* bufferWithTrigger.swift in Sources */,
10881101
62512C6A1F0EAF950083A89F /* apply.swift in Sources */,
10891102
62512C6B1F0EAF950083A89F /* cascade.swift in Sources */,
10901103
62512C671F0EAF820083A89F /* distinct+RxCocoa.swift in Sources */,
@@ -1148,6 +1161,7 @@
11481161
62512C931F0EB1850083A89F /* CascadeTests.swift in Sources */,
11491162
62512C971F0EB1850083A89F /* IgnoreTests.swift in Sources */,
11501163
62512C941F0EB1850083A89F /* CatchErrorJustCompleteTests.swift in Sources */,
1164+
E62D9D5A2199D2E6006636D7 /* BufferWithTriggerTests.swift in Sources */,
11511165
62512C921F0EB1850083A89F /* ApplyTests.swift in Sources */,
11521166
62512CA21F0EB1850083A89F /* WeakTarget.swift in Sources */,
11531167
62512C9D1F0EB1850083A89F /* PausableTests.swift in Sources */,
@@ -1168,6 +1182,7 @@
11681182
E39C41E01F18B08A007F2ACD /* ignoreErrors.swift in Sources */,
11691183
E39C41E11F18B08A007F2ACD /* ignoreWhen.swift in Sources */,
11701184
789682E820408A7700545396 /* mapAt.swift in Sources */,
1185+
E62D9D5D2199DC77006636D7 /* bufferWithTrigger.swift in Sources */,
11711186
E39C41DB1F18B08A007F2ACD /* apply.swift in Sources */,
11721187
E39C41DC1F18B08A007F2ACD /* cascade.swift in Sources */,
11731188
E39C41D81F18B086007F2ACD /* distinct+RxCocoa.swift in Sources */,
@@ -1231,6 +1246,7 @@
12311246
E39C41FE1F18B13A007F2ACD /* MapToTests+RxCocoa.swift in Sources */,
12321247
E39C420A1F18B13E007F2ACD /* OnceTests.swift in Sources */,
12331248
E39C42111F18B13E007F2ACD /* WeakTests.swift in Sources */,
1249+
E62D9D5B2199D8BB006636D7 /* BufferWithTriggerTests.swift in Sources */,
12341250
E39C420D1F18B13E007F2ACD /* RepeatWithBehaviorTests.swift in Sources */,
12351251
E39C42091F18B13E007F2ACD /* NotTests.swift in Sources */,
12361252
E39C42081F18B13E007F2ACD /* Materialized+elementsTests.swift in Sources */,
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//
2+
// bufferWithTrigger.swift
3+
// RxSwiftExt-iOS
4+
//
5+
// Created by Patrick Maltagliati on 11/12/18.
6+
// Copyright © 2018 RxSwiftCommunity. All rights reserved.
7+
//
8+
9+
import Foundation
10+
import RxSwift
11+
12+
extension ObservableType {
13+
/**
14+
Collects the elements of the source observable, and emits them as an array when the trigger emits.
15+
16+
- parameter trigger: The observable sequence used to signal the emission of the buffered items.
17+
- returns: The buffered observable from elements of the source sequence.
18+
*/
19+
public func bufferWithTrigger<U>(_ trigger: Observable<U>) -> Observable<[E]> {
20+
return Observable<[E]>.create { observer in
21+
var buffer: [E] = []
22+
let lock = NSRecursiveLock()
23+
let triggerDisposable = trigger.subscribe { event in
24+
lock.lock(); defer { lock.unlock() }
25+
switch event {
26+
case .next:
27+
observer.onNext(buffer)
28+
buffer = []
29+
default:
30+
break
31+
}
32+
}
33+
let disposable = self.subscribe { event in
34+
lock.lock(); defer { lock.unlock() }
35+
switch event {
36+
case .next(let element):
37+
buffer.append(element)
38+
case .completed:
39+
observer.onNext(buffer)
40+
observer.onCompleted()
41+
case .error(let error):
42+
observer.onError(error)
43+
buffer = []
44+
}
45+
}
46+
return Disposables.create([disposable, triggerDisposable])
47+
}
48+
}
49+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//
2+
// BufferWithTriggerTests.swift
3+
// RxSwiftExt-iOS
4+
//
5+
// Created by Patrick Maltagliati on 11/12/18.
6+
// Copyright © 2018 RxSwiftCommunity. All rights reserved.
7+
//
8+
9+
import XCTest
10+
11+
import RxSwift
12+
import RxSwiftExt
13+
import RxTest
14+
15+
class BufferWithTriggerTests: XCTestCase {
16+
let testError = NSError(domain: "dummyError", code: -232, userInfo: nil)
17+
let scheduler = TestScheduler(initialClock: 0)
18+
19+
func testBuffersUntilBoundaryEmits() {
20+
let underlying = scheduler.createHotObservable(
21+
[
22+
next(150, 1),
23+
next(201, 2),
24+
next(230, 3),
25+
next(300, 4),
26+
next(350, 5),
27+
next(375, 6),
28+
next(400, 7),
29+
next(430, 8),
30+
completed(500)
31+
]
32+
)
33+
34+
let boundary = scheduler.createHotObservable(
35+
[
36+
next(201, ()),
37+
next(301, ()),
38+
next(401, ())
39+
]
40+
)
41+
42+
let res = scheduler.start(disposed: 1000) {
43+
underlying.bufferWithTrigger(boundary.asObservable())
44+
}
45+
46+
let expected = [
47+
next(201, [2]),
48+
next(301, [3, 4]),
49+
next(401, [5, 6, 7]),
50+
next(500, [8]),
51+
completed(500)
52+
]
53+
XCTAssertEqual(res.events, expected)
54+
55+
XCTAssertEqual(underlying.subscriptions, [Subscription(200, 500)])
56+
}
57+
58+
func testPausedError() {
59+
let underlying = scheduler.createHotObservable(
60+
[
61+
next(150, 1),
62+
next(210, 2),
63+
error(230, testError),
64+
completed(500)
65+
]
66+
)
67+
68+
let boundary = scheduler.createHotObservable(
69+
[
70+
next(201, ()),
71+
next(211, ())
72+
]
73+
)
74+
75+
let res = scheduler.start(disposed: 1000) {
76+
underlying.bufferWithTrigger(boundary.asObservable())
77+
}
78+
79+
let expected = [
80+
next(201, []),
81+
next(211, [2]),
82+
error(230, testError)
83+
]
84+
XCTAssertEqual(res.events, expected)
85+
86+
XCTAssertEqual(underlying.subscriptions, [Subscription(200, 230)])
87+
}
88+
}

0 commit comments

Comments
 (0)