44
55import javax .annotation .Nullable ;
66
7+ import io .objectbox .annotation .apihint .Beta ;
78import io .objectbox .annotation .apihint .Internal ;
89
910/**
@@ -37,7 +38,7 @@ public class SubscriptionBuilder<T> {
3738 private DataTransformer <T , Object > transformer ;
3839 private Scheduler scheduler ;
3940 private ErrorObserver errorObserver ;
40- // private boolean sync;
41+ // private boolean sync;
4142
4243
4344 @ Internal
@@ -47,13 +48,13 @@ public SubscriptionBuilder(DataPublisher<T> publisher, @Nullable Object param, E
4748 this .threadPool = threadPool ;
4849 }
4950
50- // public Observable<T> runFirst(Runnable firstRunnable) {
51- // if (firstRunnable != null) {
52- // throw new IllegalStateException("Only one asyncRunnable allowed");
53- // }
54- // this.firstRunnable = firstRunnable;
55- // return this;
56- // }
51+ // public Observable<T> runFirst(Runnable firstRunnable) {
52+ // if (firstRunnable != null) {
53+ // throw new IllegalStateException("Only one asyncRunnable allowed");
54+ // }
55+ // this.firstRunnable = firstRunnable;
56+ // return this;
57+ // }
5758
5859 /**
5960 * Uses a weak reference for the observer.
@@ -75,10 +76,10 @@ public SubscriptionBuilder<T> onlyChanges() {
7576 return this ;
7677 }
7778
78- // public Observable<T> sync() {
79- // sync = true;
80- // return this;
81- // }
79+ // public Observable<T> sync() {
80+ // sync = true;
81+ // return this;
82+ // }
8283
8384 /**
8485 * Transforms the original data from the publisher to something that is more helpful to your application.
@@ -99,8 +100,9 @@ public <TO> SubscriptionBuilder<TO> transform(final DataTransformer<T, TO> trans
99100 }
100101
101102 /**
102- * The given {@link ErrorObserver} is notified when the {@link DataTransformer} ({@link #transform(DataTransformer)}) or
103- * {@link DataObserver} ({@link #observer(DataObserver)}) threw an exception.
103+ * The given {@link ErrorObserver} is notified when the {@link DataTransformer}
104+ * ({@link #transform(DataTransformer)}) or {@link DataObserver} ({@link #observer(DataObserver)})
105+ * threw an exception.
104106 */
105107 public SubscriptionBuilder <T > onError (ErrorObserver errorObserver ) {
106108 if (this .errorObserver != null ) {
@@ -126,6 +128,8 @@ public SubscriptionBuilder<T> on(Scheduler scheduler) {
126128
127129 /**
128130 * The given observer is subscribed to the publisher. This method MUST be called to complete a subscription.
131+ * <p>
132+ * Note: you must keep the returned {@link DataSubscription} to cancel it.
129133 *
130134 * @return an subscription object used for canceling further notifications to the observer
131135 */
@@ -147,20 +151,31 @@ public DataSubscription observer(DataObserver<T> observer) {
147151 observer = new ActionObserver (subscription );
148152 }
149153
150- if (single ) {
151- if (onlyChanges ) {
154+ if (single ) {
155+ if (onlyChanges ) {
152156 throw new IllegalStateException ("Illegal combination of single() and onlyChanges()" );
153157 }
154158 publisher .publishSingle (observer , publisherParam );
155159 } else {
156160 publisher .subscribe (observer , publisherParam );
157- if (!onlyChanges ) {
161+ if (!onlyChanges ) {
158162 publisher .publishSingle (observer , publisherParam );
159163 }
160164 }
161165 return subscription ;
162166 }
163167
168+ /**
169+ * Convenience for calling {@link #observer(DataObserver)} with adding the resulting {@link DataSubscription} to the
170+ * given {@link DataSubscriptionList}.
171+ */
172+ @ Beta
173+ public DataSubscription observer (DataObserver <T > observer , DataSubscriptionList dataSubscriptionList ) {
174+ DataSubscription dataSubscription = observer (observer );
175+ dataSubscriptionList .add (dataSubscription );
176+ return dataSubscription ;
177+ }
178+
164179 class ActionObserver implements DataObserver <T >, DelegatingObserver <T > {
165180 private final DataSubscriptionImpl subscription ;
166181 private SchedulerRunOnError schedulerRunOnError ;
0 commit comments