Skip to content

Commit 5d94e52

Browse files
committed
Implement the 'Start' operator
1 parent ceeab36 commit 5d94e52

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import rx.util.Timestamped;
119119
import rx.util.functions.Action0;
120120
import rx.util.functions.Action1;
121+
import rx.util.functions.Async;
121122
import rx.util.functions.Func0;
122123
import rx.util.functions.Func1;
123124
import rx.util.functions.Func2;
@@ -6269,4 +6270,77 @@ public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Fun
62696270
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
62706271
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
62716272
}
6273+
6274+
/**
6275+
* Invokes the action asynchronously, surfacing the result through an observable sequence.
6276+
* <p>
6277+
* Note: The action is called immediately, not during the subscription of the resulting
6278+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6279+
* action's outcome.
6280+
*
6281+
* @param action
6282+
* Action to run asynchronously.
6283+
* @return An observable sequence exposing a null value upon completion of the action,
6284+
* or an exception.
6285+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229265(v=vs.103).aspx">MSDN: Observable.Start</a>
6286+
*/
6287+
public static Observable<Void> start(Action0 action) {
6288+
return Async.toAsync(action).call();
6289+
}
6290+
6291+
/**
6292+
* Invokes the action asynchronously on the specified scheduler, surfacing the
6293+
* result through an observable sequence.
6294+
* <p>
6295+
* Note: The action is called immediately, not during the subscription of the resulting
6296+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6297+
* action's outcome.
6298+
*
6299+
* @param action
6300+
* Action to run asynchronously.
6301+
* @param scheduler
6302+
* Scheduler to run the function on.
6303+
* @return An observable sequence exposing a null value upon completion of the action,
6304+
* or an exception.
6305+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211971(v=vs.103).aspx">MSDN: Observable.Start</a>
6306+
*/
6307+
public static Observable<Void> start(Action0 action, Scheduler scheduler) {
6308+
return Async.toAsync(action, scheduler).call();
6309+
}
6310+
6311+
/**
6312+
* Invokes the specified function asynchronously, surfacing the result through an observable sequence.
6313+
* <p>
6314+
* Note: The function is called immediately, not during the subscription of the resulting
6315+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6316+
* function's result.
6317+
*
6318+
* @param func
6319+
* Function to run asynchronously.
6320+
* @return An observable sequence exposing the function's result value, or an exception.
6321+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229036(v=vs.103).aspx">MSDN: Observable.Start</a>
6322+
*/
6323+
public static <T> Observable<T> start(Func0<T> func) {
6324+
return Async.toAsync(func).call();
6325+
}
6326+
6327+
/**
6328+
* Invokes the specified function asynchronously on the specified scheduler, surfacing
6329+
* the result through an observable sequence.
6330+
* <p>
6331+
* Note: The function is called immediately, not during the subscription of the resulting
6332+
* sequence. Multiple subscriptions to the resulting sequence can observe the
6333+
* function's result.
6334+
*
6335+
* @param func
6336+
* Function to run asynchronously.
6337+
* @param scheduler
6338+
* Scheduler to run the function on.
6339+
* @return An observable sequence exposing the function's result value, or an exception.
6340+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211721(v=vs.103).aspx">MSDN: Observable.Start</a>
6341+
*/
6342+
public static <T> Observable<T> start(Func0<T> func, Scheduler scheduler) {
6343+
return Async.toAsync(func, scheduler).call();
6344+
}
6345+
62726346
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.mockito.InOrder;
3434
import org.mockito.Mock;
3535
import org.mockito.MockitoAnnotations;
36+
import org.mockito.invocation.InvocationOnMock;
37+
import org.mockito.stubbing.Answer;
3638

3739
import rx.Observable.OnSubscribeFunc;
3840
import rx.concurrency.TestScheduler;
@@ -41,6 +43,7 @@
4143
import rx.subscriptions.Subscriptions;
4244
import rx.util.functions.Action0;
4345
import rx.util.functions.Action1;
46+
import rx.util.functions.Func0;
4447
import rx.util.functions.Func1;
4548
import rx.util.functions.Func2;
4649

@@ -947,4 +950,66 @@ public void testRangeWithScheduler() {
947950
inOrder.verify(aObserver, times(1)).onCompleted();
948951
inOrder.verifyNoMoreInteractions();
949952
}
953+
954+
@Test
955+
public void testStartWithAction() {
956+
TestScheduler scheduler = new TestScheduler();
957+
958+
Action0 action = mock(Action0.class);
959+
Observable<Void> observable = Observable.start(action, scheduler);
960+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
961+
962+
assertEquals(null, observable.toBlockingObservable().single());
963+
assertEquals(null, observable.toBlockingObservable().single());
964+
verify(action, times(1)).call();
965+
}
966+
967+
@Test(expected = RuntimeException.class)
968+
public void testStartWithActionError() {
969+
Action0 action = new Action0() {
970+
@Override
971+
public void call() {
972+
throw new RuntimeException("Some error");
973+
}
974+
};
975+
976+
Observable<Void> observable = Observable.start(action);
977+
observable.toBlockingObservable().single();
978+
}
979+
980+
@Test
981+
public void testStartWithFunc() {
982+
TestScheduler scheduler = new TestScheduler();
983+
984+
@SuppressWarnings("unchecked")
985+
Func0<String> func = (Func0<String>) mock(Func0.class);
986+
doAnswer(new Answer<String>() {
987+
988+
@Override
989+
public String answer(InvocationOnMock invocation) throws Throwable {
990+
return "one";
991+
}
992+
993+
}).when(func).call();
994+
995+
Observable<String> observable = Observable.start(func, scheduler);
996+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
997+
998+
assertEquals("one", observable.toBlockingObservable().single());
999+
assertEquals("one", observable.toBlockingObservable().single());
1000+
verify(func, times(1)).call();
1001+
}
1002+
1003+
@Test(expected = RuntimeException.class)
1004+
public void testStartWithFuncError() {
1005+
Func0<String> func = new Func0<String>() {
1006+
@Override
1007+
public String call() {
1008+
throw new RuntimeException("Some error");
1009+
}
1010+
};
1011+
1012+
Observable<String> observable = Observable.start(func);
1013+
observable.toBlockingObservable().single();
1014+
}
9501015
}

0 commit comments

Comments
 (0)