Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func (m *manifestFileV1) Partitions() []FieldSummary {
return *m.PartitionList
}

func (*manifestFileV1) FirstRowID() *int64 { return nil }

func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
Expand All @@ -323,7 +325,7 @@ type manifestFile struct {
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
FirstRowId *int64 `avro:"first_row_id"`
FirstRowIDValue *int64 `avro:"first_row_id"`

version int `avro:"-"`
}
Expand Down Expand Up @@ -400,6 +402,8 @@ func (m *manifestFile) Partitions() []FieldSummary {
return *m.PartitionList
}

func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue }

func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
Expand Down Expand Up @@ -520,6 +524,9 @@ type ManifestFile interface {
// field in the spec. Each field in the list corresponds to a field in
// the manifest file's partition spec.
Partitions() []FieldSummary
// FirstRowID returns the first _row_id assigned to rows in this manifest (v3+ data manifests only).
// Returns nil for v1/v2 or for delete manifests.
FirstRowID() *int64

// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
HasAddedFiles() bool
Expand Down Expand Up @@ -595,6 +602,14 @@ type ManifestReader struct {
schemaLoaded bool
partitionSpec PartitionSpec
partitionSpecLoaded bool

// inheritRowIDs controls whether this reader should apply First Row ID inheritance
// for v3 data manifests (spec: First Row ID Inheritance).
inheritRowIDs bool
// nextFirstRowID tracks the next first_row_id to assign when reading v3 data
// manifests; used for First Row ID inheritance (null data file first_row_id
// gets manifest's first_row_id + sum of preceding files' record_count).
nextFirstRowID int64
}

// NewManifestReader returns a value that can read the contents of an avro manifest
Expand Down Expand Up @@ -654,15 +669,25 @@ func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error)
}
fieldNameToID, fieldIDToType, fieldIDToSize := getFieldIDMap(sc)

inheritRowIDs := formatVersion >= 3 &&
content == ManifestContentData &&
file.FirstRowID() != nil
var nextFirstRowID int64
if inheritRowIDs {
nextFirstRowID = *file.FirstRowID()
}

return &ManifestReader{
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
fieldIDToSize: fieldIDToSize,
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
fieldIDToSize: fieldIDToSize,
inheritRowIDs: inheritRowIDs,
nextFirstRowID: nextFirstRowID,
}, nil
}

Expand Down Expand Up @@ -764,6 +789,17 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
tmp = tmp.(*fallbackManifestEntry).toEntry()
}
tmp.inherit(c.file)
// Apply first_row_id inheritance for v3 data manifests (spec: First Row ID Inheritance).
if c.inheritRowIDs {
if df, ok := tmp.DataFile().(*dataFile); ok {
if df.FirstRowIDField == nil {
id := c.nextFirstRowID
df.FirstRowIDField = &id
}
// Advance for every data file, null or explicit, to match Java semantics.
c.nextFirstRowID += df.Count()
}
}
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
Expand Down Expand Up @@ -1441,10 +1477,10 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
wrapped := *(file.(*manifestFile))
if m.version == 3 {
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
if wrapped.Content == ManifestContentData && wrapped.FirstRowIDValue == nil {
if m.nextRowID != nil {
firstRowID := *m.nextRowID
wrapped.FirstRowId = &firstRowID
wrapped.FirstRowIDValue = &firstRowID
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
}
}
Expand Down
79 changes: 72 additions & 7 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,15 @@ type ManifestTestSuite struct {
}

func (m *ManifestTestSuite) writeManifestList() {
m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1))
err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)
m.Require().NoError(err)
unassignedSequenceNum := int64(-1)
m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2))
err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)
m.Require().NoError(err)
v3SequenceNum := int64(5)
firstRowID := int64(1000)
m.Require().NoError(WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3))
err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)
m.Require().NoError(err)
}

func (m *ManifestTestSuite) writeManifestEntries() {
Expand Down Expand Up @@ -760,13 +763,75 @@ func (m *ManifestTestSuite) TestReadManifestListV3() {
m.Nil(list[0].KeyMetadata())
m.Zero(list[0].PartitionSpecID())

// V3 manifest list assigns first_row_id to data manifests
m.Require().NotNil(list[0].FirstRowID(), "v3 data manifest should have first_row_id")
m.EqualValues(1000, *list[0].FirstRowID())

part := list[0].Partitions()[0]
m.True(part.ContainsNull)
m.False(*part.ContainsNaN)
m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
}

func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() {
// Build a v3 data manifest with two entries that have null first_row_id.
partitionSpec := NewPartitionSpecID(1,
PartitionField{FieldID: 1000, SourceID: 1, Name: "x", Transform: IdentityTransform{}})
firstCount, secondCount := int64(10), int64(20)
entriesWithNullFirstRowID := []ManifestEntry{
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file1.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(1)},
RecordCount: firstCount,
FileSize: 1000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil, // null so reader will inherit
},
},
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file2.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(2)},
RecordCount: secondCount,
FileSize: 2000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil,
},
},
}
var manifestBuf bytes.Buffer
_, err := WriteManifest("/manifest.avro", &manifestBuf, 3, partitionSpec, testSchema, entrySnapshotID, entriesWithNullFirstRowID)
m.Require().NoError(err)

manifestFirstRowID := int64(1000)
file := &manifestFile{
version: 3,
Path: "/manifest.avro",
Content: ManifestContentData,
FirstRowIDValue: &manifestFirstRowID,
}
entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false)
m.Require().NoError(err)
m.Require().Len(entries, 2)

// First entry gets manifest's first_row_id
m.Require().NotNil(entries[0].DataFile().FirstRowID())
m.EqualValues(1000, *entries[0].DataFile().FirstRowID())
// Second entry gets previous + previous file's record_count
m.Require().NotNil(entries[1].DataFile().FirstRowID())
m.EqualValues(1000+firstCount, *entries[1].DataFile().FirstRowID())
}

func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
// This prevents a regression that could be caused by using a schema cache
// across multiple read/write operations of an avro file. While it may sound
Expand Down Expand Up @@ -1529,11 +1594,11 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowI
m.Require().True(ok, "expected v3 manifest file type")
secondManifest, ok := list[1].(*manifestFile)
m.Require().True(ok, "expected v3 manifest file type")
m.Require().NotNil(firstManifest.FirstRowId)
m.Require().NotNil(secondManifest.FirstRowId)
m.Require().NotNil(firstManifest.FirstRowID(), "first manifest should have first_row_id")
m.Require().NotNil(secondManifest.FirstRowID(), "second manifest should have first_row_id")

m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
m.EqualValues(5015, *secondManifest.FirstRowId)
m.EqualValues(5000, *firstManifest.FirstRowID()) // start of first range
m.EqualValues(5015, *secondManifest.FirstRowID())
m.EqualValues(5022, *writer.NextRowID())
}

Expand Down
61 changes: 61 additions & 0 deletions metadata_columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use it except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iceberg

// Row lineage metadata column field IDs (v3+). Reserved IDs are Integer.MAX_VALUE - 107 and 108
// per the Iceberg spec (Metadata Columns / Row Lineage).
const (
// RowIDFieldID is the field ID for _row_id (optional long). A unique long identifier for every row.
RowIDFieldID = 2147483540
// LastUpdatedSequenceNumberFieldID is the field ID for _last_updated_sequence_number (optional long).
// The sequence number of the commit that last updated the row.
LastUpdatedSequenceNumberFieldID = 2147483539
)

// Row lineage metadata column names (v3+).
const (
RowIDColumnName = "_row_id"
LastUpdatedSequenceNumberColumnName = "_last_updated_sequence_number"
)

// RowID returns a NestedField for _row_id (optional long) for use in schemas that include row lineage.
func RowID() NestedField {
return NestedField{
ID: RowIDFieldID,
Name: RowIDColumnName,
Required: false,
Doc: "Implicit row ID that is automatically assigned",
Type: Int64Type{},
}
}

// LastUpdatedSequenceNumber returns a NestedField for _last_updated_sequence_number (optional long).
func LastUpdatedSequenceNumber() NestedField {
return NestedField{
ID: LastUpdatedSequenceNumberFieldID,
Name: LastUpdatedSequenceNumberColumnName,
Required: false,
Doc: "Sequence number when the row was last updated",
Type: Int64Type{},
}
}

// IsMetadataColumn returns true if the field ID is a reserved metadata column (e.g. row lineage).
func IsMetadataColumn(fieldID int) bool {
return fieldID == RowIDFieldID || fieldID == LastUpdatedSequenceNumberFieldID
}
96 changes: 96 additions & 0 deletions table/arrow_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,91 @@ func (as *arrowScan) getRecordFilter(ctx context.Context, fileSchema *iceberg.Sc
return nil, false, nil
}

// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number from task constants
// when those columns are present in the batch (e.g. from ToRequestedSchema). Per the Iceberg v3
// row lineage spec: if the value is null in the file, it is inherited (synthesized) from the file's
// first_row_id and data_sequence_number; otherwise the value from the file is kept.
// rowOffset is the 0-based row index within the current file and is updated so _row_id stays
// correct across multiple batches from the same file (first_row_id + row_position).
func synthesizeRowLineageColumns(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same root cause as #762 — bldr.NewArray() starts at refcount=1, array.NewRecordBatch retains to refcount=2, but the local refs in newCols are never released. Fix needs a release loop after the batch is created:

  rec := array.NewRecordBatch(schema, newCols, nrows)
  for _, c := range newCols {
      c.Release()
  }
  return rec, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out this . I see that the PR 762 has been approved and waiting to be merged. Let me know once it lands in main so that I can rebase and apply the fix for this PR

ctx context.Context,
rowOffset *int64,
task FileScanTask,
batch arrow.RecordBatch,
) (arrow.RecordBatch, error) {
alloc := compute.GetAllocator(ctx)
schema := batch.Schema()
ncols := int(batch.NumCols())
nrows := batch.NumRows()
newCols := make([]arrow.Array, ncols)

// Resolve column indices by name; -1 if not present.
rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
seqNumIndices := schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
rowIDColIdx := -1
if len(rowIDIndices) > 0 {
rowIDColIdx = rowIDIndices[0]
}
seqNumColIdx := -1
if len(seqNumIndices) > 0 {
seqNumColIdx = seqNumIndices[0]
}

for i := 0; i < ncols; i++ {
if i == rowIDColIdx && task.FirstRowID != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, if we already know the rowIDColIdx to compare against, why do we need the loop?

// _row_id: inherit first_row_id + row_position when null; else keep value from file.
if col, ok := batch.Column(i).(*array.Int64); ok {
bldr := array.NewInt64Builder(alloc)
first := *task.FirstRowID
for k := int64(0); k < nrows; k++ {
if col.IsNull(int(k)) {
bldr.Append(first + *rowOffset + k)
} else {
bldr.Append(col.Value(int(k)))
}
}
newCols[i] = bldr.NewArray()
bldr.Release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse the builder and use Reserve since we already know the size we're gonna append


continue
}
}

if i == seqNumColIdx && task.DataSequenceNumber != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, why do we need to loop and find the column if we already know seqNumColIdx to look for?

// _last_updated_sequence_number: inherit file's data_sequence_number when null; else keep value from file.
if col, ok := batch.Column(i).(*array.Int64); ok {
bldr := array.NewInt64Builder(alloc)
seq := *task.DataSequenceNumber
for k := int64(0); k < nrows; k++ {
if col.IsNull(int(k)) {
bldr.Append(seq)
} else {
bldr.Append(col.Value(int(k)))
}
}
newCols[i] = bldr.NewArray()
bldr.Release()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above


continue
}
}

col := batch.Column(i)
col.Retain()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like an extra Retain that isn't necessary, since you're going to add them to the record batch, you can just defer col.Release() the new columns you build instead of needing the release loop below

newCols[i] = col
}

// Advance so the next batch from this file uses the correct row position for _row_id.
*rowOffset += nrows

rec := array.NewRecordBatch(schema, newCols, nrows)
for _, c := range newCols {
c.Release()
}

return rec, nil
}

func (as *arrowScan) processRecords(
ctx context.Context,
task internal.Enumerated[FileScanTask],
Expand Down Expand Up @@ -513,6 +598,17 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat
return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r, false, false, as.useLargeTypes)
})

// Row lineage (v3): fill _row_id and _last_updated_sequence_number from task constants when in projection.
if task.Value.FirstRowID != nil || task.Value.DataSequenceNumber != nil {
var rowOffset int64
taskVal := task.Value
pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
defer r.Release()

return synthesizeRowLineageColumns(ctx, &rowOffset, taskVal, r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't double checked the spec, but should the row lineage columns be toggleable via a setting? i.e. a way to turn them off if you don't want them to show up in the results?

})
}

err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, pipeline, out)

return err
Expand Down
Loading
Loading