Skip to content

Commit e166a5a

Browse files
Merge branch '181-query-publisher-crash' into 'dev'
QueryPublisher: wait for the query to finish on close #181 See merge request objectbox/objectbox-java!159
2 parents 86b2b5c + d77bd7f commit e166a5a

File tree

6 files changed

+134
-9
lines changed

6 files changed

+134
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ For more insights into what changed in the ObjectBox C++ core, [check the Object
77
## 4.3.1 - in development
88

99
- Requires at least Kotlin compiler and standard library 1.7.
10+
- Data Observers: closing a Query now waits on a running publisher to finish its query, preventing a VM crash. [#1147](https://github.com/objectbox/objectbox-java/issues/1147)
1011

1112
## 4.3.0 - 2025-05-13
1213

objectbox-java/src/main/java/io/objectbox/query/InternalAccess.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,12 @@ public static <T> void nativeFindFirst(Query<T> query, long cursorHandle) {
2929
query.nativeFindFirst(query.handle, cursorHandle);
3030
}
3131

32+
/**
33+
* See {@link QueryPublisher#LOG_STATES}.
34+
*/
35+
@Internal
36+
public static void queryPublisherLogStates() {
37+
QueryPublisher.LOG_STATES = true;
38+
}
39+
3240
}

objectbox-java/src/main/java/io/objectbox/query/Query.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ protected void finalize() throws Throwable {
175175
* Calling any other methods of this afterwards will throw an {@link IllegalStateException}.
176176
*/
177177
public synchronized void close() {
178+
publisher.stopAndAwait(); // Ensure it is done so that the query is not used anymore
178179
if (handle != 0) {
179180
// Closeable recommendation: mark as "closed" before nativeDestroy could throw.
180181
long handleCopy = handle;
@@ -939,6 +940,7 @@ public long remove() {
939940
* See {@link SubscriptionBuilder#observer(DataObserver)} for additional details.
940941
*
941942
* @return A {@link SubscriptionBuilder} to build a subscription.
943+
* @see #publish()
942944
*/
943945
public SubscriptionBuilder<List<T>> subscribe() {
944946
checkOpen();
@@ -958,11 +960,15 @@ public SubscriptionBuilder<List<T>> subscribe(DataSubscriptionList dataSubscript
958960
}
959961

960962
/**
961-
* Publishes the current data to all subscribed @{@link DataObserver}s.
962-
* This is useful triggering observers when new parameters have been set.
963-
* Note, that setParameter methods will NOT be propagated to observers.
963+
* Manually schedules publishing the current results of this query to all {@link #subscribe() subscribed}
964+
* {@link DataObserver observers}, even if the underlying Boxes have not changed.
965+
* <p>
966+
* This is useful to publish new results after changing parameters of this query which would otherwise not trigger
967+
* publishing of new results.
964968
*/
965969
public void publish() {
970+
// Do open check to not silently fail (publisher runnable would just not get scheduled if query is closed)
971+
checkOpen();
966972
publisher.publish();
967973
}
968974

objectbox-java/src/main/java/io/objectbox/query/QueryPublisher.java

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,32 @@
3535
import io.objectbox.reactive.SubscriptionBuilder;
3636

3737
/**
38-
* A {@link DataPublisher} that subscribes to an ObjectClassPublisher if there is at least one observer.
39-
* Publishing is requested if the ObjectClassPublisher reports changes, a subscription is
40-
* {@link SubscriptionBuilder#observer(DataObserver) observed} or {@link Query#publish()} is called.
41-
* For publishing the query is re-run and the result delivered to the current observers.
42-
* Results are published on a single thread, one at a time, in the order publishing was requested.
38+
* A {@link DataPublisher} that {@link BoxStore#subscribe(Class) subscribes to the Box} of its associated {@link Query}
39+
* while there is at least one observer (see {@link #subscribe(DataObserver, Object)} and
40+
* {@link #unsubscribe(DataObserver, Object)}).
41+
* <p>
42+
* Publishing is requested if the Box reports changes, a subscription is
43+
* {@link SubscriptionBuilder#observer(DataObserver) observed} (if {@link #publishSingle(DataObserver, Object)} is
44+
* called) or {@link Query#publish()} (calls {@link #publish()}) is called.
45+
* <p>
46+
* For publishing the query is re-run and the result data is delivered to the current observers.
47+
* <p>
48+
* Data is passed to observers on a single thread ({@link BoxStore#internalScheduleThread(Runnable)}), one at a time, in
49+
* the order observers were added.
4350
*/
4451
@Internal
4552
class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable {
4653

54+
/**
55+
* If enabled, logs states of the publisher runnable. Useful to debug a query subscription.
56+
*/
57+
public static boolean LOG_STATES = false;
4758
private final Query<T> query;
4859
private final Box<T> box;
4960
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet<>();
5061
private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>();
5162
private volatile boolean publisherRunning = false;
63+
private volatile boolean publisherStopped = false;
5264

5365
private static class SubscribedObservers<T> implements DataObserver<List<T>> {
5466
@Override
@@ -106,6 +118,10 @@ void publish() {
106118
*/
107119
private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
108120
synchronized (publishQueue) {
121+
// Check after obtaining the lock as the publisher may have been stopped while waiting on the lock
122+
if (publisherStopped) {
123+
return;
124+
}
109125
publishQueue.add(observer);
110126
if (!publisherRunning) {
111127
publisherRunning = true;
@@ -114,6 +130,31 @@ private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
114130
}
115131
}
116132

133+
/**
134+
* Marks this publisher as stopped and if it is currently running waits on it to complete.
135+
* <p>
136+
* After calling this, this publisher will no longer run, even if observers subscribe or publishing is requested.
137+
*/
138+
void stopAndAwait() {
139+
publisherStopped = true;
140+
// Doing wait/notify waiting here; could also use the Future from BoxStore.internalScheduleThread() instead.
141+
// The latter would require another member though, which seems redundant.
142+
synchronized (this) {
143+
while (publisherRunning) {
144+
try {
145+
this.wait();
146+
} catch (InterruptedException e) {
147+
if (publisherRunning) {
148+
// When called by Query.close() throwing here will leak the query. But not throwing would allow
149+
// close() to proceed in destroying the native query while it may still be active (run() of this
150+
// is at the query.find() call), which would trigger a VM crash.
151+
throw new RuntimeException("Interrupted while waiting for publisher to finish", e);
152+
}
153+
}
154+
}
155+
}
156+
}
157+
117158
/**
118159
* Processes publish requests for this query on a single thread to prevent
119160
* older query results getting delivered after newer query results.
@@ -122,9 +163,11 @@ private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
122163
*/
123164
@Override
124165
public void run() {
166+
log("started");
125167
try {
126-
while (true) {
168+
while (!publisherStopped) {
127169
// Get all queued observer(s), stop processing if none.
170+
log("checking for observers");
128171
List<DataObserver<List<T>>> singlePublishObservers = new ArrayList<>();
129172
boolean notifySubscribedObservers = false;
130173
synchronized (publishQueue) {
@@ -143,9 +186,12 @@ public void run() {
143186
}
144187

145188
// Query.
189+
log("running query");
190+
if (publisherStopped) break; // Check again to avoid running the query if possible
146191
List<T> result = query.find();
147192

148193
// Notify observer(s).
194+
log("notifying observers");
149195
for (DataObserver<List<T>> observer : singlePublishObservers) {
150196
observer.onData(result);
151197
}
@@ -158,8 +204,12 @@ public void run() {
158204
}
159205
}
160206
} finally {
207+
log("stopped");
161208
// Re-set if wrapped code throws, otherwise this publisher can no longer publish.
162209
publisherRunning = false;
210+
synchronized (this) {
211+
this.notifyAll();
212+
}
163213
}
164214
}
165215

@@ -172,4 +222,8 @@ public synchronized void unsubscribe(DataObserver<List<T>> observer, @Nullable O
172222
}
173223
}
174224

225+
private static void log(String message) {
226+
if (LOG_STATES) System.out.println("QueryPublisher: " + message);
227+
}
228+
175229
}

tests/objectbox-java-test/src/test/java/io/objectbox/AbstractObjectBoxTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.objectbox.model.ExternalPropertyType;
4646
import io.objectbox.model.PropertyFlags;
4747
import io.objectbox.model.PropertyType;
48+
import io.objectbox.query.InternalAccess;
4849

4950

5051
import static org.junit.Assert.assertEquals;
@@ -92,6 +93,7 @@ static void printProcessId() {
9293
public void setUp() throws IOException {
9394
Cursor.TRACK_CREATION_STACK = true;
9495
Transaction.TRACK_CREATION_STACK = true;
96+
InternalAccess.queryPublisherLogStates();
9597

9698
// Note: is logged, so create before logging.
9799
boxStoreDir = prepareTempDir("object-store-test");

tests/objectbox-java-test/src/test/java/io/objectbox/query/QueryObserverTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,60 @@ public void transform_inOrderOfPublish() {
235235
assertEquals(2, (int) placing.get(1));
236236
}
237237

238+
@Test
239+
public void queryCloseWaitsOnPublisher() throws InterruptedException {
240+
CountDownLatch beforeBlockPublisher = new CountDownLatch(1);
241+
CountDownLatch blockPublisher = new CountDownLatch(1);
242+
CountDownLatch beforeQueryClose = new CountDownLatch(1);
243+
CountDownLatch afterQueryClose = new CountDownLatch(1);
244+
245+
AtomicBoolean publisherBlocked = new AtomicBoolean(false);
246+
AtomicBoolean waitedBeforeQueryClose = new AtomicBoolean(false);
247+
248+
new Thread(() -> {
249+
Query<TestEntity> query = box.query().build();
250+
query.subscribe()
251+
.onlyChanges() // prevent initial publish call
252+
.observer(data -> {
253+
beforeBlockPublisher.countDown();
254+
try {
255+
publisherBlocked.set(blockPublisher.await(1, TimeUnit.SECONDS));
256+
} catch (InterruptedException e) {
257+
throw new RuntimeException("Observer was interrupted while waiting", e);
258+
}
259+
});
260+
261+
// Trigger the query publisher, prepare so it runs its loop, incl. the query, at least twice
262+
// and block it from completing the first loop using the observer.
263+
query.publish();
264+
query.publish();
265+
266+
try {
267+
waitedBeforeQueryClose.set(beforeQueryClose.await(1, TimeUnit.SECONDS));
268+
} catch (InterruptedException e) {
269+
throw new RuntimeException("Thread was interrupted while waiting before closing query", e);
270+
}
271+
query.close();
272+
afterQueryClose.countDown();
273+
}).start();
274+
275+
// Wait for observer to block the publisher
276+
assertTrue(beforeBlockPublisher.await(1, TimeUnit.SECONDS));
277+
// Start closing the query
278+
beforeQueryClose.countDown();
279+
280+
// While the publisher is blocked, the query close call should block
281+
assertFalse(afterQueryClose.await(100, TimeUnit.MILLISECONDS));
282+
283+
// After the publisher is unblocked and can stop, the query close call should complete
284+
blockPublisher.countDown();
285+
assertTrue(afterQueryClose.await(100, TimeUnit.MILLISECONDS));
286+
287+
// Verify latches were triggered due to reaching 0, not due to timeout
288+
assertTrue(publisherBlocked.get());
289+
assertTrue(waitedBeforeQueryClose.get());
290+
}
291+
238292
private void putTestEntitiesScalars() {
239293
putTestEntities(10, null, 2000);
240294
}

0 commit comments

Comments
 (0)