Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,8 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error {
// Ref: https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
if wrapped.Content == ManifestContentData && wrapped.FirstRowId == nil {
if m.nextRowID != nil {
wrapped.FirstRowId = m.nextRowID
firstRowID := *m.nextRowID
wrapped.FirstRowId = &firstRowID
*m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount
}
}
Expand Down
33 changes: 33 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,39 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterDeltaIgnoresNonDataManifests
m.Require().NoError(writer.Close())
}

func (m *ManifestTestSuite) TestV3ManifestListWriterPersistsPerManifestFirstRowIDStart() {
// Persisted first_row_id per manifest must be the start of each assigned row-id range.
var buf bytes.Buffer
commitSnapID := int64(100)
firstRowID := int64(5000)
sequenceNum := int64(1)

writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil)
m.Require().NoError(err)

manifests := []ManifestFile{
NewManifestFile(3, "m1.avro", 10, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), // delta = 15
NewManifestFile(3, "m2.avro", 10, 1, commitSnapID).AddedRows(7).Build(), // delta = 7
}
m.Require().NoError(writer.AddManifests(manifests))
m.Require().NoError(writer.Close())

list, err := ReadManifestList(bytes.NewReader(buf.Bytes()))
m.Require().NoError(err)
m.Require().Len(list, 2)

firstManifest, ok := list[0].(*manifestFile)
m.Require().True(ok, "expected v3 manifest file type")
secondManifest, ok := list[1].(*manifestFile)
m.Require().True(ok, "expected v3 manifest file type")
m.Require().NotNil(firstManifest.FirstRowId)
m.Require().NotNil(secondManifest.FirstRowId)

m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
m.EqualValues(5015, *secondManifest.FirstRowId)
m.EqualValues(5022, *writer.NextRowID())
}

func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
// Test v3writerImpl.prepareEntry sequence number validation logic
v3Writer := v3writerImpl{}
Expand Down
94 changes: 92 additions & 2 deletions table/snapshot_producers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package table
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"io/fs"
Expand Down Expand Up @@ -135,6 +136,10 @@ func manifestSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema
}

func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, partition map[int]any) iceberg.DataFile {
return newTestDataFileWithCount(t, spec, path, partition, 1)
}

func newTestDataFileWithCount(t *testing.T, spec iceberg.PartitionSpec, path string, partition map[int]any, count int64) iceberg.DataFile {
t.Helper()

builder, err := iceberg.NewDataFileBuilder(
Expand All @@ -145,8 +150,8 @@ func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, part
partition,
nil,
nil,
1,
1,
count,
count,
)
require.NoError(t, err, "new data file builder")

Expand Down Expand Up @@ -299,6 +304,91 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = first-row-id + assigned delta (1+2)")
}

func readManifestListFromPath(t *testing.T, fs iceio.IO, path string) []iceberg.ManifestFile {
t.Helper()

f, err := fs.Open(path)
require.NoError(t, err, "open manifest list: %s", path)
defer f.Close()

list, err := iceberg.ReadManifestList(f)
require.NoError(t, err, "read manifest list: %s", path)

return list
}

func manifestFirstRowIDForSnapshot(t *testing.T, manifests []iceberg.ManifestFile, snapshotID int64) int64 {
t.Helper()

type manifestRowLineage struct {
AddedSnapshotID int64 `json:"AddedSnapshotID"`
FirstRowID *int64 `json:"FirstRowId"`
}

for _, manifest := range manifests {
raw, err := json.Marshal(manifest)
require.NoError(t, err, "marshal manifest")

var decoded manifestRowLineage
require.NoError(t, json.Unmarshal(raw, &decoded), "unmarshal manifest row-lineage fields")

if decoded.AddedSnapshotID == snapshotID {
require.NotNil(t, decoded.FirstRowID, "first_row_id must be persisted for v3 data manifests")

return *decoded.FirstRowID
}
}

require.Failf(t, "missing manifest for snapshot", "snapshot-id=%d", snapshotID)

return 0
}

// TestCommitV3RowLineagePersistsManifestFirstRowID verifies that snapshot producer
// writes first_row_id to manifest list entries using the snapshot's start row-id.
func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) {
spec := iceberg.NewPartitionSpec()
ident := Identifier{"db", "tbl"}
txn, memIO := createTestTransactionWithMemIO(t, spec)
txn.meta.formatVersion = 3

// Use multi-row files to make row-range starts obvious.
sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
sp1.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-1.parquet", nil, 3))
updates1, reqs1, err := sp1.commit()
require.NoError(t, err, "first commit should succeed")
addSnap1, ok := updates1[0].(*addSnapshotUpdate)
require.True(t, ok, "first update must be AddSnapshot")
require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "snapshot first-row-id for commit 1")

manifests1 := readManifestListFromPath(t, memIO, addSnap1.Snapshot.ManifestList)
currentManifestFirstRowID1 := manifestFirstRowIDForSnapshot(t, manifests1, addSnap1.Snapshot.SnapshotID)
require.Equal(t, *addSnap1.Snapshot.FirstRowID, currentManifestFirstRowID1,
"persisted manifest first_row_id must match snapshot first-row-id for current commit")

err = txn.apply(updates1, reqs1)
require.NoError(t, err, "first apply should succeed")
meta1, err := txn.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(3), meta1.NextRowID())

tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil)
txn2 := tbl2.NewTransaction()
txn2.meta.formatVersion = 3
sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
sp2.appendDataFile(newTestDataFileWithCount(t, spec, "file://data-2.parquet", nil, 5))
updates2, _, err := sp2.commit()
require.NoError(t, err, "second commit should succeed")
addSnap2, ok := updates2[0].(*addSnapshotUpdate)
require.True(t, ok, "first update must be AddSnapshot")
require.Equal(t, int64(3), *addSnap2.Snapshot.FirstRowID, "snapshot first-row-id for commit 2")

manifests2 := readManifestListFromPath(t, memIO, addSnap2.Snapshot.ManifestList)
currentManifestFirstRowID2 := manifestFirstRowIDForSnapshot(t, manifests2, addSnap2.Snapshot.SnapshotID)
require.Equal(t, *addSnap2.Snapshot.FirstRowID, currentManifestFirstRowID2,
"persisted manifest first_row_id must match snapshot first-row-id for current commit")
}

func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
spec := partitionedSpec()
schema := simpleSchema()
Expand Down
Loading