-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Implementing Your Own Operators
David Gross edited this page May 24, 2014
·
22 revisions
You can implement your own Observable operators. This page shows you how.
The following example shows how you can chain a custom operator (in this example: myOperator) along with standard RxJava operators by using the lift( ) operator:
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});The following section will show how to form the scaffolding of your operator so that it will work correctly with lift( ).
Define your operator as a public class, like so:
public class myOperator<T> implements Operator<T> {
public myOperator( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* add your own onCompleted behavior here, or just pass the completed notification through: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* add your own onError behavior here, or just pass the error notification through: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* this example performs some sort of simple transformation on each incoming item and then passes it along */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}- Your operator should take care to check its Subscriber's
isUnsubscribed( )status before it emits any items to (or sends any notifications to) the Subscriber. Do not waste the time to generate an item that no Subscriber is interested in seeing. - Your operator should obey the core tenets of the Observable contract:
** It may call a Subscriber's
onNext( )method any number of times, but these calls must be non-overlapping. ** It may call either a Subscriber'sonCompleted( )oronError( )method, but not both, exactly once, and it may not subsequently call a Subscriber'sonNext( )method.
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava