Skip to content

Commit 61787dd

Browse files
committed
Fixes from codereview
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
1 parent 9e227d2 commit 61787dd

File tree

6 files changed

+53
-52
lines changed

6 files changed

+53
-52
lines changed

manifest_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -481,14 +481,14 @@ type ManifestTestSuite struct {
481481
}
482482

483483
func (m *ManifestTestSuite) writeManifestList() {
484-
_, err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)
484+
err := WriteManifestList(1, &m.v1ManifestList, snapshotID, nil, nil, 0, manifestFileRecordsV1)
485485
m.Require().NoError(err)
486486
unassignedSequenceNum := int64(-1)
487-
_, err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)
487+
err = WriteManifestList(2, &m.v2ManifestList, snapshotID, nil, &unassignedSequenceNum, 0, manifestFileRecordsV2)
488488
m.Require().NoError(err)
489489
v3SequenceNum := int64(5)
490490
firstRowID := int64(1000)
491-
_, err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)
491+
err = WriteManifestList(3, &m.v3ManifestList, snapshotID, nil, &v3SequenceNum, firstRowID, manifestFileRecordsV3)
492492
m.Require().NoError(err)
493493
}
494494

@@ -764,8 +764,8 @@ func (m *ManifestTestSuite) TestReadManifestListV3() {
764764
m.Zero(list[0].PartitionSpecID())
765765

766766
// V3 manifest list assigns first_row_id to data manifests
767-
m.Require().NotNil(list[0].FirstRowId(), "v3 data manifest should have first_row_id")
768-
m.EqualValues(1000, *list[0].FirstRowId())
767+
m.Require().NotNil(list[0].FirstRowID(), "v3 data manifest should have first_row_id")
768+
m.EqualValues(1000, *list[0].FirstRowID())
769769

770770
part := list[0].Partitions()[0]
771771
m.True(part.ContainsNull)
@@ -818,7 +818,7 @@ func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritance() {
818818
version: 3,
819819
Path: "/manifest.avro",
820820
Content: ManifestContentData,
821-
FirstRowID: &manifestFirstRowID,
821+
FirstRowIDValue: &manifestFirstRowID,
822822
}
823823
entries, err := ReadManifest(file, bytes.NewReader(manifestBuf.Bytes()), false)
824824
m.Require().NoError(err)
@@ -844,7 +844,7 @@ func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
844844
// any cache. (Note: if working correctly, this will have no such side effect.)
845845
var buf bytes.Buffer
846846
seqNum := int64(9876)
847-
_, err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{
847+
err := WriteManifestList(2, &buf, 1234, nil, &seqNum, 0, []ManifestFile{
848848
NewManifestFile(2, "s3://bucket/namespace/table/metadata/abcd-0123.avro", 99, 0, 1234).Build(),
849849
})
850850
m.NoError(err)
@@ -1538,11 +1538,11 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowI
15381538
m.Require().True(ok, "expected v3 manifest file type")
15391539
secondManifest, ok := list[1].(*manifestFile)
15401540
m.Require().True(ok, "expected v3 manifest file type")
1541-
m.Require().NotNil(firstManifest.FirstRowId(), "first manifest should have first_row_id")
1542-
m.Require().NotNil(secondManifest.FirstRowId(), "second manifest should have first_row_id")
1541+
m.Require().NotNil(firstManifest.FirstRowID(), "first manifest should have first_row_id")
1542+
m.Require().NotNil(secondManifest.FirstRowID(), "second manifest should have first_row_id")
15431543

1544-
m.EqualValues(5000, *firstManifest.FirstRowId()) // start of first range
1545-
m.EqualValues(5015, *secondManifest.FirstRowId())
1544+
m.EqualValues(5000, *firstManifest.FirstRowID()) // start of first range
1545+
m.EqualValues(5015, *secondManifest.FirstRowID())
15461546
m.EqualValues(5022, *writer.NextRowID())
15471547
}
15481548

@@ -1615,7 +1615,7 @@ func (m *ManifestTestSuite) TestWriteManifestListClosesWriterOnError() {
16151615
m.Require().NoError(writer.Close())
16161616

16171617
out := &limitedWriter{limit: header.Len(), err: errLimitedWrite}
1618-
_, err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0, []ManifestFile{
1618+
err = WriteManifestList(2, out, snapshotID, nil, &seqNum, 0, []ManifestFile{
16191619
manifestFileRecordsV2[0],
16201620
manifestFileRecordsV1[0],
16211621
})

table/arrow_scanner.go

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -401,8 +401,6 @@ func synthesizeRowLineageColumns(
401401
rowOffset *int64,
402402
task FileScanTask,
403403
batch arrow.RecordBatch,
404-
_ *iceberg.Schema,
405-
_ bool,
406404
) (arrow.RecordBatch, error) {
407405
alloc := compute.GetAllocator(ctx)
408406
schema := batch.Schema()
@@ -425,37 +423,43 @@ func synthesizeRowLineageColumns(
425423
for i := 0; i < ncols; i++ {
426424
if i == rowIDColIdx && task.FirstRowID != nil {
427425
// _row_id: inherit first_row_id + row_position when null; else keep value from file.
428-
col := batch.Column(i).(*array.Int64)
429-
bldr := array.NewInt64Builder(alloc)
430-
first := *task.FirstRowID
431-
for k := int64(0); k < nrows; k++ {
432-
if col.IsNull(int(k)) {
433-
bldr.Append(first + *rowOffset + k)
434-
} else {
435-
bldr.Append(col.Value(int(k)))
426+
if col, ok := batch.Column(i).(*array.Int64); ok {
427+
bldr := array.NewInt64Builder(alloc)
428+
first := *task.FirstRowID
429+
for k := int64(0); k < nrows; k++ {
430+
if col.IsNull(int(k)) {
431+
bldr.Append(first + *rowOffset + k)
432+
} else {
433+
bldr.Append(col.Value(int(k)))
434+
}
436435
}
436+
newCols[i] = bldr.NewArray()
437+
bldr.Release()
438+
continue
437439
}
438-
newCols[i] = bldr.NewArray()
439-
bldr.Release()
440-
} else if i == seqNumColIdx && task.DataSequenceNumber != nil {
440+
}
441+
442+
if i == seqNumColIdx && task.DataSequenceNumber != nil {
441443
// _last_updated_sequence_number: inherit file's data_sequence_number when null; else keep value from file.
442-
col := batch.Column(i).(*array.Int64)
443-
bldr := array.NewInt64Builder(alloc)
444-
seq := *task.DataSequenceNumber
445-
for k := int64(0); k < nrows; k++ {
446-
if col.IsNull(int(k)) {
447-
bldr.Append(seq)
448-
} else {
449-
bldr.Append(col.Value(int(k)))
444+
if col, ok := batch.Column(i).(*array.Int64); ok {
445+
bldr := array.NewInt64Builder(alloc)
446+
seq := *task.DataSequenceNumber
447+
for k := int64(0); k < nrows; k++ {
448+
if col.IsNull(int(k)) {
449+
bldr.Append(seq)
450+
} else {
451+
bldr.Append(col.Value(int(k)))
452+
}
450453
}
454+
newCols[i] = bldr.NewArray()
455+
bldr.Release()
456+
continue
451457
}
452-
newCols[i] = bldr.NewArray()
453-
bldr.Release()
454-
} else {
455-
col := batch.Column(i)
456-
col.Retain()
457-
newCols[i] = col
458458
}
459+
460+
col := batch.Column(i)
461+
col.Retain()
462+
newCols[i] = col
459463
}
460464

461465
// Advance so the next batch from this file uses the correct row position for _row_id.
@@ -596,12 +600,10 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat
596600
if task.Value.FirstRowID != nil || task.Value.DataSequenceNumber != nil {
597601
var rowOffset int64
598602
taskVal := task.Value
599-
projSchema := as.projectedSchema
600-
useLarge := as.useLargeTypes
601603
pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) {
602604
defer r.Release()
603605

604-
return synthesizeRowLineageColumns(ctx, &rowOffset, taskVal, r, projSchema, useLarge)
606+
return synthesizeRowLineageColumns(ctx, &rowOffset, taskVal, r)
605607
})
606608
}
607609

table/scanner.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,13 +463,13 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
463463
return nil, err
464464
}
465465

466-
// Step 3: Sort positional deletes and match them to data files.
466+
// Step 3: Sort positional deletes and match them to data files.
467467
slices.SortFunc(entries.positionalDeleteEntries, func(a, b iceberg.ManifestEntry) int {
468468
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
469469
})
470470

471471
results := make([]FileScanTask, 0, len(entries.dataEntries))
472-
for _, e := range entries.dataEntries {
472+
for _, e := range entries.dataEntries {
473473
deleteFiles, err := matchDeletesToData(e, entries.positionalDeleteEntries)
474474
if err != nil {
475475
return nil, err
@@ -483,9 +483,8 @@ func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
483483
// Row lineage constants for v3: readers use these to synthesize _row_id and _last_updated_sequence_number.
484484
if scan.metadata.Version() >= 3 {
485485
task.FirstRowID = e.DataFile().FirstRowID()
486-
seq := e.SequenceNum()
487-
if seq >= 0 {
488-
task.DataSequenceNumber = &seq
486+
if fseq := e.FileSequenceNum(); fseq != nil {
487+
task.DataSequenceNumber = fseq
489488
}
490489
}
491490
results = append(results, task)

table/scanner_internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func TestSynthesizeRowLineageColumns(t *testing.T) {
272272
}, nrows)
273273
defer batch.Release()
274274

275-
out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch, nil, false)
275+
out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
276276
require.NoError(t, err)
277277
defer out.Release()
278278

table/snapshot_producers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) {
750750
addedRows = *writer.NextRowID() - firstRowID
751751
}
752752
} else {
753-
_, err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
753+
err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
754754
sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests)
755755
if err != nil {
756756
return nil, nil, err

table/snapshot_producers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ func manifestFirstRowIDForSnapshot(t *testing.T, manifests []iceberg.ManifestFil
323323

324324
type manifestRowLineage struct {
325325
AddedSnapshotID int64 `json:"AddedSnapshotID"`
326-
FirstRowID *int64 `json:"FirstRowId"`
326+
FirstRowID *int64 `json:"FirstRowIDValue"`
327327
}
328328

329329
for _, manifest := range manifests {
@@ -470,7 +470,7 @@ func TestOverwriteFilesExistingManifestsClosesWriterOnError(t *testing.T) {
470470
require.NoError(t, mem.WriteFile(manifestPath, manifestBuf.Bytes()))
471471

472472
var listBuf bytes.Buffer
473-
_, err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum, 0, []iceberg.ManifestFile{manifestFile})
473+
err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum, 0, []iceberg.ManifestFile{manifestFile})
474474
require.NoError(t, err, "write manifest list")
475475
require.NoError(t, mem.WriteFile(manifestListPath, listBuf.Bytes()))
476476

@@ -668,7 +668,7 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t *testing.T) {
668668
require.NoError(t, trackIO.WriteFile(manifestPath, manifestBuf.Bytes()))
669669

670670
var listBuf bytes.Buffer
671-
_, err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum, 0, []iceberg.ManifestFile{manifestFile})
671+
err = iceberg.WriteManifestList(2, &listBuf, snapshotID, nil, &seqNum, 0, []iceberg.ManifestFile{manifestFile})
672672
require.NoError(t, err, "write manifest list")
673673
require.NoError(t, trackIO.WriteFile(manifestListPath, listBuf.Bytes()))
674674

0 commit comments

Comments
 (0)