Skip to content

Commit 75ac84c

Browse files
committed
IGNITE-26722 after review #1.1
1 parent cfce96c commit 75ac84c

File tree

1 file changed

+29
-23
lines changed
  • modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction

1 file changed

+29
-23
lines changed

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
import java.util.UUID;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentLinkedQueue;
33-
import java.util.concurrent.CountDownLatch;
3433
import java.util.concurrent.LinkedBlockingQueue;
3534
import java.util.concurrent.ThreadPoolExecutor;
3635
import java.util.concurrent.TimeUnit;
37-
import java.util.concurrent.atomic.AtomicReference;
3836
import java.util.concurrent.locks.Lock;
3937
import org.apache.ignite.internal.failure.FailureContext;
4038
import org.apache.ignite.internal.failure.FailureManager;
@@ -94,8 +92,10 @@ public class Compactor extends IgniteWorker {
9492

9593
private final PartitionDestructionLockManager partitionDestructionLockManager;
9694

97-
/** Latch for pausing and resuming the compaction, {@code null} if there was no pause. */
98-
private final AtomicReference<CountDownLatch> pauseLatchRef = new AtomicReference<>();
95+
private final Object pauseMux = new Object();
96+
97+
/** Guarded by {@link #pauseMux}. */
98+
private boolean paused;
9999

100100
/**
101101
* Creates new ignite worker with given parameters.
@@ -506,40 +506,46 @@ private DeltaFileForCompaction(
506506
* in parallel and subsequent calls will strictly be calls after {@link #resume}.
507507
*/
508508
public void pause() {
509-
boolean casResult = pauseLatchRef.compareAndSet(null, new CountDownLatch(1));
509+
synchronized (pauseMux) {
510+
assert !paused : "It is expected that a further pause will only occur after resume";
510511

511-
assert casResult : "It is expected that there will be no parallel pause, resume or previous one has ended: " + pauseLatchRef.get();
512+
paused = true;
513+
}
512514
}
513515

514516
/** Resumes the compactor if it was paused. It is expected that this method will not be called multiple times in parallel. */
515517
public void resume() {
516-
CountDownLatch latch = pauseLatchRef.get();
517-
518-
boolean casResult = pauseLatchRef.compareAndSet(latch, null);
518+
synchronized (pauseMux) {
519+
if (paused) {
520+
paused = false;
519521

520-
assert casResult : "It is expected that there will be no parallel pause or resume";
521-
522-
if (latch != null) {
523-
latch.countDown();
522+
pauseMux.notifyAll();
523+
}
524524
}
525525
}
526526

527527
/** Must be called before each IO operation to provide other IO components with resources. */
528528
private void pauseCompactionIfNeeded() {
529-
CountDownLatch latch = pauseLatchRef.get();
529+
boolean interrupted = false;
530530

531-
if (latch != null) {
532-
blockingSectionBegin();
531+
synchronized (pauseMux) {
532+
while (paused) {
533+
blockingSectionBegin();
533534

534-
try {
535-
latch.await();
536-
} catch (InterruptedException e) {
537-
LOG.debug("Compactor pause was interrupted", e);
535+
try {
536+
pauseMux.wait();
537+
} catch (InterruptedException e) {
538+
LOG.debug("Compactor pause was interrupted", e);
538539

539-
Thread.currentThread().interrupt();
540-
} finally {
541-
blockingSectionEnd();
540+
interrupted = true;
541+
} finally {
542+
blockingSectionEnd();
543+
}
542544
}
543545
}
546+
547+
if (interrupted) {
548+
Thread.currentThread().interrupt();
549+
}
544550
}
545551
}

0 commit comments

Comments
 (0)