Skip to content

Commit 4da5bf5

Browse files
committed
Add support for row lineage in v3
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
1 parent 7b2ea9b commit 4da5bf5

File tree

7 files changed

+339
-18
lines changed

7 files changed

+339
-18
lines changed

manifest.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,8 @@ func (m *manifestFileV1) Partitions() []FieldSummary {
303303
return *m.PartitionList
304304
}
305305

306+
func (*manifestFileV1) FirstRowId() *int64 { return nil }
307+
306308
func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
307309
return fetchManifestEntries(m, fs, discardDeleted)
308310
}
@@ -323,7 +325,7 @@ type manifestFile struct {
323325
DeletedRowsCount int64 `avro:"deleted_rows_count"`
324326
PartitionList *[]FieldSummary `avro:"partitions"`
325327
Key []byte `avro:"key_metadata"`
326-
FirstRowId *int64 `avro:"first_row_id"`
328+
FirstRowID *int64 `avro:"first_row_id"`
327329

328330
version int `avro:"-"`
329331
}
@@ -400,6 +402,8 @@ func (m *manifestFile) Partitions() []FieldSummary {
400402
return *m.PartitionList
401403
}
402404

405+
func (m *manifestFile) FirstRowId() *int64 { return m.FirstRowID }
406+
403407
func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
404408
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
405409
func (m *manifestFile) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
@@ -520,6 +524,9 @@ type ManifestFile interface {
520524
// field in the spec. Each field in the list corresponds to a field in
521525
// the manifest file's partition spec.
522526
Partitions() []FieldSummary
527+
// FirstRowId returns the first _row_id assigned to rows in this manifest (v3+ data manifests only).
528+
// Returns nil for v1/v2 or for delete manifests.
529+
FirstRowId() *int64
523530

524531
// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
525532
HasAddedFiles() bool
@@ -595,6 +602,11 @@ type ManifestReader struct {
595602
schemaLoaded bool
596603
partitionSpec PartitionSpec
597604
partitionSpecLoaded bool
605+
606+
// nextFirstRowID tracks the next first_row_id to assign when reading v3 data
607+
// manifests; used for First Row ID inheritance (null data file first_row_id
608+
// gets manifest's first_row_id + sum of preceding null-first_row_id files' record_count).
609+
nextFirstRowID *int64
598610
}
599611

600612
// NewManifestReader returns a value that can read the contents of an avro manifest
@@ -759,6 +771,19 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
759771
tmp = tmp.(*fallbackManifestEntry).toEntry()
760772
}
761773
tmp.inherit(c.file)
774+
// Apply first_row_id inheritance for v3 data manifests (spec: First Row ID Inheritance).
775+
if c.content == ManifestContentData && c.file.FirstRowId() != nil {
776+
if df, ok := tmp.DataFile().(*dataFile); ok && df.FirstRowIDField == nil {
777+
if c.nextFirstRowID == nil {
778+
c.nextFirstRowID = new(int64)
779+
*c.nextFirstRowID = *c.file.FirstRowId()
780+
}
781+
id := new(int64)
782+
*id = *c.nextFirstRowID
783+
df.FirstRowIDField = id
784+
*c.nextFirstRowID += df.Count()
785+
}
786+
}
762787
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
763788
fieldToIDMap.setFieldNameToIDMap(c.fieldNameToID)
764789
fieldToIDMap.setFieldIDToLogicalTypeMap(c.fieldIDToType)
@@ -1404,10 +1429,10 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
14041429
wrapped := *(file.(*manifestFile))
14051430
if m.version == 3 {
14061431
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
1407-
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
1432+
if wrapped.Content == ManifestContentData && wrapped.FirstRowID == nil {
14081433
if m.nextRowID != nil {
14091434
firstRowID := *m.nextRowID
1410-
wrapped.FirstRowId = &firstRowID
1435+
wrapped.FirstRowID = &firstRowID
14111436
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
14121437
}
14131438
}
@@ -1445,32 +1470,40 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
14451470
}
14461471

14471472
// WriteManifestList writes a list of manifest files to an avro file.
1448-
func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, firstRowId int64, files []ManifestFile) (err error) {
1473+
// For v3 tables, it returns the next row ID after assigning first_row_id to data manifests; otherwise 0.
1474+
func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, firstRowId int64, files []ManifestFile) (nextRowIDAfter int64, err error) {
14491475
var writer *ManifestListWriter
14501476

14511477
switch version {
14521478
case 1:
14531479
writer, err = NewManifestListWriterV1(out, snapshotID, parentSnapshotID)
14541480
case 2:
14551481
if sequenceNumber == nil {
1456-
return errors.New("sequence number is required for V2 tables")
1482+
return 0, errors.New("sequence number is required for V2 tables")
14571483
}
14581484
writer, err = NewManifestListWriterV2(out, snapshotID, *sequenceNumber, parentSnapshotID)
14591485
case 3:
14601486
if sequenceNumber == nil {
1461-
return errors.New("sequence number is required for V3 tables")
1487+
return 0, errors.New("sequence number is required for V3 tables")
14621488
}
14631489
writer, err = NewManifestListWriterV3(out, snapshotID, *sequenceNumber, firstRowId, parentSnapshotID)
14641490
default:
1465-
return fmt.Errorf("unsupported manifest version: %d", version)
1491+
return 0, fmt.Errorf("unsupported manifest version: %d", version)
14661492
}
14671493

14681494
if err != nil {
1469-
return err
1495+
return 0, err
14701496
}
14711497
defer internal.CheckedClose(writer, &err)
14721498

1473-
return writer.AddManifests(files)
1499+
if err = writer.AddManifests(files); err != nil {
1500+
return 0, err
1501+
}
1502+
if version == 3 && writer.NextRowID() != nil {
1503+
nextRowIDAfter = *writer.NextRowID()
1504+
}
1505+
1506+
return nextRowIDAfter, nil
14741507
}
14751508

14761509
func WriteManifest(

manifest_test.go

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -481,12 +481,15 @@ type ManifestTestSuite struct {
481481
}
482482

483483
func (m *ManifestTestSuite) writeManifestList() {
484-
m.Require().NoError(WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1))
484+
_, err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)
485+
m.Require().NoError(err)
485486
unassignedSequenceNum := int64(-1)
486-
m.Require().NoError(WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2))
487+
_, err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)
488+
m.Require().NoError(err)
487489
v3SequenceNum := int64(5)
488490
firstRowID := int64(1000)
489-
m.Require().NoError(WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3))
491+
_, err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)
492+
m.Require().NoError(err)
490493
}
491494

492495
func (m *ManifestTestSuite) writeManifestEntries() {
@@ -760,13 +763,75 @@ func (m *ManifestTestSuite) TestReadManifestListV3() {
760763
m.Nil(list[0].KeyMetadata())
761764
m.Zero(list[0].PartitionSpecID())
762765

766+
// V3 manifest list assigns first_row_id to data manifests
767+
m.Require().NotNil(list[0].FirstRowId(), "v3 data manifest should have first_row_id")
768+
m.EqualValues(1000, *list[0].FirstRowId())
769+
763770
part := list[0].Partitions()[0]
764771
m.True(part.ContainsNull)
765772
m.False(*part.ContainsNaN)
766773
m.Equal([]byte{0x01, 0x00, 0x00, 0x00}, *part.LowerBound)
767774
m.Equal([]byte{0x02, 0x00, 0x00, 0x00}, *part.UpperBound)
768775
}
769776

777+
func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() {
778+
// Build a v3 data manifest with two entries that have null first_row_id.
779+
partitionSpec := NewPartitionSpecID(1,
780+
PartitionField{FieldID: 1000, SourceID: 1, Name: "x", Transform: IdentityTransform{}})
781+
firstCount, secondCount := int64(10), int64(20)
782+
entriesWithNullFirstRowID := []ManifestEntry{
783+
&manifestEntry{
784+
EntryStatus: EntryStatusADDED,
785+
Snapshot: &entrySnapshotID,
786+
Data: &dataFile{
787+
Content: EntryContentData,
788+
Path: "/data/file1.parquet",
789+
Format: ParquetFile,
790+
PartitionData: map[string]any{"x": int(1)},
791+
RecordCount: firstCount,
792+
FileSize: 1000,
793+
BlockSizeInBytes: 64 * 1024,
794+
FirstRowIDField: nil, // null so reader will inherit
795+
},
796+
},
797+
&manifestEntry{
798+
EntryStatus: EntryStatusADDED,
799+
Snapshot: &entrySnapshotID,
800+
Data: &dataFile{
801+
Content: EntryContentData,
802+
Path: "/data/file2.parquet",
803+
Format: ParquetFile,
804+
PartitionData: map[string]any{"x": int(2)},
805+
RecordCount: secondCount,
806+
FileSize: 2000,
807+
BlockSizeInBytes: 64 * 1024,
808+
FirstRowIDField: nil,
809+
},
810+
},
811+
}
812+
var manifestBuf bytes.Buffer
813+
_, err := WriteManifest("/manifest.avro", &manifestBuf, 3, partitionSpec, testSchema, entrySnapshotID, entriesWithNullFirstRowID)
814+
m.Require().NoError(err)
815+
816+
manifestFirstRowID := int64(1000)
817+
file := &manifestFile{
818+
version: 3,
819+
Path: "/manifest.avro",
820+
Content: ManifestContentData,
821+
FirstRowID: &manifestFirstRowID,
822+
}
823+
entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false)
824+
m.Require().NoError(err)
825+
m.Require().Len(entries, 2)
826+
827+
// First entry gets manifest's first_row_id
828+
m.Require().NotNil(entries[0].DataFile().FirstRowID())
829+
m.EqualValues(1000, *entries[0].DataFile().FirstRowID())
830+
// Second entry gets previous + previous file's record_count
831+
m.Require().NotNil(entries[1].DataFile().FirstRowID())
832+
m.EqualValues(1000+firstCount, *entries[1].DataFile().FirstRowID())
833+
}
834+
770835
func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
771836
// This prevents a regression that could be caused by using a schema cache
772837
// across multiple read/write operations of an avro file. While it may sound
@@ -779,7 +844,7 @@ func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
779844
// any cache. (Note: if working correctly, this will have no such side effect.)
780845
var buf bytes.Buffer
781846
seqNum := int64(9876)
782-
err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{
847+
_, err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{
783848
NewManifestFile(2, "s3://bucket/namespace/table/metadata/abcd-0123.avro", 99, 0, 1234).Build(),
784849
})
785850
m.NoError(err)
@@ -1550,7 +1615,7 @@ func (m *ManifestTestSuite) TestWriteManifestListClosesWriterOnError() {
15501615
m.Require().NoError(writer.Close())
15511616

15521617
out := &limitedWriter{limit: header.Len(), err: errLimitedWrite}
1553-
err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0, []ManifestFile{
1618+
_, err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0, []ManifestFile{
15541619
manifestFileRecordsV2[0],
15551620
manifestFileRecordsV1[0],
15561621
})

metadata_columns.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use it except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package iceberg
19+
20+
// Row lineage metadata column field IDs (v3+). Reserved IDs are Integer.MAX_VALUE - 107 and 108
21+
// per the Iceberg spec (Metadata Columns / Row Lineage).
22+
const (
23+
// RowIDFieldID is the field ID for _row_id (optional long). A unique long identifier for every row.
24+
RowIDFieldID = 2147483540
25+
// LastUpdatedSequenceNumberFieldID is the field ID for _last_updated_sequence_number (optional long).
26+
// The sequence number of the commit that last updated the row.
27+
LastUpdatedSequenceNumberFieldID = 2147483539
28+
)
29+
30+
// Row lineage metadata column names (v3+).
31+
const (
32+
RowIDColumnName = "_row_id"
33+
LastUpdatedSequenceNumberColumnName = "_last_updated_sequence_number"
34+
)
35+
36+
// RowID returns a NestedField for _row_id (optional long) for use in schemas that include row lineage.
37+
func RowID() NestedField {
38+
return NestedField{
39+
ID: RowIDFieldID,
40+
Name: RowIDColumnName,
41+
Required: false,
42+
Doc: "Implicit row ID that is automatically assigned",
43+
Type: Int64Type{},
44+
}
45+
}
46+
47+
// LastUpdatedSequenceNumber returns a NestedField for _last_updated_sequence_number (optional long).
48+
func LastUpdatedSequenceNumber() NestedField {
49+
return NestedField{
50+
ID: LastUpdatedSequenceNumberFieldID,
51+
Name: LastUpdatedSequenceNumberColumnName,
52+
Required: false,
53+
Doc: "Sequence number when the row was last updated",
54+
Type: Int64Type{},
55+
}
56+
}
57+
58+
// IsMetadataColumn returns true if the field ID is a reserved metadata column (e.g. row lineage).
59+
func IsMetadataColumn(fieldID int) bool {
60+
return fieldID == RowIDFieldID || fieldID == LastUpdatedSequenceNumberFieldID
61+
}

0 commit comments

Comments
 (0)