diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java index c880adbb860e..638a67fd9593 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; @@ -53,6 +54,7 @@ static Builder builder() { return new AutoValue_ReadTask.Builder(); } + @SchemaFieldNumber("0") abstract List getFileScanTaskJsons(); @SchemaIgnore diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java index b7a9be32aba2..899e7f99d903 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** Describes the table a {@link ReadTask} belongs to. */ @@ -46,6 +47,7 @@ static Builder builder() { return new AutoValue_ReadTaskDescriptor.Builder(); } + @SchemaFieldNumber("0") abstract String getTableIdentifierString(); @AutoValue.Builder diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java index aa19ca1b2710..bab5405cd4a5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -98,22 +99,31 @@ public TableIdentifier getTableIdentifier() { return cachedTableIdentifier; } + @SchemaFieldNumber("0") public abstract long getSequenceNumber(); + @SchemaFieldNumber("1") public abstract long getSnapshotId(); + @SchemaFieldNumber("2") public abstract @Nullable Long getParentId(); + @SchemaFieldNumber("3") public abstract long getTimestampMillis(); + @SchemaFieldNumber("4") public abstract @Nullable String getOperation(); + @SchemaFieldNumber("5") public abstract @Nullable Map getSummary(); + @SchemaFieldNumber("6") public abstract @Nullable String getManifestListLocation(); + @SchemaFieldNumber("7") public abstract @Nullable Integer getSchemaId(); + @SchemaFieldNumber("8") public abstract @Nullable String getTableIdentifierString(); @AutoValue.Builder diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index 78d48aacf2b7..949e205bf18a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.OUTPUT_TAG; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; import java.util.HashMap; import java.util.List; @@ -28,7 +29,9 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -150,4 +153,65 @@ public void testReadUsingManagedTransform() throws Exception { testPipeline.run(); } + + @Test + public void testSnapshotInfoSchemaFieldNumbers() throws NoSuchSchemaException { + Schema schema = SchemaRegistry.createDefault().getSchema(SnapshotInfo.class); + assertEquals(9, schema.getFieldCount()); + + assertEquals( + Schema.Field.of("sequenceNumber", Schema.FieldType.INT64) + .withDescription(schema.getField(0).getDescription()) + .withNullable(false), + schema.getField(0)); + + assertEquals( + Schema.Field.of("snapshotId", Schema.FieldType.INT64) + .withDescription(schema.getField(1).getDescription()) + .withNullable(false), + schema.getField(1)); + + assertEquals( + Schema.Field.of("parentId", Schema.FieldType.INT64) + .withDescription(schema.getField(2).getDescription()) + .withNullable(true), + schema.getField(2)); + + assertEquals( + Schema.Field.of("timestampMillis", Schema.FieldType.INT64) + .withDescription(schema.getField(3).getDescription()) + .withNullable(false), + schema.getField(3)); + + assertEquals( + Schema.Field.of("operation", Schema.FieldType.STRING) + .withDescription(schema.getField(4).getDescription()) + .withNullable(true), + schema.getField(4)); + + assertEquals( + Schema.Field.of( + "summary", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING)) + .withDescription(schema.getField(5).getDescription()) + .withNullable(true), + schema.getField(5)); + + assertEquals( + Schema.Field.of("manifestListLocation", Schema.FieldType.STRING) + .withDescription(schema.getField(6).getDescription()) + .withNullable(true), + schema.getField(6)); + + assertEquals( + Schema.Field.of("schemaId", Schema.FieldType.INT32) + .withDescription(schema.getField(7).getDescription()) + .withNullable(true), + schema.getField(7)); + + assertEquals( + Schema.Field.of("tableIdentifierString", Schema.FieldType.STRING) + .withDescription(schema.getField(8).getDescription()) + .withNullable(true), + schema.getField(8)); + } }