Skip to content

Commit 9a1832e

Browse files
author
jmhofer
committed
implemented count operator (#32)
1 parent 176280e commit 9a1832e

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,6 +2043,21 @@ public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator
20432043
return create(OperationScan.scan(this, accumulator)).takeLast(1);
20442044
}
20452045

2046+
/**
2047+
* Returns an Observable that counts the total number of elements in the source Observable.
2048+
* @return an Observable emitting the number of counted elements of the source Observable
2049+
* as its single item.
2050+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229470%28v=vs.103%29.aspx">MSDN: Observable.Count</a>
2051+
*/
2052+
public Observable<Integer> count() {
2053+
return reduce(0, new Func2<Integer, T, Integer>() {
2054+
@Override
2055+
public Integer call(Integer t1, T t2) {
2056+
return t1 + 1;
2057+
}
2058+
});
2059+
}
2060+
20462061
/**
20472062
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
20482063
* Observable that will replay all of its items and notifications to any future {@link Observer}.

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.junit.Before;
2828
import org.junit.Test;
2929
import org.mockito.Mock;
30-
import org.mockito.Mockito;
3130
import org.mockito.MockitoAnnotations;
3231

3332
import rx.Observable.OnSubscribeFunc;
@@ -69,10 +68,47 @@ public Subscription onSubscribe(Observer<? super String> Observer) {
6968
verify(aObserver, times(1)).onNext("one");
7069
verify(aObserver, times(1)).onNext("two");
7170
verify(aObserver, times(1)).onNext("three");
72-
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
71+
verify(aObserver, never()).onError(any(Throwable.class));
7372
verify(aObserver, times(1)).onCompleted();
7473
}
7574

75+
@Test
76+
public void testCountAFewItems() {
77+
Observable<String> observable = Observable.from("a", "b", "c", "d");
78+
observable.count().subscribe(w);
79+
// we should be called only once
80+
verify(w, times(1)).onNext(anyInt());
81+
verify(w).onNext(4);
82+
verify(w, never()).onError(any(Throwable.class));
83+
verify(w, times(1)).onCompleted();
84+
}
85+
86+
@Test
87+
public void testCountZeroItems() {
88+
Observable<String> observable = Observable.empty();
89+
observable.count().subscribe(w);
90+
// we should be called only once
91+
verify(w, times(1)).onNext(anyInt());
92+
verify(w).onNext(0);
93+
verify(w, never()).onError(any(Throwable.class));
94+
verify(w, times(1)).onCompleted();
95+
}
96+
97+
@Test
98+
public void testCountError() {
99+
Observable<String> o = Observable.create(new OnSubscribeFunc<String>() {
100+
@Override
101+
public Subscription onSubscribe(Observer<? super String> obsv) {
102+
obsv.onError(new RuntimeException());
103+
return Subscriptions.empty();
104+
}
105+
});
106+
o.count().subscribe(w);
107+
verify(w, never()).onNext(anyInt());
108+
verify(w, never()).onCompleted();
109+
verify(w, times(1)).onError(any(RuntimeException.class));
110+
}
111+
76112
@Test
77113
public void testReduce() {
78114
Observable<Integer> observable = Observable.from(1, 2, 3, 4);

0 commit comments

Comments
 (0)