3535import io .objectbox .reactive .SubscriptionBuilder ;
3636
3737/**
38- * A {@link DataPublisher} that subscribes to an ObjectClassPublisher if there is at least one observer.
39- * Publishing is requested if the ObjectClassPublisher reports changes, a subscription is
40- * {@link SubscriptionBuilder#observer(DataObserver) observed} or {@link Query#publish()} is called.
41- * For publishing the query is re-run and the result delivered to the current observers.
42- * Results are published on a single thread, one at a time, in the order publishing was requested.
38+ * A {@link DataPublisher} that {@link BoxStore#subscribe(Class) subscribes to the Box} of its associated {@link Query}
39+ * while there is at least one observer (see {@link #subscribe(DataObserver, Object)} and
40+ * {@link #unsubscribe(DataObserver, Object)}).
41+ * <p>
42+ * Publishing is requested if the Box reports changes, a subscription is
43+ * {@link SubscriptionBuilder#observer(DataObserver) observed} (if {@link #publishSingle(DataObserver, Object)} is
44+ * called) or {@link Query#publish()} (calls {@link #publish()}) is called.
45+ * <p>
46+ * For publishing the query is re-run and the result data is delivered to the current observers.
47+ * <p>
48+ * Data is passed to observers on a single thread ({@link BoxStore#internalScheduleThread(Runnable)}), one at a time, in
49+ * the order observers were added.
4350 */
4451@ Internal
4552class QueryPublisher <T > implements DataPublisher <List <T >>, Runnable {
4653
54+ /**
55+ * If enabled, logs states of the publisher runnable. Useful to debug a query subscription.
56+ */
57+ public static boolean LOG_STATES = false ;
4758 private final Query <T > query ;
4859 private final Box <T > box ;
4960 private final Set <DataObserver <List <T >>> observers = new CopyOnWriteArraySet <>();
@@ -152,9 +163,11 @@ void stopAndAwait() {
152163 */
153164 @ Override
154165 public void run () {
166+ log ("started" );
155167 try {
156168 while (!publisherStopped ) {
157169 // Get all queued observer(s), stop processing if none.
170+ log ("checking for observers" );
158171 List <DataObserver <List <T >>> singlePublishObservers = new ArrayList <>();
159172 boolean notifySubscribedObservers = false ;
160173 synchronized (publishQueue ) {
@@ -173,10 +186,12 @@ public void run() {
173186 }
174187
175188 // Query.
189+ log ("running query" );
176190 if (publisherStopped ) break ; // Check again to avoid running the query if possible
177191 List <T > result = query .find ();
178192
179193 // Notify observer(s).
194+ log ("notifying observers" );
180195 for (DataObserver <List <T >> observer : singlePublishObservers ) {
181196 observer .onData (result );
182197 }
@@ -189,6 +204,7 @@ public void run() {
189204 }
190205 }
191206 } finally {
207+ log ("stopped" );
192208 // Re-set if wrapped code throws, otherwise this publisher can no longer publish.
193209 publisherRunning = false ;
194210 synchronized (this ) {
@@ -206,4 +222,8 @@ public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable O
206222 }
207223 }
208224
225+ private static void log (String message ) {
226+ if (LOG_STATES ) System .out .println ("QueryPublisher: " + message );
227+ }
228+
209229}
0 commit comments