Skip to content

Commit 9c1ea01

Browse files
committed
1 parent 0dd4831 commit 9c1ea01

File tree

4 files changed

+120
-15
lines changed

4 files changed

+120
-15
lines changed

modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.concurrent.CompletableFuture;
5151
import java.util.concurrent.ExecutionException;
5252
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.atomic.AtomicReference;
5354
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
5455
import org.apache.ignite.internal.util.worker.IgniteWorker;
5556
import org.junit.jupiter.api.Test;
@@ -217,4 +218,11 @@ void testByteBufferToByteArray() {
217218
ByteBuffer smallDirectBuffer = bigDirectBuffer.position(1).limit(4).slice();
218219
assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(smallDirectBuffer));
219220
}
221+
222+
@Test
223+
void test() {
224+
AtomicReference<Integer> reference = new AtomicReference<>();
225+
226+
assertTrue(reference.compareAndSet(null, null));
227+
}
220228
}

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,8 @@ void doCheckpoint() throws IgniteInternalCheckedException {
346346
Checkpoint chp = null;
347347

348348
try {
349+
compactor.pause();
350+
349351
var tracker = new CheckpointMetricsTracker();
350352

351353
tracker.onCheckpointStart();
@@ -410,6 +412,8 @@ void doCheckpoint() throws IgniteInternalCheckedException {
410412
chp.progress.reason()
411413
);
412414
}
415+
416+
compactor.resume();
413417
}
414418

415419
currentCheckpointProgress.setPagesWriteTimeMillis(
@@ -546,6 +550,8 @@ private boolean writePages(
546550

547551
tracker.onFsyncEnd();
548552

553+
compactor.resume();
554+
549555
compactor.triggerCompaction();
550556

551557
if (shutdownNow.getAsBoolean()) {

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import java.util.UUID;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentLinkedQueue;
33+
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.LinkedBlockingQueue;
3435
import java.util.concurrent.ThreadPoolExecutor;
3536
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicReference;
3638
import java.util.concurrent.locks.Lock;
3739
import org.apache.ignite.internal.failure.FailureContext;
3840
import org.apache.ignite.internal.failure.FailureManager;
@@ -92,6 +94,9 @@ public class Compactor extends IgniteWorker {
9294

9395
private final PartitionDestructionLockManager partitionDestructionLockManager;
9496

97+
/** Latch for pausing and resuming the compaction, {@code null} if there was no pause. */
98+
private final AtomicReference<CountDownLatch> pauseLatchRef = new AtomicReference<>();
99+
95100
/**
96101
* Creates new ignite worker with given parameters.
97102
*
@@ -355,6 +360,8 @@ public void cancel() {
355360
// Do not interrupt runner thread.
356361
isCancelled.set(true);
357362

363+
resume();
364+
358365
synchronized (mux) {
359366
mux.notifyAll();
360367
}
@@ -396,6 +403,8 @@ void mergeDeltaFileToMainFile(
396403

397404
long pageOffset = deltaFilePageStore.pageOffset(pageIndex);
398405

406+
pauseCompactionIfNeeded();
407+
399408
// pageIndex instead of pageId, only for debugging in case of errors
400409
// since we do not know the pageId until we read it from the pageOffset.
401410
boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false);
@@ -417,6 +426,8 @@ void mergeDeltaFileToMainFile(
417426
return;
418427
}
419428

429+
pauseCompactionIfNeeded();
430+
420431
filePageStore.write(pageId, buffer.rewind());
421432

422433
tracker.onDataPageWritten();
@@ -433,6 +444,8 @@ void mergeDeltaFileToMainFile(
433444
return;
434445
}
435446

447+
pauseCompactionIfNeeded();
448+
436449
filePageStore.sync();
437450

438451
// Removing the delta file page store from a file page store.
@@ -448,6 +461,8 @@ void mergeDeltaFileToMainFile(
448461

449462
deltaFilePageStore.markMergedToFilePageStore();
450463

464+
pauseCompactionIfNeeded();
465+
451466
deltaFilePageStore.stop(true);
452467

453468
boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
@@ -485,4 +500,45 @@ private DeltaFileForCompaction(
485500
this.deltaFilePageStoreIo = deltaFilePageStoreIo;
486501
}
487502
}
503+
504+
/**
505+
* Pauses the compactor until it is resumed or compactor or stopped. It is expected that this method will not be called multiple times
506+
* in parallel and subsequent calls will strictly be calls after {@link #resume}.
507+
*/
508+
public void pause() {
509+
boolean casResult = pauseLatchRef.compareAndSet(null, new CountDownLatch(1));
510+
511+
assert casResult : "It is expected that there will be no parallel pause, resume or previous one has ended: " + pauseLatchRef.get();
512+
}
513+
514+
/** Resumes the compactor if it was paused. It is expected that this method will not be called multiple times in parallel. */
515+
public void resume() {
516+
CountDownLatch latch = pauseLatchRef.get();
517+
518+
boolean casResult = pauseLatchRef.compareAndSet(latch, null);
519+
520+
assert casResult : "It is expected that there will be no parallel pause or resume";
521+
522+
if (latch != null) {
523+
latch.countDown();
524+
}
525+
}
526+
527+
private void pauseCompactionIfNeeded() {
528+
CountDownLatch latch = pauseLatchRef.get();
529+
530+
if (latch != null) {
531+
blockingSectionBegin();
532+
533+
try {
534+
latch.await();
535+
} catch (InterruptedException e) {
536+
LOG.debug("Compactor pause was interrupted", e);
537+
538+
Thread.currentThread().interrupt();
539+
} finally {
540+
blockingSectionEnd();
541+
}
542+
}
543+
}
488544
}

modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2121
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
2222
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
23+
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
24+
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
2325
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
26+
import static org.hamcrest.MatcherAssert.assertThat;
2427
import static org.junit.jupiter.api.Assertions.assertFalse;
2528
import static org.junit.jupiter.api.Assertions.assertNull;
2629
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -91,21 +94,8 @@ void testStartAndStop() throws Exception {
9194
void testMergeDeltaFileToMainFile() throws Throwable {
9295
Compactor compactor = newCompactor();
9396

94-
FilePageStore filePageStore = mock(FilePageStore.class);
95-
DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
96-
97-
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
98-
99-
when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
100-
101-
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
102-
.then(answer -> {
103-
ByteBuffer buffer = answer.getArgument(2);
104-
105-
PageIo.setPageId(bufferAddress(buffer), 1);
106-
107-
return true;
108-
});
97+
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo();
98+
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
10999

110100
compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker());
111101

@@ -209,6 +199,26 @@ void testCancel() throws Exception {
209199
waitDeltaFilesFuture.get(100, MILLISECONDS);
210200
}
211201

202+
@Test
203+
void testPauseResume() throws Exception {
204+
Compactor compactor = spy(newCompactor());
205+
206+
compactor.pause();
207+
208+
DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo();
209+
FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo);
210+
211+
CompletableFuture<?> mergeDeltaFileToMainFileFuture = runAsync(
212+
() -> compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker())
213+
);
214+
215+
assertThat(mergeDeltaFileToMainFileFuture, willTimeoutFast());
216+
217+
compactor.resume();
218+
219+
assertThat(mergeDeltaFileToMainFileFuture, willCompleteSuccessfully());
220+
}
221+
212222
private Compactor newCompactor() {
213223
return newCompactor(mock(FilePageStoreManager.class));
214224
}
@@ -224,4 +234,29 @@ private Compactor newCompactor(FilePageStoreManager filePageStoreManager) {
224234
new PartitionDestructionLockManager()
225235
);
226236
}
237+
238+
private static DeltaFilePageStoreIo createDeltaFilePageStoreIo() throws Exception {
239+
DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
240+
241+
when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
242+
243+
when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
244+
.then(answer -> {
245+
ByteBuffer buffer = answer.getArgument(2);
246+
247+
PageIo.setPageId(bufferAddress(buffer), 1);
248+
249+
return true;
250+
});
251+
252+
return deltaFilePageStoreIo;
253+
}
254+
255+
private static FilePageStore createFilePageStore(DeltaFilePageStoreIo deltaFilePageStoreIo) {
256+
FilePageStore filePageStore = mock(FilePageStore.class);
257+
258+
when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
259+
260+
return filePageStore;
261+
}
227262
}

0 commit comments

Comments
 (0)