Skip to content

Commit 234946a

Browse files
committed
Merge branch 'main' into ignite-26722
2 parents 7734893 + 6601cd0 commit 234946a

File tree

10 files changed

+81
-28
lines changed

10 files changed

+81
-28
lines changed

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ class Checkpoint {
3333
/** Number of written dirty pages. */
3434
int writtenPages;
3535

36+
/** Number of fsync-ed files. */
37+
int syncedFiles;
38+
3639
/**
3740
* Constructor.
3841
*
@@ -59,6 +62,7 @@ boolean hasDelta() {
5962
void finishCheckpoint() {
6063
if (hasDelta()) {
6164
writtenPages = progress.writtenPages();
65+
syncedFiles = progress.syncedFiles();
6266

6367
progress.pagesToWrite(null);
6468

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,7 @@ public interface CheckpointProgress {
8181
* Returns a number of written checkpoint pages.
8282
*/
8383
int writtenPages();
84+
85+
/** Returns a number of fsync-ed checkpoint files. */
86+
int syncedFiles();
8487
}

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public class CheckpointProgressImpl implements CheckpointProgress {
6464
/** Counter for fsynced checkpoint pages. */
6565
private final AtomicInteger syncedPagesCntr = new AtomicInteger();
6666

67+
/** Counter for fsynced checkpoint files. */
68+
private final AtomicInteger syncedFilesCntr = new AtomicInteger();
69+
6770
/** Counter for evicted checkpoint pages. */
6871
private final AtomicInteger evictedPagesCntr = new AtomicInteger();
6972

@@ -154,6 +157,16 @@ public AtomicInteger syncedPagesCounter() {
154157
return syncedPagesCntr;
155158
}
156159

160+
/** Returns counter of fsync-ed checkpoint files. */
161+
public AtomicInteger syncedFilesCounter() {
162+
return syncedFilesCntr;
163+
}
164+
165+
@Override
166+
public int syncedFiles() {
167+
return syncedFilesCntr.get();
168+
}
169+
157170
@Override
158171
public AtomicInteger evictedPagesCounter() {
159172
return evictedPagesCntr;
@@ -214,6 +227,7 @@ public void initCounters(int checkpointPages) {
214227
writtenPagesCntr.set(0);
215228
syncedPagesCntr.set(0);
216229
evictedPagesCntr.set(0);
230+
syncedFilesCntr.set(0);
217231
}
218232

219233
/**

modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public class Checkpointer extends IgniteWorker {
121121
private static final String CHECKPOINT_FINISHED_LOG_TEMPLATE = "Checkpoint finished ["
122122
+ "checkpointId={}, "
123123
+ "writtenPages={}, "
124+
+ "fsyncFiles={}, "
124125
+ "pagesWriteTime={}ms, "
125126
+ "fsyncTime={}ms, "
126127
+ "replicatorLogSyncTime={}ms, "
@@ -434,6 +435,7 @@ void doCheckpoint() throws IgniteInternalCheckedException {
434435
CHECKPOINT_FINISHED_LOG_TEMPLATE,
435436
chp.progress.id(),
436437
totalWrittenPages,
438+
chp.syncedFiles,
437439
tracker.pagesWriteDuration(MILLISECONDS),
438440
tracker.fsyncDuration(MILLISECONDS),
439441
tracker.replicatorLogSyncDuration(MILLISECONDS),
@@ -643,9 +645,9 @@ private void fsyncPartitionFiles(
643645
return;
644646
}
645647

646-
fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore);
648+
fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore, currentCheckpointProgress);
647649

648-
fsyncFilePageStoreOnCheckpointThread(filePageStore);
650+
fsyncFilePageStoreOnCheckpointThread(filePageStore, currentCheckpointProgress);
649651

650652
renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
651653

@@ -657,11 +659,16 @@ private void fsyncPartitionFiles(
657659
}
658660
}
659661

660-
private void fsyncFilePageStoreOnCheckpointThread(FilePageStore filePageStore) throws IgniteInternalCheckedException {
662+
private void fsyncFilePageStoreOnCheckpointThread(
663+
FilePageStore filePageStore,
664+
CheckpointProgressImpl currentCheckpointProgress
665+
) throws IgniteInternalCheckedException {
661666
blockingSectionBegin();
662667

663668
try {
664669
filePageStore.sync();
670+
671+
currentCheckpointProgress.syncedFilesCounter().incrementAndGet();
665672
} finally {
666673
blockingSectionEnd();
667674
}
@@ -894,7 +901,10 @@ long nextCheckpointInterval() {
894901
return safeAbs(interval + startDelay);
895902
}
896903

897-
private void fsyncDeltaFilePageStoreOnCheckpointThread(FilePageStore filePageStore) throws IgniteInternalCheckedException {
904+
private void fsyncDeltaFilePageStoreOnCheckpointThread(
905+
FilePageStore filePageStore,
906+
CheckpointProgressImpl currentCheckpointProgress
907+
) throws IgniteInternalCheckedException {
898908
blockingSectionBegin();
899909

900910
try {
@@ -905,6 +915,8 @@ private void fsyncDeltaFilePageStoreOnCheckpointThread(FilePageStore filePageSto
905915
}
906916

907917
deltaFilePageStoreFuture.join().sync();
918+
919+
currentCheckpointProgress.syncedFilesCounter().incrementAndGet();
908920
} finally {
909921
blockingSectionEnd();
910922
}

modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,12 @@ void testCounters() {
9090
assertEquals(0, progressImpl.writtenPagesCounter().get());
9191
assertEquals(0, progressImpl.syncedPagesCounter().get());
9292
assertEquals(0, progressImpl.evictedPagesCounter().get());
93+
assertEquals(0, progressImpl.syncedFilesCounter().get());
9394

9495
progressImpl.writtenPagesCounter().incrementAndGet();
9596
progressImpl.syncedPagesCounter().incrementAndGet();
9697
progressImpl.evictedPagesCounter().incrementAndGet();
98+
progressImpl.syncedFilesCounter().incrementAndGet();
9799

98100
progressImpl.initCounters(100500);
99101

@@ -102,10 +104,12 @@ void testCounters() {
102104
assertEquals(0, progressImpl.writtenPagesCounter().get());
103105
assertEquals(0, progressImpl.syncedPagesCounter().get());
104106
assertEquals(0, progressImpl.evictedPagesCounter().get());
107+
assertEquals(0, progressImpl.syncedFilesCounter().get());
105108

106109
progressImpl.writtenPagesCounter().incrementAndGet();
107110
progressImpl.syncedPagesCounter().incrementAndGet();
108111
progressImpl.evictedPagesCounter().incrementAndGet();
112+
progressImpl.syncedFilesCounter().incrementAndGet();
109113

110114
progressImpl.clearCounters();
111115

@@ -114,6 +118,7 @@ void testCounters() {
114118
assertEquals(0, progressImpl.writtenPagesCounter().get());
115119
assertEquals(0, progressImpl.syncedPagesCounter().get());
116120
assertEquals(0, progressImpl.evictedPagesCounter().get());
121+
assertEquals(0, progressImpl.syncedFilesCounter().get());
117122

118123
progressImpl.currentCheckpointPagesCount(42);
119124

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class IndexFileManager {
100100

101101
static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
102102

103-
private final Path baseDir;
103+
private final Path indexFilesDir;
104104

105105
/**
106106
* Current index file ordinal (used to generate index file names).
@@ -114,8 +114,14 @@ class IndexFileManager {
114114
*/
115115
private final Map<Long, GroupIndexMeta> groupIndexMetas = new ConcurrentHashMap<>();
116116

117-
IndexFileManager(Path baseDir) {
118-
this.baseDir = baseDir;
117+
IndexFileManager(Path baseDir) throws IOException {
118+
indexFilesDir = baseDir.resolve("index");
119+
120+
Files.createDirectories(indexFilesDir);
121+
}
122+
123+
Path indexFilesDir() {
124+
return indexFilesDir;
119125
}
120126

121127
/**
@@ -124,7 +130,7 @@ class IndexFileManager {
124130
Path saveIndexMemtable(ReadModeIndexMemTable indexMemTable) throws IOException {
125131
String fileName = indexFileName(curFileOrdinal, 0);
126132

127-
Path tmpFilePath = baseDir.resolve(fileName + ".tmp");
133+
Path tmpFilePath = indexFilesDir.resolve(fileName + ".tmp");
128134

129135
try (var os = new BufferedOutputStream(Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE))) {
130136
byte[] headerBytes = serializeHeaderAndFillMetadata(indexMemTable);
@@ -161,7 +167,7 @@ SegmentFilePointer getSegmentFilePointer(long groupId, long logIndex) throws IOE
161167
return null;
162168
}
163169

164-
Path indexFile = baseDir.resolve(indexFileName(indexFileMeta.indexFileOrdinal(), 0));
170+
Path indexFile = indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileOrdinal(), 0));
165171

166172
// Index file payload is a 0-based array, which indices correspond to the [fileMeta.firstLogIndex, fileMeta.lastLogIndex) range.
167173
long payloadArrayIndex = logIndex - indexFileMeta.firstLogIndexInclusive();

modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
25+
import java.nio.file.Files;
2526
import java.nio.file.Path;
2627
import java.util.concurrent.atomic.AtomicReference;
2728
import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -90,7 +91,7 @@ class SegmentFileManager implements ManuallyCloseable {
9091
*/
9192
static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; // 8 zero bytes.
9293

93-
private final Path baseDir;
94+
private final Path segmentFilesDir;
9495

9596
/** Configured size of a segment file. */
9697
private final long fileSize;
@@ -124,12 +125,15 @@ class SegmentFileManager implements ManuallyCloseable {
124125
*/
125126
private boolean isStopped;
126127

127-
SegmentFileManager(String nodeName, Path baseDir, long fileSize, int stripes, FailureProcessor failureProcessor) {
128+
SegmentFileManager(String nodeName, Path baseDir, long fileSize, int stripes, FailureProcessor failureProcessor) throws IOException {
128129
if (fileSize <= HEADER_RECORD.length) {
129130
throw new IllegalArgumentException("File size must be greater than the header size: " + fileSize);
130131
}
131132

132-
this.baseDir = baseDir;
133+
this.segmentFilesDir = baseDir.resolve("segments");
134+
135+
Files.createDirectories(segmentFilesDir);
136+
133137
this.fileSize = fileSize;
134138
this.stripes = stripes;
135139

@@ -144,8 +148,16 @@ void start() throws IOException {
144148
currentSegmentFile.set(allocateNewSegmentFile(0));
145149
}
146150

151+
Path segmentFilesDir() {
152+
return segmentFilesDir;
153+
}
154+
155+
Path indexFilesDir() {
156+
return indexFileManager.indexFilesDir();
157+
}
158+
147159
private SegmentFileWithMemtable allocateNewSegmentFile(int fileOrdinal) throws IOException {
148-
Path path = baseDir.resolve(segmentFileName(fileOrdinal, 0));
160+
Path path = segmentFilesDir.resolve(segmentFileName(fileOrdinal, 0));
149161

150162
SegmentFile segmentFile = SegmentFile.createNew(path, fileSize);
151163

@@ -385,7 +397,7 @@ private long maxEntrySize() {
385397
return null;
386398
}
387399

388-
Path path = baseDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
400+
Path path = segmentFilesDir.resolve(segmentFileName(segmentFilePointer.fileOrdinal(), 0));
389401

390402
// TODO: Add a cache for recently accessed segment files, see https://issues.apache.org/jira/browse/IGNITE-26622.
391403
SegmentFile segmentFile = SegmentFile.openExisting(path);

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManagerTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class IndexFileManagerTest extends IgniteAbstractTest {
3636
private IndexFileManager indexFileManager;
3737

3838
@BeforeEach
39-
void setUp() {
39+
void setUp() throws IOException {
4040
indexFileManager = new IndexFileManager(workDir);
4141
}
4242

@@ -48,9 +48,9 @@ void testIndexFileNaming() throws IOException {
4848
Path path1 = indexFileManager.saveIndexMemtable(memtable);
4949
Path path2 = indexFileManager.saveIndexMemtable(memtable);
5050

51-
assertThat(path0, is(workDir.resolve("index-0000000000-0000000000.bin")));
52-
assertThat(path1, is(workDir.resolve("index-0000000001-0000000000.bin")));
53-
assertThat(path2, is(workDir.resolve("index-0000000002-0000000000.bin")));
51+
assertThat(path0, is(indexFileManager.indexFilesDir().resolve("index-0000000000-0000000000.bin")));
52+
assertThat(path1, is(indexFileManager.indexFilesDir().resolve("index-0000000001-0000000000.bin")));
53+
assertThat(path2, is(indexFileManager.indexFilesDir().resolve("index-0000000002-0000000000.bin")));
5454
}
5555

5656
@Test

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -376,21 +376,18 @@ private Path findSoleSegmentFile() throws IOException {
376376
}
377377

378378
private List<Path> segmentFiles() throws IOException {
379-
try (Stream<Path> files = Files.list(workDir)) {
380-
return files
381-
.filter(p -> p.getFileName().toString().startsWith("segment"))
382-
.sorted()
383-
.collect(toList());
379+
try (Stream<Path> files = Files.list(fileManager.segmentFilesDir())) {
380+
return files.sorted().collect(toList());
384381
}
385382
}
386383

387384
private List<Path> indexFiles() throws IOException {
388-
try (Stream<Path> files = Files.list(workDir)) {
385+
try (Stream<Path> files = Files.list(fileManager.indexFilesDir())) {
389386
return files
390387
.filter(p -> {
391388
String fileName = p.getFileName().toString();
392389

393-
return fileName.startsWith("index") && !fileName.endsWith(".tmp");
390+
return !fileName.endsWith(".tmp");
394391
})
395392
.sorted()
396393
.collect(toList());

modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegstoreLogStorageTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ void tearDown() throws Exception {
4242

4343
@Override
4444
protected LogStorage newLogStorage() {
45-
segmentFileManager = new SegmentFileManager(NODE_NAME, path, SEGMENT_SIZE, 1, new NoOpFailureManager());
45+
try {
46+
segmentFileManager = new SegmentFileManager(NODE_NAME, path, SEGMENT_SIZE, 1, new NoOpFailureManager());
4647

47-
logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);
48+
logStorage = new SegstoreLogStorage(GROUP_ID, segmentFileManager);
4849

49-
try {
5050
segmentFileManager.start();
5151
} catch (IOException e) {
5252
throw new RuntimeException(e);

0 commit comments

Comments
 (0)