diff --git a/manifest_test.go b/manifest_test.go index 1192474ee..87dc4efc0 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -1401,10 +1401,53 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterRowIDTracking() { // Expected: 5000 + 1500 + 2300 = 8800 expectedNextRowID := firstRowID + 1500 + 2300 m.EqualValues(expectedNextRowID, *writer.NextRowID()) + // Assigned row-id delta (for snapshot.added-rows) = 1500 + 2300 = 3800 + m.EqualValues(int64(3800), *writer.NextRowID()-firstRowID) err = writer.Close() m.Require().NoError(err) } +func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() { + // Assigned row-id delta = sum of (existing+added) for all data manifests in list. + var buf bytes.Buffer + commitSnapID := int64(100) + otherSnapID := int64(99) + firstRowID := int64(0) + sequenceNum := int64(1) + writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil) + m.Require().NoError(err) + manifests := []ManifestFile{ + NewManifestFile(3, "current.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), + NewManifestFile(3, "carried.avro", 200, 1, otherSnapID).SequenceNum(0, 0).AddedRows(100).ExistingRows(50).Build(), + NewManifestFile(3, "current2.avro", 300, 1, commitSnapID).AddedRows(20).Build(), + } + err = writer.AddManifests(manifests) + m.Require().NoError(err) + // Delta = 15 + 150 + 20 = 185 (all data manifests get row-id range) + m.EqualValues(185, *writer.NextRowID()-firstRowID) + m.Require().NoError(writer.Close()) +} + +func (m *ManifestTestSuite) TestV3ManifestListWriterDeltaIgnoresNonDataManifests() { + // Only data manifests get row-id assignment; delete manifests must not affect delta. + var buf bytes.Buffer + commitSnapID := int64(1) + firstRowID := int64(100) + sequenceNum := int64(1) + writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil) + m.Require().NoError(err) + manifests := []ManifestFile{ + NewManifestFile(3, "data.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), + NewManifestFile(3, "deletes.avro", 200, 1, commitSnapID).Content(ManifestContentDeletes).AddedRows(100).Build(), + NewManifestFile(3, "data2.avro", 300, 1, commitSnapID).AddedRows(20).Build(), + } + err = writer.AddManifests(manifests) + m.Require().NoError(err) + // Delta = 15 + 20 = 35 (only data manifests; delete manifest ignored) + m.EqualValues(35, *writer.NextRowID()-firstRowID) + m.Require().NoError(writer.Close()) +} + func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() { // Test v3writerImpl.prepareEntry sequence number validation logic v3Writer := v3writerImpl{} diff --git a/table/metadata.go b/table/metadata.go index 92b976cae..4df78e32a 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -480,6 +480,11 @@ func (b *MetadataBuilder) currentNextRowID() int64 { return initialRowID } +// NextRowID returns the next available row ID (for v3 row lineage). For v1/v2 returns 0. +func (b *MetadataBuilder) NextRowID() int64 { + return b.currentNextRowID() +} + func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error { if b.currentSnapshotID != nil && slices.Contains(snapshotIds, *b.currentSnapshotID) { return errors.New("current snapshot cannot be removed") diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 8022f5a48..c310499f8 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -703,17 +703,34 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { parentSnapshot = &sp.parentSnapshotID } + firstRowID := int64(0) + var addedRows int64 + out, err := sp.io.Create(manifestListFilePath) if err != nil { return nil, nil, err } defer internal.CheckedClose(out, &err) - // TODO: Implement v3 here - err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, - sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests) - if err != nil { - return nil, nil, err + if sp.txn.meta.formatVersion == 3 { + firstRowID = sp.txn.meta.NextRowID() + writer, err := iceberg.NewManifestListWriterV3(out, sp.snapshotID, nextSequence, firstRowID, parentSnapshot) + if err != nil { + return nil, nil, err + } + defer internal.CheckedClose(writer, &err) + if err = writer.AddManifests(newManifests); err != nil { + return nil, nil, err + } + if writer.NextRowID() != nil { + addedRows = *writer.NextRowID() - firstRowID + } + } else { + err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, + sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests) + if err != nil { + return nil, nil, err + } } snapshot := Snapshot{ @@ -725,6 +742,10 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { SchemaID: &sp.txn.meta.currentSchemaID, TimestampMs: time.Now().UnixMilli(), } + if sp.txn.meta.formatVersion == 3 { + snapshot.FirstRowID = &firstRowID + snapshot.AddedRows = &addedRows + } return []Update{ NewAddSnapshotUpdate(&snapshot), diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 1c6922f14..f2e55631d 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -97,6 +97,22 @@ func (m *memIO) Remove(name string) error { return nil } +// createTestTransactionWithMemIO creates a transaction using the io package's mem blob FS +// so that Create() output is persisted and can be read back (e.g. for sequential commits). +func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec) (*Transaction, iceio.WriteFileIO) { + t.Helper() + ctx := context.Background() + fs, err := iceio.LoadFS(ctx, nil, "mem://default/table-location") + require.NoError(t, err, "LoadFS mem") + wfs := fs.(iceio.WriteFileIO) + schema := simpleSchema() + meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, "mem://default/table-location", nil) + require.NoError(t, err, "new metadata") + tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { return fs, nil }, nil) + + return tbl.NewTransaction(), wfs +} + func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema *iceberg.Schema) int { t.Helper() @@ -161,6 +177,128 @@ func createTestTransaction(t *testing.T, io iceio.IO, spec iceberg.PartitionSpec return tbl.NewTransaction() } +// TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and AddedRows +// on the snapshot for row lineage, and that applying updates advances next-row-id correctly. +func TestCommitV3RowLineage(t *testing.T) { + trackIO := newTrackingIO() + spec := iceberg.NewPartitionSpec() + txn := createTestTransaction(t, trackIO, spec) + txn.meta.formatVersion = 3 + + // Single data file with record count 1 (newTestDataFile uses 1, 1 for record count and file size). + const expectedAddedRows = 1 + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + df := newTestDataFile(t, spec, "file://data.parquet", nil) + sp.appendDataFile(df) + + updates, reqs, err := sp.commit() + require.NoError(t, err, "commit should succeed") + require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef updates") + addSnap, ok := updates[0].(*addSnapshotUpdate) + require.True(t, ok, "first update must be AddSnapshot") + + // Exact snapshot lineage: first-row-id 0 for new table, added-rows matches appended file(s). + require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have first-row-id") + require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have added-rows") + require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID, "first-row-id should be table next-row-id at commit") + require.Equal(t, int64(expectedAddedRows), *addSnap.Snapshot.AddedRows, "added-rows should match appended data file record count") + + // Apply updates and verify metadata next-row-id advances monotonically. + err = txn.apply(updates, reqs) + require.NoError(t, err, "apply should succeed") + meta, err := txn.meta.Build() + require.NoError(t, err, "build metadata") + require.Equal(t, int64(expectedAddedRows), meta.NextRowID(), "next-row-id should equal first-row-id + added-rows") +} + +// TestCommitV3RowLineageTwoSequentialCommits runs two commits and asserts monotonic, +// gap-free first-row-id / next-row-id progression. +func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { + spec := iceberg.NewPartitionSpec() + ident := Identifier{"db", "tbl"} + txn, memIO := createTestTransactionWithMemIO(t, spec) + txn.meta.formatVersion = 3 + + // First commit: new table, append one file (1 row). + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) + updates1, reqs1, err := sp1.commit() + require.NoError(t, err, "first commit should succeed") + addSnap1, ok := updates1[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "first snapshot first-row-id") + require.Equal(t, int64(1), *addSnap1.Snapshot.AddedRows, "first snapshot added-rows") + err = txn.apply(updates1, reqs1) + require.NoError(t, err, "first apply should succeed") + meta1, err := txn.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(1), meta1.NextRowID(), "next-row-id after first commit") + + // Second commit: fast append one more file. Carried manifest already has first_row_id, so only new manifest gets row IDs; delta = 1. + tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) + txn2 := tbl2.NewTransaction() + txn2.meta.formatVersion = 3 + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) + updates2, reqs2, err := sp2.commit() + require.NoError(t, err, "second commit should succeed") + addSnap2, ok := updates2[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "second snapshot first-row-id continues from first next-row-id") + require.Equal(t, int64(1), *addSnap2.Snapshot.AddedRows, "only new manifest gets row IDs assigned") + + err = txn2.apply(updates2, reqs2) + require.NoError(t, err, "second apply should succeed") + meta2, err := txn2.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(2), meta2.NextRowID(), "next-row-id = 1 + 1 (gap-free)") +} + +// TestCommitV3RowLineageDeltaIncludesExistingRows uses merge append so one manifest +// has both existing and added rows; verifies assigned delta includes ExistingRowsCount +// and metadata next-row-id matches. +func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { + spec := iceberg.NewPartitionSpec() + ident := Identifier{"db", "tbl"} + txn, memIO := createTestTransactionWithMemIO(t, spec) + txn.meta.formatVersion = 3 + + // First commit: one file (1 row). + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) + updates1, reqs1, err := sp1.commit() + require.NoError(t, err, "first commit should succeed") + err = txn.apply(updates1, reqs1) + require.NoError(t, err) + meta1, err := txn.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(1), meta1.NextRowID()) + + // Second commit: merge append so the two data manifests (existing + new) are merged into one with 1 existing + 1 added row. + tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) + txn2 := tbl2.NewTransaction() + txn2.meta.formatVersion = 3 + if txn2.meta.props == nil { + txn2.meta.props = make(iceberg.Properties) + } + txn2.meta.props[ManifestMergeEnabledKey] = "true" + txn2.meta.props[ManifestMinMergeCountKey] = "2" + sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) + updates2, reqs2, err := sp2.commit() + require.NoError(t, err, "second commit (merge) should succeed") + addSnap2, ok := updates2[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "first-row-id continues from first commit") + require.Equal(t, int64(2), *addSnap2.Snapshot.AddedRows, "assigned delta = existing (1) + added (1) in merged manifest") + + err = txn2.apply(updates2, reqs2) + require.NoError(t, err) + meta2, err := txn2.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = first-row-id + assigned delta (1+2)") +} + func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { spec := partitionedSpec() schema := simpleSchema()