Conversation
6af257b to
4da5bf5
Compare
laskoviymishka
left a comment
There was a problem hiding this comment.
The read path structure is solid and the Java alignment is largely correct — field IDs, doc strings, manifest list writer semantics, and the Arrow synthesis pipeline all check out.
Three issues need to land before this merges.
First row ID inheritance diverges from Java spec (manifest.go ReadEntry). Java's idAssigner unconditionally executes nextRowId += file.recordCount() for every file — null or explicit. The Go implementation only advances nextFirstRowID when FirstRowIDField == nil, so a file with an explicit first_row_id silently resets the baseline for all subsequent null files in the same manifest, producing overlapping row ID ranges. The fix and the *int64 cleanup land together: initialize nextFirstRowID eagerly in NewManifestReader, then unconditionally advance after the conditional assign.
Wrong sequence number for DataSequenceNumber (scanner.go PlanFiles). e.SequenceNum() is the manifest entry's metadata sequence number; _last_updated_sequence_number per spec requires the data sequence number — entry.dataSequenceNumber() in Java, e.FileSequenceNum() in Go. These are identical for freshly ADDED entries but diverge for EXISTING entries carried across compacted manifests, where the bug silently inflates the reported sequence number.
ManifestFile.FirstRowId() must be FirstRowID() before this public interface is merged. The PR already correctly renames the struct field to FirstRowID; the exported method should follow the same Go acronym convention. Fixing a public interface post-merge requires a breaking change.
table/arrow_scanner.go
Outdated
| rowOffset *int64, | ||
| task FileScanTask, | ||
| batch arrow.RecordBatch, | ||
| _ *iceberg.Schema, |
There was a problem hiding this comment.
why does this needed?
9256510 to
61787dd
Compare
laskoviymishka
left a comment
There was a problem hiding this comment.
One more thing: memory leak, aside that - all good.
Same root cause as #762 — NewArray() starts at refcount 1, NewRecordBatch retains to refcount 2, local refs are never dropped so memory is never freed. Two places: the production release loop in synthesizeRowLineageColumns and the test setup in TestSynthesizeRowLineageColumns. The test fix is as important as the production fix — NewCheckedAllocator would have caught this immediately and prevents regressions of the same class.
| // first_row_id and data_sequence_number; otherwise the value from the file is kept. | ||
| // rowOffset is the 0-based row index within the current file and is updated so _row_id stays | ||
| // correct across multiple batches from the same file (first_row_id + row_position). | ||
| func synthesizeRowLineageColumns( |
There was a problem hiding this comment.
Same root cause as #762 — bldr.NewArray() starts at refcount=1, array.NewRecordBatch retains to refcount=2, but the local refs in newCols are never released. Fix needs a release loop after the batch is created:
rec := array.NewRecordBatch(schema, newCols, nrows)
for _, c := range newCols {
c.Release()
}
return rec, nil
There was a problem hiding this comment.
Thanks for pointing out this . I see that the PR 762 has been approved and waiting to be merged. Let me know once it lands in main so that I can rebase and apply the fix for this PR
table/scanner_internal_test.go
Outdated
| defer seqBldr.Release() | ||
| seqBldr.AppendNulls(nrows) | ||
|
|
||
| batch := array.NewRecordBatch(schema, []arrow.Array{ |
There was a problem hiding this comment.
Inline NewArray() calls go directly into NewRecordBatch with no way to release them afterward — same leak pattern. Fix by assigning to locals first and releasing after batch construction. Then add memory.NewCheckedAllocator + defer mem.AssertSize(t, 0) to make the class of leak self-enforcing going forward — same pattern used in #762 to catch exactly this.
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
3b3c7e2 to
b21cd14
Compare
|
@zeroshade could you help to review this PR as well? |
|
@dttung2905 sorry for the delay, i'll give this a review tomorrow or monday |
table/arrow_scanner.go
Outdated
| bldr := array.NewInt64Builder(alloc) | ||
| first := *task.FirstRowID | ||
| for k := int64(0); k < nrows; k++ { | ||
| if col.IsNull(int(k)) { | ||
| bldr.Append(first + *rowOffset + k) | ||
| } else { | ||
| bldr.Append(col.Value(int(k))) | ||
| } | ||
| } | ||
| newCols[i] = bldr.NewArray() | ||
| bldr.Release() |
There was a problem hiding this comment.
Reuse the builder and use Reserve since we already know the size we're gonna append
table/arrow_scanner.go
Outdated
| bldr := array.NewInt64Builder(alloc) | ||
| seq := *task.DataSequenceNumber | ||
| for k := int64(0); k < nrows; k++ { | ||
| if col.IsNull(int(k)) { | ||
| bldr.Append(seq) | ||
| } else { | ||
| bldr.Append(col.Value(int(k))) | ||
| } | ||
| } | ||
| newCols[i] = bldr.NewArray() | ||
| bldr.Release() |
table/arrow_scanner.go
Outdated
| } | ||
|
|
||
| col := batch.Column(i) | ||
| col.Retain() |
There was a problem hiding this comment.
this seems like an extra Retain that isn't necessary, since you're going to add them to the record batch, you can just defer col.Release() the new columns you build instead of needing the release loop below
table/arrow_scanner.go
Outdated
| } | ||
|
|
||
| for i := 0; i < ncols; i++ { | ||
| if i == rowIDColIdx && task.FirstRowID != nil { |
There was a problem hiding this comment.
I'm confused, if we already know the rowIDColIdx to compare against, why do we need the loop?
table/arrow_scanner.go
Outdated
| } | ||
| } | ||
|
|
||
| if i == seqNumColIdx && task.DataSequenceNumber != nil { |
There was a problem hiding this comment.
same as above, why do we need to loop and find the column if we already know seqNumColIdx to look for?
| pipeline = append(pipeline, func(r arrow.RecordBatch) (arrow.RecordBatch, error) { | ||
| defer r.Release() | ||
|
|
||
| return synthesizeRowLineageColumns(ctx, &rowOffset, taskVal, r) |
There was a problem hiding this comment.
I haven't double checked the spec, but should the row lineage columns be toggleable via a setting? i.e. a way to turn them off if you don't want them to show up in the results?
table/scanner.go
Outdated
| if scan.metadata.Version() >= 3 { | ||
| task.FirstRowID = e.DataFile().FirstRowID() | ||
| if fseq := e.FileSequenceNum(); fseq != nil { | ||
| task.DataSequenceNumber = fseq | ||
| } | ||
| } |
There was a problem hiding this comment.
any reason not to always populate these? at worst we're just setting it to nil if we're not >= v3
Signed-off-by: dttung2905 <ttdao.2015@accountancy.smu.edu.sg>
This should fully support read path and partially support write path
Unsupported write path:
_row_idand_last_updated_sequence_numberare not copied into the new files. Row lineage is preserved for appends and for metadata/manifest list; it is not yet preserved when rewriting data files._row_id/_last_updated_sequence_numberas null columns (they are omitted); that is allowed by the spec and is not planned in this PR.