Skip to content

Commit beca2e6

Browse files
authored
fix(manifest): correct v3 manifest-list first row id assigment for row-lineage (#741)
Pointer is aliased and then mutated before encode. Start becomes end of range. With wiring snapshot lineage from writer delta (#728), this can cause overlaps/gaps between commits. Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
1 parent a576506 commit beca2e6

File tree

3 files changed

+127
-3
lines changed

3 files changed

+127
-3
lines changed

manifest.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1406,7 +1406,8 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
14061406
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
14071407
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
14081408
if m.nextRowID != nil {
1409-
wrapped.FirstRowId = m.nextRowID
1409+
firstRowID := *m.nextRowID
1410+
wrapped.FirstRowId = &firstRowID
14101411
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
14111412
}
14121413
}

manifest_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,39 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterDeltaIgnoresNonDataManifests
14481448
m.Require().NoError(writer.Close())
14491449
}
14501450

1451+
func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowIDStart() {
1452+
// Persisted first_row_id per manifest must be the start of each assigned row-id range.
1453+
var buf bytes.Buffer
1454+
commitSnapID := int64(100)
1455+
firstRowID := int64(5000)
1456+
sequenceNum := int64(1)
1457+
1458+
writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil)
1459+
m.Require().NoError(err)
1460+
1461+
manifests := []ManifestFile{
1462+
NewManifestFile(3, "m1.avro", 10, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), // delta = 15
1463+
NewManifestFile(3, "m2.avro", 10, 1, commitSnapID).AddedRows(7).Build(), // delta = 7
1464+
}
1465+
m.Require().NoError(writer.AddManifests(manifests))
1466+
m.Require().NoError(writer.Close())
1467+
1468+
list, err := ReadManifestList(bytes.NewReader(buf.Bytes()))
1469+
m.Require().NoError(err)
1470+
m.Require().Len(list, 2)
1471+
1472+
firstManifest, ok := list[0].(*manifestFile)
1473+
m.Require().True(ok, "expected v3 manifest file type")
1474+
secondManifest, ok := list[1].(*manifestFile)
1475+
m.Require().True(ok, "expected v3 manifest file type")
1476+
m.Require().NotNil(firstManifest.FirstRowId)
1477+
m.Require().NotNil(secondManifest.FirstRowId)
1478+
1479+
m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
1480+
m.EqualValues(5015, *secondManifest.FirstRowId)
1481+
m.EqualValues(5022, *writer.NextRowID())
1482+
}
1483+
14511484
func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
14521485
// Test v3writerImpl.prepareEntry sequence number validation logic
14531486
v3Writer := v3writerImpl{}

table/snapshot_producers_test.go

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package table
2020
import (
2121
"bytes"
2222
"context"
23+
"encoding/json"
2324
"errors"
2425
"io"
2526
"io/fs"
@@ -135,6 +136,10 @@ func manifestSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema
135136
}
136137

137138
func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, partition map[int]any) iceberg.DataFile {
139+
return newTestDataFileWithCount(t, spec, path, partition, 1)
140+
}
141+
142+
func newTestDataFileWithCount(t *testing.T, spec iceberg.PartitionSpec, path string, partition map[int]any, count int64) iceberg.DataFile {
138143
t.Helper()
139144

140145
builder, err := iceberg.NewDataFileBuilder(
@@ -145,8 +150,8 @@ func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, part
145150
partition,
146151
nil,
147152
nil,
148-
1,
149-
1,
153+
count,
154+
count,
150155
)
151156
require.NoError(t, err, "new data file builder")
152157

@@ -299,6 +304,91 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
299304
require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = first-row-id + assigned delta (1+2)")
300305
}
301306

307+
func readManifestListFromPath(t *testing.T, fs iceio.IO, path string) []iceberg.ManifestFile {
308+
t.Helper()
309+
310+
f, err := fs.Open(path)
311+
require.NoError(t, err, "open manifest list: %s", path)
312+
defer f.Close()
313+
314+
list, err := iceberg.ReadManifestList(f)
315+
require.NoError(t, err, "read manifest list: %s", path)
316+
317+
return list
318+
}
319+
320+
func manifestFirstRowIDForSnapshot(t *testing.T, manifests []iceberg.ManifestFile, snapshotID int64) int64 {
321+
t.Helper()
322+
323+
type manifestRowLineage struct {
324+
AddedSnapshotID int64 `json:"AddedSnapshotID"`
325+
FirstRowID *int64 `json:"FirstRowId"`
326+
}
327+
328+
for _, manifest := range manifests {
329+
raw, err := json.Marshal(manifest)
330+
require.NoError(t, err, "marshal manifest")
331+
332+
var decoded manifestRowLineage
333+
require.NoError(t, json.Unmarshal(raw, &decoded), "unmarshal manifest row-lineage fields")
334+
335+
if decoded.AddedSnapshotID == snapshotID {
336+
require.NotNil(t, decoded.FirstRowID, "first_row_id must be persisted for v3 data manifests")
337+
338+
return *decoded.FirstRowID
339+
}
340+
}
341+
342+
require.Failf(t, "missing manifest for snapshot", "snapshot-id=%d", snapshotID)
343+
344+
return 0
345+
}
346+
347+
// TestCommitV3RowLineagePersistsManifestFirstRowID verifies that snapshot producer
348+
// writes first_row_id to manifest list entries using the snapshot's start row-id.
349+
func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) {
350+
spec := iceberg.NewPartitionSpec()
351+
ident := Identifier{"db", "tbl"}
352+
txn, memIO := createTestTransactionWithMemIO(t, spec)
353+
txn.meta.formatVersion = 3
354+
355+
// Use multi-row files to make row-range starts obvious.
356+
sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
357+
sp1.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 3))
358+
updates1, reqs1, err := sp1.commit()
359+
require.NoError(t, err, "first commit should succeed")
360+
addSnap1, ok := updates1[0].(*addSnapshotUpdate)
361+
require.True(t, ok, "first update must be AddSnapshot")
362+
require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "snapshot first-row-id for commit 1")
363+
364+
manifests1 := readManifestListFromPath(t, memIO, addSnap1.Snapshot.ManifestList)
365+
currentManifestFirstRowID1 := manifestFirstRowIDForSnapshot(t, manifests1, addSnap1.Snapshot.SnapshotID)
366+
require.Equal(t, *addSnap1.Snapshot.FirstRowID, currentManifestFirstRowID1,
367+
"persisted manifest first_row_id must match snapshot first-row-id for current commit")
368+
369+
err = txn.apply(updates1, reqs1)
370+
require.NoError(t, err, "first apply should succeed")
371+
meta1, err := txn.meta.Build()
372+
require.NoError(t, err)
373+
require.Equal(t, int64(3), meta1.NextRowID())
374+
375+
tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil)
376+
txn2 := tbl2.NewTransaction()
377+
txn2.meta.formatVersion = 3
378+
sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
379+
sp2.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-2.parquet", nil, 5))
380+
updates2, _, err := sp2.commit()
381+
require.NoError(t, err, "second commit should succeed")
382+
addSnap2, ok := updates2[0].(*addSnapshotUpdate)
383+
require.True(t, ok, "first update must be AddSnapshot")
384+
require.Equal(t, int64(3), *addSnap2.Snapshot.FirstRowID, "snapshot first-row-id for commit 2")
385+
386+
manifests2 := readManifestListFromPath(t, memIO, addSnap2.Snapshot.ManifestList)
387+
currentManifestFirstRowID2 := manifestFirstRowIDForSnapshot(t, manifests2, addSnap2.Snapshot.SnapshotID)
388+
require.Equal(t, *addSnap2.Snapshot.FirstRowID, currentManifestFirstRowID2,
389+
"persisted manifest first_row_id must match snapshot first-row-id for current commit")
390+
}
391+
302392
func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
303393
spec := partitionedSpec()
304394
schema := simpleSchema()

0 commit comments

Comments
 (0)