Skip to content

Commit 0f23a15

Browse files
SherifMakhloufakarnokd
authored andcommitted
Adds Observable.sorted method (#4264)
* Adds Observable.sorted() method * moves sorted tests to OperatorToObservableSortedListTest * replace observer with testSubscriber * adds a note about long or infinite sources * makes inner class static
1 parent 515d7ce commit 0f23a15

File tree

2 files changed

+146
-24
lines changed

2 files changed

+146
-24
lines changed

src/main/java/rx/Observable.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11438,6 +11438,57 @@ public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Intege
1143811438
return lift(new OperatorToObservableSortedList<T>(sortFunction, initialCapacity));
1143911439
}
1144011440

11441+
/**
11442+
* Returns an Observable that emits the events emitted by source Observable, in a
11443+
* sorted order. Each item emitted by the Observable must implement {@link Comparable} with respect to all
11444+
* other items in the sequence.
11445+
*
11446+
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
11447+
* might cause {@link OutOfMemoryError}
11448+
*
11449+
* <dl>
11450+
* <dt><b>Backpressure:</b></dt>
11451+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Observable} in an
11452+
* unbounded manner (i.e., without applying backpressure to it).</dd>
11453+
* <dt><b>Scheduler:</b></dt>
11454+
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
11455+
* </dl>
11456+
*
11457+
* @throws ClassCastException
11458+
* if any item emitted by the Observable does not implement {@link Comparable} with respect to
11459+
* all other items emitted by the Observable
11460+
* @return an Observable that emits the items emitted by the source Observable in sorted order
11461+
*/
11462+
@Experimental
11463+
public final Observable<T> sorted(){
11464+
return toSortedList().flatMapIterable(UtilityFunctions.<List<T>>identity());
11465+
}
11466+
11467+
/**
11468+
* Returns an Observable that emits the events emitted by source Observable, in a
11469+
* sorted order based on a specified comparison function.
11470+
*
11471+
* <p>Note that calling {@code sorted} with long, non-terminating or infinite sources
11472+
* might cause {@link OutOfMemoryError}
11473+
*
11474+
* <dl>
11475+
* <dt><b>Backpressure:</b></dt>
11476+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Observable} in an
11477+
* unbounded manner (i.e., without applying backpressure to it).</dd>
11478+
* <dt><b>Scheduler:</b></dt>
11479+
* <dd>{@code sorted} does not operate by default on a particular {@link Scheduler}.</dd>
11480+
* </dl>
11481+
*
11482+
* @param sortFunction
11483+
* a function that compares two items emitted by the source Observable and returns an Integer
11484+
* that indicates their sort order
11485+
* @return an Observable that emits the items emitted by the source Observable in sorted order
11486+
*/
11487+
@Experimental
11488+
public final Observable<T> sorted(Func2<? super T, ? super T, Integer> sortFunction) {
11489+
return toSortedList(sortFunction).flatMapIterable(UtilityFunctions.<List<T>>identity());
11490+
}
11491+
1144111492
/**
1144211493
* Modifies the source Observable so that subscribers will unsubscribe from it on a specified
1144311494
* {@link Scheduler}.

src/test/java/rx/internal/operators/OperatorToObservableSortedListTest.java

Lines changed: 95 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@ public void testSortedList() {
4040
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
4141
Observable<List<Integer>> observable = w.toSortedList();
4242

43-
@SuppressWarnings("unchecked")
44-
Observer<List<Integer>> observer = mock(Observer.class);
45-
observable.subscribe(observer);
46-
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
47-
verify(observer, Mockito.never()).onError(any(Throwable.class));
48-
verify(observer, times(1)).onCompleted();
43+
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
44+
observable.subscribe(testSubscriber);
45+
testSubscriber.assertValue(Arrays.asList(1,2,3,4,5));
46+
testSubscriber.assertNoErrors();
47+
testSubscriber.assertCompleted();
4948
}
5049

5150
@Test
@@ -154,12 +153,11 @@ public void testSortedListCapacity() {
154153
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
155154
Observable<List<Integer>> observable = w.toSortedList(4);
156155

157-
@SuppressWarnings("unchecked")
158-
Observer<List<Integer>> observer = mock(Observer.class);
159-
observable.subscribe(observer);
160-
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
161-
verify(observer, Mockito.never()).onError(any(Throwable.class));
162-
verify(observer, times(1)).onCompleted();
156+
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
157+
observable.subscribe(testSubscriber);
158+
testSubscriber.assertValue(Arrays.asList(1,2,3,4,5));
159+
testSubscriber.assertNoErrors();
160+
testSubscriber.assertCompleted();
163161
}
164162

165163
@Test
@@ -172,12 +170,11 @@ public Integer call(Integer t1, Integer t2) {
172170
}
173171
});
174172

175-
@SuppressWarnings("unchecked")
176-
Observer<List<Integer>> observer = mock(Observer.class);
177-
observable.subscribe(observer);
178-
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
179-
verify(observer, Mockito.never()).onError(any(Throwable.class));
180-
verify(observer, times(1)).onCompleted();
173+
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
174+
observable.subscribe(testSubscriber);
175+
testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1));
176+
testSubscriber.assertNoErrors();
177+
testSubscriber.assertCompleted();
181178
}
182179

183180
@Test
@@ -190,11 +187,85 @@ public Integer call(Integer t1, Integer t2) {
190187
}
191188
}, 4);
192189

193-
@SuppressWarnings("unchecked")
194-
Observer<List<Integer>> observer = mock(Observer.class);
195-
observable.subscribe(observer);
196-
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
197-
verify(observer, Mockito.never()).onError(any(Throwable.class));
198-
verify(observer, times(1)).onCompleted();
190+
TestSubscriber<List<Integer>> testSubscriber = new TestSubscriber<List<Integer>>();
191+
observable.subscribe(testSubscriber);
192+
testSubscriber.assertValue(Arrays.asList(5, 4, 3, 2, 1));
193+
testSubscriber.assertNoErrors();
194+
testSubscriber.assertCompleted();
195+
}
196+
197+
@Test
198+
public void testSorted() {
199+
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
200+
Observable<Integer> observable = w.sorted();
201+
202+
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
203+
observable.subscribe(testSubscriber);
204+
testSubscriber.assertValues(1,2,3,4,5);
205+
testSubscriber.assertNoErrors();
206+
testSubscriber.assertCompleted();
207+
}
208+
209+
@Test
210+
public void testSortedWithCustomFunction() {
211+
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
212+
Observable<Integer> observable = w.sorted(new Func2<Integer, Integer, Integer>() {
213+
214+
@Override
215+
public Integer call(Integer t1, Integer t2) {
216+
return t2 - t1;
217+
}
218+
219+
});
220+
221+
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
222+
observable.subscribe(testSubscriber);
223+
testSubscriber.assertValues(5,4,3,2,1);
224+
testSubscriber.assertNoErrors();
225+
testSubscriber.assertCompleted();
226+
}
227+
228+
@Test
229+
public void testSortedCustomComparator() {
230+
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
231+
Observable<Integer> observable = w.sorted(new Func2<Integer, Integer, Integer>() {
232+
@Override
233+
public Integer call(Integer t1, Integer t2) {
234+
return t1.compareTo(t2);
235+
}
236+
237+
});
238+
239+
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
240+
observable.subscribe(testSubscriber);
241+
testSubscriber.assertValues(1,2,3,4,5);
242+
testSubscriber.assertNoErrors();
243+
testSubscriber.assertCompleted();
244+
}
245+
246+
@Test
247+
public void testSortedWithNonComparable() {
248+
NonComparable n1 = new NonComparable(1,"a");
249+
NonComparable n2 = new NonComparable(2,"b");
250+
NonComparable n3 = new NonComparable(3,"c");
251+
Observable<NonComparable> w = Observable.just(n1,n2,n3);
252+
253+
Observable<NonComparable> observable = w.sorted();
254+
255+
TestSubscriber<NonComparable> testSubscriber = new TestSubscriber<NonComparable>();
256+
observable.subscribe(testSubscriber);
257+
testSubscriber.assertNoValues();
258+
testSubscriber.assertError(ClassCastException.class);
259+
testSubscriber.assertNotCompleted();
260+
}
261+
262+
private final static class NonComparable{
263+
public int i;
264+
public String s;
265+
266+
NonComparable(int i, String s){
267+
this.i = i;
268+
this.s = s;
269+
}
199270
}
200271
}

0 commit comments

Comments
 (0)