|
17 | 17 | package io.objectbox.query; |
18 | 18 |
|
19 | 19 | import java.util.ArrayDeque; |
| 20 | +import java.util.ArrayList; |
20 | 21 | import java.util.Deque; |
21 | 22 | import java.util.List; |
22 | 23 | import java.util.Set; |
@@ -49,13 +50,13 @@ class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable { |
49 | 50 | private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>(); |
50 | 51 | private volatile boolean publisherRunning = false; |
51 | 52 |
|
52 | | - private static class AllObservers<T> implements DataObserver<List<T>> { |
| 53 | + private static class SubscribedObservers<T> implements DataObserver<List<T>> { |
53 | 54 | @Override |
54 | 55 | public void onData(List<T> data) { |
55 | 56 | } |
56 | 57 | } |
57 | | - /** Placeholder observer if all observers should be notified. */ |
58 | | - private final AllObservers<T> ALL_OBSERVERS = new AllObservers<>(); |
| 58 | + /** Placeholder observer if subscribed observers should be notified. */ |
| 59 | + private final SubscribedObservers<T> SUBSCRIBED_OBSERVERS = new SubscribedObservers<>(); |
59 | 60 |
|
60 | 61 | private DataObserver<Class<T>> objectClassObserver; |
61 | 62 | private DataSubscription objectClassSubscription; |
@@ -108,42 +109,55 @@ public void publishSingle(DataObserver<List<T>> observer, @Nullable Object param |
108 | 109 |
|
109 | 110 | void publish() { |
110 | 111 | synchronized (publishQueue) { |
111 | | - publishQueue.add(ALL_OBSERVERS); |
| 112 | + publishQueue.add(SUBSCRIBED_OBSERVERS); |
112 | 113 | if (!publisherRunning) { |
113 | 114 | publisherRunning = true; |
114 | 115 | box.getStore().internalScheduleThread(this); |
115 | 116 | } |
116 | 117 | } |
117 | 118 | } |
118 | 119 |
|
| 120 | + /** |
| 121 | + * Processes publish requests for this query on a single thread to prevent |
| 122 | + * older query results getting delivered after newer query results. |
| 123 | + * To speed up processing each loop publishes to all queued observers instead of just the next in line. |
| 124 | + * This reduces time spent querying and waiting for DataObserver.onData() and their potential DataTransformers. |
| 125 | + */ |
119 | 126 | @Override |
120 | 127 | public void run() { |
121 | | - /* |
122 | | - * Process publish requests for this query on a single thread to avoid an older request |
123 | | - * racing a new one (and causing outdated results to be delivered last). |
124 | | - */ |
125 | 128 | try { |
126 | 129 | while (true) { |
127 | | - // Get next observer(s). |
128 | | - DataObserver<List<T>> observer; |
| 130 | + // Get all queued observer(s), stop processing if none. |
| 131 | + List<DataObserver<List<T>>> singlePublishObservers = new ArrayList<>(); |
| 132 | + boolean notifySubscribedObservers = false; |
129 | 133 | synchronized (publishQueue) { |
130 | | - observer = publishQueue.pollFirst(); |
131 | | - if (observer == null) { |
| 134 | + DataObserver<List<T>> nextObserver; |
| 135 | + while ((nextObserver = publishQueue.poll()) != null) { |
| 136 | + if (SUBSCRIBED_OBSERVERS.equals(nextObserver)) { |
| 137 | + notifySubscribedObservers = true; |
| 138 | + } else { |
| 139 | + singlePublishObservers.add(nextObserver); |
| 140 | + } |
| 141 | + } |
| 142 | + if (!notifySubscribedObservers && singlePublishObservers.isEmpty()) { |
132 | 143 | publisherRunning = false; |
133 | | - break; |
| 144 | + break; // Stop. |
134 | 145 | } |
135 | 146 | } |
136 | 147 |
|
137 | | - // Query, then notify observer(s). |
| 148 | + // Query. |
138 | 149 | List<T> result = query.find(); |
139 | | - if (ALL_OBSERVERS.equals(observer)) { |
| 150 | + |
| 151 | + // Notify observer(s). |
| 152 | + for (DataObserver<List<T>> observer : singlePublishObservers) { |
| 153 | + observer.onData(result); |
| 154 | + } |
| 155 | + if (notifySubscribedObservers) { |
140 | 156 | // Use current list of observers to avoid notifying unsubscribed observers. |
141 | 157 | Set<DataObserver<List<T>>> observers = this.observers; |
142 | 158 | for (DataObserver<List<T>> dataObserver : observers) { |
143 | 159 | dataObserver.onData(result); |
144 | 160 | } |
145 | | - } else { |
146 | | - observer.onData(result); |
147 | 161 | } |
148 | 162 | } |
149 | 163 | } finally { |
|
0 commit comments