2929import io .objectbox .Box ;
3030import io .objectbox .TestEntity ;
3131import io .objectbox .reactive .DataObserver ;
32+ import io .objectbox .reactive .DataSubscription ;
3233
3334
3435import static io .objectbox .TestEntity_ .simpleInt ;
3536import static org .junit .Assert .assertEquals ;
37+ import static org .junit .Assert .assertFalse ;
3638import static org .junit .Assert .assertTrue ;
3739
3840public class QueryObserverTest extends AbstractObjectBoxTest {
@@ -44,6 +46,44 @@ public void setUpBox() {
4446 box = getTestEntityBox ();
4547 }
4648
49+ @ Test
50+ public void observer_removeDuringCallback_works () throws InterruptedException {
51+ SelfRemovingObserver testObserver = new SelfRemovingObserver ();
52+ // Note: use onlyChanges to not trigger observer on subscribing.
53+ testObserver .dataSubscription = box .query ().build ()
54+ .subscribe ()
55+ .onlyChanges ()
56+ .observer (testObserver );
57+
58+ // Trigger event.
59+ putTestEntitiesScalars ();
60+
61+ // Should have gotten past dataSubscription.cancel() without crashing.
62+ assertTrue (testObserver .latch .await (5 , TimeUnit .SECONDS ));
63+
64+ // Just to make sure: trigger another event, should not be received.
65+ testObserver .latch = new CountDownLatch (1 );
66+ putTestEntitiesScalars ();
67+ assertFalse (testObserver .latch .await (5 , TimeUnit .SECONDS ));
68+ }
69+
70+ private static class SelfRemovingObserver implements DataObserver <List <TestEntity >> {
71+
72+ CountDownLatch latch = new CountDownLatch (1 );
73+ DataSubscription dataSubscription ;
74+
75+ @ Override
76+ public void onData (List <TestEntity > data ) {
77+ if (dataSubscription != null ) {
78+ System .out .println ("Cancelling subscription" );
79+ dataSubscription .cancel ();
80+ dataSubscription = null ;
81+ }
82+ // Once here, cancel did not crash.
83+ latch .countDown ();
84+ }
85+ }
86+
4787 @ Test
4888 public void testObserver () {
4989 int [] valuesInt = {2003 , 2007 , 2002 };
0 commit comments