Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dep.lucene.version>9.12.0</dep.lucene.version>
<dep.assertj-core.version>3.8.0</dep.assertj-core.version>
<dep.parquet.version>1.16.0</dep.parquet.version>
<dep.iceberg.version>1.10.0</dep.iceberg.version>
<dep.iceberg.version>1.10.1</dep.iceberg.version>
<dep.asm.version>9.7.1</dep.asm.version>
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,6 +31,9 @@ public class CommitTaskData
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;
private final FileContent content;
private final OptionalLong contentOffset;
private final OptionalLong contentSizeInBytes;
private final OptionalLong recordCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -40,7 +44,10 @@ public CommitTaskData(
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile,
@JsonProperty("content") FileContent content)
@JsonProperty("content") FileContent content,
@JsonProperty("contentOffset") OptionalLong contentOffset,
@JsonProperty("contentSizeInBytes") OptionalLong contentSizeInBytes,
@JsonProperty("recordCount") OptionalLong recordCount)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
Expand All @@ -50,6 +57,24 @@ public CommitTaskData(
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
this.content = requireNonNull(content, "content is null");
this.contentOffset = contentOffset != null ? contentOffset : OptionalLong.empty();
this.contentSizeInBytes = contentSizeInBytes != null ? contentSizeInBytes : OptionalLong.empty();
this.recordCount = recordCount != null ? recordCount : OptionalLong.empty();
}

public CommitTaskData(
String path,
long fileSizeInBytes,
MetricsWrapper metrics,
int partitionSpecId,
Optional<String> partitionDataJson,
FileFormat fileFormat,
String referencedDataFile,
FileContent content)
{
this(path, fileSizeInBytes, metrics, partitionSpecId, partitionDataJson,
fileFormat, referencedDataFile, content,
OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty());
}

@JsonProperty
Expand Down Expand Up @@ -99,4 +124,22 @@ public FileContent getContent()
{
return content;
}

@JsonProperty
public OptionalLong getContentOffset()
{
return contentOffset;
}

@JsonProperty
public OptionalLong getContentSizeInBytes()
{
return contentSizeInBytes;
}

@JsonProperty
public OptionalLong getRecordCount()
{
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return toIntExact(((Long) marker.getValue()));
}

if (type instanceof TimestampType || type instanceof TimeType) {
if (type instanceof TimestampType) {
TimestampType tsType = (TimestampType) type;
long value = (Long) marker.getValue();
if (tsType.getPrecision() == MILLISECONDS) {
return MILLISECONDS.toMicros(value);
}
return value;
}

if (type instanceof TimeType) {
return MILLISECONDS.toMicros((Long) marker.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public enum FileFormat
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
METADATA("metadata.json", false);
METADATA("metadata.json", false),
PUFFIN("puffin", false),
DWRF("dwrf", true);

private final String ext;
private final boolean splittable;
Expand Down Expand Up @@ -61,6 +63,9 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for
case METADATA:
prestoFileFormat = METADATA;
break;
case PUFFIN:
prestoFileFormat = PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,9 @@ protected static void validateTableForPresto(BaseTable table, Optional<Long> tab
schema = metadata.schema();
}

// Reject schema default values (initial-default / write-default)
for (Types.NestedField field : schema.columns()) {
if (field.initialDefault() != null || field.writeDefault() != null) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
}
}
// Iceberg v3 column default values (initial-default / write-default) are supported.
// The Iceberg library handles applying defaults when reading files that were written
// before a column with a default was added via schema evolution.

// Reject Iceberg table encryption
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
Expand Down Expand Up @@ -1524,8 +1521,23 @@ public Optional<ConnectorOutputMetadata> finishDeleteWithOutput(ConnectorSession
.ofPositionDeletes()
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(FileFormat.fromString(task.getFileFormat().name()))
.withMetrics(task.getMetrics().metrics());
.withFormat(FileFormat.fromString(task.getFileFormat().name()));

if (task.getFileFormat() == com.facebook.presto.iceberg.FileFormat.PUFFIN) {
builder.withRecordCount(task.getRecordCount().orElseThrow(() ->
new VerifyException("recordCount required for deletion vector")));
builder.withContentOffset(task.getContentOffset().orElseThrow(() ->
new VerifyException("contentOffset required for deletion vector")));
builder.withContentSizeInBytes(task.getContentSizeInBytes().orElseThrow(() ->
new VerifyException("contentSizeInBytes required for deletion vector")));
}
else {
builder.withMetrics(task.getMetrics().metrics());
}

if (task.getReferencedDataFile().isPresent()) {
builder.withReferencedDataFile(task.getReferencedDataFile().get());
}

if (!spec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RewriteDataFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteDeleteFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteManifestsProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure;
Expand Down Expand Up @@ -195,6 +196,7 @@ protected void setup(Binder binder)
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDeleteFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteManifestsProcedure.class).in(Scopes.SINGLETON);

// for orc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
import com.facebook.presto.iceberg.function.VariantFunctions;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
Expand Down Expand Up @@ -256,6 +257,7 @@ public Set<Class<?>> getSystemFunctions()
.add(ApplyChangelogFunction.class)
.add(IcebergBucketFunction.class)
.add(IcebergBucketFunction.Bucket.class)
.add(VariantFunctions.class)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum IcebergErrorCode
ICEBERG_INVALID_MATERIALIZED_VIEW(18, EXTERNAL),
ICEBERG_INVALID_SPEC_ID(19, EXTERNAL),
ICEBERG_TRANSACTION_CONFLICT_ERROR(20, EXTERNAL),
ICEBERG_WRITER_CLOSE_ERROR(21, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.iceberg.delete.DeleteFilter;
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
import com.facebook.presto.iceberg.delete.IcebergDeletionVectorPageSink;
import com.facebook.presto.iceberg.delete.PositionDeleteFilter;
import com.facebook.presto.iceberg.delete.RowPredicate;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
Expand All @@ -70,6 +71,7 @@
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.reader.ParquetReader;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
Expand Down Expand Up @@ -863,17 +865,33 @@ else if (subColumn.getId() == MERGE_PARTITION_DATA.getId()) {
verify(storageProperties.isPresent(), "storageProperties are null");

LocationProvider locationProvider = getLocationProvider(table.getSchemaTableName(), outputPath.get(), storageProperties.get());
Supplier<IcebergDeletePageSink> deleteSinkSupplier = () -> new IcebergDeletePageSink(
partitionSpec,
split.getPartitionDataJson(),
locationProvider,
fileWriterFactory,
hdfsEnvironment,
hdfsContext,
jsonCodec,
session,
split.getPath(),
split.getFileFormat());
int tableFormatVersion = Integer.parseInt(
storageProperties.get().getOrDefault("format-version", "2"));
Supplier<ConnectorPageSink> deleteSinkSupplier;
if (tableFormatVersion >= 3) {
deleteSinkSupplier = () -> new IcebergDeletionVectorPageSink(
partitionSpec,
split.getPartitionDataJson(),
locationProvider,
hdfsEnvironment,
hdfsContext,
jsonCodec,
session,
split.getPath());
}
else {
deleteSinkSupplier = () -> new IcebergDeletePageSink(
partitionSpec,
split.getPartitionDataJson(),
locationProvider,
fileWriterFactory,
hdfsEnvironment,
hdfsContext,
jsonCodec,
session,
split.getPath(),
split.getFileFormat());
}
boolean storeDeleteFilePath = icebergColumns.contains(DELETE_FILE_PATH_COLUMN_HANDLE);
Supplier<List<DeleteFilter>> deleteFilters = memoize(() -> {
// If equality deletes are optimized into a join they don't need to be applied here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
Expand Down Expand Up @@ -47,7 +46,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -126,13 +124,6 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
PartitionSpec spec = task.spec();
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());

// Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported)
for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) {
if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported");
}
}

// TODO: We should leverage residual expression and convert that to TupleDomain.
// The predicate here is used by readers for predicate push down at reader level,
// so when we do not use residual expression, we are just wasting CPU cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.iceberg.delete.DeleteFilter;
import com.facebook.presto.iceberg.delete.IcebergDeletePageSink;
import com.facebook.presto.iceberg.delete.RowPredicate;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.UpdatablePageSource;
Expand Down Expand Up @@ -71,8 +71,8 @@ public class IcebergUpdateablePageSource
implements UpdatablePageSource
{
private final ConnectorPageSource delegate;
private final Supplier<IcebergDeletePageSink> deleteSinkSupplier;
private IcebergDeletePageSink positionDeleteSink;
private final Supplier<ConnectorPageSink> deleteSinkSupplier;
private ConnectorPageSink positionDeleteSink;
private final Supplier<Optional<RowPredicate>> deletePredicate;
private final Supplier<List<DeleteFilter>> deleteFilters;

Expand Down Expand Up @@ -107,7 +107,7 @@ public IcebergUpdateablePageSource(
ConnectorPageSource delegate,
// represents the columns output by the delegate page source
List<IcebergColumnHandle> delegateColumns,
Supplier<IcebergDeletePageSink> deleteSinkSupplier,
Supplier<ConnectorPageSink> deleteSinkSupplier,
Supplier<Optional<RowPredicate>> deletePredicate,
Supplier<List<DeleteFilter>> deleteFilters,
Supplier<IcebergPageSink> updatedRowPageSinkSupplier,
Expand Down Expand Up @@ -295,7 +295,7 @@ public void updateRows(Page page, List<Integer> columnValueAndRowIdChannels)
public CompletableFuture<Collection<Slice>> finish()
{
return Optional.ofNullable(positionDeleteSink)
.map(IcebergDeletePageSink::finish)
.map(ConnectorPageSink::finish)
.orElseGet(() -> completedFuture(ImmutableList.of()))
.thenCombine(
Optional.ofNullable(updatedRowPageSink).map(IcebergPageSink::finish)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public final class IcebergUtil
{
private static final Logger log = Logger.get(IcebergUtil.class);
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 2;
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 3;
public static final int MAX_SUPPORTED_FORMAT_VERSION = 3;

public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L;
Expand Down Expand Up @@ -779,7 +779,10 @@ public static Domain createDomainFromIcebergPartitionValue(
case TIME:
case TIMESTAMP:
return singleValue(prestoType, MICROSECONDS.toMillis((Long) value));
case TIMESTAMP_NANO:
return singleValue(prestoType, Math.floorDiv((Long) value, 1000L));
case STRING:
case VARIANT:
return singleValue(prestoType, utf8Slice(value.toString()));
case FLOAT:
return singleValue(prestoType, (long) floatToRawIntBits((Float) value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public static Object getValue(JsonNode partitionValue, Type type)
return partitionValue.asInt();
case LONG:
case TIMESTAMP:
case TIMESTAMP_NANO:
case TIME:
return partitionValue.asLong();
case FLOAT:
Expand All @@ -175,6 +176,7 @@ public static Object getValue(JsonNode partitionValue, Type type)
}
return partitionValue.doubleValue();
case STRING:
case VARIANT:
return partitionValue.asText();
case FIXED:
case BINARY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private Object convert(Object value, Type type)
if (value == null) {
return null;
}
if (type instanceof Types.StringType) {
if (type instanceof Types.StringType || type.isVariantType()) {
return value.toString();
}
if (type instanceof Types.BinaryType) {
Expand All @@ -303,6 +303,9 @@ private Object convert(Object value, Type type)
return MICROSECONDS.toMillis((long) value);
}
}
if (type instanceof Types.TimestampNanoType) {
return Math.floorDiv((long) value, 1000L);
}
if (type instanceof Types.TimeType) {
return MICROSECONDS.toMillis((long) value);
}
Expand Down
Loading
Loading