diff --git a/manifest.go b/manifest.go index 3c135d1dd..baa538b3b 100644 --- a/manifest.go +++ b/manifest.go @@ -303,6 +303,8 @@ func (m *manifestFileV1) Partitions() []FieldSummary { return *m.PartitionList } +func (*manifestFileV1) FirstRowID() *int64 { return nil } + func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { return fetchManifestEntries(m, fs, discardDeleted) } @@ -323,7 +325,7 @@ type manifestFile struct { DeletedRowsCount int64 `avro:"deleted_rows_count"` PartitionList *[]FieldSummary `avro:"partitions"` Key []byte `avro:"key_metadata"` - FirstRowId *int64 `avro:"first_row_id"` + FirstRowIDValue *int64 `avro:"first_row_id"` version int `avro:"-"` } @@ -400,6 +402,8 @@ func (m *manifestFile) Partitions() []FieldSummary { return *m.PartitionList } +func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue } + func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 } func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 } func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) { @@ -520,6 +524,9 @@ type ManifestFile interface { // field in the spec. Each field in the list corresponds to a field in // the manifest file's partition spec. Partitions() []FieldSummary + // FirstRowID returns the first _row_id assigned to rows in this manifest (v3+ data manifests only). + // Returns nil for v1/v2 or for delete manifests. + FirstRowID() *int64 // HasAddedFiles returns true if AddedDataFiles > 0 or if it was null. HasAddedFiles() bool @@ -595,6 +602,14 @@ type ManifestReader struct { schemaLoaded bool partitionSpec PartitionSpec partitionSpecLoaded bool + + // inheritRowIDs controls whether this reader should apply First Row ID inheritance + // for v3 data manifests (spec: First Row ID Inheritance). + inheritRowIDs bool + // nextFirstRowID tracks the next first_row_id to assign when reading v3 data + // manifests; used for First Row ID inheritance (null data file first_row_id + // gets manifest's first_row_id + sum of preceding files' record_count). + nextFirstRowID int64 } // NewManifestReader returns a value that can read the contents of an avro manifest @@ -654,15 +669,25 @@ func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error) } fieldNameToID, fieldIDToType, fieldIDToSize := getFieldIDMap(sc) + inheritRowIDs := formatVersion >= 3 && + content == ManifestContentData && + file.FirstRowID() != nil + var nextFirstRowID int64 + if inheritRowIDs { + nextFirstRowID = *file.FirstRowID() + } + return &ManifestReader{ - dec: dec, - file: file, - formatVersion: formatVersion, - isFallback: isFallback, - content: content, - fieldNameToID: fieldNameToID, - fieldIDToType: fieldIDToType, - fieldIDToSize: fieldIDToSize, + dec: dec, + file: file, + formatVersion: formatVersion, + isFallback: isFallback, + content: content, + fieldNameToID: fieldNameToID, + fieldIDToType: fieldIDToType, + fieldIDToSize: fieldIDToSize, + inheritRowIDs: inheritRowIDs, + nextFirstRowID: nextFirstRowID, }, nil } @@ -764,6 +789,17 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) { tmp = tmp.(*fallbackManifestEntry).toEntry() } tmp.inherit(c.file) + // Apply first_row_id inheritance for v3 data manifests (spec: First Row ID Inheritance). + if c.inheritRowIDs { + if df, ok := tmp.DataFile().(*dataFile); ok { + if df.FirstRowIDField == nil { + id := c.nextFirstRowID + df.FirstRowIDField = &id + } + // Advance for every data file, null or explicit, to match Java semantics. + c.nextFirstRowID += df.Count() + } + } if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok { fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID) fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType) @@ -1441,10 +1477,10 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { wrapped := *(file.(*manifestFile)) if m.version == 3 { // Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168 - if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil { + if wrapped.Content == ManifestContentData && wrapped.FirstRowIDValue == nil { if m.nextRowID != nil { firstRowID := *m.nextRowID - wrapped.FirstRowId = &firstRowID + wrapped.FirstRowIDValue = &firstRowID *m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount } } diff --git a/manifest_test.go b/manifest_test.go index dfe3eb5ad..de37f0aab 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -481,12 +481,15 @@ type ManifestTestSuite struct { } func (m *ManifestTestSuite) writeManifestList() { - m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)) + err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1) + m.Require().NoError(err) unassignedSequenceNum := int64(-1) - m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)) + err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2) + m.Require().NoError(err) v3SequenceNum := int64(5) firstRowID := int64(1000) - m.Require().NoError(WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)) + err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3) + m.Require().NoError(err) } func (m *ManifestTestSuite) writeManifestEntries() { @@ -760,6 +763,10 @@ func (m *ManifestTestSuite) TestReadManifestListV3() { m.Nil(list[0].KeyMetadata()) m.Zero(list[0].PartitionSpecID()) + // V3 manifest list assigns first_row_id to data manifests + m.Require().NotNil(list[0].FirstRowID(), "v3 data manifest should have first_row_id") + m.EqualValues(1000, *list[0].FirstRowID()) + part := list[0].Partitions()[0] m.True(part.ContainsNull) m.False(*part.ContainsNaN) @@ -767,6 +774,64 @@ func (m *ManifestTestSuite) TestReadManifestListV3() { m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound) } +func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() { + // Build a v3 data manifest with two entries that have null first_row_id. + partitionSpec := NewPartitionSpecID(1, + PartitionField{FieldID: 1000, SourceID: 1, Name: "x", Transform: IdentityTransform{}}) + firstCount, secondCount := int64(10), int64(20) + entriesWithNullFirstRowID := []ManifestEntry{ + &manifestEntry{ + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: &dataFile{ + Content: EntryContentData, + Path: "/data/file1.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"x": int(1)}, + RecordCount: firstCount, + FileSize: 1000, + BlockSizeInBytes: 64 * 1024, + FirstRowIDField: nil, // null so reader will inherit + }, + }, + &manifestEntry{ + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: &dataFile{ + Content: EntryContentData, + Path: "/data/file2.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"x": int(2)}, + RecordCount: secondCount, + FileSize: 2000, + BlockSizeInBytes: 64 * 1024, + FirstRowIDField: nil, + }, + }, + } + var manifestBuf bytes.Buffer + _, err := WriteManifest("/manifest.avro", &manifestBuf, 3, partitionSpec, testSchema, entrySnapshotID, entriesWithNullFirstRowID) + m.Require().NoError(err) + + manifestFirstRowID := int64(1000) + file := &manifestFile{ + version: 3, + Path: "/manifest.avro", + Content: ManifestContentData, + FirstRowIDValue: &manifestFirstRowID, + } + entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false) + m.Require().NoError(err) + m.Require().Len(entries, 2) + + // First entry gets manifest's first_row_id + m.Require().NotNil(entries[0].DataFile().FirstRowID()) + m.EqualValues(1000, *entries[0].DataFile().FirstRowID()) + // Second entry gets previous + previous file's record_count + m.Require().NotNil(entries[1].DataFile().FirstRowID()) + m.EqualValues(1000+firstCount, *entries[1].DataFile().FirstRowID()) +} + func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() { // This prevents a regression that could be caused by using a schema cache // across multiple read/write operations of an avro file. While it may sound @@ -1529,11 +1594,11 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowI m.Require().True(ok, "expected v3 manifest file type") secondManifest, ok := list[1].(*manifestFile) m.Require().True(ok, "expected v3 manifest file type") - m.Require().NotNil(firstManifest.FirstRowId) - m.Require().NotNil(secondManifest.FirstRowId) + m.Require().NotNil(firstManifest.FirstRowID(), "first manifest should have first_row_id") + m.Require().NotNil(secondManifest.FirstRowID(), "second manifest should have first_row_id") - m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range - m.EqualValues(5015, *secondManifest.FirstRowId) + m.EqualValues(5000, *firstManifest.FirstRowID()) // start of first range + m.EqualValues(5015, *secondManifest.FirstRowID()) m.EqualValues(5022, *writer.NextRowID()) } diff --git a/metadata_columns.go b/metadata_columns.go new file mode 100644 index 000000000..f5cf44dc4 --- /dev/null +++ b/metadata_columns.go @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use it except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +// Row lineage metadata column field IDs (v3+). Reserved IDs are Integer.MAX_VALUE - 107 and 108 +// per the Iceberg spec (Metadata Columns / Row Lineage). +const ( + // RowIDFieldID is the field ID for _row_id (optional long). A unique long identifier for every row. + RowIDFieldID = 2147483540 + // LastUpdatedSequenceNumberFieldID is the field ID for _last_updated_sequence_number (optional long). + // The sequence number of the commit that last updated the row. + LastUpdatedSequenceNumberFieldID = 2147483539 +) + +// Row lineage metadata column names (v3+). +const ( + RowIDColumnName = "_row_id" + LastUpdatedSequenceNumberColumnName = "_last_updated_sequence_number" +) + +// RowID returns a NestedField for _row_id (optional long) for use in schemas that include row lineage. +func RowID() NestedField { + return NestedField{ + ID: RowIDFieldID, + Name: RowIDColumnName, + Required: false, + Doc: "Implicit row ID that is automatically assigned", + Type: Int64Type{}, + } +} + +// LastUpdatedSequenceNumber returns a NestedField for _last_updated_sequence_number (optional long). +func LastUpdatedSequenceNumber() NestedField { + return NestedField{ + ID: LastUpdatedSequenceNumberFieldID, + Name: LastUpdatedSequenceNumberColumnName, + Required: false, + Doc: "Sequence number when the row was last updated", + Type: Int64Type{}, + } +} + +// IsMetadataColumn returns true if the field ID is a reserved metadata column (e.g. row lineage). +func IsMetadataColumn(fieldID int) bool { + return fieldID == RowIDFieldID || fieldID == LastUpdatedSequenceNumberFieldID +} diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go index 30d56477f..7432d043b 100644 --- a/table/arrow_scanner.go +++ b/table/arrow_scanner.go @@ -40,6 +40,7 @@ import ( const ( ScanOptionArrowUseLargeTypes = "arrow.use_large_types" + ScanOptionRowLineageEnabled = "row_lineage.enabled" ) var PositionalDeleteArrowSchema, _ = SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false) @@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, fileSchema *iceberg.Sc return nil, false, nil } +// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number from task constants +// when those columns are present in the batch (e.g. from ToRequestedSchema). Per the Iceberg v3 +// row lineage spec: if the value is null in the file, it is inherited (synthesized) from the file's +// first_row_id and data_sequence_number; otherwise the value from the file is kept. +// rowOffset is the 0-based row index within the current file and is updated so _row_id stays +// correct across multiple batches from the same file (first_row_id + row_position). +func synthesizeRowLineageColumns( + ctx context.Context, + rowOffset *int64, + task FileScanTask, + batch arrow.RecordBatch, +) (arrow.RecordBatch, error) { + alloc := compute.GetAllocator(ctx) + schema := batch.Schema() + nrows := batch.NumRows() + + // Start from the existing columns; we'll replace the row lineage columns in-place + // when we need to synthesize values. + newCols := append([]arrow.Array(nil), batch.Columns()...) + + // Resolve column indices by name; -1 if not present. + rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName) + seqNumIndices := schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName) + rowIDColIdx := -1 + if len(rowIDIndices) > 0 { + rowIDColIdx = rowIDIndices[0] + } + seqNumColIdx := -1 + if len(seqNumIndices) > 0 { + seqNumColIdx = seqNumIndices[0] + } + + var toRelease []arrow.Array + + // _row_id: inherit first_row_id + row_position when null; else keep value from file. + if rowIDColIdx >= 0 && task.FirstRowID != nil { + if col, ok := newCols[rowIDColIdx].(*array.Int64); ok { + bldr := array.NewInt64Builder(alloc) + defer bldr.Release() + + bldr.Reserve(int(nrows)) + first := *task.FirstRowID + for k := int64(0); k < nrows; k++ { + if col.IsNull(int(k)) { + bldr.Append(first + *rowOffset + k) + } else { + bldr.Append(col.Value(int(k))) + } + } + + arr := bldr.NewArray() + newCols[rowIDColIdx] = arr + toRelease = append(toRelease, arr) + } + } + + // _last_updated_sequence_number: inherit file's data_sequence_number when null; else keep value from file. + if seqNumColIdx >= 0 && task.DataSequenceNumber != nil { + if col, ok := newCols[seqNumColIdx].(*array.Int64); ok { + bldr := array.NewInt64Builder(alloc) + defer bldr.Release() + + bldr.Reserve(int(nrows)) + seq := *task.DataSequenceNumber + for k := int64(0); k < nrows; k++ { + if col.IsNull(int(k)) { + bldr.Append(seq) + } else { + bldr.Append(col.Value(int(k))) + } + } + + arr := bldr.NewArray() + newCols[seqNumColIdx] = arr + toRelease = append(toRelease, arr) + } + } + + // Advance so the next batch from this file uses the correct row position for _row_id. + *rowOffset += nrows + + rec := array.NewRecordBatch(schema, newCols, nrows) + for _, c := range toRelease { + c.Release() + } + + return rec, nil +} + func (as *arrowScan) processRecords( ctx context.Context, task internal.Enumerated[FileScanTask], @@ -513,6 +603,22 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r, false, false, as.useLargeTypes) }) + // Row lineage: optionally fill _row_id and _last_updated_sequence_number from task + // constants when in projection. + rowLineageEnabled, err := strconv.ParseBool(as.options.Get(ScanOptionRowLineageEnabled, "true")) + if err != nil { + rowLineageEnabled = true + } + if rowLineageEnabled && (task.Value.FirstRowID != nil || task.Value.DataSequenceNumber != nil) { + var rowOffset int64 + taskVal := task.Value + pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) { + defer r.Release() + + return synthesizeRowLineageColumns(ctx, &rowOffset, taskVal, r) + }) + } + err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out) return err diff --git a/table/metadata_schema_compatibility.go b/table/metadata_schema_compatibility.go index 7d58a29dc..1d3f76ee9 100644 --- a/table/metadata_schema_compatibility.go +++ b/table/metadata_schema_compatibility.go @@ -36,10 +36,10 @@ func (e ErrIncompatibleSchema) Error() string { var problems strings.Builder for _, f := range e.fields { if f.UnsupportedType != nil { - problems.WriteString(fmt.Sprintf("\n- invalid type for %s: %s is not supported until v%d", f.ColName, f.Field.Type, f.UnsupportedType.MinFormatVersion)) + fmt.Fprintf(&problems, "\n- invalid type for %s: %s is not supported until v%d", f.ColName, f.Field.Type, f.UnsupportedType.MinFormatVersion) } if f.InvalidDefault != nil { - problems.WriteString(fmt.Sprintf("\n- invalid initial default for %s: non-null default (%v) is not supported until v%d", f.ColName, f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion)) + fmt.Fprintf(&problems, "\n- invalid initial default for %s: non-null default (%v) is not supported until v%d", f.ColName, f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion) } } diff --git a/table/scanner.go b/table/scanner.go index bc3c09027..365a5777e 100644 --- a/table/scanner.go +++ b/table/scanner.go @@ -474,12 +474,19 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) { if err != nil { return nil, err } - results = append(results, FileScanTask{ + task := FileScanTask{ File: e.DataFile(), DeleteFiles: deleteFiles, Start: 0, Length: e.DataFile().FileSizeBytes(), - }) + } + // Row lineage constants: readers use these to synthesize _row_id and + // _last_updated_sequence_number when requested. + task.FirstRowID = e.DataFile().FirstRowID() + if fseq := e.FileSequenceNum(); fseq != nil { + task.DataSequenceNumber = fseq + } + results = append(results, task) } return results, nil @@ -489,6 +496,12 @@ type FileScanTask struct { File iceberg.DataFile DeleteFiles []iceberg.DataFile Start, Length int64 + + // Row lineage (v3): constants used when reading to synthesize _row_id and _last_updated_sequence_number. + // FirstRowID is the effective first_row_id for this file (from manifest entry, after inheritance). + // DataSequenceNumber is the data sequence number of the file's manifest entry. + FirstRowID *int64 + DataSequenceNumber *int64 } // ToArrowRecords returns the arrow schema of the expected records and an interator diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go index 07ca97000..d5808728f 100644 --- a/table/scanner_internal_test.go +++ b/table/scanner_internal_test.go @@ -23,6 +23,10 @@ import ( "sync/atomic" "testing" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/iceberg-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -231,3 +235,63 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t *testing.T) { assert.ErrorIs(t, err, ErrPartitionSpecNotFound) assert.ErrorContains(t, err, "id 999") } + +// TestSynthesizeRowLineageColumns verifies that _row_id and _last_updated_sequence_number +// are filled from task constants when those columns are present and null. +func TestSynthesizeRowLineageColumns(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + ctx := compute.WithAllocator(t.Context(), mem) + defer mem.AssertSize(t, 0) + firstRowID := int64(1000) + dataSeqNum := int64(5) + task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber: &dataSeqNum} + rowOffset := int64(0) + + // Build a batch with a data column plus _row_id and _last_updated_sequence_number (all nulls). + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: iceberg.RowIDColumnName, Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: iceberg.LastUpdatedSequenceNumberColumnName, Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + }, + nil, + ) + const nrows = 3 + xBldr := array.NewInt64Builder(mem) + defer xBldr.Release() + xBldr.AppendValues([]int64{1, 2, 3}, nil) + rowIDBldr := array.NewInt64Builder(mem) + defer rowIDBldr.Release() + rowIDBldr.AppendNulls(nrows) + seqBldr := array.NewInt64Builder(mem) + defer seqBldr.Release() + seqBldr.AppendNulls(nrows) + + xArr := xBldr.NewArray() + rowIDArr := rowIDBldr.NewArray() + seqArr := seqBldr.NewArray() + batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr, seqArr}, nrows) + xArr.Release() + rowIDArr.Release() + seqArr.Release() + defer batch.Release() + + out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch) + require.NoError(t, err) + defer out.Release() + + // _row_id should be 1000, 1001, 1002 + rowIDCol := out.Column(1).(*array.Int64) + require.Equal(t, nrows, rowIDCol.Len()) + for i := 0; i < nrows; i++ { + assert.False(t, rowIDCol.IsNull(i), "row %d", i) + assert.EqualValues(t, 1000+int64(i), rowIDCol.Value(i), "row %d", i) + } + // _last_updated_sequence_number should be 5 for all + seqCol := out.Column(2).(*array.Int64) + for i := 0; i < nrows; i++ { + assert.False(t, seqCol.IsNull(i), "row %d", i) + assert.EqualValues(t, 5, seqCol.Value(i), "row %d", i) + } + assert.EqualValues(t, 3, rowOffset) +} diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 88e981ed4..308b98292 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -323,7 +323,7 @@ func manifestFirstRowIDForSnapshot(t *testing.T, manifests []iceberg.ManifestFil type manifestRowLineage struct { AddedSnapshotID int64 `json:"AddedSnapshotID"` - FirstRowID *int64 `json:"FirstRowId"` + FirstRowID *int64 `json:"FirstRowIDValue"` } for _, manifest := range manifests {