|
16 | 16 |
|
17 | 17 | package io.objectbox;
|
18 | 18 |
|
| 19 | +import io.objectbox.exception.DbException; |
| 20 | +import io.objectbox.exception.DbExceptionListener; |
| 21 | +import io.objectbox.exception.DbMaxReadersExceededException; |
| 22 | +import io.objectbox.internal.ObjectBoxThreadPool; |
19 | 23 | import org.junit.Ignore;
|
20 | 24 | import org.junit.Test;
|
21 | 25 |
|
22 | 26 | import java.util.ArrayList;
|
23 | 27 | import java.util.concurrent.Callable;
|
24 | 28 | import java.util.concurrent.CountDownLatch;
|
25 | 29 | import java.util.concurrent.ExecutorService;
|
26 |
| -import java.util.concurrent.Executors; |
27 | 30 | import java.util.concurrent.Future;
|
28 | 31 | import java.util.concurrent.LinkedBlockingQueue;
|
29 | 32 | import java.util.concurrent.TimeUnit;
|
30 | 33 | import java.util.concurrent.atomic.AtomicInteger;
|
31 | 34 |
|
32 |
| -import io.objectbox.exception.DbException; |
33 |
| -import io.objectbox.exception.DbExceptionListener; |
34 |
| -import io.objectbox.exception.DbMaxReadersExceededException; |
35 |
| -import io.objectbox.internal.ObjectBoxThreadPool; |
36 |
| - |
37 |
| -import static org.junit.Assert.*; |
| 35 | +import static org.junit.Assert.assertArrayEquals; |
| 36 | +import static org.junit.Assert.assertEquals; |
| 37 | +import static org.junit.Assert.assertFalse; |
| 38 | +import static org.junit.Assert.assertNotNull; |
| 39 | +import static org.junit.Assert.assertNotSame; |
| 40 | +import static org.junit.Assert.assertSame; |
| 41 | +import static org.junit.Assert.assertTrue; |
| 42 | +import static org.junit.Assert.fail; |
38 | 43 |
|
39 | 44 | public class TransactionTest extends AbstractObjectBoxTest {
|
40 | 45 |
|
@@ -443,40 +448,55 @@ public void testCallInTxAsync_Error() throws InterruptedException {
|
443 | 448 | }
|
444 | 449 |
|
445 | 450 | @Test
|
446 |
| - public void transactionsOnUnboundedThreadPool() throws Exception { |
447 |
| - //Silence the unnecessary debug output and set the max readers |
448 |
| - resetBoxStoreWithoutDebugFlags(100); |
449 |
| - |
450 |
| - runThreadPoolTransactionTest(new ObjectBoxThreadPool(store)); |
| 451 | + public void runInReadTx_unboundedThreadPool() throws Exception { |
| 452 | + runThreadPoolReaderTest( |
| 453 | + () -> store.runInReadTx(() -> { |
| 454 | + }) |
| 455 | + ); |
451 | 456 | }
|
452 | 457 |
|
453 | 458 | @Test
|
454 |
| - public void transactionsOnBoundedThreadPool() throws Exception { |
455 |
| - //Silence the unnecessary debug output and set the max readers |
456 |
| - int maxReaders = 100; |
457 |
| - resetBoxStoreWithoutDebugFlags(maxReaders); |
| 459 | + public void callInReadTx_unboundedThreadPool() throws Exception { |
| 460 | + runThreadPoolReaderTest( |
| 461 | + () -> store.callInReadTx(() -> 1) |
| 462 | + ); |
| 463 | + } |
458 | 464 |
|
459 |
| - runThreadPoolTransactionTest(Executors.newFixedThreadPool(maxReaders)); |
| 465 | + @Test |
| 466 | + public void boxReader_unboundedThreadPool() throws Exception { |
| 467 | + runThreadPoolReaderTest( |
| 468 | + () -> { |
| 469 | + store.boxFor(TestEntity.class).count(); |
| 470 | + store.closeThreadResources(); |
| 471 | + } |
| 472 | + ); |
460 | 473 | }
|
461 | 474 |
|
462 |
| - private void resetBoxStoreWithoutDebugFlags(int maxReaders) { |
463 |
| - // Remove existing store |
| 475 | + /** |
| 476 | + * Tests that a reader is available again after a transaction is closed on a thread. |
| 477 | + * To not exceed max readers this test simply does not allow any two threads |
| 478 | + * to have an active transaction at the same time, e.g. there should always be only one active reader. |
| 479 | + */ |
| 480 | + private void runThreadPoolReaderTest(Runnable runnable) throws Exception { |
| 481 | + // Replace default store: transaction logging disabled and specific max readers. |
464 | 482 | tearDown();
|
| 483 | + store = createBoxStoreBuilder(null) |
| 484 | + .maxReaders(100) |
| 485 | + .debugFlags(0) |
| 486 | + .build(); |
465 | 487 |
|
466 |
| - BoxStoreBuilder builder = createBoxStoreBuilder(false); |
467 |
| - builder.maxReaders = maxReaders; |
468 |
| - builder.debugFlags = 0; |
469 |
| - store = builder.build(); |
470 |
| - } |
| 488 | + // Unbounded thread pool so number of threads run exceeds max readers. |
| 489 | + ExecutorService pool = new ObjectBoxThreadPool(store); |
471 | 490 |
|
472 |
| - private void runThreadPoolTransactionTest(ExecutorService pool) throws Exception { |
473 |
| - //Create a bunch of transactions on a thread pool. We can even run them synchronously. |
474 | 491 | ArrayList<Future<Integer>> txTasks = new ArrayList<>(10000);
|
| 492 | + final Object lock = new Object(); |
475 | 493 | for (int i = 0; i < 10000; i++) {
|
476 | 494 | final int txNumber = i;
|
477 | 495 | txTasks.add(pool.submit(() -> {
|
478 |
| - synchronized (store) { |
479 |
| - return store.callInReadTx(() -> txNumber); |
| 496 | + // Lock to ensure no two threads have an active transaction at the same time. |
| 497 | + synchronized (lock) { |
| 498 | + runnable.run(); |
| 499 | + return txNumber; |
480 | 500 | }
|
481 | 501 | }));
|
482 | 502 | }
|
|
0 commit comments