Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private static void extractFieldNames(SqlNode node, Set<String> fieldNames) {
* <p>Note: This utility currently supports only top-level fields within the filter expression.
* Nested field references are not supported.
*/
static Expression convert(@Nullable String filter, Schema schema) {
public static Expression convert(@Nullable String filter, Schema schema) {
if (filter == null) {
return Expressions.alwaysTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public class ReadUtils {
"parquet.read.support.class",
"parquet.crypto.factory.class");

static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema schema) {
public static ParquetReader<Record> createReader(FileScanTask task, Table table) {
return createReader(task, table, table.schema());
}

public static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema schema) {
String filePath = task.file().path().toString();
InputFile inputFile;
try (FileIO io = table.io()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -36,15 +31,13 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
Expand All @@ -56,7 +49,6 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,7 +95,6 @@ class DestinationState {
final Cache<PartitionKey, RecordWriter> writers;
private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();
private final Map<String, PartitionField> partitionFieldMap = Maps.newHashMap();
private final List<Exception> exceptions = Lists.newArrayList();
private final InternalRecordWrapper wrapper; // wrapper that facilitates partitioning

Expand All @@ -114,9 +105,6 @@ class DestinationState {
this.routingPartitionKey = new PartitionKey(spec, schema);
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.table = table;
for (PartitionField partitionField : spec.fields()) {
partitionFieldMap.put(partitionField.name(), partitionField);
}

// build a cache of RecordWriters.
// writers will expire after 1 min of idle time.
Expand All @@ -126,7 +114,6 @@ class DestinationState {
.expireAfterAccess(1, TimeUnit.MINUTES)
.removalListener(
(RemovalNotification<PartitionKey, RecordWriter> removal) -> {
final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey());
final RecordWriter recordWriter =
Preconditions.checkStateNotNull(removal.getValue());
try {
Expand All @@ -142,9 +129,7 @@ class DestinationState {
throw rethrow;
}
openWriters--;
String partitionPath = getPartitionDataPath(pk.toPath(), partitionFieldMap);
dataFiles.add(
SerializableDataFile.from(recordWriter.getDataFile(), partitionPath));
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), spec));
})
.build();
}
Expand Down Expand Up @@ -210,39 +195,6 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
}
}

/**
* Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a
* {@link DataFile}.
*/
@VisibleForTesting
static String getPartitionDataPath(
String partitionPath, Map<String, PartitionField> partitionFieldMap) {
if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
return partitionPath;
}
List<String> resolved = new ArrayList<>();
for (String partition : Splitter.on('/').splitToList(partitionPath)) {
List<String> nameAndValue = Splitter.on('=').splitToList(partition);
String name = nameAndValue.get(0);
String value = nameAndValue.get(1);
String transformName =
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
if (Transforms.month().toString().equals(transformName)) {
int month = YearMonth.parse(value).getMonthValue();
value = String.valueOf(month);
} else if (Transforms.hour().toString().equals(transformName)) {
long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
value = String.valueOf(hour);
}
resolved.add(name + "=" + value);
}
return String.join("/", resolved);
}

private static final DateTimeFormatter HOUR_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

private final Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,40 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Equivalence;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -48,12 +63,12 @@
* <p>NOTE: If you add any new fields here, you need to also update the {@link #equals} and {@link
* #hashCode()} methods.
*
* <p>Use {@link #from(DataFile, String)} to create a {@link SerializableDataFile} and {@link
* <p>Use {@link #from(DataFile, PartitionSpec)} to create a {@link SerializableDataFile} and {@link
* #createDataFile(Map)} to reconstruct the original {@link DataFile}.
*/
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class SerializableDataFile {
public abstract class SerializableDataFile {
public static Builder builder() {
return new AutoValue_SerializableDataFile.Builder();
}
Expand Down Expand Up @@ -87,7 +102,7 @@ public static Builder builder() {
abstract @Nullable Map<Integer, byte[]> getUpperBounds();

@AutoValue.Builder
abstract static class Builder {
public abstract static class Builder {
abstract Builder setPath(String path);

abstract Builder setFileFormat(String fileFormat);
Expand Down Expand Up @@ -119,14 +134,25 @@ abstract static class Builder {
abstract SerializableDataFile build();
}

public static SerializableDataFile from(DataFile f, Map<Integer, PartitionSpec> specs) {
return from(
f,
Preconditions.checkStateNotNull(
specs.get(f.specId()),
"Could not create a SerializableDataFile because DataFile is written using a partition spec id '%s' that is not found in the provided specs: %s",
f.specId(),
specs.keySet()));
}

/**
* Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link
* PartitionKey}.
*/
static SerializableDataFile from(DataFile f, String partitionPath) {
public static SerializableDataFile from(DataFile f, PartitionSpec spec) {
String partitionPath = getPartitionDataPath(f.partition(), spec);

return SerializableDataFile.builder()
.setPath(f.path().toString())
return builder()
.setPath(f.location())
.setFileFormat(f.format().toString())
.setRecordCount(f.recordCount())
.setFileSizeInBytes(f.fileSizeInBytes())
Expand All @@ -150,7 +176,7 @@ static SerializableDataFile from(DataFile f, String partitionPath) {
* it from Beam-compatible types.
*/
@SuppressWarnings("nullness")
DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
public DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
PartitionSpec partitionSpec =
checkStateNotNull(
partitionSpecs.get(getPartitionSpecId()),
Expand Down Expand Up @@ -180,6 +206,43 @@ DataFile createDataFile(Map<Integer, PartitionSpec> partitionSpecs) {
.build();
}

/**
* Returns an equivalent partition path that is made up of partition data. Needed to reconstruct a
* {@link DataFile}.
*/
@VisibleForTesting
static String getPartitionDataPath(StructLike partitionKey, PartitionSpec spec) {
String partitionPath = spec.partitionToPath(partitionKey);

Map<String, Transform<?, ?>> transforms =
spec.fields().stream()
.collect(Collectors.toMap(PartitionField::name, PartitionField::transform));

if (partitionPath.isEmpty() || transforms.isEmpty()) {
return partitionPath;
}
List<String> resolved = new ArrayList<>();
for (String partition : Splitter.on('/').splitToList(partitionPath)) {
List<String> nameAndValue = Splitter.on('=').splitToList(partition);
String name = nameAndValue.get(0);
String value = nameAndValue.get(1);
String transformName = checkArgumentNotNull(transforms.get(name)).toString();
if (Transforms.month().toString().equals(transformName)) {
int month = YearMonth.parse(value).getMonthValue();
value = String.valueOf(month);
} else if (Transforms.hour().toString().equals(transformName)) {
long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, HOUR_FORMATTER));
value = String.valueOf(hour);
}
resolved.add(name + "=" + value);
}
return String.join("/", resolved);
}

private static final DateTimeFormatter HOUR_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);

// ByteBuddyUtils has trouble converting Map value type ByteBuffer
// to byte[] and back to ByteBuffer, so we perform these conversions manually
// TODO(https://github.com/apache/beam/issues/32701)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg.maintenance;

import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
import org.apache.beam.sdk.io.iceberg.SerializableDataFile;
import org.apache.beam.sdk.io.iceberg.SnapshotInfo;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CommitFiles extends DoFn<KV<Void, CoGbkResult>, SnapshotInfo> {
private static final Logger LOG = LoggerFactory.getLogger(CommitFiles.class);
private static final Counter numFilesCommitted =
Metrics.counter(CommitFiles.class, "numFilesCommitted");
private static final Counter numFilesRemoved =
Metrics.counter(CommitFiles.class, "numFilesRemoved");
private final String tableIdentifier;
private final IcebergCatalogConfig catalogConfig;
private final TupleTag<SerializableDataFile> newFiles;
private final TupleTag<SerializableDataFile> oldFiles;

CommitFiles(
String tableIdentifier,
IcebergCatalogConfig catalogConfig,
TupleTag<SerializableDataFile> newFiles,
TupleTag<SerializableDataFile> oldFiles) {
this.tableIdentifier = tableIdentifier;
this.catalogConfig = catalogConfig;
this.newFiles = newFiles;
this.oldFiles = oldFiles;
}

@ProcessElement
public void process(@Element KV<Void, CoGbkResult> element, OutputReceiver<SnapshotInfo> output) {
Table table = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier));

Iterable<SerializableDataFile> newSerializedFiles = element.getValue().getAll(newFiles);
Iterable<SerializableDataFile> oldSerializedFiles = element.getValue().getAll(oldFiles);

if (!newSerializedFiles.iterator().hasNext()) {
Preconditions.checkState(
!oldSerializedFiles.iterator().hasNext(),
"No new files were added, so expected no files planned for deletion, "
+ "but received %s files planned for deletion.",
Iterables.size(oldSerializedFiles));
LOG.info(RewriteDataFiles.REWRITE_PREFIX + "Received no rewritten files. Skipping commit.");
return;
}

RewriteFiles rewriteFiles = table.newRewrite();

int numAddFiles = 0;
for (SerializableDataFile newFile : newSerializedFiles) {
DataFile f = newFile.createDataFile(table.specs());
rewriteFiles.addFile(f);
numAddFiles++;
}

int numRemoveFiles = 0;
for (SerializableDataFile oldFile : oldSerializedFiles) {
DataFile f = oldFile.createDataFile(table.specs());
rewriteFiles.deleteFile(f);
numRemoveFiles++;
}

rewriteFiles.commit();
Snapshot snapshot = table.currentSnapshot();
numFilesCommitted.inc(numAddFiles);
numFilesRemoved.inc(numRemoveFiles);
LOG.info(
"Committed rewrite and created new snapshot for table '{}': {}", tableIdentifier, snapshot);
output.output(SnapshotInfo.fromSnapshot(snapshot));
}
}
Loading
Loading