|
45 | 45 |
|
46 | 46 | import org.junit.Assert;
|
47 | 47 | import org.junit.Test;
|
| 48 | +import org.awaitility.Awaitility; |
48 | 49 | import org.slf4j.Logger;
|
49 | 50 | import org.slf4j.LoggerFactory;
|
50 | 51 |
|
@@ -118,6 +119,18 @@ Future<Void> submitPendingAntiCompaction(PendingAntiCompaction.AcquireResult res
|
118 | 119 | }
|
119 | 120 | }
|
120 | 121 |
|
| 122 | + private void waitForExecutorShutdown(ExecutorService es) |
| 123 | + { |
| 124 | + es.shutdown(); |
| 125 | + try { |
| 126 | + if (!es.awaitTermination(10, TimeUnit.SECONDS)) { |
| 127 | + logger.warn("Executor did not terminate in time, forcing shutdown"); |
| 128 | + es.shutdownNow(); |
| 129 | + } |
| 130 | + } catch (InterruptedException e) { |
| 131 | + Thread.currentThread().interrupt(); |
| 132 | + } |
| 133 | + } |
121 | 134 | /**
|
122 | 135 | * verify the pending anti compaction happy path
|
123 | 136 | */
|
@@ -158,7 +171,7 @@ public void successCase() throws Exception
|
158 | 171 | }
|
159 | 172 | finally
|
160 | 173 | {
|
161 |
| - executor.shutdown(); |
| 174 | + waitForExecutorShutdown(executor); |
162 | 175 | }
|
163 | 176 |
|
164 | 177 | assertEquals(3, cfs.getLiveSSTables().size());
|
@@ -491,7 +504,7 @@ public void testBlockedAcquisition() throws ExecutionException, InterruptedExcep
|
491 | 504 | }
|
492 | 505 | finally
|
493 | 506 | {
|
494 |
| - es.shutdown(); |
| 507 | + waitForExecutorShutdown(es); |
495 | 508 | ISSTableScanner.closeAllAndPropagate(scanners, null);
|
496 | 509 | }
|
497 | 510 | }
|
@@ -568,44 +581,55 @@ public void onFailure(Throwable throwable)
|
568 | 581 | }
|
569 | 582 | finally
|
570 | 583 | {
|
571 |
| - es.shutdown(); |
| 584 | + waitForExecutorShutdown(es); |
572 | 585 | }
|
573 | 586 | }
|
574 | 587 |
|
575 | 588 | @Test
|
576 |
| - public void testSSTablePredicateOngoingAntiCompaction() |
| 589 | + public void testSSTablePredicateOngoingAntiCompaction() throws InterruptedException |
577 | 590 | {
|
578 | 591 | ColumnFamilyStore cfs = MockSchema.newCFS();
|
579 |
| - cfs.disableAutoCompaction(); |
580 |
| - List<SSTableReader> sstables = new ArrayList<>(); |
581 |
| - List<SSTableReader> repairedSSTables = new ArrayList<>(); |
582 |
| - List<SSTableReader> pendingSSTables = new ArrayList<>(); |
583 |
| - for (int i = 1; i <= 10; i++) |
584 |
| - { |
585 |
| - SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs); |
586 |
| - sstables.add(sstable); |
587 |
| - } |
588 |
| - for (int i = 1; i <= 10; i++) |
| 592 | + try |
589 | 593 | {
|
590 |
| - SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs); |
591 |
| - AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis()); |
592 |
| - repairedSSTables.add(sstable); |
| 594 | + cfs.disableAutoCompaction(); |
| 595 | + List<SSTableReader> sstables = new ArrayList<>(); |
| 596 | + List<SSTableReader> repairedSSTables = new ArrayList<>(); |
| 597 | + List<SSTableReader> pendingSSTables = new ArrayList<>(); |
| 598 | + for (int i = 1; i <= 10; i++) |
| 599 | + { |
| 600 | + SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs); |
| 601 | + sstables.add(sstable); |
| 602 | + } |
| 603 | + for (int i = 1; i <= 10; i++) |
| 604 | + { |
| 605 | + SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs); |
| 606 | + AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis()); |
| 607 | + repairedSSTables.add(sstable); |
| 608 | + } |
| 609 | + for (int i = 1; i <= 10; i++) |
| 610 | + { |
| 611 | + SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs); |
| 612 | + AbstractPendingRepairTest.mutateRepaired(sstable, nextTimeUUID(), false); |
| 613 | + pendingSSTables.add(sstable); |
| 614 | + } |
| 615 | + |
| 616 | + cfs.addSSTables(sstables); |
| 617 | + cfs.addSSTables(repairedSSTables); |
| 618 | + |
| 619 | + // if we are compacting the non-repaired non-pending sstables, we should get an error |
| 620 | + tryPredicate(cfs, sstables, null, true); |
| 621 | + // make sure we don't try to grab pending or repaired sstables; |
| 622 | + tryPredicate(cfs, repairedSSTables, sstables, false); |
| 623 | + tryPredicate(cfs, pendingSSTables, sstables, false); |
593 | 624 | }
|
594 |
| - for (int i = 1; i <= 10; i++) |
| 625 | + finally |
595 | 626 | {
|
596 |
| - SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs); |
597 |
| - AbstractPendingRepairTest.mutateRepaired(sstable, nextTimeUUID(), false); |
598 |
| - pendingSSTables.add(sstable); |
| 627 | + // Wait for any ongoing operations to complete |
| 628 | + Awaitility.await() |
| 629 | + .atMost(10, TimeUnit.SECONDS) |
| 630 | + .pollInterval(100, TimeUnit.MILLISECONDS) |
| 631 | + .until(() -> getCompactionsFor(cfs).isEmpty()); |
599 | 632 | }
|
600 |
| - |
601 |
| - cfs.addSSTables(sstables); |
602 |
| - cfs.addSSTables(repairedSSTables); |
603 |
| - |
604 |
| - // if we are compacting the non-repaired non-pending sstables, we should get an error |
605 |
| - tryPredicate(cfs, sstables, null, true); |
606 |
| - // make sure we don't try to grab pending or repaired sstables; |
607 |
| - tryPredicate(cfs, repairedSSTables, sstables, false); |
608 |
| - tryPredicate(cfs, pendingSSTables, sstables, false); |
609 | 633 | }
|
610 | 634 |
|
611 | 635 | private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> compacting, List<SSTableReader> expectedLive, boolean shouldFail)
|
@@ -642,8 +666,8 @@ public boolean isGlobal()
|
642 | 666 | @Test
|
643 | 667 | public void testRetries() throws InterruptedException, ExecutionException, TimeoutException
|
644 | 668 | {
|
645 |
| - ColumnFamilyStore cfs = MockSchema.newCFS(); |
646 |
| - cfs.addSSTable(MockSchema.sstable(1, true, cfs)); |
| 669 | + makeSSTables(1); |
| 670 | + |
647 | 671 | CountDownLatch cdl = new CountDownLatch(5);
|
648 | 672 | ExecutorPlus es = executorFactory().sequential("test");
|
649 | 673 | AbstractTableOperation operation = new AbstractTableOperation()
|
@@ -687,15 +711,15 @@ protected PendingAntiCompaction.AcquireResult acquireSSTables()
|
687 | 711 | }
|
688 | 712 | finally
|
689 | 713 | {
|
690 |
| - es.shutdown(); |
| 714 | + waitForExecutorShutdown(es); |
691 | 715 | }
|
692 | 716 | }
|
693 | 717 |
|
694 | 718 | @Test
|
695 | 719 | public void testRetriesTimeout() throws InterruptedException, ExecutionException, IOException, TimeoutException
|
696 | 720 | {
|
697 |
| - ColumnFamilyStore cfs = MockSchema.newCFS(); |
698 |
| - cfs.addSSTable(MockSchema.sstable(1, true, cfs)); |
| 721 | + makeSSTables(1); |
| 722 | + |
699 | 723 | ExecutorPlus es = executorFactory().sequential("test");
|
700 | 724 | TableOperation operation = new AbstractTableOperation()
|
701 | 725 | {
|
@@ -725,7 +749,7 @@ public boolean apply(SSTableReader sstable)
|
725 | 749 | }
|
726 | 750 | finally
|
727 | 751 | {
|
728 |
| - es.shutdown(); |
| 752 | + waitForExecutorShutdown(es); |
729 | 753 | }
|
730 | 754 | }
|
731 | 755 |
|
@@ -754,7 +778,7 @@ public void testWith2i() throws ExecutionException, InterruptedException, Timeou
|
754 | 778 | }
|
755 | 779 | finally
|
756 | 780 | {
|
757 |
| - es.shutdown(); |
| 781 | + waitForExecutorShutdown(es); |
758 | 782 | }
|
759 | 783 | }
|
760 | 784 |
|
|
0 commit comments