diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java index fd008701c548..4dde6d083894 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java @@ -137,7 +137,7 @@ private static void extractFieldNames(SqlNode node, Set fieldNames) { *

Note: This utility currently supports only top-level fields within the filter expression. * Nested field references are not supported. */ - static Expression convert(@Nullable String filter, Schema schema) { + public static Expression convert(@Nullable String filter, Schema schema) { if (filter == null) { return Expressions.alwaysTrue(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index 4b127fcdef22..c51d77c733c0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -72,7 +72,11 @@ public class ReadUtils { "parquet.read.support.class", "parquet.crypto.factory.class"); - static ParquetReader createReader(FileScanTask task, Table table, Schema schema) { + public static ParquetReader createReader(FileScanTask task, Table table) { + return createReader(task, table, table.schema()); + } + + public static ParquetReader createReader(FileScanTask task, Table table, Schema schema) { String filePath = task.file().path().toString(); InputFile inputFile; try (FileIO io = table.io()) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index b1e8a825601d..d50b58c13cf4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -21,11 +21,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.io.IOException; -import java.time.LocalDateTime; -import java.time.YearMonth; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -36,7 +31,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; @@ -44,7 +38,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; @@ -56,7 +49,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.transforms.Transforms; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +95,6 @@ class DestinationState { final Cache writers; private final List dataFiles = Lists.newArrayList(); @VisibleForTesting final Map writerCounts = Maps.newHashMap(); - private final Map partitionFieldMap = Maps.newHashMap(); private final List exceptions = Lists.newArrayList(); private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning @@ -114,9 +105,6 @@ class DestinationState { this.routingPartitionKey = new PartitionKey(spec, schema); this.wrapper = new InternalRecordWrapper(schema.asStruct()); this.table = table; - for (PartitionField partitionField : spec.fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); - } // build a cache of RecordWriters. // writers will expire after 1 min of idle time. @@ -126,7 +114,6 @@ class DestinationState { .expireAfterAccess(1, TimeUnit.MINUTES) .removalListener( (RemovalNotification removal) -> { - final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey()); final RecordWriter recordWriter = Preconditions.checkStateNotNull(removal.getValue()); try { @@ -142,9 +129,7 @@ class DestinationState { throw rethrow; } openWriters--; - String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap); - dataFiles.add( - SerializableDataFile.from(recordWriter.getDataFile(), partitionPath)); + dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), spec)); }) .build(); } @@ -210,39 +195,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) { } } - /** - * Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a - * {@link DataFile}. - */ - @VisibleForTesting - static String getPartitionDataPath( - String partitionPath, Map partitionFieldMap) { - if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) { - return partitionPath; - } - List resolved = new ArrayList<>(); - for (String partition : Splitter.on('/').splitToList(partitionPath)) { - List nameAndValue = Splitter.on('=').splitToList(partition); - String name = nameAndValue.get(0); - String value = nameAndValue.get(1); - String transformName = - Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString(); - if (Transforms.month().toString().equals(transformName)) { - int month = YearMonth.parse(value).getMonthValue(); - value = String.valueOf(month); - } else if (Transforms.hour().toString().equals(transformName)) { - long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER)); - value = String.valueOf(hour); - } - resolved.add(name + "=" + value); - } - return String.join("/", resolved); - } - - private static final DateTimeFormatter HOUR_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); - private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); - private final Catalog catalog; private final String filePrefix; private final long maxFileSize; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 0060cf0ce85d..cdb9413859b1 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -17,25 +17,40 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.YearMonth; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Equivalence; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -48,12 +63,12 @@ *

NOTE: If you add any new fields here, you need to also update the {@link #equals} and {@link * #hashCode()} methods. * - *

Use {@link #from(DataFile, String)} to create a {@link SerializableDataFile} and {@link + *

Use {@link #from(DataFile, PartitionSpec)} to create a {@link SerializableDataFile} and {@link * #createDataFile(Map)} to reconstruct the original {@link DataFile}. */ @DefaultSchema(AutoValueSchema.class) @AutoValue -abstract class SerializableDataFile { +public abstract class SerializableDataFile { public static Builder builder() { return new AutoValue_SerializableDataFile.Builder(); } @@ -87,7 +102,7 @@ public static Builder builder() { abstract @Nullable Map getUpperBounds(); @AutoValue.Builder - abstract static class Builder { + public abstract static class Builder { abstract Builder setPath(String path); abstract Builder setFileFormat(String fileFormat); @@ -119,14 +134,25 @@ abstract static class Builder { abstract SerializableDataFile build(); } + public static SerializableDataFile from(DataFile f, Map specs) { + return from( + f, + Preconditions.checkStateNotNull( + specs.get(f.specId()), + "Could not create a SerializableDataFile because DataFile is written using a partition spec id '%s' that is not found in the provided specs: %s", + f.specId(), + specs.keySet())); + } + /** * Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link * PartitionKey}. */ - static SerializableDataFile from(DataFile f, String partitionPath) { + public static SerializableDataFile from(DataFile f, PartitionSpec spec) { + String partitionPath = getPartitionDataPath(f.partition(), spec); - return SerializableDataFile.builder() - .setPath(f.path().toString()) + return builder() + .setPath(f.location()) .setFileFormat(f.format().toString()) .setRecordCount(f.recordCount()) .setFileSizeInBytes(f.fileSizeInBytes()) @@ -150,7 +176,7 @@ static SerializableDataFile from(DataFile f, String partitionPath) { * it from Beam-compatible types. */ @SuppressWarnings("nullness") - DataFile createDataFile(Map partitionSpecs) { + public DataFile createDataFile(Map partitionSpecs) { PartitionSpec partitionSpec = checkStateNotNull( partitionSpecs.get(getPartitionSpecId()), @@ -180,6 +206,43 @@ DataFile createDataFile(Map partitionSpecs) { .build(); } + /** + * Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a + * {@link DataFile}. + */ + @VisibleForTesting + static String getPartitionDataPath(StructLike partitionKey, PartitionSpec spec) { + String partitionPath = spec.partitionToPath(partitionKey); + + Map> transforms = + spec.fields().stream() + .collect(Collectors.toMap(PartitionField::name, PartitionField::transform)); + + if (partitionPath.isEmpty() || transforms.isEmpty()) { + return partitionPath; + } + List resolved = new ArrayList<>(); + for (String partition : Splitter.on('/').splitToList(partitionPath)) { + List nameAndValue = Splitter.on('=').splitToList(partition); + String name = nameAndValue.get(0); + String value = nameAndValue.get(1); + String transformName = checkArgumentNotNull(transforms.get(name)).toString(); + if (Transforms.month().toString().equals(transformName)) { + int month = YearMonth.parse(value).getMonthValue(); + value = String.valueOf(month); + } else if (Transforms.hour().toString().equals(transformName)) { + long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER)); + value = String.valueOf(hour); + } + resolved.add(name + "=" + value); + } + return String.join("/", resolved); + } + + private static final DateTimeFormatter HOUR_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC); + // ByteBuddyUtils has trouble converting Map value type ByteBuffer // to byte[] and back to ByteBuffer, so we perform these conversions manually // TODO(https://github.com/apache/beam/issues/32701) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/CommitFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/CommitFiles.java new file mode 100644 index 000000000000..fb61305e826e --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/CommitFiles.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.io.iceberg.SerializableDataFile; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CommitFiles extends DoFn, SnapshotInfo> { + private static final Logger LOG = LoggerFactory.getLogger(CommitFiles.class); + private static final Counter numFilesCommitted = + Metrics.counter(CommitFiles.class, "numFilesCommitted"); + private static final Counter numFilesRemoved = + Metrics.counter(CommitFiles.class, "numFilesRemoved"); + private final String tableIdentifier; + private final IcebergCatalogConfig catalogConfig; + private final TupleTag newFiles; + private final TupleTag oldFiles; + + CommitFiles( + String tableIdentifier, + IcebergCatalogConfig catalogConfig, + TupleTag newFiles, + TupleTag oldFiles) { + this.tableIdentifier = tableIdentifier; + this.catalogConfig = catalogConfig; + this.newFiles = newFiles; + this.oldFiles = oldFiles; + } + + @ProcessElement + public void process(@Element KV element, OutputReceiver output) { + Table table = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)); + + Iterable newSerializedFiles = element.getValue().getAll(newFiles); + Iterable oldSerializedFiles = element.getValue().getAll(oldFiles); + + if (!newSerializedFiles.iterator().hasNext()) { + Preconditions.checkState( + !oldSerializedFiles.iterator().hasNext(), + "No new files were added, so expected no files planned for deletion, " + + "but received %s files planned for deletion.", + Iterables.size(oldSerializedFiles)); + LOG.info(RewriteDataFiles.REWRITE_PREFIX + "Received no rewritten files. Skipping commit."); + return; + } + + RewriteFiles rewriteFiles = table.newRewrite(); + + int numAddFiles = 0; + for (SerializableDataFile newFile : newSerializedFiles) { + DataFile f = newFile.createDataFile(table.specs()); + rewriteFiles.addFile(f); + numAddFiles++; + } + + int numRemoveFiles = 0; + for (SerializableDataFile oldFile : oldSerializedFiles) { + DataFile f = oldFile.createDataFile(table.specs()); + rewriteFiles.deleteFile(f); + numRemoveFiles++; + } + + rewriteFiles.commit(); + Snapshot snapshot = table.currentSnapshot(); + numFilesCommitted.inc(numAddFiles); + numFilesRemoved.inc(numRemoveFiles); + LOG.info( + "Committed rewrite and created new snapshot for table '{}': {}", tableIdentifier, snapshot); + output.output(SnapshotInfo.fromSnapshot(snapshot)); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/FileScanTaskSorter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/FileScanTaskSorter.java new file mode 100644 index 000000000000..f2e9dad9b013 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/FileScanTaskSorter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.iceberg.NullOrder.NULLS_FIRST; +import static org.apache.iceberg.NullOrder.NULLS_LAST; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Conversions; +import org.checkerframework.checker.nullness.qual.Nullable; + +class FileScanTaskSorter { + + /** Simple sorting by table's sort order using min/max values from file statistics */ + static List sortByTableSortOrder( + CloseableIterable tasks, SortOrder sortOrder) { + + List taskList = Lists.newArrayList(tasks); + if (sortOrder.fields().isEmpty()) { + return taskList; + } + + taskList.sort(new SortOrderComparator(sortOrder)); + + return taskList; + } + + /** Simple sorting by a single column's min value */ + static List sortByColumn(CloseableIterable tasks, SortField field) + throws IOException { + + try { + List taskList = Lists.newArrayList(tasks); + taskList.sort( + (task1, task2) -> { + Object min1 = getMinValue(task1, field.sourceId()); + Object min2 = getMinValue(task2, field.sourceId()); + return compareNullableValues(min1, min2, field.nullOrder()); + }); + return taskList; + } finally { + tasks.close(); + } + } + + private static class SortOrderComparator implements Comparator { + private final List sortFields; + + SortOrderComparator(SortOrder sortOrder) { + this.sortFields = sortOrder.fields(); + } + + @Override + public int compare(FileScanTask task1, FileScanTask task2) { + for (SortField sortField : sortFields) { + int fieldId = sortField.sourceId(); + + int comparison; + if (sortField.direction() == SortDirection.ASC) { + comparison = + compareNullableValues( + getMinValue(task1, fieldId), getMinValue(task2, fieldId), sortField.nullOrder()); + + } else { + comparison = + compareNullableValues( + getMaxValue(task2, fieldId), getMaxValue(task1, fieldId), sortField.nullOrder()); + } + + if (comparison != 0) { + return comparison; + } + } + + // if all sort columns are equal, choose the task that starts with an earlier offset + return Long.compare(task1.start(), task2.start()); + } + } + + @SuppressWarnings("unchecked") + private static int compareNullableValues( + @Nullable Object min1, @Nullable Object min2, NullOrder nullOrder) { + if (min1 == null && min2 == null) { + return 0; + } + if (nullOrder == NULLS_FIRST) { + if (min1 == null) { + return -1; + } + if (min2 == null) { + return 1; + } + } else if (nullOrder == NULLS_LAST) { + if (min1 == null) { + return 1; + } + if (min2 == null) { + return -1; + } + } + + if (min1 instanceof Comparable && min2 instanceof Comparable) { + Comparable comparable1 = (Comparable) min1; + return comparable1.compareTo(min2); + } + return 0; + } + + private static @Nullable Object getMinValue(FileScanTask task, int columnId) { + @Nullable ByteBuffer value = task.file().lowerBounds().get(columnId); + if (value == null) { + return null; + } + return Conversions.fromByteBuffer(task.schema().findType(columnId), value); + } + + private static @Nullable Object getMaxValue(FileScanTask task, int columnId) { + @Nullable ByteBuffer value = task.file().upperBounds().get(columnId); + if (value == null) { + return null; + } + return Conversions.fromByteBuffer(task.schema().findType(columnId), value); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/IcebergMaintenance.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/IcebergMaintenance.java new file mode 100644 index 000000000000..204f049cffce --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/IcebergMaintenance.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergMaintenance { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMaintenance.class); + public static final String MAINTENANCE_PREFIX = "[Maintenance]"; + private static final String IMPULSE = "Impulse"; + private final Pipeline pipeline; + private PCollection maintenance; + private final SerializableTable table; + private final String tableIdentifier; + private final IcebergCatalogConfig catalogConfig; + private final String operationId; + + private IcebergMaintenance( + String tableIdentifier, + Map catalogConfig, + @Nullable PipelineOptions pipelineOptions, + @Nullable Pipeline pipeline) { + this.tableIdentifier = tableIdentifier; + this.catalogConfig = IcebergCatalogConfig.builder().setCatalogProperties(catalogConfig).build(); + + if (pipeline == null) { + PipelineOptions options = + pipelineOptions != null ? pipelineOptions : PipelineOptionsFactory.create(); + LOG.info( + MAINTENANCE_PREFIX + " Building a new {} pipeline to run maintenance for table '{}'.", + options.getRunner().getSimpleName(), + tableIdentifier); + pipeline = Pipeline.create(options); + } + this.pipeline = pipeline; + + this.table = + (SerializableTable) + SerializableTable.copyOf( + this.catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier))); + Snapshot snapshot = + Preconditions.checkStateNotNull( + table.currentSnapshot(), + "Iceberg maintenance requires a valid snapshot, but table '%s' has none. " + + "Ensure that a write operation has been successfully committed.", + tableIdentifier); + this.maintenance = this.pipeline.apply(IMPULSE, Create.of(SnapshotInfo.fromSnapshot(snapshot))); + this.operationId = UUID.randomUUID().toString(); + } + + public static IcebergMaintenance create( + String tableIdentifier, Map catalogConfig) { + return new IcebergMaintenance(tableIdentifier, catalogConfig, null, null); + } + + public static IcebergMaintenance create( + String tableIdentifier, + Map catalogConfig, + @Nullable PipelineOptions pipelineOptions) { + return new IcebergMaintenance(tableIdentifier, catalogConfig, pipelineOptions, null); + } + + public static IcebergMaintenance create( + String tableIdentifier, Map catalogConfig, @Nullable Pipeline pipeline) { + return new IcebergMaintenance(tableIdentifier, catalogConfig, null, pipeline); + } + + public IcebergMaintenance rewriteDataFiles() { + return rewriteDataFiles(RewriteDataFiles.Configuration.builder().build()); + } + + public IcebergMaintenance rewriteDataFiles(RewriteDataFiles.Configuration rewriteConfig) { + checkNotAddedYet(RewriteDataFiles.class); + LOG.info( + MAINTENANCE_PREFIX + " Adding {} task with config: {}", + RewriteDataFiles.class.getSimpleName(), + rewriteConfig); + maintenance = + maintenance.apply( + RewriteDataFiles.create( + tableIdentifier, table, catalogConfig, rewriteConfig, operationId)); + return this; + } + + public PipelineResult run() { + checkNotEmpty(); + LOG.info( + MAINTENANCE_PREFIX + " Running maintenance on table {} with operation-id: {}", + tableIdentifier, + operationId); + return pipeline.run(); + } + + private void checkNotAddedYet(Class transform) { + pipeline.traverseTopologically( + new Pipeline.PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + @Nullable PTransform nodeT = node.getTransform(); + if (nodeT != null && nodeT.getClass().equals(transform)) { + throw new IllegalStateException( + String.format( + "A '%s' task can only be applied once per maintenance operation. Please remove the duplicate task.", + transform.getSimpleName())); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + }); + } + + public void checkNotEmpty() { + boolean[] isEmpty = new boolean[] {true}; + pipeline.traverseTopologically( + new Pipeline.PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (node.getTransform() != null && !node.getFullName().startsWith(IMPULSE)) { + isEmpty[0] = false; + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + return CompositeBehavior.ENTER_TRANSFORM; + } + }); + if (isEmpty[0]) { + throw new IllegalStateException( + String.format( + "Maintenance operation for Iceberg table '%s' is empty. Please apply at least one task.", + tableIdentifier)); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/PlanDataFileRewrites.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/PlanDataFileRewrites.java new file mode 100644 index 000000000000..ac6d4ff7df03 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/PlanDataFileRewrites.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.io.iceberg.FilterUtils; +import org.apache.beam.sdk.io.iceberg.SerializableDataFile; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.expressions.Expression; +import org.checkerframework.checker.nullness.qual.Nullable; + +class PlanDataFileRewrites extends PTransform, PCollectionTuple> { + private final SerializableTable table; + private final RewriteDataFiles.Configuration rewriteConfig; + static final TupleTag REWRITE_GROUPS = new TupleTag<>() {}; + static final TupleTag OLD_FILES = new TupleTag<>() {}; + + PlanDataFileRewrites(SerializableTable table, RewriteDataFiles.Configuration rewriteConfig) { + this.table = table; + this.rewriteConfig = rewriteConfig; + } + + @Override + public PCollectionTuple expand(PCollection input) { + + return input.apply( + ParDo.of(new ScanAndCreatePlan(table, rewriteConfig)) + .withOutputTags(REWRITE_GROUPS, TupleTagList.of(OLD_FILES))); + } + + static class ScanAndCreatePlan extends DoFn { + private final SerializableTable table; + private final RewriteDataFiles.Configuration rewriteConfig; + private final @Nullable Expression filter; + private static final Counter plannedFilesToRewrite = + Metrics.counter(ScanAndCreatePlan.class, "plannedFilesToRewrite"); + private static final Counter plannedBytesToRewrite = + Metrics.counter(ScanAndCreatePlan.class, "plannedBytesToRewrite"); + private static final Counter plannedPartitionsToRewrite = + Metrics.counter(ScanAndCreatePlan.class, "plannedPartitionsToRewrite"); + private static final Counter plannedOutputFiles = + Metrics.counter(ScanAndCreatePlan.class, "plannedOutputFiles"); + + ScanAndCreatePlan(SerializableTable table, RewriteDataFiles.Configuration rewriteConfig) { + this.table = table; + this.rewriteConfig = rewriteConfig; + this.filter = + rewriteConfig.getFilter() != null + ? FilterUtils.convert(rewriteConfig.getFilter(), table.schema()) + : null; + } + + @ProcessElement + public void process(MultiOutputReceiver output) { + SortAwareBinPackRewriteFilePlanner planner = + new SortAwareBinPackRewriteFilePlanner(table, filter, rewriteConfig); + planner.init(firstNonNull(rewriteConfig.getRewriteOptions(), Collections.emptyMap())); + int totalFilesToRewrite = 0; + long totalBytesToRewrite = 0L; + int totalPlannedOutputFiles = 0; + Set partitionPaths = new HashSet<>(); + + for (RewriteFileGroup group : planner.beamPlan()) { + output.get(REWRITE_GROUPS).output(group); + + partitionPaths.add(group.getPartitionPath()); + totalBytesToRewrite += group.getTotalInputFileByteSize(); + totalFilesToRewrite += group.numInputFiles(); + totalPlannedOutputFiles++; + } + for (DataFile file : planner.getOldFiles()) { + SerializableDataFile serializableFile = SerializableDataFile.from(file, table.specs()); + output.get(OLD_FILES).output(serializableFile); + } + + plannedFilesToRewrite.inc(totalFilesToRewrite); + plannedBytesToRewrite.inc(totalBytesToRewrite); + plannedPartitionsToRewrite.inc(partitionPaths.size()); + plannedOutputFiles.inc(totalPlannedOutputFiles); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDataFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDataFiles.java new file mode 100644 index 000000000000..5ee4e7942029 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDataFiles.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.beam.sdk.io.iceberg.maintenance.PlanDataFileRewrites.OLD_FILES; +import static org.apache.beam.sdk.io.iceberg.maintenance.PlanDataFileRewrites.REWRITE_GROUPS; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Map; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.io.iceberg.SerializableDataFile; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.iceberg.SerializableTable; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.dataflow.qual.Pure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RewriteDataFiles + extends PTransform, PCollection> { + private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFiles.class); + public static final String REWRITE_PREFIX = + IcebergMaintenance.MAINTENANCE_PREFIX + "[RewriteDataFiles] "; + private final String tableIdentifier; + private final SerializableTable table; + private final IcebergCatalogConfig catalogConfig; + private final Configuration rewriteConfig; + private final String operationId; + + RewriteDataFiles( + String tableIdentifier, + SerializableTable table, + IcebergCatalogConfig catalogConfig, + Configuration rewriteConfig, + String operationId) { + this.tableIdentifier = tableIdentifier; + this.table = table; + this.catalogConfig = catalogConfig; + this.rewriteConfig = rewriteConfig; + this.operationId = operationId; + } + + static RewriteDataFiles create( + String tableIdentifier, + SerializableTable table, + IcebergCatalogConfig catalogConfig, + @Nullable Configuration configuration, + String operationId) { + return new RewriteDataFiles( + tableIdentifier, + table, + catalogConfig, + configuration != null ? configuration : Configuration.builder().build(), + operationId); + } + + @Override + public PCollection expand(PCollection impulse) { + LOG.info( + REWRITE_PREFIX + "Running {} task with operation-id: {}", + RewriteDataFiles.class.getSimpleName(), + operationId); + PCollectionTuple plan = + impulse.apply("Scan and Plan Rewrite", new PlanDataFileRewrites(table, rewriteConfig)); + PCollection> rewrittenFiles = + plan.get(REWRITE_GROUPS) + .apply("Redistribute RewriteFileGroups", Redistribute.arbitrarily()) + .setCoder(FILE_GROUP_CODER) + .apply("Rewrite Groups", ParDo.of(new RewriteDoFn(table, operationId))) + .setCoder(SERIALIZABLE_DF_CODER) + .apply("Add Key to New Files", WithKeys.of((Void) null)); + + PCollection> oldFiles = + plan.get(OLD_FILES) + .setCoder(SERIALIZABLE_DF_CODER) + .apply("Add Key to Old Files", WithKeys.of((Void) null)); + + return KeyedPCollectionTuple.of(ADD_DATA_FILES, rewrittenFiles) + .and(DELETE_DATA_FILES, oldFiles) + .apply(CoGroupByKey.create()) + .apply( + "Single Commit", + ParDo.of( + new CommitFiles(tableIdentifier, catalogConfig, ADD_DATA_FILES, DELETE_DATA_FILES))) + .setCoder(SNAPSHOT_CODER); + } + + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class Configuration implements Serializable { + public static Builder builder() { + return new AutoValue_RewriteDataFiles_Configuration.Builder(); + } + + @SchemaFieldDescription( + "A snapshot ID used for planning and as the starting snapshot id for commit validation when replacing the files") + @Pure + public abstract @Nullable Long getSnapshotId(); + + @SchemaFieldDescription( + "Property used when scanning for data files to rewrite (default is false)") + @Pure + public abstract @Nullable Boolean getCaseSensitive(); + + @Pure + public abstract @Nullable String getFilter(); + + @Pure + public abstract @Nullable Map getRewriteOptions(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setSnapshotId(@Nullable Long snapshotId); + + public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive); + + public abstract Builder setFilter(@Nullable String filter); + + public abstract Builder setRewriteOptions(@Nullable Map options); + + public abstract Configuration build(); + } + } + + private static final TupleTag ADD_DATA_FILES = new TupleTag<>(); + private static final TupleTag DELETE_DATA_FILES = new TupleTag<>(); + + static final SchemaCoder FILE_GROUP_CODER; + static SchemaCoder SERIALIZABLE_DF_CODER; + static SchemaCoder SNAPSHOT_CODER; + + static { + try { + FILE_GROUP_CODER = SchemaRegistry.createDefault().getSchemaCoder(RewriteFileGroup.class); + SERIALIZABLE_DF_CODER = + SchemaRegistry.createDefault().getSchemaCoder(SerializableDataFile.class); + SNAPSHOT_CODER = SchemaRegistry.createDefault().getSchemaCoder(SnapshotInfo.class); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDoFn.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDoFn.java new file mode 100644 index 000000000000..82dfd76944eb --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteDoFn.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.iceberg.ReadUtils; +import org.apache.beam.sdk.io.iceberg.SerializableDataFile; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ScanTaskParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericDeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.util.PropertyUtil; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RewriteDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(RewriteDoFn.class); + private final String operationId; + private final SerializableTable table; + private final FileFormat format; + private @MonotonicNonNull WriterFactory writerFactory; + private static final Counter activeRewriters = + Metrics.counter(RewriteDoFn.class, "activeRewriters"); + private static final Distribution filesToRewriteByteSize = + Metrics.distribution(RewriteDoFn.class, "filesToRewriteByteSize"); + private static final Distribution outputFileByteSize = + Metrics.distribution(RewriteDoFn.class, "outputFileByteSize"); + + RewriteDoFn(SerializableTable table, String operationId) { + this.operationId = operationId; + this.table = table; + + String formatString = + PropertyUtil.propertyAsString( + table.properties(), + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.format = FileFormat.fromString(formatString); + } + + @Setup + public void start() { + long workerId = ThreadLocalRandom.current().nextLong(); + LOG.info( + RewriteDataFiles.REWRITE_PREFIX + "Starting up a RewriteDoFn with worker-id: " + workerId); + writerFactory = new WriterFactory(format, Long.MAX_VALUE, workerId, operationId); + } + + @ProcessElement + public void processElement( + @Element RewriteFileGroup group, OutputReceiver output) + throws IOException { + try (TaskWriter writer = createWriter()) { + activeRewriters.inc(); + for (String jsonTask : group.getJsonTasks()) { + FileScanTask fileScanTask = ScanTaskParser.fromJson(jsonTask, true); + if (fileScanTask.start() == 0) { + // A single underlying data file can be split into multiple FileScanTasks (e.g., in a + // split plan). We only track the file's total size once, when processing the task + // that starts at offset 0. + filesToRewriteByteSize.update(fileScanTask.length()); + } + + try (CloseableIterable iterable = ReadUtils.createReader(fileScanTask, table)) { + GenericDeleteFilter deleteFilter = + new GenericDeleteFilter(table.io(), fileScanTask, table.schema(), table.schema()); + CloseableIterable reader = deleteFilter.filter(iterable); + + for (Record record : reader) { + writer.write(record); + } + } + } + + DataFile[] datafiles = writer.dataFiles(); + for (DataFile dataFile : datafiles) { + SerializableDataFile serializableDataFile = + SerializableDataFile.from(dataFile, table.specs()); + output.output(serializableDataFile); + + outputFileByteSize.update(dataFile.fileSizeInBytes()); + } + System.out.println( + "xxx output files: " + + Arrays.stream(datafiles) + .map(d -> SerializableDataFile.from(d, table.specs())) + .collect(Collectors.toList())); + } + activeRewriters.dec(); + } + + TaskWriter createWriter() { + checkStateNotNull(writerFactory).init(table); + return checkStateNotNull(writerFactory).create(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteFileGroup.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteFileGroup.java new file mode 100644 index 000000000000..fd6cc499b598 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/RewriteFileGroup.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import com.google.auto.value.AutoValue; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.ScanTaskParser; + +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class RewriteFileGroup { + static Builder builder() { + return new AutoValue_RewriteFileGroup.Builder(); + } + + int numInputFiles() { + return getJsonTasks().size(); + } + + abstract int getGlobalIndex(); + + abstract int getPartitionIndex(); + + abstract String getPartitionPath(); + + abstract List getJsonTasks(); + + abstract int getOutputSpecId(); + + abstract long getWriteMaxFileSize(); + + abstract long getInputSplitSize(); + + abstract int getExpectedOutputFiles(); + + abstract long getTotalInputFileByteSize(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setGlobalIndex(int globalIndex); + + abstract Builder setPartitionIndex(int partitionIndex); + + abstract Builder setPartitionPath(String partitionPath); + + abstract Builder setJsonTasks(List jsonTasks); + + Builder setFileScanTasks(List tasks) { + long byteSize = tasks.stream().mapToLong(FileScanTask::length).sum(); + return setTotalInputFileByteSize(byteSize) + .setJsonTasks(tasks.stream().map(ScanTaskParser::toJson).collect(Collectors.toList())); + } + + abstract Builder setOutputSpecId(int outputSpecId); + + abstract Builder setWriteMaxFileSize(long writeMaxFileSize); + + abstract Builder setInputSplitSize(long inputSplitSize); + + abstract Builder setExpectedOutputFiles(int expectedOutputFiles); + + abstract Builder setTotalInputFileByteSize(long byteSize); + + abstract RewriteFileGroup build(); + } + + public static Comparator comparator(RewriteJobOrder rewriteJobOrder) { + switch (rewriteJobOrder) { + case BYTES_ASC: + return Comparator.comparing(RewriteFileGroup::getTotalInputFileByteSize); + case BYTES_DESC: + return Comparator.comparing( + RewriteFileGroup::getTotalInputFileByteSize, Comparator.reverseOrder()); + case FILES_ASC: + return Comparator.comparing(RewriteFileGroup::numInputFiles); + case FILES_DESC: + return Comparator.comparing(RewriteFileGroup::numInputFiles, Comparator.reverseOrder()); + default: + return (unused, unused2) -> 0; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/SortAwareBinPackRewriteFilePlanner.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/SortAwareBinPackRewriteFilePlanner.java new file mode 100644 index 000000000000..85164b394a39 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/SortAwareBinPackRewriteFilePlanner.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.beam.sdk.io.iceberg.maintenance.RewriteDataFiles.REWRITE_PREFIX; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; +import static org.apache.iceberg.util.PropertyUtil.propertyAsLong; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.iceberg.maintenance.RewriteDataFiles.Configuration; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.BinPackRewriteFilePlanner; +import org.apache.iceberg.actions.RewriteDataFiles; +import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.BinPacking; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.StructLikeMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Groups specified data files in the {@link Table} into {@link RewriteFileGroup}s. The files are + * grouped by partitions based on their size using fix sized bins. Extends {@link + * SizeBasedFileRewritePlanner} with delete file number and delete ratio thresholds and job {@link + * RewriteDataFiles#REWRITE_JOB_ORDER} handling. + */ +public class SortAwareBinPackRewriteFilePlanner extends BinPackRewriteFilePlanner { + /** + * The max number of files to be rewritten (Not providing this value would rewrite all the files) + */ + public static final String MAX_FILES_TO_REWRITE = "max-files-to-rewrite"; + + private static final Logger LOG = + LoggerFactory.getLogger(SortAwareBinPackRewriteFilePlanner.class); + + private final Expression filter; + private final @Nullable Long snapshotId; + private final boolean caseSensitive; + private final List oldFiles = Lists.newArrayList(); + private final Integer maxFilesToRewrite; + private final RewriteJobOrder rewriteJobOrder; + private final boolean rewriteAll; + private final long targetFileSize; + + /** + * Creates the planner for the given table. + * + * @param table to plan for + * @param filter used to remove files from the plan + * @param rewriteConfig used to configure the rewrite planner + */ + public SortAwareBinPackRewriteFilePlanner( + Table table, @Nullable Expression filter, Configuration rewriteConfig) { + super(table); + this.filter = filter != null ? filter : Expressions.alwaysTrue(); + this.snapshotId = rewriteConfig.getSnapshotId(); + this.caseSensitive = Boolean.TRUE.equals(rewriteConfig.getCaseSensitive()); + + Map options = + firstNonNull(rewriteConfig.getRewriteOptions(), Collections.emptyMap()); + this.rewriteJobOrder = + RewriteJobOrder.fromName( + PropertyUtil.propertyAsString( + options, + RewriteDataFiles.REWRITE_JOB_ORDER, + RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT)); + this.maxFilesToRewrite = PropertyUtil.propertyAsNullableInt(options, MAX_FILES_TO_REWRITE); + Preconditions.checkArgument( + maxFilesToRewrite == null || maxFilesToRewrite > 0, + "Cannot set %s to %s, the value must be positive integer.", + MAX_FILES_TO_REWRITE, + maxFilesToRewrite); + this.rewriteAll = PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, REWRITE_ALL_DEFAULT); + this.targetFileSize = + propertyAsLong( + options, + TARGET_FILE_SIZE_BYTES, + propertyAsLong( + table.properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)); + } + + public List beamPlan() { + StructLikeMap>> plan = planFileGroups(); + RewriteExecutionContext ctx = new RewriteExecutionContext(); + List selectedFileGroups = Lists.newArrayList(); + AtomicInteger fileCountRunner = new AtomicInteger(); + + plan.entrySet().stream() + .filter(e -> !e.getValue().isEmpty()) + .forEach( + entry -> { + StructLike partition = entry.getKey(); + entry + .getValue() + .forEach( + fileScanTasks -> { + long inputSize = inputSize(fileScanTasks); + if (maxFilesToRewrite == null) { + selectedFileGroups.add( + newRewriteGroup( + ctx, + partition, + fileScanTasks, + inputSplitSize(inputSize), + expectedOutputFiles(inputSize))); + } else if (fileCountRunner.get() < maxFilesToRewrite) { + int remainingSize = maxFilesToRewrite - fileCountRunner.get(); + int scanTasksToRewrite = Math.min(fileScanTasks.size(), remainingSize); + selectedFileGroups.add( + newRewriteGroup( + ctx, + partition, + fileScanTasks.subList(0, scanTasksToRewrite), + inputSplitSize(inputSize), + expectedOutputFiles(inputSize))); + fileCountRunner.getAndAdd(scanTasksToRewrite); + } + fileScanTasks.forEach(task -> oldFiles.add(task.file())); + }); + }); + return selectedFileGroups.stream() + .sorted(RewriteFileGroup.comparator(rewriteJobOrder)) + .collect(Collectors.toList()); + } + + public List getOldFiles() { + return oldFiles; + } + + private StructLikeMap>> planFileGroups() { + TableScan scan = + table().newScan().filter(filter).caseSensitive(caseSensitive).ignoreResiduals(); + + if (snapshotId != null) { + scan = scan.useSnapshot(snapshotId); + } + + Types.StructType partitionType = table().spec().partitionType(); + CloseableIterable fileScanTasks = scan.planFiles(); + + try { + List sortedTasks = + FileScanTaskSorter.sortByTableSortOrder(fileScanTasks, table().sortOrder()); + StructLikeMap> filesByPartition = + groupByPartition(table(), partitionType, sortedTasks); + System.out.println("xxx partitions: " + filesByPartition.keySet()); + return filesByPartition.transformValues(tasks -> ImmutableList.copyOf(planFileGroups(tasks))); + } finally { + try { + fileScanTasks.close(); + } catch (IOException io) { + LOG.error("Cannot properly close file iterable while planning for rewrite", io); + } + } + } + + private StructLikeMap> groupByPartition( + Table table, Types.StructType partitionType, List tasks) { + StructLikeMap> filesByPartition = StructLikeMap.create(partitionType); + StructLike emptyStruct = GenericRecord.create(partitionType); + + for (FileScanTask task : tasks) { + // If a task uses an incompatible partition spec the data inside could contain values + // which belong to multiple partitions in the current spec. Treating all such files as + // un-partitioned and grouping them together helps to minimize new files made. + StructLike taskPartition = + task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct; + + filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task); + } + + LOG.info( + REWRITE_PREFIX + "Planning compaction across {} partition(s).", filesByPartition.size()); + return filesByPartition; + } + + @Override + public Iterable> planFileGroups(Iterable tasks) { + Iterable filteredTasks = rewriteAll ? tasks : filterFiles(tasks); + BinPacking.ListPacker packer = + new BinPacking.ListPacker<>(targetFileSize, 1, false); + List> groups = packer.pack(filteredTasks, ContentScanTask::length); + return rewriteAll ? groups : filterFileGroups(groups); + } + + private RewriteFileGroup newRewriteGroup( + RewriteExecutionContext ctx, + StructLike partition, + List tasks, + long inputSplitSize, + int expectedOutputFiles) { + return RewriteFileGroup.builder() + .setGlobalIndex(ctx.currentGlobalIndex()) + .setPartitionIndex(ctx.currentPartitionIndex(partition)) + .setPartitionPath(table().spec().partitionToPath(partition)) + .setFileScanTasks(tasks) + .setOutputSpecId(outputSpecId()) + .setWriteMaxFileSize(writeMaxFileSize()) + .setInputSplitSize(inputSplitSize) + .setExpectedOutputFiles(expectedOutputFiles) + .build(); + } + + protected static class RewriteExecutionContext { + private final Map partitionIndexMap; + private final AtomicInteger groupIndex; + + protected RewriteExecutionContext() { + this.partitionIndexMap = Maps.newConcurrentMap(); + this.groupIndex = new AtomicInteger(1); + } + + protected int currentGlobalIndex() { + return groupIndex.getAndIncrement(); + } + + protected int currentPartitionIndex(StructLike partition) { + return partitionIndexMap.merge(partition, 1, Integer::sum); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/WriterFactory.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/WriterFactory.java new file mode 100644 index 000000000000..378ae85e6f54 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/maintenance/WriterFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.maintenance; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +class WriterFactory { + private final long targetFileSizeBytes; + private final String operationId; + private final long workerId; + private final FileFormat format; + private @MonotonicNonNull OutputFileFactory outputFileFactory; + private @MonotonicNonNull Table table; + + WriterFactory(FileFormat format, long targetFileSizeBytes, long workerId, String operationId) { + this.format = format; + this.targetFileSizeBytes = targetFileSizeBytes; + this.operationId = operationId; + this.workerId = workerId; + } + + void init(Table table) { + if (outputFileFactory == null) { + this.table = table; + + outputFileFactory = + OutputFileFactory.builderFor(table, 0, workerId) + .format(format) + .ioSupplier(table::io) + .defaultSpec(table.spec()) + .operationId(operationId) + .build(); + } + } + + TaskWriter create() { + Table table = checkStateNotNull(this.table); + FileAppenderFactory appenderFactory = new GenericAppenderFactory(table.schema()); + + if (table.spec().isUnpartitioned()) { + return new UnpartitionedWriter<>( + table.spec(), + format, + appenderFactory, + checkStateNotNull(outputFileFactory), + table.io(), + targetFileSizeBytes); + } else { + return new RecordPartitionedFanoutWriter( + table.spec(), + format, + appenderFactory, + checkStateNotNull(outputFileFactory), + table.io(), + targetFileSizeBytes, + table.schema()); + } + } + + private static class RecordPartitionedFanoutWriter extends PartitionedFanoutWriter { + + private final PartitionKey partitionKey; + private final InternalRecordWrapper recordWrapper; + + RecordPartitionedFanoutWriter( + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.recordWrapper = new InternalRecordWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Record row) { + partitionKey.partition(recordWrapper.wrap(row)); + return partitionKey; + } + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 36b74967f0b2..5f429ebef246 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -36,7 +36,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -363,15 +362,8 @@ public void testSerializableDataFileRoundTripEquality() throws IOException { DataFile datafile = writer.getDataFile(); assertEquals(2L, datafile.recordCount()); - Map partitionFieldMap = new HashMap<>(); - for (PartitionField partitionField : PARTITION_SPEC.fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); - } - - String partitionPath = - RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile roundTripDataFile = - SerializableDataFile.from(datafile, partitionPath) + SerializableDataFile.from(datafile, PARTITION_SPEC) .createDataFile(ImmutableMap.of(PARTITION_SPEC.specId(), PARTITION_SPEC)); checkDataFileEquality(datafile, roundTripDataFile); @@ -403,14 +395,8 @@ public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOEx writer.close(); // fetch data file and its serializable version - Map partitionFieldMap = new HashMap<>(); - for (PartitionField partitionField : PARTITION_SPEC.fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); - } - String partitionPath = - RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), partitionFieldMap); DataFile datafile = writer.getDataFile(); - SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionPath); + SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, PARTITION_SPEC); assertEquals(2L, datafile.recordCount()); assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId());