Skip to content

Commit 3b3c7e2

Browse files
committed
Fixes from codereview
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
1 parent 61787dd commit 3b3c7e2

File tree

5 files changed

+52
-47
lines changed

5 files changed

+52
-47
lines changed

manifest.go

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (m *manifestFileV1) Partitions() []FieldSummary {
303303
return *m.PartitionList
304304
}
305305

306-
func (*manifestFileV1) FirstRowId() *int64 { return nil }
306+
func (*manifestFileV1) FirstRowID() *int64 { return nil }
307307

308308
func (m *manifestFileV1) FetchEntries(fs iceio.IO, discardDeleted bool) ([]ManifestEntry, error) {
309309
return fetchManifestEntries(m, fs, discardDeleted)
@@ -325,7 +325,7 @@ type manifestFile struct {
325325
DeletedRowsCount int64 `avro:"deleted_rows_count"`
326326
PartitionList *[]FieldSummary `avro:"partitions"`
327327
Key []byte `avro:"key_metadata"`
328-
FirstRowID *int64 `avro:"first_row_id"`
328+
FirstRowIDValue *int64 `avro:"first_row_id"`
329329

330330
version int `avro:"-"`
331331
}
@@ -402,7 +402,7 @@ func (m *manifestFile) Partitions() []FieldSummary {
402402
return *m.PartitionList
403403
}
404404

405-
func (m *manifestFile) FirstRowId() *int64 { return m.FirstRowID }
405+
func (m *manifestFile) FirstRowID() *int64 { return m.FirstRowIDValue }
406406

407407
func (m *manifestFile) HasAddedFiles() bool { return m.AddedFilesCount != 0 }
408408
func (m *manifestFile) HasExistingFiles() bool { return m.ExistingFilesCount != 0 }
@@ -524,9 +524,9 @@ type ManifestFile interface {
524524
// field in the spec. Each field in the list corresponds to a field in
525525
// the manifest file's partition spec.
526526
Partitions() []FieldSummary
527-
// FirstRowId returns the first _row_id assigned to rows in this manifest (v3+ data manifests only).
527+
// FirstRowID returns the first _row_id assigned to rows in this manifest (v3+ data manifests only).
528528
// Returns nil for v1/v2 or for delete manifests.
529-
FirstRowId() *int64
529+
FirstRowID() *int64
530530

531531
// HasAddedFiles returns true if AddedDataFiles > 0 or if it was null.
532532
HasAddedFiles() bool
@@ -603,10 +603,13 @@ type ManifestReader struct {
603603
partitionSpec PartitionSpec
604604
partitionSpecLoaded bool
605605

606+
// inheritRowIDs controls whether this reader should apply First Row ID inheritance
607+
// for v3 data manifests (spec: First Row ID Inheritance).
608+
inheritRowIDs bool
606609
// nextFirstRowID tracks the next first_row_id to assign when reading v3 data
607610
// manifests; used for First Row ID inheritance (null data file first_row_id
608-
// gets manifest's first_row_id + sum of preceding null-first_row_id files' record_count).
609-
nextFirstRowID *int64
611+
// gets manifest's first_row_id + sum of preceding files' record_count).
612+
nextFirstRowID int64
610613
}
611614

612615
// NewManifestReader returns a value that can read the contents of an avro manifest
@@ -669,15 +672,25 @@ func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error)
669672
}
670673
fieldNameToID, fieldIDToType, fieldIDToSize := getFieldIDMap(sc)
671674

675+
inheritRowIDs := formatVersion >= 3 &&
676+
content == ManifestContentData &&
677+
file.FirstRowID() != nil
678+
var nextFirstRowID int64
679+
if inheritRowIDs {
680+
nextFirstRowID = *file.FirstRowID()
681+
}
682+
672683
return &ManifestReader{
673-
dec: dec,
674-
file: file,
675-
formatVersion: formatVersion,
676-
isFallback: isFallback,
677-
content: content,
678-
fieldNameToID: fieldNameToID,
679-
fieldIDToType: fieldIDToType,
680-
fieldIDToSize: fieldIDToSize,
684+
dec: dec,
685+
file: file,
686+
formatVersion: formatVersion,
687+
isFallback: isFallback,
688+
content: content,
689+
fieldNameToID: fieldNameToID,
690+
fieldIDToType: fieldIDToType,
691+
fieldIDToSize: fieldIDToSize,
692+
inheritRowIDs: inheritRowIDs,
693+
nextFirstRowID: nextFirstRowID,
681694
}, nil
682695
}
683696

@@ -775,16 +788,14 @@ func (c *ManifestReader) ReadEntry() (ManifestEntry, error) {
775788
}
776789
tmp.inherit(c.file)
777790
// Apply first_row_id inheritance for v3 data manifests (spec: First Row ID Inheritance).
778-
if c.content == ManifestContentData && c.file.FirstRowId() != nil {
779-
if df, ok := tmp.DataFile().(*dataFile); ok && df.FirstRowIDField == nil {
780-
if c.nextFirstRowID == nil {
781-
c.nextFirstRowID = new(int64)
782-
*c.nextFirstRowID = *c.file.FirstRowId()
791+
if c.inheritRowIDs {
792+
if df, ok := tmp.DataFile().(*dataFile); ok {
793+
if df.FirstRowIDField == nil {
794+
id := c.nextFirstRowID
795+
df.FirstRowIDField = &id
783796
}
784-
id := new(int64)
785-
*id = *c.nextFirstRowID
786-
df.FirstRowIDField = id
787-
*c.nextFirstRowID += df.Count()
797+
// Advance for every data file, null or explicit, to match Java semantics.
798+
c.nextFirstRowID += df.Count()
788799
}
789800
}
790801
if fieldToIDMap, ok := tmp.DataFile().(hasFieldToIDMap); ok {
@@ -1460,10 +1471,10 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
14601471
wrapped := *(file.(*manifestFile))
14611472
if m.version == 3 {
14621473
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
1463-
if wrapped.Content == ManifestContentData && wrapped.FirstRowID == nil {
1474+
if wrapped.Content == ManifestContentData && wrapped.FirstRowIDValue == nil {
14641475
if m.nextRowID != nil {
14651476
firstRowID := *m.nextRowID
1466-
wrapped.FirstRowID = &firstRowID
1477+
wrapped.FirstRowIDValue = &firstRowID
14671478
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
14681479
}
14691480
}
@@ -1501,40 +1512,32 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
15011512
}
15021513

15031514
// WriteManifestList writes a list of manifest files to an avro file.
1504-
// For v3 tables, it returns the next row ID after assigning first_row_id to data manifests; otherwise 0.
1505-
func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, firstRowId int64, files []ManifestFile) (nextRowIDAfter int64, err error) {
1515+
func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnapshotID, sequenceNumber *int64, firstRowId int64, files []ManifestFile) (err error) {
15061516
var writer *ManifestListWriter
15071517

15081518
switch version {
15091519
case 1:
15101520
writer, err = NewManifestListWriterV1(out, snapshotID, parentSnapshotID)
15111521
case 2:
15121522
if sequenceNumber == nil {
1513-
return 0, errors.New("sequence number is required for V2 tables")
1523+
return errors.New("sequence number is required for V2 tables")
15141524
}
15151525
writer, err = NewManifestListWriterV2(out, snapshotID, *sequenceNumber, parentSnapshotID)
15161526
case 3:
15171527
if sequenceNumber == nil {
1518-
return 0, errors.New("sequence number is required for V3 tables")
1528+
return errors.New("sequence number is required for V3 tables")
15191529
}
15201530
writer, err = NewManifestListWriterV3(out, snapshotID, *sequenceNumber, firstRowId, parentSnapshotID)
15211531
default:
1522-
return 0, fmt.Errorf("unsupported manifest version: %d", version)
1532+
return fmt.Errorf("unsupported manifest version: %d", version)
15231533
}
15241534

15251535
if err != nil {
1526-
return 0, err
1536+
return err
15271537
}
15281538
defer internal.CheckedClose(writer, &err)
15291539

1530-
if err = writer.AddManifests(files); err != nil {
1531-
return 0, err
1532-
}
1533-
if version == 3 && writer.NextRowID() != nil {
1534-
nextRowIDAfter = *writer.NextRowID()
1535-
}
1536-
1537-
return nextRowIDAfter, nil
1540+
return writer.AddManifests(files)
15381541
}
15391542

15401543
func WriteManifest(

manifest_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -815,9 +815,9 @@ func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() {
815815

816816
manifestFirstRowID := int64(1000)
817817
file := &manifestFile{
818-
version: 3,
819-
Path: "/manifest.avro",
820-
Content: ManifestContentData,
818+
version: 3,
819+
Path: "/manifest.avro",
820+
Content: ManifestContentData,
821821
FirstRowIDValue: &manifestFirstRowID,
822822
}
823823
entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false)

table/arrow_scanner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,7 @@ func synthesizeRowLineageColumns(
435435
}
436436
newCols[i] = bldr.NewArray()
437437
bldr.Release()
438+
438439
continue
439440
}
440441
}
@@ -453,6 +454,7 @@ func synthesizeRowLineageColumns(
453454
}
454455
newCols[i] = bldr.NewArray()
455456
bldr.Release()
457+
456458
continue
457459
}
458460
}

table/metadata_schema_compatibility.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ func (e ErrIncompatibleSchema) Error() string {
3636
var problems strings.Builder
3737
for _, f := range e.fields {
3838
if f.UnsupportedType != nil {
39-
problems.WriteString(fmt.Sprintf("\n- invalid type for %s: %s is not supported until v%d", f.ColName, f.Field.Type, f.UnsupportedType.MinFormatVersion))
39+
fmt.Fprintf(&problems, "\n- invalid type for %s: %s is not supported until v%d", f.ColName, f.Field.Type, f.UnsupportedType.MinFormatVersion)
4040
}
4141
if f.InvalidDefault != nil {
42-
problems.WriteString(fmt.Sprintf("\n- invalid initial default for %s: non-null default (%v) is not supported until v%d", f.ColName, f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion))
42+
fmt.Fprintf(&problems, "\n- invalid initial default for %s: non-null default (%v) is not supported until v%d", f.ColName, f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion)
4343
}
4444
}
4545

table/scanner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,13 +463,13 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
463463
return nil, err
464464
}
465465

466-
// Step 3: Sort positional deletes and match them to data files.
466+
// Step 3: Sort positional deletes and match them to data files.
467467
slices.SortFunc(entries.positionalDeleteEntries, func(a, b iceberg.ManifestEntry) int {
468468
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
469469
})
470470

471471
results := make([]FileScanTask, 0, len(entries.dataEntries))
472-
for _, e := range entries.dataEntries {
472+
for _, e := range entries.dataEntries {
473473
deleteFiles, err := matchDeletesToData(e, entries.positionalDeleteEntries)
474474
if err != nil {
475475
return nil, err

0 commit comments

Comments
 (0)