Skip to content

Commit 6135a9a

Browse files
greenrobotgreenrobot-team
authored andcommitted
Query/QueryPublisher: wait for the query to finish on close #181
Otherwise, Query would destroy its native counterpart while it's still in use.
1 parent 86b2b5c commit 6135a9a

File tree

3 files changed

+45
-4
lines changed

3 files changed

+45
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ For more insights into what changed in the ObjectBox C++ core, [check the Object
77
## 4.3.1 - in development
88

99
- Requires at least Kotlin compiler and standard library 1.7.
10+
- Data Observers: closing a Query now waits on a running publisher to finish its query, preventing a VM crash. [#1147](https://github.com/objectbox/objectbox-java/issues/1147)
1011

1112
## 4.3.0 - 2025-05-13
1213

objectbox-java/src/main/java/io/objectbox/query/Query.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ protected void finalize() throws Throwable {
175175
* Calling any other methods of this afterwards will throw an {@link IllegalStateException}.
176176
*/
177177
public synchronized void close() {
178+
publisher.stopAndAwait(); // Ensure it is done so that the query is not used anymore
178179
if (handle != 0) {
179180
// Closeable recommendation: mark as "closed" before nativeDestroy could throw.
180181
long handleCopy = handle;
@@ -939,6 +940,7 @@ public long remove() {
939940
* See {@link SubscriptionBuilder#observer(DataObserver)} for additional details.
940941
*
941942
* @return A {@link SubscriptionBuilder} to build a subscription.
943+
* @see #publish()
942944
*/
943945
public SubscriptionBuilder<List<T>> subscribe() {
944946
checkOpen();
@@ -958,11 +960,15 @@ public SubscriptionBuilder<List<T>> subscribe(DataSubscriptionList dataSubscript
958960
}
959961

960962
/**
961-
* Publishes the current data to all subscribed @{@link DataObserver}s.
962-
* This is useful triggering observers when new parameters have been set.
963-
* Note, that setParameter methods will NOT be propagated to observers.
963+
* Manually schedules publishing the current results of this query to all {@link #subscribe() subscribed}
964+
* {@link DataObserver observers}, even if the underlying Boxes have not changed.
965+
* <p>
966+
* This is useful to publish new results after changing parameters of this query which would otherwise not trigger
967+
* publishing of new results.
964968
*/
965969
public void publish() {
970+
// Do open check to not silently fail (publisher runnable would just not get scheduled if query is closed)
971+
checkOpen();
966972
publisher.publish();
967973
}
968974

objectbox-java/src/main/java/io/objectbox/query/QueryPublisher.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class QueryPublisher<T> implements DataPublisher<List<T>>, Runnable {
4949
private final Set<DataObserver<List<T>>> observers = new CopyOnWriteArraySet<>();
5050
private final Deque<DataObserver<List<T>>> publishQueue = new ArrayDeque<>();
5151
private volatile boolean publisherRunning = false;
52+
private volatile boolean publisherStopped = false;
5253

5354
private static class SubscribedObservers<T> implements DataObserver<List<T>> {
5455
@Override
@@ -106,6 +107,10 @@ void publish() {
106107
*/
107108
private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
108109
synchronized (publishQueue) {
110+
// Check after obtaining the lock as the publisher may have been stopped while waiting on the lock
111+
if (publisherStopped) {
112+
return;
113+
}
109114
publishQueue.add(observer);
110115
if (!publisherRunning) {
111116
publisherRunning = true;
@@ -114,6 +119,31 @@ private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
114119
}
115120
}
116121

122+
/**
123+
* Marks this publisher as stopped and if it is currently running waits on it to complete.
124+
* <p>
125+
* After calling this, this publisher will no longer run, even if observers subscribe or publishing is requested.
126+
*/
127+
void stopAndAwait() {
128+
publisherStopped = true;
129+
// Doing wait/notify waiting here; could also use the Future from BoxStore.internalScheduleThread() instead.
130+
// The latter would require another member though, which seems redundant.
131+
synchronized (this) {
132+
while (publisherRunning) {
133+
try {
134+
this.wait();
135+
} catch (InterruptedException e) {
136+
if (publisherRunning) {
137+
// When called by Query.close() throwing here will leak the query. But not throwing would allow
138+
// close() to proceed in destroying the native query while it may still be active (run() of this
139+
// is at the query.find() call), which would trigger a VM crash.
140+
throw new RuntimeException("Interrupted while waiting for publisher to finish", e);
141+
}
142+
}
143+
}
144+
}
145+
}
146+
117147
/**
118148
* Processes publish requests for this query on a single thread to prevent
119149
* older query results getting delivered after newer query results.
@@ -123,7 +153,7 @@ private void queueObserverAndScheduleRun(DataObserver<List<T>> observer) {
123153
@Override
124154
public void run() {
125155
try {
126-
while (true) {
156+
while (!publisherStopped) {
127157
// Get all queued observer(s), stop processing if none.
128158
List<DataObserver<List<T>>> singlePublishObservers = new ArrayList<>();
129159
boolean notifySubscribedObservers = false;
@@ -143,6 +173,7 @@ public void run() {
143173
}
144174

145175
// Query.
176+
if (publisherStopped) break; // Check again to avoid running the query if possible
146177
List<T> result = query.find();
147178

148179
// Notify observer(s).
@@ -160,6 +191,9 @@ public void run() {
160191
} finally {
161192
// Re-set if wrapped code throws, otherwise this publisher can no longer publish.
162193
publisherRunning = false;
194+
synchronized (this) {
195+
this.notifyAll();
196+
}
163197
}
164198
}
165199

0 commit comments

Comments
 (0)