Skip to content

Commit 63c946b

Browse files
Merge pull request #184 from benjchristensen/issue-57-last
Convert 'last' from non-blocking to blocking to match Rx.Net
2 parents 5a53462 + 573ba90 commit 63c946b

File tree

5 files changed

+113
-116
lines changed

5 files changed

+113
-116
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import rx.Notification;
3030
import rx.Observable;
3131
import rx.Observer;
3232
import rx.Subscription;
33+
import rx.subscriptions.Subscriptions;
3334
import rx.util.functions.Func1;
3435

3536
def class ObservableTests {
@@ -61,8 +62,12 @@ def class ObservableTests {
6162

6263
@Test
6364
public void testLast() {
64-
new TestFactory().getObservable().last().subscribe({ result -> a.received(result)});
65-
verify(a, times(1)).received("hello_1");
65+
assertEquals("three", Observable.toObservable("one", "two", "three").last())
66+
}
67+
68+
@Test
69+
public void testLastWithPredicate() {
70+
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
6671
}
6772

6873
@Test
@@ -175,6 +180,12 @@ def class ObservableTests {
175180
verify(a, times(0)).received(3);
176181
}
177182

183+
@Test
184+
public void testTakeLast() {
185+
new TestFactory().getObservable().takeLast(1).subscribe({ result -> a.received(result)});
186+
verify(a, times(1)).received("hello_1");
187+
}
188+
178189
@Test
179190
public void testTakeWhileViaGroovy() {
180191
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
@@ -280,7 +291,7 @@ def class ObservableTests {
280291
observer.onCompleted();
281292
}
282293
}).start();
283-
return Observable.noOpSubscription();
294+
return Subscriptions.empty();
284295
}
285296
}
286297

language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void testFilterViaGroovy() {
8484

8585
@Test
8686
public void testLast() {
87-
String script = "mockApiCall.getObservable().last().subscribe(lambda{|result| a.received(result)})";
87+
String script = "mockApiCall.getObservable().takeLast(1).subscribe(lambda{|result| a.received(result)})";
8888
runGroovyScript(script);
8989
verify(assertion, times(1)).received("hello_1");
9090
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ class UnitTestSuite extends JUnitSuite {
160160
verify(assertion, times(1)).received(List(2,4,6,8))
161161
}
162162

163-
@Test def testLast() {
163+
@Test def testTakeLast() {
164164
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
165-
numbers.last().subscribe((callback: Int) => {
166-
println("testLast: onNext -> got " + callback)
165+
numbers.takeLast(1).subscribe((callback: Int) => {
166+
println("testTakeLast: onNext -> got " + callback)
167167
assertion.received(callback)
168168
})
169169
verify(assertion, times(1)).received(9)

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import rx.operators.OperationDefer;
4242
import rx.operators.OperationDematerialize;
4343
import rx.operators.OperationFilter;
44-
import rx.operators.OperationLast;
4544
import rx.operators.OperationMap;
4645
import rx.operators.OperationMaterialize;
4746
import rx.operators.OperationMerge;
@@ -537,7 +536,6 @@ public Subscription call(Observer<T> t1) {
537536
}
538537
}
539538

540-
541539
/**
542540
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
543541
*
@@ -807,18 +805,44 @@ public static <T> Observable<T> just(T value) {
807805
}
808806

809807
/**
810-
* Takes the last item emitted by a source Observable and returns an Observable that emits only
811-
* that item as its sole emission.
812-
* <p>
813-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
808+
* Returns the last element of an observable sequence with a specified source.
809+
*
810+
* @param that
811+
* the source Observable
812+
* @return the last element in the observable sequence.
813+
*/
814+
public static <T> T last(final Observable<T> that) {
815+
T result = null;
816+
for (T value : that.toIterable()) {
817+
result = value;
818+
}
819+
return result;
820+
}
821+
822+
/**
823+
* Returns the last element of an observable sequence that matches the predicate.
824+
*
825+
* @param that
826+
* the source Observable
827+
* @param predicate
828+
* a predicate function to evaluate for elements in the sequence.
829+
* @return the last element in the observable sequence.
830+
*/
831+
public static <T> T last(final Observable<T> that, final Func1<T, Boolean> predicate) {
832+
return last(that.filter(predicate));
833+
}
834+
835+
/**
836+
* Returns the last element of an observable sequence that matches the predicate.
814837
*
815838
* @param that
816839
* the source Observable
817-
* @return an Observable that emits a single item, which is identical to the last item emitted
818-
* by the source Observable
840+
* @param predicate
841+
* a predicate function to evaluate for elements in the sequence.
842+
* @return the last element in the observable sequence.
819843
*/
820-
public static <T> Observable<T> last(final Observable<T> that) {
821-
return _create(OperationLast.last(that));
844+
public static <T> T last(final Observable<T> that, final Object predicate) {
845+
return last(that.filter(predicate));
822846
}
823847

824848
/**
@@ -1363,7 +1387,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
13631387
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
13641388
*/
13651389
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
1366-
return last(_create(OperationScan.scan(sequence, accumulator)));
1390+
return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1);
13671391
}
13681392

13691393
/**
@@ -1435,7 +1459,7 @@ public T call(T t1, T t2) {
14351459
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
14361460
*/
14371461
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
1438-
return last(_create(OperationScan.scan(sequence, initialValue, accumulator)));
1462+
return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1);
14391463
}
14401464

14411465
/**
@@ -2364,17 +2388,44 @@ public Boolean call(T t1) {
23642388
}
23652389

23662390
/**
2367-
* Converts an Observable that emits a sequence of objects into one that only emits the last
2368-
* object in this sequence before completing.
2369-
* <p>
2370-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
2391+
* Returns the last element of an observable sequence with a specified source.
23712392
*
2372-
* @return an Observable that emits only the last item emitted by the original Observable
2393+
* @return the last element in the observable sequence.
23732394
*/
2374-
public Observable<T> last() {
2395+
public T last() {
23752396
return last(this);
23762397
}
23772398

2399+
/**
2400+
* Returns the last element of an observable sequence that matches the predicate.
2401+
*
2402+
* @param predicate
2403+
* a predicate function to evaluate for elements in the sequence.
2404+
* @return the last element in the observable sequence.
2405+
*/
2406+
public T last(final Func1<T, Boolean> predicate) {
2407+
return last(this, predicate);
2408+
}
2409+
2410+
/**
2411+
* Returns the last element of an observable sequence that matches the predicate.
2412+
*
2413+
* @param predicate
2414+
* a predicate function to evaluate for elements in the sequence.
2415+
* @return the last element in the observable sequence.
2416+
*/
2417+
public T last(final Object predicate) {
2418+
@SuppressWarnings("rawtypes")
2419+
final FuncN _f = Functions.from(predicate);
2420+
2421+
return last(this, new Func1<T, Boolean>() {
2422+
@Override
2423+
public Boolean call(T args) {
2424+
return (Boolean) _f.call(args);
2425+
}
2426+
});
2427+
}
2428+
23782429
/**
23792430
* Returns the last element, or a default value if no value is found.
23802431
*
@@ -3333,6 +3384,32 @@ public Boolean call(String args) {
33333384
});
33343385
}
33353386

3387+
@Test
3388+
public void testLast() {
3389+
Observable<String> obs = Observable.toObservable("one", "two", "three");
3390+
3391+
assertEquals("three", obs.last());
3392+
}
3393+
3394+
@Test
3395+
public void testLastWithPredicate() {
3396+
Observable<String> obs = Observable.toObservable("one", "two", "three");
3397+
3398+
assertEquals("two", obs.last(new Func1<String, Boolean>() {
3399+
@Override
3400+
public Boolean call(String s) {
3401+
return s.length() == 3;
3402+
}
3403+
}));
3404+
}
3405+
3406+
@Test
3407+
public void testLastEmptyObservable() {
3408+
Observable<String> obs = Observable.toObservable();
3409+
3410+
assertNull(obs.last());
3411+
}
3412+
33363413
private static class TestException extends RuntimeException {
33373414
private static final long serialVersionUID = 1L;
33383415
}

rxjava-core/src/main/java/rx/operators/OperationLast.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

0 commit comments

Comments
 (0)