diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 37dd25bf9029..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index eef2b154d243..0060cf0ce85d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -21,11 +21,15 @@ import com.google.auto.value.AutoValue; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Equivalence; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -41,8 +45,11 @@ * encode/decode it. This class is an identical version that can be used as a PCollection element * type. * - *

Use {@link #from(DataFile, PartitionKey)} to create a {@link SerializableDataFile} and {@link - * #createDataFile(PartitionSpec)} to reconstruct the original {@link DataFile}. + *

NOTE: If you add any new fields here, you need to also update the {@link #equals} and {@link + * #hashCode()} methods. + * + *

Use {@link #from(DataFile, String)} to create a {@link SerializableDataFile} and {@link + * #createDataFile(Map)} to reconstruct the original {@link DataFile}. */ @DefaultSchema(AutoValueSchema.class) @AutoValue @@ -199,4 +206,86 @@ DataFile createDataFile(Map partitionSpecs) { } return output; } + + @Override + public final boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializableDataFile that = (SerializableDataFile) o; + return getPath().equals(that.getPath()) + && getFileFormat().equals(that.getFileFormat()) + && getRecordCount() == that.getRecordCount() + && getFileSizeInBytes() == that.getFileSizeInBytes() + && getPartitionPath().equals(that.getPartitionPath()) + && getPartitionSpecId() == that.getPartitionSpecId() + && Objects.equals(getKeyMetadata(), that.getKeyMetadata()) + && Objects.equals(getSplitOffsets(), that.getSplitOffsets()) + && Objects.equals(getColumnSizes(), that.getColumnSizes()) + && Objects.equals(getValueCounts(), that.getValueCounts()) + && Objects.equals(getNullValueCounts(), that.getNullValueCounts()) + && Objects.equals(getNanValueCounts(), that.getNanValueCounts()) + && mapEquals(getLowerBounds(), that.getLowerBounds()) + && mapEquals(getUpperBounds(), that.getUpperBounds()); + } + + private static boolean mapEquals( + @Nullable Map map1, @Nullable Map map2) { + if (map1 == null && map2 == null) { + return true; + } else if (map1 == null || map2 == null) { + return false; + } + Equivalence byteArrayEquivalence = + new Equivalence() { + @Override + protected boolean doEquivalent(byte[] a, byte[] b) { + return Arrays.equals(a, b); + } + + @Override + protected int doHash(byte[] bytes) { + return Arrays.hashCode(bytes); + } + }; + + return Maps.difference(map1, map2, byteArrayEquivalence).areEqual(); + } + + @Override + public final int hashCode() { + int hashCode = + Objects.hash( + getPath(), + getFileFormat(), + getRecordCount(), + getFileSizeInBytes(), + getPartitionPath(), + getPartitionSpecId(), + getKeyMetadata(), + getSplitOffsets(), + getColumnSizes(), + getValueCounts(), + getNullValueCounts(), + getNanValueCounts()); + hashCode = 31 * hashCode + computeMapByteHashCode(getLowerBounds()); + hashCode = 31 * hashCode + computeMapByteHashCode(getUpperBounds()); + return hashCode; + } + + private static int computeMapByteHashCode(@Nullable Map map) { + if (map == null) { + return 0; + } + int hashCode = 0; + for (Map.Entry entry : map.entrySet()) { + int keyHash = entry.getKey().hashCode(); + int valueHash = Arrays.hashCode(entry.getValue()); // content-based hash code + hashCode += keyHash ^ valueHash; + } + return hashCode; + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java new file mode 100644 index 000000000000..983f021fd7ce --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Test; + +/** + * Test for {@link SerializableDataFile}. More tests can be found in {@link + * org.apache.beam.sdk.io.iceberg.RecordWriterManagerTest}. + */ +public class SerializableDataFileTest { + static final Set FIELDS_SET = + ImmutableSet.builder() + .add("path") + .add("fileFormat") + .add("recordCount") + .add("fileSizeInBytes") + .add("partitionPath") + .add("partitionSpecId") + .add("keyMetadata") + .add("splitOffsets") + .add("columnSizes") + .add("valueCounts") + .add("nullValueCounts") + .add("nanValueCounts") + .add("lowerBounds") + .add("upperBounds") + .build(); + + @Test + public void testFieldsInEqualsMethodInSyncWithGetterFields() { + List getMethodNames = + Arrays.stream(SerializableDataFile.class.getDeclaredMethods()) + .map(Method::getName) + .filter(methodName -> methodName.startsWith("get")) + .collect(Collectors.toList()); + + List lowerCaseFields = + FIELDS_SET.stream().map(String::toLowerCase).collect(Collectors.toList()); + List extras = new ArrayList<>(); + for (String field : getMethodNames) { + if (!lowerCaseFields.contains(field.substring(3).toLowerCase())) { + extras.add(field); + } + } + if (!extras.isEmpty()) { + throw new IllegalStateException( + "Detected new field(s) added to SerializableDataFile: " + + extras + + "\nPlease include the new field(s) in SerializableDataFile's equals() and hashCode() methods, then add them " + + "to this test class's FIELDS_SET."); + } + } +}