1919import org .junit .Before ;
2020import org .junit .Test ;
2121
22- import java .util .ArrayList ;
22+ import java .util .Arrays ;
2323import java .util .List ;
2424import java .util .concurrent .CopyOnWriteArrayList ;
2525import java .util .concurrent .CountDownLatch ;
26+ import java .util .concurrent .TimeUnit ;
2627
2728import io .objectbox .AbstractObjectBoxTest ;
2829import io .objectbox .Box ;
2930import io .objectbox .TestEntity ;
3031import io .objectbox .reactive .DataObserver ;
31- import io .objectbox .reactive .DataTransformer ;
3232
3333
3434import static io .objectbox .TestEntity_ .simpleInt ;
3535import static org .junit .Assert .assertEquals ;
36+ import static org .junit .Assert .assertTrue ;
3637
37- public class QueryObserverTest extends AbstractObjectBoxTest implements DataObserver < List < TestEntity >> {
38+ public class QueryObserverTest extends AbstractObjectBoxTest {
3839
3940 private Box <TestEntity > box ;
40- private List <List <TestEntity >> receivedChanges = new CopyOnWriteArrayList <>();
41- private CountDownLatch latch = new CountDownLatch (1 );
4241
4342 @ Before
4443 public void setUpBox () {
@@ -52,80 +51,117 @@ public void testObserver() {
5251 assertEquals (0 , query .count ());
5352
5453 // Initial data on subscription.
55- query .subscribe ().observer (this );
56- assertLatchCountedDown (latch , 5 );
57- assertEquals (1 , receivedChanges .size ());
58- assertEquals (0 , receivedChanges .get (0 ).size ());
54+ TestObserver <List <TestEntity >> testObserver = new TestObserver <>();
55+ query .subscribe ().observer (testObserver );
56+ testObserver .assertLatchCountedDown ();
57+ assertEquals (1 , testObserver .receivedChanges .size ());
58+ assertEquals (0 , testObserver .receivedChanges .get (0 ).size ());
5959
6060 // On put.
61- receivedChanges .clear ();
62- latch = new CountDownLatch ( 1 );
61+ testObserver . receivedChanges .clear ();
62+ testObserver . resetLatch ( );
6363 putTestEntitiesScalars ();
64- assertLatchCountedDown (latch , 5 );
65- assertEquals (1 , receivedChanges .size ());
66- assertEquals (3 , receivedChanges .get (0 ).size ());
64+ testObserver . assertLatchCountedDown ();
65+ assertEquals (1 , testObserver . receivedChanges .size ());
66+ assertEquals (3 , testObserver . receivedChanges .get (0 ).size ());
6767
6868 // On remove all.
69- receivedChanges .clear ();
70- latch = new CountDownLatch ( 1 );
69+ testObserver . receivedChanges .clear ();
70+ testObserver . resetLatch ( );
7171 box .removeAll ();
72- assertLatchCountedDown (latch , 5 );
73- assertEquals (1 , receivedChanges .size ());
74- assertEquals (0 , receivedChanges .get (0 ).size ());
72+ testObserver . assertLatchCountedDown ();
73+ assertEquals (1 , testObserver . receivedChanges .size ());
74+ assertEquals (0 , testObserver . receivedChanges .get (0 ).size ());
7575 }
7676
7777 @ Test
7878 public void testSingle () throws InterruptedException {
7979 putTestEntitiesScalars ();
8080 int [] valuesInt = {2003 , 2007 , 2002 };
8181 Query <TestEntity > query = box .query ().in (simpleInt , valuesInt ).build ();
82- query .subscribe ().single ().observer (this );
83- assertLatchCountedDown (latch , 5 );
84- assertEquals (1 , receivedChanges .size ());
85- assertEquals (3 , receivedChanges .get (0 ).size ());
8682
87- receivedChanges .clear ();
83+ TestObserver <List <TestEntity >> testObserver = new TestObserver <>();
84+ query .subscribe ().single ().observer (testObserver );
85+ testObserver .assertLatchCountedDown ();
86+ assertEquals (1 , testObserver .receivedChanges .size ());
87+ assertEquals (3 , testObserver .receivedChanges .get (0 ).size ());
88+
89+ testObserver .receivedChanges .clear ();
8890 putTestEntities (1 );
8991 Thread .sleep (20 );
90- assertEquals (0 , receivedChanges .size ());
92+ assertEquals (0 , testObserver . receivedChanges .size ());
9193 }
9294
9395 @ Test
9496 public void testTransformer () throws InterruptedException {
9597 int [] valuesInt = {2003 , 2007 , 2002 };
9698 Query <TestEntity > query = box .query ().in (simpleInt , valuesInt ).build ();
9799 assertEquals (0 , query .count ());
98- final List <Integer > receivedSums = new ArrayList <>();
100+ TestObserver <Integer > testObserver = new TestObserver <>();
99101
100102 query .subscribe ().transform (source -> {
101103 int sum = 0 ;
102104 for (TestEntity entity : source ) {
103105 sum += entity .getSimpleInt ();
104106 }
105107 return sum ;
106- }).observer (data -> {
107- receivedSums .add (data );
108- latch .countDown ();
109- });
110- assertLatchCountedDown (latch , 5 );
108+ }).observer (testObserver );
109+ testObserver .assertLatchCountedDown ();
111110
112- latch = new CountDownLatch ( 1 );
111+ testObserver . resetLatch ( );
113112 putTestEntitiesScalars ();
114- assertLatchCountedDown (latch , 5 );
113+ testObserver . assertLatchCountedDown ();
115114 Thread .sleep (20 );
116115
117- assertEquals (2 , receivedSums .size ());
118- assertEquals (0 , (int ) receivedSums .get (0 ));
119- assertEquals (2003 + 2007 + 2002 , (int ) receivedSums .get (1 ));
116+ assertEquals (2 , testObserver . receivedChanges .size ());
117+ assertEquals (0 , (int ) testObserver . receivedChanges .get (0 ));
118+ assertEquals (2003 + 2007 + 2002 , (int ) testObserver . receivedChanges .get (1 ));
120119 }
121120
122121 private void putTestEntitiesScalars () {
123122 putTestEntities (10 , null , 2000 );
124123 }
125124
126- @ Override
127- public void onData (List <TestEntity > queryResult ) {
128- receivedChanges .add (queryResult );
129- latch .countDown ();
125+ public static class TestObserver <T > implements DataObserver <T > {
126+
127+ List <T > receivedChanges = new CopyOnWriteArrayList <>();
128+ CountDownLatch latch = new CountDownLatch (1 );
129+
130+ private void log (String message ) {
131+ System .out .println ("TestObserver: " + message );
132+ }
133+
134+ void printEvents () {
135+ int count = receivedChanges .size ();
136+ log ("Received " + count + " event(s):" );
137+ for (int i = 0 ; i < count ; i ++) {
138+ T receivedChange = receivedChanges .get (i );
139+ if (receivedChange instanceof List ) {
140+ List <?> list = (List <?>) receivedChange ;
141+ log ((i + 1 ) + "/" + count + ": size=" + list .size ()
142+ + "; items=" + Arrays .toString (list .toArray ()));
143+ }
144+ }
145+ }
146+
147+ void resetLatch () {
148+ latch = new CountDownLatch (1 );
149+ }
150+
151+ void assertLatchCountedDown () {
152+ try {
153+ assertTrue (latch .await (5 , TimeUnit .SECONDS ));
154+ } catch (InterruptedException e ) {
155+ throw new RuntimeException (e );
156+ }
157+ printEvents ();
158+ }
159+
160+ @ Override
161+ public void onData (T data ) {
162+ receivedChanges .add (data );
163+ latch .countDown ();
164+ }
165+
130166 }
131167}
0 commit comments