Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -176,6 +177,11 @@ public SnapshotManager snapshotManager() {
snapshotCache);
}

@Override
public ChangelogManager changelogManager() {
return new ChangelogManager(fileIO, options.path(), options.branch());
}

@Override
public ManifestFile.Factory manifestFileFactory() {
return manifestFileFactory(false);
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand All @@ -61,6 +62,8 @@ public interface FileStore<T> {

SnapshotManager snapshotManager();

ChangelogManager changelogManager();

RowType partitionType();

CoreOptions options();
Expand Down
5 changes: 4 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -61,7 +62,9 @@
*/
@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class Snapshot {
public class Snapshot implements Serializable {

private static final long serialVersionUID = 1L;

public static final long FIRST_SNAPSHOT_ID = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public CommitMessage doCompact(FileStoreTable table, AppendOnlyFileStoreWrite wr
UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer =
AppendDeletionFileMaintainer.forUnawareAppend(
table.store().newIndexFileHandler(),
table.snapshotManager().latestSnapshotId(),
table.snapshotManager().latestSnapshot(),
partition);
compactAfter.addAll(
write.compactRewrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,7 @@ private UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow partition) {
synchronized (this) {
maintainer =
AppendDeletionFileMaintainer.forUnawareAppend(
indexFileHandler,
snapshotManager.latestSnapshotId(),
partition);
indexFileHandler, snapshotManager.latestSnapshot(), partition);
}
cache.put(partition, maintainer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.deletionvectors;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
Expand Down Expand Up @@ -152,23 +153,23 @@ public Factory(IndexFileHandler handler) {
}

public DeletionVectorsMaintainer createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
List<IndexFileMeta> indexFiles =
snapshotId == null
snapshot == null
? Collections.emptyList()
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket);
: handler.scan(snapshot, DELETION_VECTORS_INDEX, partition, bucket);
Map<String, DeletionVector> deletionVectors =
new HashMap<>(handler.readAllDeletionVectors(indexFiles));
return createOrRestore(deletionVectors);
}

@VisibleForTesting
public DeletionVectorsMaintainer createOrRestore(
@Nullable Long snapshotId, BinaryRow partition) {
@Nullable Snapshot snapshot, BinaryRow partition) {
List<IndexFileMeta> indexFiles =
snapshotId == null
snapshot == null
? Collections.emptyList()
: handler.scanEntries(snapshotId, DELETION_VECTORS_INDEX, partition)
: handler.scanEntries(snapshot, DELETION_VECTORS_INDEX, partition)
.stream()
.map(IndexManifestEntry::indexFile)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.deletionvectors.append;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
Expand Down Expand Up @@ -53,21 +54,21 @@ public interface AppendDeletionFileMaintainer {

static BucketedAppendDeletionFileMaintainer forBucketedAppend(
IndexFileHandler indexFileHandler,
@Nullable Long snapshotId,
@Nullable Snapshot snapshot,
BinaryRow partition,
int bucket) {
// bucket should have only one deletion file, so here we should read old deletion vectors,
// overwrite the entire deletion file of the bucket when writing deletes.
DeletionVectorsMaintainer maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
.createOrRestore(snapshot, partition, bucket);
return new BucketedAppendDeletionFileMaintainer(partition, bucket, maintainer);
}

static UnawareAppendDeletionFileMaintainer forUnawareAppend(
IndexFileHandler indexFileHandler, @Nullable Long snapshotId, BinaryRow partition) {
IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot, BinaryRow partition) {
Map<String, DeletionFile> deletionFiles =
indexFileHandler.scanDVIndex(snapshotId, partition, UNAWARE_BUCKET);
indexFileHandler.scanDVIndex(snapshot, partition, UNAWARE_BUCKET);
return new UnawareAppendDeletionFileMaintainer(indexFileHandler, partition, deletionFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

/** migrate iceberg table to paimon table. */
public class IcebergMigrator implements Migrator {

private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class);

private final ThreadPoolExecutor executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.index;

import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
Expand All @@ -44,14 +45,14 @@ public class HashIndexMaintainer implements IndexMaintainer<KeyValue> {

private HashIndexMaintainer(
IndexFileHandler fileHandler,
@Nullable Long snapshotId,
@Nullable Snapshot snapshot,
BinaryRow partition,
int bucket) {
this.fileHandler = fileHandler;
IntHashSet hashcode = new IntHashSet();
if (snapshotId != null) {
if (snapshot != null) {
Optional<IndexFileMeta> indexFile =
fileHandler.scanHashIndex(snapshotId, partition, bucket);
fileHandler.scanHashIndex(snapshot, partition, bucket);
if (indexFile.isPresent()) {
IndexFileMeta file = indexFile.get();
hashcode = new IntHashSet((int) file.rowCount());
Expand Down Expand Up @@ -115,8 +116,8 @@ public Factory(IndexFileHandler handler) {

@Override
public IndexMaintainer<KeyValue> createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
return new HashIndexMaintainer(handler, snapshotId, partition, bucket);
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
return new HashIndexMaintainer(handler, snapshot, partition, bucket);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public DeletionVectorsIndexFile deletionVectorsIndex() {
return this.deletionVectorsIndex;
}

public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, bucket);
public Optional<IndexFileMeta> scanHashIndex(
Snapshot snapshot, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = scan(snapshot, HASH_INDEX, partition, bucket);
if (result.size() > 1) {
throw new IllegalArgumentException(
"Find multiple hash index files for one bucket: " + result);
Expand All @@ -85,11 +86,10 @@ public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partitio
}

public Map<String, DeletionFile> scanDVIndex(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
if (snapshotId == null) {
@Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
if (snapshot == null) {
return Collections.emptyMap();
}
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyMap();
Expand Down Expand Up @@ -136,9 +136,9 @@ public List<IndexManifestEntry> scan(String indexType) {
}

public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int bucket) {
Snapshot snapshot, String indexType, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = new ArrayList<>();
for (IndexManifestEntry file : scanEntries(snapshotId, indexType, partition)) {
for (IndexManifestEntry file : scanEntries(snapshot, indexType, partition)) {
if (file.bucket() == bucket) {
result.add(file.indexFile());
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public List<IndexManifestEntry> scanEntries() {
}

public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partition) {
Long snapshot = snapshotManager.latestSnapshotId();
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return Collections.emptyList();
}
Expand All @@ -180,13 +180,8 @@ public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow partitio
}

public List<IndexManifestEntry> scanEntries(
long snapshotId, String indexType, BinaryRow partition) {
return scanEntries(snapshotId, indexType, Collections.singleton(partition));
}

public List<IndexManifestEntry> scanEntries(
long snapshot, String indexType, Set<BinaryRow> partitions) {
return scanEntries(snapshotManager.snapshot(snapshot), indexType, partitions);
Snapshot snapshot, String indexType, BinaryRow partition) {
return scanEntries(snapshot, indexType, Collections.singleton(partition));
}

public List<IndexManifestEntry> scanEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.index;

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;

import javax.annotation.Nullable;
Expand All @@ -34,6 +35,6 @@ public interface IndexMaintainer<T> {
/** Factory to restore {@link IndexMaintainer}. */
interface Factory<T> {
IndexMaintainer<T> createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket);
@Nullable Snapshot snapshot, BinaryRow partition, int bucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ public void restore(List<State<T>> states) {
for (State<T> state : states) {
RecordWriter<T> writer =
createWriter(
state.baseSnapshotId,
state.partition,
state.bucket,
state.dataFiles,
Expand Down Expand Up @@ -426,24 +425,23 @@ public WriterContainer<T> createWriterContainer(
}
}

Long latestSnapshotId = snapshotManager.latestSnapshotId();
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
List<DataFileMeta> restoreFiles = new ArrayList<>();
if (!ignorePreviousFiles && latestSnapshotId != null) {
restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket);
if (!ignorePreviousFiles && latestSnapshot != null) {
restoreFiles = scanExistingFileMetas(latestSnapshot, partition, bucket);
}
IndexMaintainer<T> indexMaintainer =
indexFactory == null
? null
: indexFactory.createOrRestore(
ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
ignorePreviousFiles ? null : latestSnapshot, partition, bucket);
DeletionVectorsMaintainer deletionVectorsMaintainer =
dvMaintainerFactory == null
? null
: dvMaintainerFactory.createOrRestore(
ignorePreviousFiles ? null : latestSnapshotId, partition, bucket);
ignorePreviousFiles ? null : latestSnapshot, partition, bucket);
RecordWriter<T> writer =
createWriter(
latestSnapshotId,
partition.copy(),
bucket,
restoreFiles,
Expand All @@ -454,7 +452,10 @@ public WriterContainer<T> createWriterContainer(
writer.withInsertOnly(isInsertOnly);
notifyNewWriter(writer);
return new WriterContainer<>(
writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
writer,
indexMaintainer,
deletionVectorsMaintainer,
latestSnapshot == null ? null : latestSnapshot.id());
}

@Override
Expand All @@ -469,10 +470,10 @@ public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
}

private List<DataFileMeta> scanExistingFileMetas(
long snapshotId, BinaryRow partition, int bucket) {
Snapshot snapshot, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
List<ManifestEntry> files =
scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files();
scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files();
for (ManifestEntry entry : files) {
if (entry.totalBuckets() != totalBuckets) {
String partInfo =
Expand Down Expand Up @@ -513,7 +514,6 @@ public ExecutorService getCompactExecutor() {
protected void notifyNewWriter(RecordWriter<T> writer) {}

protected abstract RecordWriter<T> createWriter(
@Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public AppendOnlyFileStoreWrite(

@Override
protected RecordWriter<InternalRow> createWriter(
@Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoredFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public KeyValueFileStoreWrite(

@Override
protected MergeTreeWriter createWriter(
@Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
Expand Down
Loading