diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index b06c542629be..4e404aa86c83 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -55,6 +55,7 @@ import org.apache.paimon.tag.SuccessFileTagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -176,6 +177,11 @@ public SnapshotManager snapshotManager() { snapshotCache); } + @Override + public ChangelogManager changelogManager() { + return new ChangelogManager(fileIO, options.path(), options.branch()); + } + @Override public ManifestFile.Factory manifestFileFactory() { return manifestFileFactory(false); diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index e50d4ada1397..6cd170f4e6c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -61,6 +62,8 @@ public interface FileStore { SnapshotManager snapshotManager(); + ChangelogManager changelogManager(); + RowType partitionType(); CoreOptions options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index baee7bad950e..3b53315d1838 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -33,6 +33,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.util.Map; import java.util.Objects; @@ -61,7 +62,9 @@ */ @Public @JsonIgnoreProperties(ignoreUnknown = true) -public class Snapshot { +public class Snapshot implements Serializable { + + private static final long serialVersionUID = 1L; public static final long FIRST_SNAPSHOT_ID = 1; diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java index 54c41aa428d1..b510f8346e01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java @@ -79,7 +79,7 @@ public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite wr UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer = AppendDeletionFileMaintainer.forUnawareAppend( table.store().newIndexFileHandler(), - table.snapshotManager().latestSnapshotId(), + table.snapshotManager().latestSnapshot(), partition); compactAfter.addAll( write.compactRewrite( diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java index 490bda9d4cf1..b386c6c4edc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java @@ -355,9 +355,7 @@ private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow partition) { synchronized (this) { maintainer = AppendDeletionFileMaintainer.forUnawareAppend( - indexFileHandler, - snapshotManager.latestSnapshotId(), - partition); + indexFileHandler, snapshotManager.latestSnapshot(), partition); } cache.put(partition, maintainer); } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index bfb94b4481d0..7c578506d8e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -18,6 +18,7 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; @@ -152,11 +153,11 @@ public Factory(IndexFileHandler handler) { } public DeletionVectorsMaintainer createOrRestore( - @Nullable Long snapshotId, BinaryRow partition, int bucket) { + @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { List indexFiles = - snapshotId == null + snapshot == null ? Collections.emptyList() - : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket); + : handler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket); Map deletionVectors = new HashMap<>(handler.readAllDeletionVectors(indexFiles)); return createOrRestore(deletionVectors); @@ -164,11 +165,11 @@ public DeletionVectorsMaintainer createOrRestore( @VisibleForTesting public DeletionVectorsMaintainer createOrRestore( - @Nullable Long snapshotId, BinaryRow partition) { + @Nullable Snapshot snapshot, BinaryRow partition) { List indexFiles = - snapshotId == null + snapshot == null ? Collections.emptyList() - : handler.scanEntries(snapshotId, DELETION_VECTORS_INDEX, partition) + : handler.scanEntries(snapshot, DELETION_VECTORS_INDEX, partition) .stream() .map(IndexManifestEntry::indexFile) .collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java index 2c59fe022bbb..4455605e9a12 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java @@ -18,6 +18,7 @@ package org.apache.paimon.deletionvectors.append; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; @@ -53,21 +54,21 @@ public interface AppendDeletionFileMaintainer { static BucketedAppendDeletionFileMaintainer forBucketedAppend( IndexFileHandler indexFileHandler, - @Nullable Long snapshotId, + @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { // bucket should have only one deletion file, so here we should read old deletion vectors, // overwrite the entire deletion file of the bucket when writing deletes. DeletionVectorsMaintainer maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler) - .createOrRestore(snapshotId, partition, bucket); + .createOrRestore(snapshot, partition, bucket); return new BucketedAppendDeletionFileMaintainer(partition, bucket, maintainer); } static UnawareAppendDeletionFileMaintainer forUnawareAppend( - IndexFileHandler indexFileHandler, @Nullable Long snapshotId, BinaryRow partition) { + IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot, BinaryRow partition) { Map deletionFiles = - indexFileHandler.scanDVIndex(snapshotId, partition, UNAWARE_BUCKET); + indexFileHandler.scanDVIndex(snapshot, partition, UNAWARE_BUCKET); return new UnawareAppendDeletionFileMaintainer(indexFileHandler, partition, deletionFiles); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 4aff0ded5f1d..8fa08287d826 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -77,6 +77,7 @@ /** migrate iceberg table to paimon table. */ public class IcebergMigrator implements Migrator { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class); private final ThreadPoolExecutor executor; diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java index 41e4865253b6..b7040176df66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java @@ -19,6 +19,7 @@ package org.apache.paimon.index; import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -44,14 +45,14 @@ public class HashIndexMaintainer implements IndexMaintainer { private HashIndexMaintainer( IndexFileHandler fileHandler, - @Nullable Long snapshotId, + @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { this.fileHandler = fileHandler; IntHashSet hashcode = new IntHashSet(); - if (snapshotId != null) { + if (snapshot != null) { Optional indexFile = - fileHandler.scanHashIndex(snapshotId, partition, bucket); + fileHandler.scanHashIndex(snapshot, partition, bucket); if (indexFile.isPresent()) { IndexFileMeta file = indexFile.get(); hashcode = new IntHashSet((int) file.rowCount()); @@ -115,8 +116,8 @@ public Factory(IndexFileHandler handler) { @Override public IndexMaintainer createOrRestore( - @Nullable Long snapshotId, BinaryRow partition, int bucket) { - return new HashIndexMaintainer(handler, snapshotId, partition, bucket); + @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { + return new HashIndexMaintainer(handler, snapshot, partition, bucket); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 8b0e5c5021f6..f04b40b2ff65 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -75,8 +75,9 @@ public DeletionVectorsIndexFile deletionVectorsIndex() { return this.deletionVectorsIndex; } - public Optional scanHashIndex(long snapshotId, BinaryRow partition, int bucket) { - List result = scan(snapshotId, HASH_INDEX, partition, bucket); + public Optional scanHashIndex( + Snapshot snapshot, BinaryRow partition, int bucket) { + List result = scan(snapshot, HASH_INDEX, partition, bucket); if (result.size() > 1) { throw new IllegalArgumentException( "Find multiple hash index files for one bucket: " + result); @@ -85,11 +86,10 @@ public Optional scanHashIndex(long snapshotId, BinaryRow partitio } public Map scanDVIndex( - @Nullable Long snapshotId, BinaryRow partition, int bucket) { - if (snapshotId == null) { + @Nullable Snapshot snapshot, BinaryRow partition, int bucket) { + if (snapshot == null) { return Collections.emptyMap(); } - Snapshot snapshot = snapshotManager.snapshot(snapshotId); String indexManifest = snapshot.indexManifest(); if (indexManifest == null) { return Collections.emptyMap(); @@ -136,9 +136,9 @@ public List scan(String indexType) { } public List scan( - long snapshotId, String indexType, BinaryRow partition, int bucket) { + Snapshot snapshot, String indexType, BinaryRow partition, int bucket) { List result = new ArrayList<>(); - for (IndexManifestEntry file : scanEntries(snapshotId, indexType, partition)) { + for (IndexManifestEntry file : scanEntries(snapshot, indexType, partition)) { if (file.bucket() == bucket) { result.add(file.indexFile()); } @@ -171,7 +171,7 @@ public List scanEntries() { } public List scanEntries(String indexType, BinaryRow partition) { - Long snapshot = snapshotManager.latestSnapshotId(); + Snapshot snapshot = snapshotManager.latestSnapshot(); if (snapshot == null) { return Collections.emptyList(); } @@ -180,13 +180,8 @@ public List scanEntries(String indexType, BinaryRow partitio } public List scanEntries( - long snapshotId, String indexType, BinaryRow partition) { - return scanEntries(snapshotId, indexType, Collections.singleton(partition)); - } - - public List scanEntries( - long snapshot, String indexType, Set partitions) { - return scanEntries(snapshotManager.snapshot(snapshot), indexType, partitions); + Snapshot snapshot, String indexType, BinaryRow partition) { + return scanEntries(snapshot, indexType, Collections.singleton(partition)); } public List scanEntries( diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java index ba881d6140b6..3a4e232e0402 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexMaintainer.java @@ -18,6 +18,7 @@ package org.apache.paimon.index; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import javax.annotation.Nullable; @@ -34,6 +35,6 @@ public interface IndexMaintainer { /** Factory to restore {@link IndexMaintainer}. */ interface Factory { IndexMaintainer createOrRestore( - @Nullable Long snapshotId, BinaryRow partition, int bucket); + @Nullable Snapshot snapshot, BinaryRow partition, int bucket); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 9b7c1a8fbab8..0d651a22c894 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -367,7 +367,6 @@ public void restore(List> states) { for (State state : states) { RecordWriter writer = createWriter( - state.baseSnapshotId, state.partition, state.bucket, state.dataFiles, @@ -426,24 +425,23 @@ public WriterContainer createWriterContainer( } } - Long latestSnapshotId = snapshotManager.latestSnapshotId(); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); List restoreFiles = new ArrayList<>(); - if (!ignorePreviousFiles && latestSnapshotId != null) { - restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket); + if (!ignorePreviousFiles && latestSnapshot != null) { + restoreFiles = scanExistingFileMetas(latestSnapshot, partition, bucket); } IndexMaintainer indexMaintainer = indexFactory == null ? null : indexFactory.createOrRestore( - ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); + ignorePreviousFiles ? null : latestSnapshot, partition, bucket); DeletionVectorsMaintainer deletionVectorsMaintainer = dvMaintainerFactory == null ? null : dvMaintainerFactory.createOrRestore( - ignorePreviousFiles ? null : latestSnapshotId, partition, bucket); + ignorePreviousFiles ? null : latestSnapshot, partition, bucket); RecordWriter writer = createWriter( - latestSnapshotId, partition.copy(), bucket, restoreFiles, @@ -454,7 +452,10 @@ public WriterContainer createWriterContainer( writer.withInsertOnly(isInsertOnly); notifyNewWriter(writer); return new WriterContainer<>( - writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId); + writer, + indexMaintainer, + deletionVectorsMaintainer, + latestSnapshot == null ? null : latestSnapshot.id()); } @Override @@ -469,10 +470,10 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { } private List scanExistingFileMetas( - long snapshotId, BinaryRow partition, int bucket) { + Snapshot snapshot, BinaryRow partition, int bucket) { List existingFileMetas = new ArrayList<>(); List files = - scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files(); + scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files(); for (ManifestEntry entry : files) { if (entry.totalBuckets() != totalBuckets) { String partInfo = @@ -513,7 +514,6 @@ public ExecutorService getCompactExecutor() { protected void notifyNewWriter(RecordWriter writer) {} protected abstract RecordWriter createWriter( - @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoreFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index b3007a36202c..bff9bbae5c67 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -102,7 +102,6 @@ public AppendOnlyFileStoreWrite( @Override protected RecordWriter createWriter( - @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoredFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 0d2f824282c3..fa0dfa45f5b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -176,7 +176,6 @@ public KeyValueFileStoreWrite( @Override protected MergeTreeWriter createWriter( - @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoreFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 8be994414b6d..7a2dbcf47b6e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -30,6 +30,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FileSystemBranchManager; @@ -60,10 +61,10 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static org.apache.paimon.utils.ChangelogManager.CHANGELOG_PREFIX; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; -import static org.apache.paimon.utils.SnapshotManager.CHANGELOG_PREFIX; -import static org.apache.paimon.utils.SnapshotManager.EARLIEST; -import static org.apache.paimon.utils.SnapshotManager.LATEST; +import static org.apache.paimon.utils.HintFileUtils.EARLIEST; +import static org.apache.paimon.utils.HintFileUtils.LATEST; import static org.apache.paimon.utils.SnapshotManager.SNAPSHOT_PREFIX; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; @@ -133,6 +134,7 @@ protected void cleanSnapshotDir( for (String branch : branches) { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); + ChangelogManager changelogManager = branchTable.changelogManager(); // specially handle the snapshot directory List> nonSnapshotFiles = @@ -146,7 +148,7 @@ protected void cleanSnapshotDir( // specially handle the changelog directory List> nonChangelogFiles = - tryGetNonChangelogFiles(snapshotManager.changelogDirectory(), this::oldEnough); + tryGetNonChangelogFiles(changelogManager.changelogDirectory(), this::oldEnough); nonChangelogFiles.forEach( nonChangelogFile -> cleanFile( @@ -234,10 +236,11 @@ protected void cleanFile(Path path) { protected Set safelyGetAllSnapshots(String branch) throws IOException { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); + ChangelogManager changelogManager = branchTable.changelogManager(); TagManager tagManager = branchTable.tagManager(); Set readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots()); readSnapshots.addAll(tagManager.taggedSnapshots()); - readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs()); + readSnapshots.addAll(changelogManager.safelyGetAllChangelogs()); return readSnapshots; } diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java index b7cc16625122..9adc6e904701 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java @@ -97,7 +97,6 @@ public PostponeBucketFileStoreWrite( @Override protected PostponeBucketWriter createWriter( - @Nullable Long snapshotId, BinaryRow partition, int bucket, List restoreFiles, diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index 3ee0d5fa9b01..e00b2a398cf8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -43,6 +43,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -79,6 +80,12 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + privilegeChecker.assertCanSelectOrInsert(identifier); + return wrapped.changelogManager(); + } + @Override public RowType partitionType() { return wrapped.partitionType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 52c806c7c53b..716c2432891b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -37,6 +37,7 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -44,7 +45,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** {@link FileStoreTable} with privilege checks. */ public class PrivilegedFileStoreTable extends DelegatedFileStoreTable { @@ -66,9 +66,15 @@ public SnapshotManager snapshotManager() { } @Override - public OptionalLong latestSnapshotId() { + public ChangelogManager changelogManager() { privilegeChecker.assertCanSelectOrInsert(identifier); - return wrapped.latestSnapshotId(); + return wrapped.changelogManager(); + } + + @Override + public Optional latestSnapshot() { + privilegeChecker.assertCanSelectOrInsert(identifier); + return wrapped.latestSnapshot(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 58c5d2b9848d..abcc21bb11c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; @@ -272,7 +273,8 @@ public TableSchema commitChanges(List changes) Catalog.ColumnNotExistException { SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch, null, null); - boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null); + LazyField hasSnapshots = + new LazyField<>(() -> snapshotManager.latestSnapshot() != null); while (true) { TableSchema oldTableSchema = @@ -289,7 +291,7 @@ public TableSchema commitChanges(List changes) for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; - if (hasSnapshots) { + if (hasSnapshots.get()) { checkAlterTableOption( setOption.key(), oldOptions.get(setOption.key()), @@ -299,7 +301,7 @@ public TableSchema commitChanges(List changes) newOptions.put(setOption.key(), setOption.value()); } else if (change instanceof RemoveOption) { RemoveOption removeOption = (RemoveOption) change; - if (hasSnapshots) { + if (hasSnapshots.get()) { checkResetTableOption(removeOption.key()); } newOptions.remove(removeOption.key()); diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java index 5cb88f7257a7..6bd9be262f6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsFileHandler.java @@ -54,11 +54,11 @@ public String writeStats(Statistics stats) { * @return stats */ public Optional readStats() { - Long latestSnapshotId = snapshotManager.latestSnapshotId(); - if (latestSnapshotId == null) { + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + if (latestSnapshot == null) { throw new IllegalStateException("Unable to obtain the latest snapshot"); } - return readStats(latestSnapshotId); + return readStats(latestSnapshot); } /** @@ -66,10 +66,6 @@ public Optional readStats() { * * @return stats */ - public Optional readStats(long snapshotId) { - return readStats(snapshotManager.snapshot(snapshotId)); - } - public Optional readStats(Snapshot snapshot) { String file = snapshot.statistics(); return file == null ? Optional.empty() : Optional.of(readStats(file)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 3ed1ee937766..ebeccd24aec3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -61,6 +61,7 @@ import org.apache.paimon.tag.TagPreview; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.CatalogBranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileSystemBranchManager; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; @@ -83,7 +84,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.SortedMap; import java.util.function.BiConsumer; @@ -145,9 +145,9 @@ public void setStatsCache(Cache cache) { } @Override - public OptionalLong latestSnapshotId() { - Long snapshot = store().snapshotManager().latestSnapshotId(); - return snapshot == null ? OptionalLong.empty() : OptionalLong.of(snapshot); + public Optional latestSnapshot() { + Snapshot snapshot = store().snapshotManager().latestSnapshot(); + return Optional.ofNullable(snapshot); } @Override @@ -259,6 +259,7 @@ public SnapshotReader newSnapshotReader() { tableSchema, coreOptions(), snapshotManager(), + changelogManager(), splitGenerator(), nonPartitionFilterConsumer(), DefaultValueAssigner.create(tableSchema), @@ -282,6 +283,7 @@ public StreamDataTableScan newStreamScan() { coreOptions(), newSnapshotReader(), snapshotManager(), + changelogManager(), supportStreamingReadOverwrite(), DefaultValueAssigner.create(tableSchema)); } @@ -419,16 +421,27 @@ public SnapshotManager snapshotManager() { return store().snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return store().changelogManager(); + } + @Override public ExpireSnapshots newExpireSnapshots() { return new ExpireSnapshotsImpl( - snapshotManager(), store().newSnapshotDeletion(), store().newTagManager()); + snapshotManager(), + changelogManager(), + store().newSnapshotDeletion(), + store().newTagManager()); } @Override public ExpireSnapshots newExpireChangelog() { return new ExpireChangelogImpl( - snapshotManager(), tagManager(), store().newChangelogDeletion()); + snapshotManager(), + changelogManager(), + tagManager(), + store().newChangelogDeletion()); } @Override @@ -750,6 +763,7 @@ public FileStoreTable switchToBranch(String branchName) { private RollbackHelper rollbackHelper() { return new RollbackHelper( snapshotManager(), + changelogManager(), tagManager(), fileIO, store().newSnapshotDeletion(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index 75e3d6fe3d88..dd0ab68254f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -24,6 +24,7 @@ import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -39,6 +40,8 @@ public interface DataTable extends InnerTable { SnapshotManager snapshotManager(); + ChangelogManager changelogManager(); + SchemaManager schemaManager(); TagManager tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 0a548941bedc..0e0967fe897a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.source.StreamDataTableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; @@ -50,7 +51,6 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** Delegated {@link FileStoreTable}. */ public abstract class DelegatedFileStoreTable implements FileStoreTable { @@ -95,6 +95,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public SchemaManager schemaManager() { return wrapped.schemaManager(); @@ -156,8 +161,8 @@ public Optional statistics() { } @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); + public Optional latestSnapshot() { + return wrapped.latestSnapshot(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 1ffa7485aee5..bf4f8665ad49 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -45,6 +46,7 @@ public class ExpireChangelogImpl implements ExpireSnapshots { public static final Logger LOG = LoggerFactory.getLogger(ExpireChangelogImpl.class); private final SnapshotManager snapshotManager; + private final ChangelogManager changelogManager; private final ConsumerManager consumerManager; private final ChangelogDeletion changelogDeletion; private final TagManager tagManager; @@ -53,9 +55,11 @@ public class ExpireChangelogImpl implements ExpireSnapshots { public ExpireChangelogImpl( SnapshotManager snapshotManager, + ChangelogManager changelogManager, TagManager tagManager, ChangelogDeletion changelogDeletion) { this.snapshotManager = snapshotManager; + this.changelogManager = changelogManager; this.tagManager = tagManager; this.consumerManager = new ConsumerManager( @@ -90,11 +94,11 @@ public int expire() { return 0; } - Long latestChangelogId = snapshotManager.latestLongLivedChangelogId(); + Long latestChangelogId = changelogManager.latestLongLivedChangelogId(); if (latestChangelogId == null) { return 0; } - Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); + Long earliestChangelogId = changelogManager.earliestLongLivedChangelogId(); if (earliestChangelogId == null) { return 0; } @@ -123,8 +127,8 @@ public int expire() { maxExclusive = Math.min(maxExclusive, latestChangelogId); for (long id = min; id <= maxExclusive; id++) { - if (snapshotManager.longLivedChangelogExists(id) - && olderThanMills <= snapshotManager.longLivedChangelog(id).timeMillis()) { + if (changelogManager.longLivedChangelogExists(id) + && olderThanMills <= changelogManager.longLivedChangelog(id).timeMillis()) { return expireUntil(earliestChangelogId, id); } } @@ -140,13 +144,13 @@ public int expireUntil(long earliestId, long endExclusiveId) { List skippingSnapshots = findSkippingTags(taggedSnapshots, earliestId, endExclusiveId); - skippingSnapshots.add(snapshotManager.changelog(endExclusiveId)); + skippingSnapshots.add(changelogManager.changelog(endExclusiveId)); Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete changelog files from changelog #" + id); } - Changelog changelog = snapshotManager.longLivedChangelog(id); + Changelog changelog = changelogManager.longLivedChangelog(id); Predicate skipper; try { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); @@ -161,7 +165,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { changelogDeletion.cleanUnusedDataFiles(changelog, skipper); changelogDeletion.cleanUnusedManifests(changelog, manifestSkippSet); - snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id)); + changelogManager.fileIO().deleteQuietly(changelogManager.longLivedChangelogPath(id)); } changelogDeletion.cleanEmptyDirectories(); @@ -171,7 +175,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { private void writeEarliestHintFile(long earliest) { try { - snapshotManager.commitLongLivedChangelogEarliestHint(earliest); + changelogManager.commitLongLivedChangelogEarliestHint(earliest); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 9f67a637ff42..7c63ab7e54bb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.options.ExpireConfig; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -50,6 +51,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsImpl.class); private final SnapshotManager snapshotManager; + private final ChangelogManager changelogManager; private final ConsumerManager consumerManager; private final SnapshotDeletion snapshotDeletion; private final TagManager tagManager; @@ -58,9 +60,11 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { public ExpireSnapshotsImpl( SnapshotManager snapshotManager, + ChangelogManager changelogManager, SnapshotDeletion snapshotDeletion, TagManager tagManager) { this.snapshotManager = snapshotManager; + this.changelogManager = changelogManager; this.consumerManager = new ConsumerManager( snapshotManager.fileIO(), @@ -135,7 +139,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { // No expire happens: // write the hint file in order to see the earliest snapshot directly next time // should avoid duplicate writes when the file exists - if (snapshotManager.readHint(SnapshotManager.EARLIEST) == null) { + if (snapshotManager.earliestFileNotExists()) { writeEarliestHint(earliestId); } @@ -261,8 +265,8 @@ public int expireUntil(long earliestId, long endExclusiveId) { private void commitChangelog(Changelog changelog) { try { - snapshotManager.commitChangelog(changelog, changelog.id()); - snapshotManager.commitLongLivedChangelogLatestHint(changelog.id()); + changelogManager.commitChangelog(changelog, changelog.id()); + changelogManager.commitLongLivedChangelogLatestHint(changelog.id()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 7796b36ecb3c..3fcf52db4893 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; /** * A file format table refers to a directory that contains multiple files of the same format, where @@ -253,7 +252,7 @@ default Optional statistics() { } @Override - default OptionalLong latestSnapshotId() { + default Optional latestSnapshot() { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index e0e1a8e2b62d..cb6e3d3fb337 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; /** Readonly table which only provide implementation for scan and read. */ public interface ReadonlyTable extends InnerTable { @@ -110,8 +109,8 @@ default StreamDataTableScan newStreamScan() { } @Override - default OptionalLong latestSnapshotId() { - return OptionalLong.empty(); + default Optional latestSnapshot() { + return Optional.empty(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index d5482c6f5388..4812b9409227 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -25,6 +25,7 @@ import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -49,6 +50,7 @@ public class RollbackHelper { private static final Logger LOG = LoggerFactory.getLogger(RollbackHelper.class); private final SnapshotManager snapshotManager; + private final ChangelogManager changelogManager; private final TagManager tagManager; private final FileIO fileIO; private final SnapshotDeletion snapshotDeletion; @@ -57,12 +59,14 @@ public class RollbackHelper { public RollbackHelper( SnapshotManager snapshotManager, + ChangelogManager changelogManager, TagManager tagManager, FileIO fileIO, SnapshotDeletion snapshotDeletion, ChangelogDeletion changelogDeletion, TagDeletion tagDeletion) { this.snapshotManager = snapshotManager; + this.changelogManager = changelogManager; this.tagManager = tagManager; this.fileIO = fileIO; this.snapshotDeletion = snapshotDeletion; @@ -143,8 +147,8 @@ private List cleanSnapshotsDataFiles(Snapshot retainedSnapshot) { } private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapshot) { - Long earliest = snapshotManager.earliestLongLivedChangelogId(); - Long latest = snapshotManager.latestLongLivedChangelogId(); + Long earliest = changelogManager.earliestLongLivedChangelogId(); + Long latest = changelogManager.latestLongLivedChangelogId(); if (earliest == null || latest == null) { return Collections.emptyList(); } @@ -153,7 +157,7 @@ private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh List toBeCleaned = new ArrayList<>(); long to = Math.max(earliest, retainedSnapshot.id() + 1); for (long i = latest; i >= to; i--) { - toBeCleaned.add(snapshotManager.changelog(i)); + toBeCleaned.add(changelogManager.changelog(i)); } // modify the latest hint @@ -162,9 +166,9 @@ private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh if (to == earliest) { // all changelog has been cleaned, so we do not know the actual latest id // set to -1 - snapshotManager.commitLongLivedChangelogLatestHint(-1); + changelogManager.commitLongLivedChangelogLatestHint(-1); } else { - snapshotManager.commitLongLivedChangelogLatestHint(to - 1); + changelogManager.commitLongLivedChangelogLatestHint(to - 1); } } } catch (IOException e) { @@ -174,7 +178,7 @@ private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh // delete data files of changelog for (Changelog changelog : toBeCleaned) { // delete changelog files first, cannot be read now - fileIO.deleteQuietly(snapshotManager.longLivedChangelogPath(changelog.id())); + fileIO.deleteQuietly(changelogManager.longLivedChangelogPath(changelog.id())); // clean the deleted file changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> false); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 2d0a038a4e41..e72073317c49 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; /** * A table provides basic abstraction for table type and table scan and table read. @@ -93,9 +92,9 @@ default String uuid() { /** Copy this table with adding dynamic options. */ Table copy(Map dynamicOptions); - /** Get the latest snapshot id for this table, or empty if there are no snapshots. */ + /** Get the latest snapshot for this table, or empty if there are no snapshots. */ @Experimental - OptionalLong latestSnapshotId(); + Optional latestSnapshot(); /** Get the {@link Snapshot} from snapshot id. */ @Experimental diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 5bb9ba1378cb..c56b4ec2f092 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -47,6 +47,7 @@ import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; import org.apache.paimon.tag.Tag; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -130,6 +131,7 @@ public CoreOptions options() { protected StartingScanner createStartingScanner(boolean isStreaming) { SnapshotManager snapshotManager = snapshotReader.snapshotManager(); + ChangelogManager changelogManager = snapshotReader.changelogManager(); CoreOptions.StreamScanMode type = options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE); switch (type) { @@ -149,6 +151,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { if (consumer.isPresent()) { return new ContinuousFromSnapshotStartingScanner( snapshotManager, + changelogManager, consumer.get().nextSnapshot(), options.changelogLifecycleDecoupled()); } @@ -178,6 +181,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return isStreaming ? new ContinuousFromTimestampStartingScanner( snapshotManager, + changelogManager, startupMillis, options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis); @@ -189,6 +193,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return isStreaming ? new ContinuousFromSnapshotStartingScanner( snapshotManager, + changelogManager, options.scanSnapshotId(), options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java index 5d92d6f7070e..f3581b114cc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java @@ -35,6 +35,7 @@ import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.NextSnapshotFetcher; import org.apache.paimon.utils.SnapshotManager; @@ -74,6 +75,7 @@ public DataTableStreamScan( CoreOptions options, SnapshotReader snapshotReader, SnapshotManager snapshotManager, + ChangelogManager changelogManager, boolean supportStreamingReadOverwrite, DefaultValueAssigner defaultValueAssigner) { super(options, snapshotReader); @@ -82,7 +84,8 @@ public DataTableStreamScan( this.supportStreamingReadOverwrite = supportStreamingReadOverwrite; this.defaultValueAssigner = defaultValueAssigner; this.nextSnapshotProvider = - new NextSnapshotFetcher(snapshotManager, options.changelogLifecycleDecoupled()); + new NextSnapshotFetcher( + snapshotManager, changelogManager, options.changelogLifecycleDecoupled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java index d8e614222857..04d86626c1c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; /** @@ -28,10 +29,15 @@ public class ContinuousFromSnapshotStartingScanner extends AbstractStartingScanner { private final boolean changelogDecoupled; + private final ChangelogManager changelogManager; public ContinuousFromSnapshotStartingScanner( - SnapshotManager snapshotManager, long snapshotId, boolean changelogDecoupled) { + SnapshotManager snapshotManager, + ChangelogManager changelogManager, + long snapshotId, + boolean changelogDecoupled) { super(snapshotManager); + this.changelogManager = changelogManager; this.startingSnapshotId = snapshotId; this.changelogDecoupled = changelogDecoupled; } @@ -50,7 +56,7 @@ public Result scan(SnapshotReader snapshotReader) { private Long getEarliestId() { Long earliestId; if (changelogDecoupled) { - Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); + Long earliestChangelogId = changelogManager.earliestLongLivedChangelogId(); earliestId = earliestChangelogId == null ? snapshotManager.earliestSnapshotId() diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java index 941174835537..1946da1e0c7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java @@ -19,11 +19,20 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.utils.ChangelogManager; +import org.apache.paimon.utils.FunctionWithException; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; + +import static org.apache.paimon.utils.SnapshotManager.EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM; + /** * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a * streaming read. @@ -33,16 +42,22 @@ public class ContinuousFromTimestampStartingScanner extends AbstractStartingScan private static final Logger LOG = LoggerFactory.getLogger(ContinuousFromTimestampStartingScanner.class); + private final ChangelogManager changelogManager; private final long startupMillis; private final boolean startFromChangelog; public ContinuousFromTimestampStartingScanner( - SnapshotManager snapshotManager, long startupMillis, boolean changelogDecoupled) { + SnapshotManager snapshotManager, + ChangelogManager changelogManager, + long startupMillis, + boolean changelogDecoupled) { super(snapshotManager); + this.changelogManager = changelogManager; this.startupMillis = startupMillis; this.startFromChangelog = changelogDecoupled; this.startingSnapshotId = - this.snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); + earlierThanTimeMills( + snapshotManager, changelogManager, startupMillis, startFromChangelog); } @Override @@ -57,11 +72,112 @@ public StartingContext startingContext() { @Override public Result scan(SnapshotReader snapshotReader) { Long startingSnapshotId = - snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); + earlierThanTimeMills( + snapshotManager, changelogManager, startupMillis, startFromChangelog); if (startingSnapshotId == null) { LOG.debug("There is currently no snapshot. Waiting for snapshot generation."); return new NoSnapshot(); } return new NextSnapshot(startingSnapshotId + 1); } + + /** + * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be + * returned if all snapshots are equal to or later than the timestamp mills. + */ + public static @Nullable Long earlierThanTimeMills( + SnapshotManager snapshotManager, + ChangelogManager changelogManager, + long timestampMills, + boolean startFromChangelog) { + Long latest = snapshotManager.latestSnapshotId(); + if (latest == null) { + return null; + } + + Snapshot earliestSnapshot = + earliestSnapshot(snapshotManager, changelogManager, startFromChangelog, latest); + if (earliestSnapshot == null) { + return latest - 1; + } + + if (earliestSnapshot.timeMillis() >= timestampMills) { + return earliestSnapshot.id() - 1; + } + + long earliest = earliestSnapshot.id(); + while (earliest < latest) { + long mid = (earliest + latest + 1) / 2; + Snapshot snapshot = + startFromChangelog + ? changelogOrSnapshot(snapshotManager, changelogManager, mid) + : snapshotManager.snapshot(mid); + if (snapshot.timeMillis() < timestampMills) { + earliest = mid; + } else { + latest = mid - 1; + } + } + return earliest; + } + + private static @Nullable Snapshot earliestSnapshot( + SnapshotManager snapshotManager, + ChangelogManager changelogManager, + boolean includeChangelog, + @Nullable Long stopSnapshotId) { + Long snapshotId = null; + if (includeChangelog) { + snapshotId = changelogManager.earliestLongLivedChangelogId(); + } + if (snapshotId == null) { + snapshotId = snapshotManager.earliestSnapshotId(); + } + if (snapshotId == null) { + return null; + } + + if (stopSnapshotId == null) { + stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM; + } + + FunctionWithException snapshotFunction = + includeChangelog + ? s -> tryGetChangelogOrSnapshot(snapshotManager, changelogManager, s) + : snapshotManager::tryGetSnapshot; + + do { + try { + return snapshotFunction.apply(snapshotId); + } catch (FileNotFoundException e) { + snapshotId++; + if (snapshotId > stopSnapshotId) { + return null; + } + LOG.warn( + "The earliest snapshot or changelog was once identified but disappeared. " + + "It might have been expired by other jobs operating on this table. " + + "Searching for the second earliest snapshot or changelog instead. "); + } + } while (true); + } + + private static Snapshot tryGetChangelogOrSnapshot( + SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId) + throws FileNotFoundException { + if (changelogManager.longLivedChangelogExists(snapshotId)) { + return changelogManager.tryGetChangelog(snapshotId); + } else { + return snapshotManager.tryGetSnapshot(snapshotId); + } + } + + private static Snapshot changelogOrSnapshot( + SnapshotManager snapshotManager, ChangelogManager changelogManager, long snapshotId) { + if (changelogManager.longLivedChangelogExists(snapshotId)) { + return changelogManager.changelog(snapshotId); + } else { + return snapshotManager.snapshot(snapshotId); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java index 1189bc68efa2..73adedc122d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.utils.SnapshotManager; @@ -30,9 +31,14 @@ public class FullStartingScanner extends ReadPlanStartingScanner { private static final Logger LOG = LoggerFactory.getLogger(FullStartingScanner.class); + private Snapshot startingSnapshot; + public FullStartingScanner(SnapshotManager snapshotManager) { super(snapshotManager); - this.startingSnapshotId = snapshotManager.latestSnapshotId(); + this.startingSnapshot = snapshotManager.latestSnapshot(); + if (this.startingSnapshot != null) { + this.startingSnapshotId = startingSnapshot.id(); + } } @Override @@ -42,14 +48,14 @@ public ScanMode startingScanMode() { @Override public SnapshotReader configure(SnapshotReader snapshotReader) { - if (startingSnapshotId == null) { + if (startingSnapshot == null) { // try to get first snapshot again - startingSnapshotId = snapshotManager.latestSnapshotId(); + startingSnapshot = snapshotManager.latestSnapshot(); } - if (startingSnapshotId == null) { + if (startingSnapshot == null) { LOG.debug("There is currently no snapshot. Waiting for snapshot generation."); return null; } - return snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId); + return snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshot); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 3329ab95fcc9..0cc42088304a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.SnapshotManager; @@ -51,6 +52,8 @@ public interface SnapshotReader { SnapshotManager snapshotManager(); + ChangelogManager changelogManager(); + ManifestsReader manifestsReader(); List readManifest(ManifestFileMeta manifest); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 5a4a79e0d702..649dc5b1e490 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -46,6 +46,7 @@ import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.SplitGenerator; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; @@ -79,6 +80,7 @@ public class SnapshotReaderImpl implements SnapshotReader { private final CoreOptions options; private final boolean deletionVectors; private final SnapshotManager snapshotManager; + private final ChangelogManager changelogManager; private final ConsumerManager consumerManager; private final SplitGenerator splitGenerator; private final BiConsumer nonPartitionFilterConsumer; @@ -95,6 +97,7 @@ public SnapshotReaderImpl( TableSchema tableSchema, CoreOptions options, SnapshotManager snapshotManager, + ChangelogManager changelogManager, SplitGenerator splitGenerator, BiConsumer nonPartitionFilterConsumer, DefaultValueAssigner defaultValueAssigner, @@ -106,6 +109,7 @@ public SnapshotReaderImpl( this.options = options; this.deletionVectors = options.deletionVectorsEnabled(); this.snapshotManager = snapshotManager; + this.changelogManager = changelogManager; this.consumerManager = new ConsumerManager( snapshotManager.fileIO(), @@ -130,6 +134,11 @@ public SnapshotManager snapshotManager() { return snapshotManager; } + @Override + public ChangelogManager changelogManager() { + return changelogManager; + } + @Override public ManifestsReader manifestsReader() { return scan.manifestsReader(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 005535094ef5..57e18fe46237 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -59,6 +59,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; @@ -75,7 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -106,8 +106,8 @@ public AuditLogTable(FileStoreTable wrapped) { } @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); + public Optional latestSnapshot() { + return wrapped.latestSnapshot(); } @Override @@ -188,6 +188,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public SchemaManager schemaManager() { return wrapped.schemaManager(); @@ -255,6 +260,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public ManifestsReader manifestsReader() { return wrapped.manifestsReader(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java index 31cecbfb15c2..de6b29dee48b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java @@ -50,6 +50,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; @@ -61,7 +62,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import static org.apache.paimon.utils.SerializationUtils.newBytesType; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; @@ -112,8 +113,8 @@ public CompactBucketsTable(FileStoreTable wrapped, boolean isContinuous, String } @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); + public Optional latestSnapshot() { + return wrapped.latestSnapshot(); } @Override @@ -146,6 +147,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public SchemaManager schemaManager() { return wrapped.schemaManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index 522335aaa6c9..f3fafaa91e51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -50,6 +50,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; @@ -60,7 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE; @@ -98,8 +99,8 @@ public FileMonitorTable(FileStoreTable wrapped) { } @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); + public Optional latestSnapshot() { + return wrapped.latestSnapshot(); } @Override @@ -132,6 +133,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public SchemaManager schemaManager() { return wrapped.schemaManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 5308005053c8..fc9527c19568 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -38,13 +38,14 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -68,8 +69,8 @@ public ReadOptimizedTable(FileStoreTable wrapped) { } @Override - public OptionalLong latestSnapshotId() { - return wrapped.latestSnapshotId(); + public Optional latestSnapshot() { + return wrapped.latestSnapshot(); } @Override @@ -147,6 +148,7 @@ public StreamDataTableScan newStreamScan() { coreOptions(), newSnapshotReader(), snapshotManager(), + changelogManager(), wrapped.supportStreamingReadOverwrite(), DefaultValueAssigner.create(wrapped.schema())); } @@ -166,6 +168,11 @@ public SnapshotManager snapshotManager() { return wrapped.snapshotManager(); } + @Override + public ChangelogManager changelogManager() { + return wrapped.changelogManager(); + } + @Override public SchemaManager schemaManager() { return wrapped.schemaManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java new file mode 100644 index 000000000000..ece4de657712 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChangelogManager.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Changelog; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.BranchManager.branchPath; +import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.HintFileUtils.commitEarliestHint; +import static org.apache.paimon.utils.HintFileUtils.commitLatestHint; +import static org.apache.paimon.utils.HintFileUtils.findEarliest; +import static org.apache.paimon.utils.HintFileUtils.findLatest; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; + +/** + * Manager for {@link Changelog}, providing utility methods related to paths and changelog hints. + */ +public class ChangelogManager implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ChangelogManager.class); + + public static final String CHANGELOG_PREFIX = "changelog-"; + + private final FileIO fileIO; + private final Path tablePath; + private final String branch; + + public ChangelogManager(FileIO fileIO, Path tablePath, @Nullable String branchName) { + this.fileIO = fileIO; + this.tablePath = tablePath; + this.branch = BranchManager.normalizeBranch(branchName); + } + + public FileIO fileIO() { + return fileIO; + } + + public @Nullable Long latestLongLivedChangelogId() { + try { + return findLatest( + fileIO, changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find latest changelog id", e); + } + } + + public @Nullable Long earliestLongLivedChangelogId() { + try { + return findEarliest( + fileIO, changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find earliest changelog id", e); + } + } + + public boolean longLivedChangelogExists(long snapshotId) { + Path path = longLivedChangelogPath(snapshotId); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + "Failed to determine if changelog #" + snapshotId + " exists in path " + path, + e); + } + } + + public Changelog longLivedChangelog(long snapshotId) { + return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); + } + + public Changelog changelog(long snapshotId) { + Path changelogPath = longLivedChangelogPath(snapshotId); + return Changelog.fromPath(fileIO, changelogPath); + } + + public Path longLivedChangelogPath(long snapshotId) { + return new Path( + branchPath(tablePath, branch) + "/changelog/" + CHANGELOG_PREFIX + snapshotId); + } + + public Path changelogDirectory() { + return new Path(branchPath(tablePath, branch) + "/changelog"); + } + + public void commitChangelog(Changelog changelog, long id) throws IOException { + fileIO.writeFile(longLivedChangelogPath(id), changelog.toJson(), true); + } + + public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { + commitLatestHint(fileIO, snapshotId, changelogDirectory()); + } + + public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException { + commitEarliestHint(fileIO, snapshotId, changelogDirectory()); + } + + public Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException { + Path changelogPath = longLivedChangelogPath(snapshotId); + return Changelog.tryFromPath(fileIO, changelogPath); + } + + public Iterator changelogs() throws IOException { + return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) + .map(this::changelog) + .sorted(Comparator.comparingLong(Changelog::id)) + .iterator(); + } + + public List safelyGetAllChangelogs() throws IOException { + List paths = + listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) + .map(this::longLivedChangelogPath) + .collect(Collectors.toList()); + + List changelogs = Collections.synchronizedList(new ArrayList<>(paths.size())); + collectSnapshots( + path -> { + try { + changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path))); + } catch (IOException e) { + if (!(e instanceof FileNotFoundException)) { + throw new RuntimeException(e); + } + } + }, + paths); + + return changelogs; + } + + private static void collectSnapshots(Consumer pathConsumer, List paths) + throws IOException { + ExecutorService executor = + createCachedThreadPool( + Runtime.getRuntime().availableProcessors(), "CHANGELOG_COLLECTOR"); + + try { + randomlyOnlyExecute(executor, pathConsumer, paths); + } catch (RuntimeException e) { + throw new IOException(e); + } finally { + executor.shutdown(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java new file mode 100644 index 000000000000..734de7aeeff7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.BinaryOperator; +import java.util.function.Function; + +import static org.apache.paimon.utils.FileUtils.listVersionedFiles; + +/** Utils for hint files. */ +public class HintFileUtils { + + public static final String EARLIEST = "EARLIEST"; + public static final String LATEST = "LATEST"; + + private static final int READ_HINT_RETRY_NUM = 3; + private static final int READ_HINT_RETRY_INTERVAL = 1; + + @Nullable + public static Long findLatest(FileIO fileIO, Path dir, String prefix, Function file) + throws IOException { + Long snapshotId = readHint(fileIO, LATEST, dir); + if (snapshotId != null && snapshotId > 0) { + long nextSnapshot = snapshotId + 1; + // it is the latest only there is no next one + if (!fileIO.exists(file.apply(nextSnapshot))) { + return snapshotId; + } + } + return findByListFiles(fileIO, Math::max, dir, prefix); + } + + @Nullable + public static Long findEarliest( + FileIO fileIO, Path dir, String prefix, Function file) throws IOException { + Long snapshotId = readHint(fileIO, EARLIEST, dir); + // null and it is the earliest only it exists + if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) { + return snapshotId; + } + + return findByListFiles(fileIO, Math::min, dir, prefix); + } + + public static Long readHint(FileIO fileIO, String fileName, Path dir) { + Path path = new Path(dir, fileName); + int retryNumber = 0; + while (retryNumber++ < READ_HINT_RETRY_NUM) { + try { + return fileIO.readOverwrittenFileUtf8(path).map(Long::parseLong).orElse(null); + } catch (Exception ignored) { + } + try { + TimeUnit.MILLISECONDS.sleep(READ_HINT_RETRY_INTERVAL); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return null; + } + + public static Long findByListFiles( + FileIO fileIO, BinaryOperator reducer, Path dir, String prefix) + throws IOException { + return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); + } + + public static void commitLatestHint(FileIO fileIO, long id, Path dir) throws IOException { + commitHint(fileIO, id, LATEST, dir); + } + + public static void commitEarliestHint(FileIO fileIO, long id, Path dir) throws IOException { + commitHint(fileIO, id, EARLIEST, dir); + } + + public static void deleteLatestHint(FileIO fileIO, Path dir) throws IOException { + Path hintFile = new Path(dir, LATEST); + fileIO.delete(hintFile, false); + } + + public static void commitHint(FileIO fileIO, long id, String fileName, Path dir) + throws IOException { + Path hintFile = new Path(dir, fileName); + int loopTime = 3; + while (loopTime-- > 0) { + try { + fileIO.overwriteFileUtf8(hintFile, String.valueOf(id)); + return; + } catch (IOException e) { + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(1000) + 500); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + // throw root cause + throw new RuntimeException(e); + } + if (loopTime == 0) { + throw e; + } + } + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java index d0a317df5379..205846b1b50a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java @@ -30,11 +30,17 @@ public class NextSnapshotFetcher { public static final Logger LOG = LoggerFactory.getLogger(NextSnapshotFetcher.class); + private final SnapshotManager snapshotManager; + private final ChangelogManager changelogManager; private final boolean changelogDecoupled; - public NextSnapshotFetcher(SnapshotManager snapshotManager, boolean changelogDecoupled) { + public NextSnapshotFetcher( + SnapshotManager snapshotManager, + ChangelogManager changelogManager, + boolean changelogDecoupled) { this.snapshotManager = snapshotManager; + this.changelogManager = changelogManager; this.changelogDecoupled = changelogDecoupled; } @@ -63,7 +69,7 @@ public Snapshot getNextSnapshot(long nextSnapshotId) { return null; } - if (!changelogDecoupled || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { + if (!changelogDecoupled || !changelogManager.longLivedChangelogExists(nextSnapshotId)) { throw new OutOfRangeException( String.format( "The snapshot with id %d has expired. You can: " @@ -71,6 +77,6 @@ public Snapshot getNextSnapshot(long nextSnapshotId) { + "2. use consumer-id to ensure that unconsumed snapshots will not be expired.", nextSnapshotId)); } - return snapshotManager.changelog(nextSnapshotId); + return changelogManager.changelog(nextSnapshotId); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index fc2013f019b4..d48821aa9c17 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -18,7 +18,6 @@ package org.apache.paimon.utils; -import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -44,9 +43,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -66,12 +62,8 @@ public class SnapshotManager implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); public static final String SNAPSHOT_PREFIX = "snapshot-"; - public static final String CHANGELOG_PREFIX = "changelog-"; - public static final String EARLIEST = "EARLIEST"; - public static final String LATEST = "LATEST"; - private static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3; - private static final int READ_HINT_RETRY_NUM = 3; - private static final int READ_HINT_RETRY_INTERVAL = 1; + + public static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3; private final FileIO fileIO; private final Path tablePath; @@ -112,15 +104,6 @@ public String branch() { return branch; } - public Path changelogDirectory() { - return new Path(branchPath(tablePath, branch) + "/changelog"); - } - - public Path longLivedChangelogPath(long snapshotId) { - return new Path( - branchPath(tablePath, branch) + "/changelog/" + CHANGELOG_PREFIX + snapshotId); - } - public Path snapshotPath(long snapshotId) { return new Path( branchPath(tablePath, branch) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); @@ -160,20 +143,6 @@ public Snapshot tryGetSnapshot(long snapshotId) throws FileNotFoundException { return snapshot; } - public Changelog changelog(long snapshotId) { - Path changelogPath = longLivedChangelogPath(snapshotId); - return Changelog.fromPath(fileIO, changelogPath); - } - - private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException { - Path changelogPath = longLivedChangelogPath(snapshotId); - return Changelog.tryFromPath(fileIO, changelogPath); - } - - public Changelog longLivedChangelog(long snapshotId) { - return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); - } - public boolean snapshotExists(long snapshotId) { Path path = snapshotPath(snapshotId); try { @@ -193,17 +162,6 @@ public void deleteSnapshot(long snapshotId) { fileIO().deleteQuietly(path); } - public boolean longLivedChangelogExists(long snapshotId) { - Path path = longLivedChangelogPath(snapshotId); - try { - return fileIO.exists(path); - } catch (IOException e) { - throw new RuntimeException( - "Failed to determine if changelog #" + snapshotId + " exists in path " + path, - e); - } - } - public @Nullable Snapshot latestSnapshot() { if (snapshotLoader != null) { try { @@ -228,18 +186,11 @@ public boolean longLivedChangelogExists(long snapshotId) { } public @Nullable Snapshot earliestSnapshot() { - return earliestSnapshot(false, null); + return earliestSnapshot(null); } - private @Nullable Snapshot earliestSnapshot( - boolean includeChangelog, @Nullable Long stopSnapshotId) { - Long snapshotId = null; - if (includeChangelog) { - snapshotId = earliestLongLivedChangelogId(); - } - if (snapshotId == null) { - snapshotId = earliestSnapshotId(); - } + private @Nullable Snapshot earliestSnapshot(@Nullable Long stopSnapshotId) { + Long snapshotId = earliestSnapshotId(); if (snapshotId == null) { return null; } @@ -248,12 +199,9 @@ public boolean longLivedChangelogExists(long snapshotId) { stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM; } - FunctionWithException snapshotFunction = - includeChangelog ? this::tryGetChangelogOrSnapshot : this::tryGetSnapshot; - do { try { - return snapshotFunction.apply(snapshotId); + return tryGetSnapshot(snapshotId); } catch (FileNotFoundException e) { snapshotId++; if (snapshotId > stopSnapshotId) { @@ -267,6 +215,10 @@ public boolean longLivedChangelogExists(long snapshotId) { } while (true); } + public boolean earliestFileNotExists() { + return HintFileUtils.readHint(fileIO, HintFileUtils.EARLIEST, snapshotDirectory()) == null; + } + public @Nullable Long earliestSnapshotId() { try { return findEarliest(snapshotDirectory(), SNAPSHOT_PREFIX, this::snapshotPath); @@ -275,27 +227,6 @@ public boolean longLivedChangelogExists(long snapshotId) { } } - public @Nullable Long earliestLongLivedChangelogId() { - try { - return findEarliest( - changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); - } catch (IOException e) { - throw new RuntimeException("Failed to find earliest changelog id", e); - } - } - - public @Nullable Long latestLongLivedChangelogId() { - try { - return findLatest(changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); - } catch (IOException e) { - throw new RuntimeException("Failed to find latest changelog id", e); - } - } - - public @Nullable Long latestChangelogId() { - return latestSnapshotId(); - } - public @Nullable Long pickOrLatest(Predicate predicate) { Long latestId = latestSnapshotId(); Long earliestId = earliestSnapshotId(); @@ -315,53 +246,6 @@ public boolean longLivedChangelogExists(long snapshotId) { return latestId; } - private Snapshot changelogOrSnapshot(long snapshotId) { - if (longLivedChangelogExists(snapshotId)) { - return changelog(snapshotId); - } else { - return snapshot(snapshotId); - } - } - - private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException { - if (longLivedChangelogExists(snapshotId)) { - return tryGetChangelog(snapshotId); - } else { - return tryGetSnapshot(snapshotId); - } - } - - /** - * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be - * returned if all snapshots are equal to or later than the timestamp mills. - */ - public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { - Long latest = latestSnapshotId(); - if (latest == null) { - return null; - } - - Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog, latest); - if (earliestSnapshot == null) { - return latest - 1; - } - - if (earliestSnapshot.timeMillis() >= timestampMills) { - return earliestSnapshot.id() - 1; - } - - long earliest = earliestSnapshot.id(); - while (earliest < latest) { - long mid = (earliest + latest + 1) / 2; - if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { - earliest = mid; - } else { - latest = mid - 1; - } - } - return earliest; - } - /** * Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp * mills. If there is no such a snapshot, returns null. @@ -372,7 +256,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = earliestSnapshot(false, latest); + Snapshot earliestSnapShot = earliestSnapshot(latest); if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; } @@ -437,7 +321,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = earliestSnapshot(false, latest); + Snapshot earliestSnapShot = earliestSnapshot(latest); if (earliestSnapShot == null) { return null; } @@ -502,7 +386,7 @@ private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundE return null; } - Snapshot earliestSnapShot = earliestSnapshot(false, latest); + Snapshot earliestSnapShot = earliestSnapshot(latest); if (earliestSnapShot == null) { return null; } @@ -625,13 +509,6 @@ public Iterator snapshotsWithinRange( .iterator(); } - public Iterator changelogs() throws IOException { - return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::changelog) - .sorted(Comparator.comparingLong(Changelog::id)) - .iterator(); - } - /** * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may * be deleted by other processes, so just skip this snapshot. @@ -656,29 +533,7 @@ public List safelyGetAllSnapshots() throws IOException { return snapshots; } - public List safelyGetAllChangelogs() throws IOException { - List paths = - listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::longLivedChangelogPath) - .collect(Collectors.toList()); - - List changelogs = Collections.synchronizedList(new ArrayList<>(paths.size())); - collectSnapshots( - path -> { - try { - changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path))); - } catch (IOException e) { - if (!(e instanceof FileNotFoundException)) { - throw new RuntimeException(e); - } - } - }, - paths); - - return changelogs; - } - - private void collectSnapshots(Consumer pathConsumer, List paths) + private static void collectSnapshots(Consumer pathConsumer, List paths) throws IOException { ExecutorService executor = createCachedThreadPool( @@ -770,10 +625,6 @@ public List findSnapshotsForIdentifiers( return matchedSnapshots; } - public void commitChangelog(Changelog changelog, long id) throws IOException { - fileIO.writeFile(longLivedChangelogPath(id), changelog.toJson(), true); - } - /** * Traversal snapshots from latest to earliest safely, this is applied on the writer side * because the committer may delete obsolete snapshots, which may cause the writer to encounter @@ -818,53 +669,12 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { private @Nullable Long findLatest(Path dir, String prefix, Function file) throws IOException { - Long snapshotId = readHint(LATEST, dir); - if (snapshotId != null && snapshotId > 0) { - long nextSnapshot = snapshotId + 1; - // it is the latest only there is no next one - if (!fileIO.exists(file.apply(nextSnapshot))) { - return snapshotId; - } - } - return findByListFiles(Math::max, dir, prefix); + return HintFileUtils.findLatest(fileIO, dir, prefix, file); } private @Nullable Long findEarliest(Path dir, String prefix, Function file) throws IOException { - Long snapshotId = readHint(EARLIEST, dir); - // null and it is the earliest only it exists - if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) { - return snapshotId; - } - - return findByListFiles(Math::min, dir, prefix); - } - - public Long readHint(String fileName) { - return readHint(fileName, snapshotDirectory()); - } - - public Long readHint(String fileName, Path dir) { - Path path = new Path(dir, fileName); - int retryNumber = 0; - while (retryNumber++ < READ_HINT_RETRY_NUM) { - try { - return fileIO.readOverwrittenFileUtf8(path).map(Long::parseLong).orElse(null); - } catch (Exception ignored) { - } - try { - TimeUnit.MILLISECONDS.sleep(READ_HINT_RETRY_INTERVAL); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - return null; - } - - private Long findByListFiles(BinaryOperator reducer, Path dir, String prefix) - throws IOException { - return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); + return HintFileUtils.findEarliest(fileIO, dir, prefix, file); } public static int findPreviousSnapshot(List sortedSnapshots, long targetSnapshotId) { @@ -887,46 +697,14 @@ public static int findPreviousOrEqualSnapshot( } public void deleteLatestHint() throws IOException { - Path snapshotDir = snapshotDirectory(); - Path hintFile = new Path(snapshotDir, LATEST); - fileIO.delete(hintFile, false); + HintFileUtils.deleteLatestHint(fileIO, snapshotDirectory()); } public void commitLatestHint(long snapshotId) throws IOException { - commitHint(snapshotId, LATEST, snapshotDirectory()); - } - - public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { - commitHint(snapshotId, LATEST, changelogDirectory()); - } - - public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException { - commitHint(snapshotId, EARLIEST, changelogDirectory()); + HintFileUtils.commitLatestHint(fileIO, snapshotId, snapshotDirectory()); } public void commitEarliestHint(long snapshotId) throws IOException { - commitHint(snapshotId, EARLIEST, snapshotDirectory()); - } - - private void commitHint(long snapshotId, String fileName, Path dir) throws IOException { - Path hintFile = new Path(dir, fileName); - int loopTime = 3; - while (loopTime-- > 0) { - try { - fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); - return; - } catch (IOException e) { - try { - Thread.sleep(ThreadLocalRandom.current().nextInt(1000) + 500); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - // throw root cause - throw new RuntimeException(e); - } - if (loopTime == 0) { - throw e; - } - } - } + HintFileUtils.commitEarliestHint(fileIO, snapshotId, snapshotDirectory()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java index b48288a8da14..41f5e2dd952e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/SnapshotTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.Test; @@ -53,6 +54,10 @@ public static SnapshotManager newSnapshotManager(FileIO fileIO, Path tablePath) return newSnapshotManager(fileIO, tablePath, DEFAULT_MAIN_BRANCH); } + public static ChangelogManager newChangelogManager(FileIO fileIO, Path tablePath) { + return new ChangelogManager(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + public static SnapshotManager newSnapshotManager(FileIO fileIO, Path tablePath, String branch) { return new SnapshotManager(fileIO, tablePath, branch, null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index e6db51589408..94f5777c1097 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -115,8 +115,8 @@ public CommitMessage removeIndexFiles( } public List scanDVIndexFiles(BinaryRow partition, int bucket) { - Long lastSnapshotId = snapshotManager().latestSnapshotId(); - return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket); + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + return fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, partition, bucket); } public UnawareAppendDeletionFileMaintainer createDVIFMaintainer( @@ -126,10 +126,10 @@ public UnawareAppendDeletionFileMaintainer createDVIFMaintainer( } public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { - Long lastSnapshotId = snapshotManager().latestSnapshotId(); + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); DeletionVectorsMaintainer.Factory factory = new DeletionVectorsMaintainer.Factory(fileHandler); - return factory.createOrRestore(lastSnapshotId, partition, bucket); + return factory.createOrRestore(latestSnapshot, partition, bucket); } public CommitMessageImpl writeDVIndexFiles( diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 62f2c38e1392..2187418ee0cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -57,9 +57,11 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.HintFileUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -161,6 +163,7 @@ public FileStoreCommitImpl newCommit() { public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long millisRetained) { return new ExpireSnapshotsImpl( snapshotManager(), + changelogManager(), newSnapshotDeletion(), new TagManager(fileIO, options.path())) .config( @@ -174,6 +177,7 @@ public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long mi public ExpireSnapshots newExpire(ExpireConfig expireConfig) { return new ExpireSnapshotsImpl( snapshotManager(), + changelogManager(), newSnapshotDeletion(), new TagManager(fileIO, options.path())) .config(expireConfig); @@ -183,6 +187,7 @@ public ExpireSnapshots newChangelogExpire(ExpireConfig config) { ExpireChangelogImpl impl = new ExpireChangelogImpl( snapshotManager(), + changelogManager(), new TagManager(fileIO, options.path()), newChangelogDeletion()); impl.config(config); @@ -493,32 +498,33 @@ public void assertCleaned() throws IOException { // - latest should < true_latest // - earliest should < true_earliest SnapshotManager snapshotManager = snapshotManager(); + ChangelogManager changelogManager = changelogManager(); Path snapshotDir = snapshotManager.snapshotDirectory(); - Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST); - Path latest = new Path(snapshotDir, SnapshotManager.LATEST); + Path earliest = new Path(snapshotDir, HintFileUtils.EARLIEST); + Path latest = new Path(snapshotDir, HintFileUtils.LATEST); if (actualFiles.remove(earliest)) { - long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST); + long earliestId = snapshotManager.earliestSnapshotId(); fileIO.delete(earliest, false); assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue(); } if (actualFiles.remove(latest)) { - long latestId = snapshotManager.readHint(SnapshotManager.LATEST); + long latestId = snapshotManager.latestSnapshotId(); fileIO.delete(latest, false); assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue(); } - Path changelogDir = snapshotManager.changelogDirectory(); - Path earliestChangelog = new Path(changelogDir, SnapshotManager.EARLIEST); - Path latestChangelog = new Path(changelogDir, SnapshotManager.LATEST); + Path changelogDir = changelogManager.changelogDirectory(); + Path earliestChangelog = new Path(changelogDir, HintFileUtils.EARLIEST); + Path latestChangelog = new Path(changelogDir, HintFileUtils.LATEST); if (actualFiles.remove(earliestChangelog)) { - long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST, changelogDir); + long earliestId = changelogManager.earliestLongLivedChangelogId(); fileIO.delete(earliest, false); - assertThat(earliestId <= snapshotManager.earliestLongLivedChangelogId()).isTrue(); + assertThat(earliestId <= changelogManager.earliestLongLivedChangelogId()).isTrue(); } if (actualFiles.remove(latestChangelog)) { - long latestId = snapshotManager.readHint(SnapshotManager.LATEST, changelogDir); + long latestId = changelogManager.latestLongLivedChangelogId(); fileIO.delete(latest, false); - assertThat(latestId <= snapshotManager.latestLongLivedChangelogId()).isTrue(); + assertThat(latestId <= changelogManager.latestLongLivedChangelogId()).isTrue(); } // for easier debugging @@ -539,6 +545,7 @@ private Set getFilesInUse() { schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id))); SnapshotManager snapshotManager = snapshotManager(); + ChangelogManager changelogManager = changelogManager(); Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { @@ -548,7 +555,7 @@ private Set getFilesInUse() { long firstInUseSnapshotId = Snapshot.FIRST_SNAPSHOT_ID; for (long id = latestSnapshotId - 1; id >= Snapshot.FIRST_SNAPSHOT_ID; id--) { if (!snapshotManager.snapshotExists(id) - && !snapshotManager.longLivedChangelogExists(id)) { + && !changelogManager.longLivedChangelogExists(id)) { firstInUseSnapshotId = id + 1; break; } @@ -565,6 +572,7 @@ public Set getFilesInUse(long snapshotId) { return getFilesInUse( snapshotId, snapshotManager(), + changelogManager(), fileIO, pathFactory(), manifestListFactory().create(), @@ -574,6 +582,7 @@ public Set getFilesInUse(long snapshotId) { public static Set getFilesInUse( long snapshotId, SnapshotManager snapshotManager, + ChangelogManager changelogManager, FileIO fileIO, FileStorePathFactory pathFactory, ManifestList manifestList, @@ -585,16 +594,18 @@ public static Set getFilesInUse( getSnapshotFileInUse( snapshotId, snapshotManager, + changelogManager, fileIO, pathFactory, manifestList, manifestFile); result.addAll(files); - } else if (snapshotManager.longLivedChangelogExists(snapshotId)) { + } else if (changelogManager.longLivedChangelogExists(snapshotId)) { Set files = getChangelogFileInUse( snapshotId, snapshotManager, + changelogManager, fileIO, pathFactory, manifestList, @@ -611,6 +622,7 @@ public static Set getFilesInUse( private static Set getSnapshotFileInUse( long snapshotId, SnapshotManager snapshotManager, + ChangelogManager changelogManager, FileIO fileIO, FileStorePathFactory pathFactory, ManifestList manifestList, @@ -621,7 +633,7 @@ private static Set getSnapshotFileInUse( boolean produceChangelog = options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; // The option from the table may not align with the expiration config - boolean changelogDecoupled = snapshotManager.earliestLongLivedChangelogId() != null; + boolean changelogDecoupled = changelogManager.earliestLongLivedChangelogId() != null; Path snapshotPath = snapshotManager.snapshotPath(snapshotId); Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath); @@ -683,6 +695,7 @@ private static Set getSnapshotFileInUse( private static Set getChangelogFileInUse( long changelogId, SnapshotManager snapshotManager, + ChangelogManager changelogManager, FileIO fileIO, FileStorePathFactory pathFactory, ManifestList manifestList, @@ -691,7 +704,7 @@ private static Set getChangelogFileInUse( SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); - Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId); + Path changelogPath = changelogManager.longLivedChangelogPath(changelogId); Changelog changelog = Changelog.fromPath(fileIO, changelogPath); // changelog file diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java index 308a1ea97153..07446faab66d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.data.BinaryRow; @@ -99,8 +100,8 @@ public void test1() { BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); - Long lastSnapshotId = table.snapshotManager().latestSnapshotId(); - dvMaintainer = factory.createOrRestore(lastSnapshotId, BinaryRow.EMPTY_ROW, 0); + Snapshot latestSnapshot = table.snapshotManager().latestSnapshot(); + dvMaintainer = factory.createOrRestore(latestSnapshot, BinaryRow.EMPTY_ROW, 0); DeletionVector deletionVector2 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector2.isDeleted(1)).isTrue(); assertThat(deletionVector2.isDeleted(2)).isFalse(); @@ -120,8 +121,8 @@ public void test1() { commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); - lastSnapshotId = table.snapshotManager().latestSnapshotId(); - dvMaintainer = factory.createOrRestore(lastSnapshotId, BinaryRow.EMPTY_ROW, 0); + latestSnapshot = table.snapshotManager().latestSnapshot(); + dvMaintainer = factory.createOrRestore(latestSnapshot, BinaryRow.EMPTY_ROW, 0); DeletionVector deletionVector3 = dvMaintainer.deletionVectorOf("f1").get(); assertThat(deletionVector3.isDeleted(1)).isTrue(); assertThat(deletionVector3.isDeleted(2)).isTrue(); diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index f29eb55113f5..934f48e06756 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.iceberg; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; @@ -213,7 +214,7 @@ public void testRetryCreateMetadata() throws Exception { write.compact(BinaryRow.EMPTY_ROW, 0, true); List commitMessages2 = write.prepareCommit(true, 2); commit.commit(2, commitMessages2); - assertThat(table.latestSnapshotId()).hasValue(3L); + assertThat(table.latestSnapshot()).isPresent().map(Snapshot::id).hasValue(3L); IcebergPathFactory pathFactory = new IcebergPathFactory(new Path(table.location(), "metadata")); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index ed1489ca7eb3..cdc71dcf8fe4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -68,6 +69,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; +import static org.apache.paimon.utils.HintFileUtils.EARLIEST; import static org.assertj.core.api.Assertions.assertThat; /** Base test class for {@link ExpireSnapshotsImpl}. */ @@ -79,12 +81,14 @@ public class ExpireSnapshotsTest { @TempDir java.nio.file.Path tempExternalPath; protected TestFileStore store; protected SnapshotManager snapshotManager; + protected ChangelogManager changelogManager; @BeforeEach public void beforeEach() throws Exception { gen = new TestKeyValueGenerator(); store = createStore(); snapshotManager = store.snapshotManager(); + changelogManager = store.changelogManager(); SchemaManager schemaManager = new SchemaManager(fileIO, new Path(tempDir.toUri())); schemaManager.createTable( new Schema( @@ -494,7 +498,7 @@ public void testExpireWithNumber() throws Exception { // validate earliest hint file Path snapshotDir = snapshotManager.snapshotDirectory(); - Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST); + Path earliest = new Path(snapshotDir, EARLIEST); assertThat(fileIO.exists(earliest)).isTrue(); @@ -603,9 +607,9 @@ public void testChangelogOutLivedSnapshot() throws Exception { int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); int earliestSnapshotId = snapshotManager.earliestSnapshotId().intValue(); - int latestLongLivedChangelogId = snapshotManager.latestLongLivedChangelogId().intValue(); + int latestLongLivedChangelogId = changelogManager.latestLongLivedChangelogId().intValue(); int earliestLongLivedChangelogId = - snapshotManager.earliestLongLivedChangelogId().intValue(); + changelogManager.earliestLongLivedChangelogId().intValue(); // 2 snapshot in /snapshot assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(1); @@ -618,9 +622,9 @@ public void testChangelogOutLivedSnapshot() throws Exception { assertThat(snapshotManager.latestSnapshotId().intValue()).isEqualTo(latestSnapshotId); assertThat(snapshotManager.earliestSnapshotId().intValue()).isEqualTo(earliestSnapshotId); - assertThat(snapshotManager.latestLongLivedChangelogId()) + assertThat(changelogManager.latestLongLivedChangelogId()) .isEqualTo(snapshotManager.earliestSnapshotId() - 1); - assertThat(snapshotManager.earliestLongLivedChangelogId()) + assertThat(changelogManager.earliestLongLivedChangelogId()) .isEqualTo(snapshotManager.earliestSnapshotId() - 1); store.assertCleaned(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index ffad8358e870..b0b506b9304e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -41,6 +41,7 @@ import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -647,6 +648,7 @@ public void testExpireWithDeletingTags() throws Exception { TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2); tagManager = new TagManager(fileIO, store.options().path()); SnapshotManager snapshotManager = store.snapshotManager(); + ChangelogManager changelogManager = store.changelogManager(); TestKeyValueGenerator gen = new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); BinaryRow partition = gen.getPartition(gen.next()); @@ -674,7 +676,8 @@ public void testExpireWithDeletingTags() throws Exception { // action: expire snapshot 1 -> delete tag1 -> expire snapshot 2 // result: exist A & B (because of tag2) ExpireSnapshots expireSnapshots = - new ExpireSnapshotsImpl(snapshotManager, store.newSnapshotDeletion(), tagManager); + new ExpireSnapshotsImpl( + snapshotManager, changelogManager, store.newSnapshotDeletion(), tagManager); expireSnapshots .config( ExpireConfig.builder() diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 9e4ba30eb878..b40dc32d5368 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -85,6 +85,7 @@ import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; +import static org.apache.paimon.utils.HintFileUtils.LATEST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -165,7 +166,7 @@ public void testLatestHint() throws Exception { testRandomConcurrentNoConflict(1, false, CoreOptions.ChangelogProducer.NONE); SnapshotManager snapshotManager = createStore(false, 1).snapshotManager(); Path snapshotDir = snapshotManager.snapshotDirectory(); - Path latest = new Path(snapshotDir, SnapshotManager.LATEST); + Path latest = new Path(snapshotDir, LATEST); assertThat(new LocalFileIO().exists(latest)).isTrue(); @@ -740,7 +741,7 @@ public void testIndexFiles() throws Exception { // assert part1 List part1Index = - indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part1); + indexFileHandler.scanEntries(snapshot, HASH_INDEX, part1); assertThat(part1Index.size()).isEqualTo(2); IndexManifestEntry indexManifestEntry = @@ -755,7 +756,7 @@ public void testIndexFiles() throws Exception { // assert part2 List part2Index = - indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part2); + indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2); assertThat(part2Index.size()).isEqualTo(1); assertThat(part2Index.get(0).bucket()).isEqualTo(2); assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile())) @@ -767,7 +768,7 @@ public void testIndexFiles() throws Exception { snapshot = store.snapshotManager().latestSnapshot(); // assert update part1 - part1Index = indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part1); + part1Index = indexFileHandler.scanEntries(snapshot, HASH_INDEX, part1); assertThat(part1Index.size()).isEqualTo(2); indexManifestEntry = @@ -781,7 +782,7 @@ public void testIndexFiles() throws Exception { .containsExactlyInAnyOrder(6, 8); // assert scan one bucket - Optional file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0); + Optional file = indexFileHandler.scanHashIndex(snapshot, part1, 0); assertThat(file).isPresent(); assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1, 4); @@ -790,9 +791,9 @@ public void testIndexFiles() throws Exception { store.overwriteData( Collections.singletonList(record1), gen::getPartition, kv -> 0, new HashMap<>()); snapshot = store.snapshotManager().latestSnapshot(); - file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0); + file = indexFileHandler.scanHashIndex(snapshot, part1, 0); assertThat(file).isEmpty(); - file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2); + file = indexFileHandler.scanHashIndex(snapshot, part2, 2); assertThat(file).isPresent(); // overwrite all partitions @@ -800,7 +801,7 @@ public void testIndexFiles() throws Exception { store.overwriteData( Collections.singletonList(record1), gen::getPartition, kv -> 0, new HashMap<>()); snapshot = store.snapshotManager().latestSnapshot(); - file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2); + file = indexFileHandler.scanHashIndex(snapshot, part2, 2); assertThat(file).isEmpty(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 775f8a4d37ad..e56cb58222e0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -368,7 +368,7 @@ private void validate( // validate changelog if (table.coreOptions().changelogProducer() == CoreOptions.ChangelogProducer.INPUT) { List changelogs = new ArrayList<>(); - table.snapshotManager().changelogs().forEachRemaining(changelogs::add); + table.changelogManager().changelogs().forEachRemaining(changelogs::add); validateChangelog( changelogs.stream() .sorted(Comparator.comparingLong(Changelog::id)) diff --git a/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java index 86dbb02cdd6b..b162b5814209 100644 --- a/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java @@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; -import java.util.OptionalLong; +import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -35,6 +35,7 @@ /** Tests for {@link PrivilegedCatalog}. */ public class PrivilegedCatalogTest extends FileSystemCatalogTest { + private static final String PASSWORD_ROOT = "123456"; private static final String USERNAME_TEST_USER = "test_user"; private static final String PASSWORD_TEST_USER = "test_password"; @@ -61,7 +62,7 @@ public void testGetTable() throws Exception { FileStoreTable dataTable = (FileStoreTable) userCatalog.getTable(identifier); assertNoPrivilege(dataTable::snapshotManager); - assertNoPrivilege(dataTable::latestSnapshotId); + assertNoPrivilege(dataTable::latestSnapshot); assertNoPrivilege(() -> dataTable.snapshot(0)); rootCatalog.grantPrivilegeOnTable(USERNAME_TEST_USER, identifier, PrivilegeType.SELECT); @@ -69,7 +70,7 @@ public void testGetTable() throws Exception { FileStoreTable dataTable2 = (FileStoreTable) userCatalog.getTable(identifier); assertThat(dataTable2.snapshotManager().latestSnapshotId()).isNull(); - assertThat(dataTable2.latestSnapshotId()).isEqualTo(OptionalLong.empty()); + assertThat(dataTable2.latestSnapshot()).isEqualTo(Optional.empty()); assertThatThrownBy(() -> dataTable2.snapshot(0)).isNotNull(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index bcf13d49222b..78ecbb4d11ca 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -119,6 +119,8 @@ import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.SnapshotTest.newSnapshotManager; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; +import static org.apache.paimon.utils.HintFileUtils.EARLIEST; +import static org.apache.paimon.utils.HintFileUtils.LATEST; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; @@ -1475,6 +1477,7 @@ public void testAsyncExpireExecutionMode() throws Exception { TestFileStore.getFilesInUse( latestSnapshotId, snapshotManager, + table.changelogManager(), table.fileIO(), store.pathFactory(), store.manifestListFactory().create(), @@ -1485,8 +1488,8 @@ public void testAsyncExpireExecutionMode() throws Exception { .filter(Files::isRegularFile) .filter(p -> !p.getFileName().toString().startsWith("snapshot")) .filter(p -> !p.getFileName().toString().startsWith("schema")) - .filter(p -> !p.getFileName().toString().equals(SnapshotManager.LATEST)) - .filter(p -> !p.getFileName().toString().equals(SnapshotManager.EARLIEST)) + .filter(p -> !p.getFileName().toString().equals(LATEST)) + .filter(p -> !p.getFileName().toString().equals(EARLIEST)) .map(p -> new Path(TraceableFileIO.SCHEME + "://" + p.toString())) .filter(p -> !filesInUse.contains(p)) .collect(Collectors.toList()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java index 46093f2cba0d..db00fdf35a34 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -42,6 +43,7 @@ public class ContinuousFromTimestampStartingScannerTest extends ScannerTestBase @Test public void testScan() throws Exception { SnapshotManager snapshotManager = table.snapshotManager(); + ChangelogManager changelogManager = table.changelogManager(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); @@ -67,7 +69,8 @@ public void testScan() throws Exception { long timestamp = snapshotManager.snapshot(3).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); + new ContinuousFromTimestampStartingScanner( + snapshotManager, changelogManager, timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); @@ -81,7 +84,10 @@ public void testNoSnapshot() { SnapshotManager snapshotManager = table.snapshotManager(); ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, System.currentTimeMillis(), false); + snapshotManager, + table.changelogManager(), + System.currentTimeMillis(), + false); assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class); } @@ -101,7 +107,8 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { long timestamp = snapshotManager.snapshot(1).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); + new ContinuousFromTimestampStartingScanner( + snapshotManager, table.changelogManager(), timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); // next snapshot @@ -131,6 +138,7 @@ public void testScanFromChangelog(String changelogProducer) throws Exception { + "/" + UUID.randomUUID())); SnapshotManager snapshotManager = table.snapshotManager(); + ChangelogManager changelogManager = table.changelogManager(); StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); @@ -154,25 +162,34 @@ public void testScanFromChangelog(String changelogProducer) throws Exception { commit.commit(2, write.prepareCommit(true, 2)); assertThat(snapshotManager.latestSnapshotId()).isEqualTo(3); - assertThat(snapshotManager.earliestLongLivedChangelogId()).isEqualTo(1); + assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(1); assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(2); ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(3).timeMillis(), true); + snapshotManager, + table.changelogManager(), + snapshotManager.snapshot(3).timeMillis(), + true); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(2).timeMillis(), true); + snapshotManager, + table.changelogManager(), + snapshotManager.snapshot(2).timeMillis(), + true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(2); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.changelog(1).timeMillis(), true); + snapshotManager, + changelogManager, + changelogManager.changelog(1).timeMillis(), + true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 3628a1ea5790..e1fcd92547f5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingScanner; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -41,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.apache.paimon.SnapshotTest.newChangelogManager; import static org.apache.paimon.SnapshotTest.newSnapshotManager; import static org.apache.paimon.utils.FileSystemBranchManager.DEFAULT_MAIN_BRANCH; import static org.assertj.core.api.Assertions.assertThat; @@ -127,7 +129,9 @@ public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOExceptio // pick a random time equal to one of the snapshots time = millis.get(random.nextInt(numSnapshots)); } - Long actual = snapshotManager.earlierThanTimeMills(time, false); + Long actual = + ContinuousFromTimestampStartingScanner.earlierThanTimeMills( + snapshotManager, null, time, false); if (millis.get(numSnapshots - 1) < time) { if (isRaceCondition && millis.size() == 1) { @@ -452,11 +456,13 @@ public void testLongLivedChangelog() throws Exception { FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = newSnapshotManager(localFileIO, new Path(tempDir.toString())); + ChangelogManager changelogManager = + newChangelogManager(localFileIO, new Path(tempDir.toString())); long millis = 1L; for (long i = 1; i <= 5; i++) { Changelog changelog = createChangelogWithMillis(i, millis + i * 1000); localFileIO.tryToWriteAtomic( - snapshotManager.longLivedChangelogPath(i), changelog.toJson()); + changelogManager.longLivedChangelogPath(i), changelog.toJson()); } for (long i = 6; i <= 10; i++) { @@ -464,19 +470,18 @@ public void testLongLivedChangelog() throws Exception { localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } - Assertions.assertThat(snapshotManager.earliestLongLivedChangelogId()).isEqualTo(1); - Assertions.assertThat(snapshotManager.latestChangelogId()).isEqualTo(10); - Assertions.assertThat(snapshotManager.latestLongLivedChangelogId()).isEqualTo(5); + Assertions.assertThat(changelogManager.earliestLongLivedChangelogId()).isEqualTo(1); + Assertions.assertThat(changelogManager.latestLongLivedChangelogId()).isEqualTo(5); Assertions.assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(6); Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(10); - Assertions.assertThat(snapshotManager.changelog(1)).isNotNull(); + Assertions.assertThat(changelogManager.changelog(1)).isNotNull(); } @Test public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException { FileIO localFileIO = LocalFileIO.create(); - SnapshotManager snapshotManager = - newSnapshotManager(localFileIO, new Path(tempDir.toString())); + ChangelogManager snapshotManager = + newChangelogManager(localFileIO, new Path(tempDir.toString())); long id = 1L; Changelog changelog = createChangelogWithMillis(id, 1L); snapshotManager.commitChangelog(changelog, id); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index d82b54470b59..876c51244cae 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -1545,7 +1545,7 @@ private void alterTableStatisticsInternal( Table table = catalog.getTable(toIdentifier(tablePath)); checkArgument( table instanceof FileStoreTable, "Now only support analyze FileStoreTable."); - if (!table.latestSnapshotId().isPresent()) { + if (!table.latestSnapshot().isPresent()) { LOG.info("Skipping analyze table because the snapshot is null."); return; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java index 72041811d6f4..684de7a3d2dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java @@ -28,6 +28,7 @@ import org.apache.paimon.table.source.snapshot.FullStartingScanner; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.source.snapshot.StartingScanner; +import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; @@ -53,6 +54,7 @@ public LookupDataTableScan( CoreOptions options, SnapshotReader snapshotReader, SnapshotManager snapshotManager, + ChangelogManager changelogManager, boolean supportStreamingReadOverwrite, DefaultValueAssigner defaultValueAssigner, LookupStreamScanMode lookupScanMode) { @@ -60,6 +62,7 @@ public LookupDataTableScan( options, snapshotReader, snapshotManager, + changelogManager, supportStreamingReadOverwrite, defaultValueAssigner); this.startupMode = options.startupMode(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java index 090399706330..386a9e5f4c76 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java @@ -78,6 +78,7 @@ public StreamDataTableScan newStreamScan() { wrapped.coreOptions(), wrapped.newSnapshotReader(), wrapped.snapshotManager(), + wrapped.changelogManager(), wrapped.supportStreamingReadOverwrite(), DefaultValueAssigner.create(wrapped.schema()), lookupScanMode); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 95be75c19aea..478a44c8860b 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -248,7 +248,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]): Seq[CommitMessage] = { val sparkSession = deletionVectors.sparkSession import sparkSession.implicits._ - val snapshotId = table.snapshotManager().latestSnapshotId(); + val snapshot = table.snapshotManager().latestSnapshot() val serializedCommits = deletionVectors .groupByKey(_.partitionAndBucket) .mapGroups { @@ -260,11 +260,11 @@ case class PaimonSparkWriter(table: FileStoreTable) { if (dvIndexFileMaintainer == null) { val partition = SerializationUtils.deserializeBinaryRow(sdv.partition) dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) { - AppendDeletionFileMaintainer.forUnawareAppend(indexHandler, snapshotId, partition) + AppendDeletionFileMaintainer.forUnawareAppend(indexHandler, snapshot, partition) } else { AppendDeletionFileMaintainer.forBucketedAppend( indexHandler, - snapshotId, + snapshot, partition, sdv.bucket) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index ec5526f20e1d..1c2038d227a1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -685,7 +685,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe partitions.flatMap { partition => dvMaintainerFactory - .createOrRestore(table.snapshotManager().latestSnapshotId(), partition) + .createOrRestore(table.snapshotManager().latestSnapshot(), partition) .deletionVectors() .asScala }.toMap