Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func (m *manifestFileV1) Partitions() []FieldSummary {
return *m.PartitionList
}

func (*manifestFileV1) FirstRowID() *int64 { return nil }

func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
return fetchManifestEntries(m, fs, discardDeleted)
}
Expand All @@ -323,7 +325,7 @@ type manifestFile struct {
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
FirstRowId *int64 `avro:"first_row_id"`
FirstRowIDValue *int64 `avro:"first_row_id"`

version int `avro:"-"`
}
Expand Down Expand Up @@ -400,6 +402,8 @@ func (m *manifestFile) Partitions() []FieldSummary {
return *m.PartitionList
}

func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue }

func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
Expand Down Expand Up @@ -520,6 +524,9 @@ type ManifestFile interface {
// field in the spec. Each field in the list corresponds to a field in
// the manifest file's partition spec.
Partitions() []FieldSummary
// FirstRowID returns the first _row_id assigned to rows in this manifest (v3+ data manifests only).
// Returns nil for v1/v2 or for delete manifests.
FirstRowID() *int64

// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
HasAddedFiles() bool
Expand Down Expand Up @@ -595,6 +602,14 @@ type ManifestReader struct {
schemaLoaded bool
partitionSpec PartitionSpec
partitionSpecLoaded bool

// inheritRowIDs controls whether this reader should apply First Row ID inheritance
// for v3 data manifests (spec: First Row ID Inheritance).
inheritRowIDs bool
// nextFirstRowID tracks the next first_row_id to assign when reading v3 data
// manifests; used for First Row ID inheritance (null data file first_row_id
// gets manifest's first_row_id + sum of preceding files' record_count).
nextFirstRowID int64
}

// NewManifestReader returns a value that can read the contents of an avro manifest
Expand Down Expand Up @@ -654,15 +669,25 @@ func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error)
}
fieldNameToID, fieldIDToType, fieldIDToSize := getFieldIDMap(sc)

inheritRowIDs := formatVersion >= 3 &&
content == ManifestContentData &&
file.FirstRowID() != nil
var nextFirstRowID int64
if inheritRowIDs {
nextFirstRowID = *file.FirstRowID()
}

return &ManifestReader{
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
fieldIDToSize: fieldIDToSize,
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
fieldIDToSize: fieldIDToSize,
inheritRowIDs: inheritRowIDs,
nextFirstRowID: nextFirstRowID,
}, nil
}

Expand Down Expand Up @@ -764,6 +789,17 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
tmp = tmp.(*fallbackManifestEntry).toEntry()
}
tmp.inherit(c.file)
// Apply first_row_id inheritance for v3 data manifests (spec: First Row ID Inheritance).
if c.inheritRowIDs {
if df, ok := tmp.DataFile().(*dataFile); ok {
if df.FirstRowIDField == nil {
id := c.nextFirstRowID
df.FirstRowIDField = &id
}
// Advance for every data file, null or explicit, to match Java semantics.
c.nextFirstRowID += df.Count()
}
}
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
Expand Down Expand Up @@ -1441,10 +1477,10 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
wrapped := *(file.(*manifestFile))
if m.version == 3 {
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
if wrapped.Content == ManifestContentData && wrapped.FirstRowIDValue == nil {
if m.nextRowID != nil {
firstRowID := *m.nextRowID
wrapped.FirstRowId = &firstRowID
wrapped.FirstRowIDValue = &firstRowID
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
}
}
Expand Down
79 changes: 72 additions & 7 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,15 @@ type ManifestTestSuite struct {
}

func (m *ManifestTestSuite) writeManifestList() {
m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1))
err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)
m.Require().NoError(err)
unassignedSequenceNum := int64(-1)
m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2))
err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)
m.Require().NoError(err)
v3SequenceNum := int64(5)
firstRowID := int64(1000)
m.Require().NoError(WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3))
err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)
m.Require().NoError(err)
}

func (m *ManifestTestSuite) writeManifestEntries() {
Expand Down Expand Up @@ -760,13 +763,75 @@ func (m *ManifestTestSuite) TestReadManifestListV3() {
m.Nil(list[0].KeyMetadata())
m.Zero(list[0].PartitionSpecID())

// V3 manifest list assigns first_row_id to data manifests
m.Require().NotNil(list[0].FirstRowID(), "v3 data manifest should have first_row_id")
m.EqualValues(1000, *list[0].FirstRowID())

part := list[0].Partitions()[0]
m.True(part.ContainsNull)
m.False(*part.ContainsNaN)
m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
}

func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() {
// Build a v3 data manifest with two entries that have null first_row_id.
partitionSpec := NewPartitionSpecID(1,
PartitionField{FieldID: 1000, SourceID: 1, Name: "x", Transform: IdentityTransform{}})
firstCount, secondCount := int64(10), int64(20)
entriesWithNullFirstRowID := []ManifestEntry{
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file1.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(1)},
RecordCount: firstCount,
FileSize: 1000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil, // null so reader will inherit
},
},
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file2.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(2)},
RecordCount: secondCount,
FileSize: 2000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil,
},
},
}
var manifestBuf bytes.Buffer
_, err := WriteManifest("/manifest.avro", &manifestBuf, 3, partitionSpec, testSchema, entrySnapshotID, entriesWithNullFirstRowID)
m.Require().NoError(err)

manifestFirstRowID := int64(1000)
file := &manifestFile{
version: 3,
Path: "/manifest.avro",
Content: ManifestContentData,
FirstRowIDValue: &manifestFirstRowID,
}
entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false)
m.Require().NoError(err)
m.Require().Len(entries, 2)

// First entry gets manifest's first_row_id
m.Require().NotNil(entries[0].DataFile().FirstRowID())
m.EqualValues(1000, *entries[0].DataFile().FirstRowID())
// Second entry gets previous + previous file's record_count
m.Require().NotNil(entries[1].DataFile().FirstRowID())
m.EqualValues(1000+firstCount, *entries[1].DataFile().FirstRowID())
}

func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
// This prevents a regression that could be caused by using a schema cache
// across multiple read/write operations of an avro file. While it may sound
Expand Down Expand Up @@ -1529,11 +1594,11 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowI
m.Require().True(ok, "expected v3 manifest file type")
secondManifest, ok := list[1].(*manifestFile)
m.Require().True(ok, "expected v3 manifest file type")
m.Require().NotNil(firstManifest.FirstRowId)
m.Require().NotNil(secondManifest.FirstRowId)
m.Require().NotNil(firstManifest.FirstRowID(), "first manifest should have first_row_id")
m.Require().NotNil(secondManifest.FirstRowID(), "second manifest should have first_row_id")

m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
m.EqualValues(5015, *secondManifest.FirstRowId)
m.EqualValues(5000, *firstManifest.FirstRowID()) // start of first range
m.EqualValues(5015, *secondManifest.FirstRowID())
m.EqualValues(5022, *writer.NextRowID())
}

Expand Down
61 changes: 61 additions & 0 deletions metadata_columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use it except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package iceberg

// Row lineage metadata column field IDs (v3+). Reserved IDs are Integer.MAX_VALUE - 107 and 108
// per the Iceberg spec (Metadata Columns / Row Lineage).
const (
// RowIDFieldID is the field ID for _row_id (optional long). A unique long identifier for every row.
RowIDFieldID = 2147483540
// LastUpdatedSequenceNumberFieldID is the field ID for _last_updated_sequence_number (optional long).
// The sequence number of the commit that last updated the row.
LastUpdatedSequenceNumberFieldID = 2147483539
)

// Row lineage metadata column names (v3+).
const (
RowIDColumnName = "_row_id"
LastUpdatedSequenceNumberColumnName = "_last_updated_sequence_number"
)

// RowID returns a NestedField for _row_id (optional long) for use in schemas that include row lineage.
func RowID() NestedField {
return NestedField{
ID: RowIDFieldID,
Name: RowIDColumnName,
Required: false,
Doc: "Implicit row ID that is automatically assigned",
Type: Int64Type{},
}
}

// LastUpdatedSequenceNumber returns a NestedField for _last_updated_sequence_number (optional long).
func LastUpdatedSequenceNumber() NestedField {
return NestedField{
ID: LastUpdatedSequenceNumberFieldID,
Name: LastUpdatedSequenceNumberColumnName,
Required: false,
Doc: "Sequence number when the row was last updated",
Type: Int64Type{},
}
}

// IsMetadataColumn returns true if the field ID is a reserved metadata column (e.g. row lineage).
func IsMetadataColumn(fieldID int) bool {
return fieldID == RowIDFieldID || fieldID == LastUpdatedSequenceNumberFieldID
}
Loading
Loading