Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,7 @@ public static List<Partition> listPartitionsFromFileSystem(Table table) {
table.newReadBuilder().newScan().listPartitionEntries();
List<Partition> partitions = new ArrayList<>(partitionEntries.size());
for (PartitionEntry entry : partitionEntries) {
partitions.add(
new Partition(
computer.generatePartValues(entry.partition()),
entry.recordCount(),
entry.fileSizeInBytes(),
entry.fileCount(),
entry.lastFileCreationTime()));
partitions.add(entry.toPartition(computer));
}
return partitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

Expand All @@ -49,7 +51,8 @@ public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) {
}

@Override
public boolean commit(Snapshot snapshot, String branch) throws Exception {
public boolean commit(Snapshot snapshot, String branch, List<Partition> statistics)
throws Exception {
Path newSnapshotPath =
snapshotManager.branch().equals(branch)
? snapshotManager.snapshotPath(snapshot.id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
package org.apache.paimon.catalog;

import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;

import java.io.Serializable;
import java.util.List;

/** Interface to commit snapshot atomically. */
public interface SnapshotCommit extends AutoCloseable {

boolean commit(Snapshot snapshot, String branch) throws Exception;
boolean commit(Snapshot snapshot, String branch, List<Partition> statistics) throws Exception;

/** Factory to create {@link SnapshotCommit}. */
interface Factory extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.InternalRowPartitionComputer;

import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -83,6 +85,15 @@ public PartitionEntry merge(PartitionEntry entry) {
Math.max(lastFileCreationTime, entry.lastFileCreationTime));
}

public Partition toPartition(InternalRowPartitionComputer computer) {
return new Partition(
computer.generatePartValues(partition),
recordCount,
fileSizeInBytes,
fileCount,
lastFileCreationTime);
}

public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
return fromDataFile(entry.partition(), entry.kind(), entry.file());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand All @@ -55,6 +57,7 @@
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
Expand All @@ -78,6 +81,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.manifest.ManifestEntry.recordCount;
Expand Down Expand Up @@ -137,6 +141,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final BucketMode bucketMode;
private final long commitTimeout;
private final int commitMaxRetries;
private final InternalRowPartitionComputer partitionComputer;

private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
Expand Down Expand Up @@ -198,6 +203,12 @@ public FileStoreCommitImpl(
this.commitCallbacks = commitCallbacks;
this.commitMaxRetries = commitMaxRetries;
this.commitTimeout = commitTimeout;
this.partitionComputer =
new InternalRowPartitionComputer(
options.partitionDefaultName(),
partitionType,
partitionType.getFieldNames().toArray(new String[0]),
options.legacyPartitionName());

this.ignoreEmptyCommit = true;
this.commitMetrics = null;
Expand Down Expand Up @@ -492,7 +503,7 @@ public void overwrite(
attempts +=
tryCommit(
compactTableFiles,
Collections.emptyList(),
emptyList(),
compactDvIndexFiles,
committable.identifier(),
committable.watermark(),
Expand All @@ -508,9 +519,9 @@ public void overwrite(
if (this.commitMetrics != null) {
reportCommit(
appendTableFiles,
Collections.emptyList(),
emptyList(),
compactTableFiles,
Collections.emptyList(),
emptyList(),
commitDuration,
generatedSnapshot,
attempts);
Expand Down Expand Up @@ -550,23 +561,12 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
}

tryOverwrite(
partitionFilter,
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier,
null,
new HashMap<>());
partitionFilter, emptyList(), emptyList(), commitIdentifier, null, new HashMap<>());
}

@Override
public void truncateTable(long commitIdentifier) {
tryOverwrite(
null,
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier,
null,
new HashMap<>());
tryOverwrite(null, emptyList(), emptyList(), commitIdentifier, null, new HashMap<>());
}

@Override
Expand Down Expand Up @@ -597,9 +597,9 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) {
public void commitStatistics(Statistics stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
emptyList(),
emptyList(),
emptyList(),
commitIdentifier,
null,
Collections.emptyMap(),
Expand Down Expand Up @@ -809,7 +809,7 @@ private int tryOverwrite(

return tryCommit(
changesWithOverwrite,
Collections.emptyList(),
emptyList(),
indexChangesWithOverwrite,
identifier,
watermark,
Expand Down Expand Up @@ -887,6 +887,7 @@ CommitResult tryCommitOnce(
Snapshot newSnapshot;
String baseManifestList = null;
String deltaManifestList = null;
List<PartitionEntry> deltaStatistics = null;
String changelogManifestList = null;
String oldIndexManifest = null;
String indexManifest = null;
Expand Down Expand Up @@ -933,6 +934,7 @@ CommitResult tryCommitOnce(

boolean rewriteIndexManifest = true;
if (retryResult != null) {
deltaStatistics = retryResult.deltaStatistics;
deltaManifestList = retryResult.deltaManifestList;
changelogManifestList = retryResult.changelogManifestList;
if (Objects.equals(oldIndexManifest, retryResult.oldIndexManifest)) {
Expand All @@ -944,6 +946,7 @@ CommitResult tryCommitOnce(
}
} else {
// write new delta files into manifest files
deltaStatistics = new ArrayList<>(PartitionEntry.merge(deltaFiles));
deltaManifestList = manifestList.write(manifestFile.write(deltaFiles));

// write changelog into manifest files
Expand Down Expand Up @@ -1009,7 +1012,7 @@ CommitResult tryCommitOnce(
e);
}

if (commitSnapshotImpl(newSnapshot)) {
if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
Expand All @@ -1031,6 +1034,7 @@ CommitResult tryCommitOnce(
newSnapshotId, commitUser, identifier, commitKind.name(), commitTime));
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, mergeAfterManifests);
return new RetryResult(
deltaStatistics,
deltaManifestList,
changelogManifestList,
oldIndexManifest,
Expand Down Expand Up @@ -1115,7 +1119,7 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la
}

String baseManifestList = manifestList.write(mergeAfterManifests);
String deltaManifestList = manifestList.write(Collections.emptyList());
String deltaManifestList = manifestList.write(emptyList());

// prepare snapshot file
Snapshot newSnapshot =
Expand All @@ -1137,17 +1141,21 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la
latestSnapshot.watermark(),
latestSnapshot.statistics());

if (!commitSnapshotImpl(newSnapshot)) {
if (!commitSnapshotImpl(newSnapshot, emptyList())) {
return new ManifestCompactResult(
baseManifestList, deltaManifestList, mergeBeforeManifests, mergeAfterManifests);
} else {
return new SuccessManifestCompactResult();
}
}

private boolean commitSnapshotImpl(Snapshot newSnapshot) {
private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> deltaStatistics) {
try {
return snapshotCommit.commit(newSnapshot, branchName);
List<Partition> statistics = new ArrayList<>(deltaStatistics.size());
for (PartitionEntry entry : deltaStatistics) {
statistics.add(entry.toPartition(partitionComputer));
}
return snapshotCommit.commit(newSnapshot, branchName, statistics);
} catch (Throwable e) {
// exception when performing the atomic rename,
// we cannot clean up because we can't determine the success
Expand Down Expand Up @@ -1517,6 +1525,7 @@ public boolean isSuccess() {

private class RetryResult implements CommitResult {

private final List<PartitionEntry> deltaStatistics;
private final String deltaManifestList;
private final String changelogManifestList;

Expand All @@ -1527,12 +1536,14 @@ private class RetryResult implements CommitResult {
private final List<SimpleFileEntry> baseDataFiles;

private RetryResult(
List<PartitionEntry> deltaStatistics,
String deltaManifestList,
String changelogManifestList,
String oldIndexManifest,
String newIndexManifest,
Snapshot latestSnapshot,
List<SimpleFileEntry> baseDataFiles) {
this.deltaStatistics = deltaStatistics;
this.deltaManifestList = deltaManifestList;
this.changelogManifestList = changelogManifestList;
this.oldIndexManifest = oldIndexManifest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import java.util.Map;
import java.util.Objects;

/** Entry representing a partition. */
/**
* Statistics of a partition, fields inside may be negative, indicating that some data has been
* removed.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@Public
public class Partition implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ public Optional<Snapshot> loadSnapshot(Identifier identifier) throws TableNotExi
return Optional.of(response.getSnapshot());
}

public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
CommitTableRequest request = new CommitTableRequest(identifier, snapshot);
public boolean commitSnapshot(
Identifier identifier, Snapshot snapshot, List<Partition> statistics) {
CommitTableRequest request = new CommitTableRequest(identifier, snapshot, statistics);
CommitTableResponse response =
client.post(
resourcePaths.commitTable(identifier.getDatabaseName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.utils.SnapshotManager;

import java.util.List;

/** Factory to create {@link SnapshotCommit} for REST Catalog. */
public class RESTSnapshotCommitFactory implements SnapshotCommit.Factory {

Expand All @@ -39,11 +42,11 @@ public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotMana
RESTCatalog catalog = loader.load();
return new SnapshotCommit() {
@Override
public boolean commit(Snapshot snapshot, String branch) {
public boolean commit(Snapshot snapshot, String branch, List<Partition> statistics) {
Identifier newIdentifier =
new Identifier(
identifier.getDatabaseName(), identifier.getTableName(), branch);
return catalog.commitSnapshot(newIdentifier, snapshot);
return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
}

@Override
Expand Down
Loading
Loading