16
16
17
17
package io .objectbox ;
18
18
19
+ import io .objectbox .exception .DbException ;
20
+ import io .objectbox .exception .DbExceptionListener ;
21
+ import io .objectbox .exception .DbMaxReadersExceededException ;
19
22
import org .junit .Ignore ;
20
23
import org .junit .Test ;
21
24
25
+ import java .util .ArrayList ;
22
26
import java .util .concurrent .Callable ;
23
27
import java .util .concurrent .CountDownLatch ;
28
+ import java .util .concurrent .ExecutorService ;
29
+ import java .util .concurrent .Executors ;
30
+ import java .util .concurrent .Future ;
24
31
import java .util .concurrent .LinkedBlockingQueue ;
32
+ import java .util .concurrent .ThreadPoolExecutor ;
25
33
import java .util .concurrent .TimeUnit ;
26
34
import java .util .concurrent .atomic .AtomicInteger ;
27
35
28
- import javax .annotation .Nullable ;
29
-
30
- import io .objectbox .exception .DbException ;
31
- import io .objectbox .exception .DbExceptionListener ;
32
- import io .objectbox .exception .DbMaxReadersExceededException ;
33
-
34
- import static org .junit .Assert .*;
36
+ import static org .junit .Assert .assertArrayEquals ;
37
+ import static org .junit .Assert .assertEquals ;
38
+ import static org .junit .Assert .assertFalse ;
39
+ import static org .junit .Assert .assertNotNull ;
40
+ import static org .junit .Assert .assertNotSame ;
41
+ import static org .junit .Assert .assertSame ;
42
+ import static org .junit .Assert .assertTrue ;
43
+ import static org .junit .Assert .fail ;
35
44
36
45
public class TransactionTest extends AbstractObjectBoxTest {
37
46
@@ -41,9 +50,9 @@ private void prepareOneEntryWith1230() {
41
50
KeyValueCursor cursor = transaction .createKeyValueCursor ();
42
51
cursor .put (123 , new byte []{1 , 2 , 3 , 0 });
43
52
cursor .close ();
44
- assertEquals ( true , transaction .isActive ());
53
+ assertTrue ( transaction .isActive ());
45
54
transaction .commit ();
46
- assertEquals ( false , transaction .isActive ());
55
+ assertFalse ( transaction .isActive ());
47
56
}
48
57
49
58
@ Test
@@ -83,17 +92,17 @@ public void testReadTransactionWhileWriting() {
83
92
cursorRead .close ();
84
93
85
94
// commit writing
86
- assertEquals ( true , txRead .isReadOnly ());
87
- assertEquals ( false , txWrite .isReadOnly ());
95
+ assertTrue ( txRead .isReadOnly ());
96
+ assertFalse ( txWrite .isReadOnly ());
88
97
89
- assertEquals ( true , txWrite .isActive ());
98
+ assertTrue ( txWrite .isActive ());
90
99
txWrite .commit ();
91
- assertEquals ( false , txWrite .isActive ());
100
+ assertFalse ( txWrite .isActive ());
92
101
93
102
// commit reading
94
- assertEquals ( true , txRead .isActive ());
103
+ assertTrue ( txRead .isActive ());
95
104
txRead .abort ();
96
- assertEquals ( false , txRead .isActive ());
105
+ assertFalse ( txRead .isActive ());
97
106
98
107
// start reading again and get the new value
99
108
txRead = store .beginReadTx ();
@@ -118,7 +127,7 @@ public void testTransactionReset() {
118
127
assertArrayEquals (new byte []{3 , 2 , 1 , 0 }, cursor .get (123 ));
119
128
cursor .close ();
120
129
transaction .reset ();
121
- assertEquals ( true , transaction .isActive ());
130
+ assertTrue ( transaction .isActive ());
122
131
123
132
cursor = transaction .createKeyValueCursor ();
124
133
assertArrayEquals (new byte []{1 , 2 , 3 , 0 }, cursor .get (123 ));
@@ -145,7 +154,7 @@ public void testTransactionReset() {
145
154
assertArrayEquals (new byte []{3 , 2 , 1 , 0 }, cursor .get (123 ));
146
155
cursor .close ();
147
156
transaction .reset ();
148
- assertEquals ( true , transaction .isActive ());
157
+ assertTrue ( transaction .isActive ());
149
158
150
159
cursor = transaction .createKeyValueCursor ();
151
160
assertArrayEquals (new byte []{3 , 2 , 1 , 0 }, cursor .get (123 ));
@@ -439,5 +448,78 @@ public void testCallInTxAsync_Error() throws InterruptedException {
439
448
assertNotNull (result );
440
449
}
441
450
451
+ @ Test
452
+ public void transaction_unboundedThreadPool () throws Exception {
453
+ runThreadPoolReaderTest (
454
+ () -> {
455
+ Transaction tx = store .beginReadTx ();
456
+ tx .close ();
457
+ }
458
+ );
459
+ }
460
+
461
+ @ Test
462
+ public void runInReadTx_unboundedThreadPool () throws Exception {
463
+ runThreadPoolReaderTest (
464
+ () -> store .runInReadTx (() -> {
465
+ })
466
+ );
467
+ }
468
+
469
+ @ Test
470
+ public void callInReadTx_unboundedThreadPool () throws Exception {
471
+ runThreadPoolReaderTest (
472
+ () -> store .callInReadTx (() -> 1 )
473
+ );
474
+ }
442
475
476
+ @ Test
477
+ public void boxReader_unboundedThreadPool () throws Exception {
478
+ runThreadPoolReaderTest (
479
+ () -> {
480
+ store .boxFor (TestEntity .class ).count ();
481
+ store .closeThreadResources ();
482
+ }
483
+ );
484
+ }
485
+
486
+ /**
487
+ * Tests that a reader is available again after a transaction is closed on a thread.
488
+ * To not exceed max readers this test simply does not allow any two threads
489
+ * to have an active transaction at the same time, e.g. there should always be only one active reader.
490
+ */
491
+ private void runThreadPoolReaderTest (Runnable runnable ) throws Exception {
492
+ // Replace default store: transaction logging disabled and specific max readers.
493
+ tearDown ();
494
+ store = createBoxStoreBuilder (null )
495
+ .maxReaders (100 )
496
+ .debugFlags (0 )
497
+ .noReaderThreadLocals () // This is the essential flag to make this test work
498
+ .build ();
499
+
500
+ // Unbounded (but throttled) thread pool so number of threads run exceeds max readers.
501
+ int numThreads = TestUtils .isWindows () ? 300 : 1000 ; // Less on Windows; had some resource issues on Windows CI
502
+ ExecutorService pool = Executors .newCachedThreadPool ();
503
+
504
+ ArrayList <Future <Integer >> txTasks = new ArrayList <>(10000 );
505
+ final Object lock = new Object ();
506
+ for (int i = 0 ; i < 10000 ; i ++) {
507
+ final int txNumber = i ;
508
+ txTasks .add (pool .submit (() -> {
509
+ // Lock to ensure no two threads have an active transaction at the same time.
510
+ synchronized (lock ) {
511
+ runnable .run ();
512
+ return txNumber ;
513
+ }
514
+ }));
515
+ if (pool instanceof ThreadPoolExecutor && ((ThreadPoolExecutor ) pool ).getActiveCount () > numThreads ) {
516
+ Thread .sleep (1 ); // Throttle processing to limit thread resources
517
+ }
518
+ }
519
+
520
+ //Iterate through all the txTasks and make sure all transactions succeeded.
521
+ for (Future <Integer > txTask : txTasks ) {
522
+ txTask .get (1 , TimeUnit .MINUTES ); // 1s would be enough for normally, but use 1 min to allow debug sessions
523
+ }
524
+ }
443
525
}
0 commit comments