@@ -235,6 +235,60 @@ public void transform_inOrderOfPublish() {
235235 assertEquals (2 , (int ) placing .get (1 ));
236236 }
237237
238+ @ Test
239+ public void queryCloseWaitsOnPublisher () throws InterruptedException {
240+ CountDownLatch beforeBlockPublisher = new CountDownLatch (1 );
241+ CountDownLatch blockPublisher = new CountDownLatch (1 );
242+ CountDownLatch beforeQueryClose = new CountDownLatch (1 );
243+ CountDownLatch afterQueryClose = new CountDownLatch (1 );
244+
245+ AtomicBoolean publisherBlocked = new AtomicBoolean (false );
246+ AtomicBoolean waitedBeforeQueryClose = new AtomicBoolean (false );
247+
248+ new Thread (() -> {
249+ Query <TestEntity > query = box .query ().build ();
250+ query .subscribe ()
251+ .onlyChanges () // prevent initial publish call
252+ .observer (data -> {
253+ beforeBlockPublisher .countDown ();
254+ try {
255+ publisherBlocked .set (blockPublisher .await (1 , TimeUnit .SECONDS ));
256+ } catch (InterruptedException e ) {
257+ throw new RuntimeException ("Observer was interrupted while waiting" , e );
258+ }
259+ });
260+
261+ // Trigger the query publisher, prepare so it runs its loop, incl. the query, at least twice
262+ // and block it from completing the first loop using the observer.
263+ query .publish ();
264+ query .publish ();
265+
266+ try {
267+ waitedBeforeQueryClose .set (beforeQueryClose .await (1 , TimeUnit .SECONDS ));
268+ } catch (InterruptedException e ) {
269+ throw new RuntimeException ("Thread was interrupted while waiting before closing query" , e );
270+ }
271+ query .close ();
272+ afterQueryClose .countDown ();
273+ }).start ();
274+
275+ // Wait for observer to block the publisher
276+ assertTrue (beforeBlockPublisher .await (1 , TimeUnit .SECONDS ));
277+ // Start closing the query
278+ beforeQueryClose .countDown ();
279+
280+ // While the publisher is blocked, the query close call should block
281+ assertFalse (afterQueryClose .await (100 , TimeUnit .MILLISECONDS ));
282+
283+ // After the publisher is unblocked and can stop, the query close call should complete
284+ blockPublisher .countDown ();
285+ assertTrue (afterQueryClose .await (100 , TimeUnit .MILLISECONDS ));
286+
287+ // Verify latches were triggered due to reaching 0, not due to timeout
288+ assertTrue (publisherBlocked .get ());
289+ assertTrue (waitedBeforeQueryClose .get ());
290+ }
291+
238292 private void putTestEntitiesScalars () {
239293 putTestEntities (10 , null , 2000 );
240294 }
0 commit comments