Skip to content

Commit 0152075

Browse files
Core, Spark: Add row lineage metadata columns, and surface them in SparkTable metadata columns (apache#12596)
1 parent db34c1c commit 0152075

File tree

3 files changed

+65
-9
lines changed

3 files changed

+65
-9
lines changed

core/src/main/java/org/apache/iceberg/MetadataColumns.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,37 @@ private MetadataColumns() {}
9393
"_commit_snapshot_id",
9494
Types.LongType.get(),
9595
"Commit snapshot ID");
96+
public static final NestedField ROW_ID =
97+
NestedField.optional(
98+
Integer.MAX_VALUE - 107,
99+
"_row_id",
100+
Types.LongType.get(),
101+
"Implicit row ID that is automatically assigned");
102+
public static final NestedField LAST_UPDATED_SEQUENCE_NUMBER =
103+
NestedField.optional(
104+
Integer.MAX_VALUE - 108,
105+
"_last_updated_sequence_number",
106+
Types.LongType.get(),
107+
"Sequence number when the row was last updated");
96108

97109
private static final Map<String, NestedField> META_COLUMNS =
98110
ImmutableMap.of(
99111
FILE_PATH.name(), FILE_PATH,
100112
ROW_POSITION.name(), ROW_POSITION,
101113
IS_DELETED.name(), IS_DELETED,
102-
SPEC_ID.name(), SPEC_ID);
114+
SPEC_ID.name(), SPEC_ID,
115+
ROW_ID.name(), ROW_ID,
116+
LAST_UPDATED_SEQUENCE_NUMBER.name(), LAST_UPDATED_SEQUENCE_NUMBER);
103117

104118
private static final Set<Integer> META_IDS =
105119
ImmutableSet.of(
106120
FILE_PATH.fieldId(),
107121
ROW_POSITION.fieldId(),
108122
IS_DELETED.fieldId(),
109123
SPEC_ID.fieldId(),
110-
PARTITION_COLUMN_ID);
124+
PARTITION_COLUMN_ID,
125+
ROW_ID.fieldId(),
126+
LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
111127

112128
public static Set<Integer> metadataFieldIds() {
113129
return META_IDS;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iceberg.TableOperations;
4040
import org.apache.iceberg.TableProperties;
4141
import org.apache.iceberg.TableScan;
42+
import org.apache.iceberg.TableUtil;
4243
import org.apache.iceberg.exceptions.ValidationException;
4344
import org.apache.iceberg.expressions.Evaluator;
4445
import org.apache.iceberg.expressions.Expression;
@@ -48,6 +49,7 @@
4849
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
4950
import org.apache.iceberg.io.CloseableIterable;
5051
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
52+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5153
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
5254
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
5355
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -256,13 +258,23 @@ public Set<TableCapability> capabilities() {
256258
@Override
257259
public MetadataColumn[] metadataColumns() {
258260
DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table()));
259-
return new MetadataColumn[] {
260-
new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
261-
new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
262-
new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
263-
new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false),
264-
new SparkMetadataColumn(MetadataColumns.IS_DELETED.name(), DataTypes.BooleanType, false)
265-
};
261+
ImmutableList.Builder<SparkMetadataColumn> metadataColumns = ImmutableList.builder();
262+
metadataColumns.add(
263+
new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
264+
new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
265+
new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
266+
new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false),
267+
new SparkMetadataColumn(MetadataColumns.IS_DELETED.name(), DataTypes.BooleanType, false));
268+
269+
if (TableUtil.formatVersion(table()) >= 3) {
270+
metadataColumns.add(
271+
new SparkMetadataColumn(MetadataColumns.ROW_ID.name(), DataTypes.LongType, true));
272+
metadataColumns.add(
273+
new SparkMetadataColumn(
274+
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), DataTypes.LongType, true));
275+
}
276+
277+
return metadataColumns.build().toArray(SparkMetadataColumn[]::new);
266278
}
267279

268280
@Override

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
5959
import org.apache.iceberg.spark.TestBase;
6060
import org.apache.iceberg.types.Types;
61+
import org.apache.spark.sql.AnalysisException;
6162
import org.apache.spark.sql.Dataset;
6263
import org.apache.spark.sql.Encoders;
6364
import org.apache.spark.sql.Row;
@@ -94,12 +95,17 @@ public static Object[][] parameters() {
9495
{FileFormat.PARQUET, true, 1},
9596
{FileFormat.PARQUET, false, 2},
9697
{FileFormat.PARQUET, true, 2},
98+
{FileFormat.PARQUET, false, 3},
99+
{FileFormat.PARQUET, true, 3},
97100
{FileFormat.AVRO, false, 1},
98101
{FileFormat.AVRO, false, 2},
102+
{FileFormat.AVRO, false, 3},
99103
{FileFormat.ORC, false, 1},
100104
{FileFormat.ORC, true, 1},
101105
{FileFormat.ORC, false, 2},
102106
{FileFormat.ORC, true, 2},
107+
{FileFormat.ORC, false, 3},
108+
{FileFormat.ORC, true, 3},
103109
};
104110
}
105111

@@ -311,6 +317,28 @@ public void testConflictingColumns() {
311317
sql("SELECT _spec_id, _partition, _renamed_spec_id FROM %s", TABLE_NAME));
312318
}
313319

320+
@TestTemplate
321+
public void testRowLineageColumnsResolvedInV3OrHigher() {
322+
if (formatVersion >= 3) {
323+
// Test against an empty table to ensure column resolution in formats supporting row lineage
324+
// and so that the test doesn't have to change with inheritance
325+
assertEquals(
326+
"Rows must match",
327+
ImmutableList.of(),
328+
sql("SELECT _row_id, _last_updated_sequence_number, id FROM %s", TABLE_NAME));
329+
} else {
330+
// Should fail to resolve row lineage metadata columns in V1/V2 tables
331+
assertThatThrownBy(() -> sql("SELECT _row_id FROM %s", TABLE_NAME))
332+
.isInstanceOf(AnalysisException.class)
333+
.hasMessageContaining(
334+
"A column or function parameter with name `_row_id` cannot be resolved");
335+
assertThatThrownBy(() -> sql("SELECT _last_updated_sequence_number FROM %s", TABLE_NAME))
336+
.isInstanceOf(AnalysisException.class)
337+
.hasMessageContaining(
338+
"A column or function parameter with name `_last_updated_sequence_number` cannot be resolved");
339+
}
340+
}
341+
314342
private void createAndInitTable() throws IOException {
315343
Map<String, String> properties = Maps.newHashMap();
316344
properties.put(FORMAT_VERSION, String.valueOf(formatVersion));

0 commit comments

Comments
 (0)