From 4ed736c5707ec8acb36c08aed3a0cbc79ed85484 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Sun, 15 Feb 2026 23:23:36 +0100 Subject: [PATCH 1/8] feat: Wire V3 snapshot producer to row-lineage state --- table/metadata.go | 5 +++++ table/snapshot_producers.go | 18 ++++++++++++++++-- table/snapshot_producers_test.go | 22 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/table/metadata.go b/table/metadata.go index 92b976cae..4df78e32a 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -480,6 +480,11 @@ func (b *MetadataBuilder) currentNextRowID() int64 { return initialRowID } +// NextRowID returns the next available row ID (for v3 row lineage). For v1/v2 returns 0. +func (b *MetadataBuilder) NextRowID() int64 { + return b.currentNextRowID() +} + func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error { if b.currentSnapshotID != nil && slices.Contains(snapshotIds, *b.currentSnapshotID) { return errors.New("current snapshot cannot be removed") diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 8022f5a48..2467b3be4 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -703,15 +703,25 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { parentSnapshot = &sp.parentSnapshotID } + firstRowID := int64(0) + var addedRows int64 + if sp.txn.meta.formatVersion == 3 { + firstRowID = sp.txn.meta.NextRowID() + for _, m := range newManifests { + if m.ManifestContent() == iceberg.ManifestContentData { + addedRows += m.AddedRows() + m.ExistingRows() + } + } + } + out, err := sp.io.Create(manifestListFilePath) if err != nil { return nil, nil, err } defer internal.CheckedClose(out, &err) - // TODO: Implement v3 here err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, - sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests) + sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests) if err != nil { return nil, nil, err } @@ -725,6 +735,10 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { SchemaID: &sp.txn.meta.currentSchemaID, TimestampMs: time.Now().UnixMilli(), } + if sp.txn.meta.formatVersion == 3 { + snapshot.FirstRowID = &firstRowID + snapshot.AddedRows = &addedRows + } return []Update{ NewAddSnapshotUpdate(&snapshot), diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 1c6922f14..f98517afc 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -161,6 +161,28 @@ func createTestTransaction(t *testing.T, io iceio.IO, spec iceberg.PartitionSpec return tbl.NewTransaction() } +// TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and AddedRows +// on the snapshot for row lineage. +func TestCommitV3RowLineage(t *testing.T) { + trackIO := newTrackingIO() + spec := iceberg.NewPartitionSpec() + txn := createTestTransaction(t, trackIO, spec) + txn.meta.formatVersion = 3 + + sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) + df := newTestDataFile(t, spec, "file://data.parquet", nil) + sp.appendDataFile(df) + + updates, _, err := sp.commit() + require.NoError(t, err, "commit should succeed") + require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef updates") + addSnap, ok := updates[0].(*addSnapshotUpdate) + require.True(t, ok, "first update must be AddSnapshot") + require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have first-row-id") + require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have added-rows") + require.GreaterOrEqual(t, *addSnap.Snapshot.AddedRows, int64(0), "added-rows must be non-negative") +} + func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { spec := partitionedSpec() schema := simpleSchema() From 89dd5c6a4ebc1511724631474b88d30e11a55f60 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Sun, 15 Feb 2026 23:37:59 +0100 Subject: [PATCH 2/8] only added rows --- table/snapshot_producers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 2467b3be4..d618b525e 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -708,8 +708,8 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { if sp.txn.meta.formatVersion == 3 { firstRowID = sp.txn.meta.NextRowID() for _, m := range newManifests { - if m.ManifestContent() == iceberg.ManifestContentData { - addedRows += m.AddedRows() + m.ExistingRows() + if m.ManifestContent() == iceberg.ManifestContentData && m.SnapshotID() == sp.snapshotID { + addedRows += m.AddedRows() } } } From dc531f9bfd480862bc36b6827f89a4f62eb07270 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Sun, 15 Feb 2026 23:41:06 +0100 Subject: [PATCH 3/8] make test more complex --- table/snapshot_producers_test.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index f98517afc..75436534b 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -162,25 +162,37 @@ func createTestTransaction(t *testing.T, io iceio.IO, spec iceberg.PartitionSpec } // TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and AddedRows -// on the snapshot for row lineage. +// on the snapshot for row lineage, and that applying updates advances next-row-id correctly. func TestCommitV3RowLineage(t *testing.T) { trackIO := newTrackingIO() spec := iceberg.NewPartitionSpec() txn := createTestTransaction(t, trackIO, spec) txn.meta.formatVersion = 3 + // Single data file with record count 1 (newTestDataFile uses 1, 1 for record count and file size). + const expectedAddedRows = 1 sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil) df := newTestDataFile(t, spec, "file://data.parquet", nil) sp.appendDataFile(df) - updates, _, err := sp.commit() + updates, reqs, err := sp.commit() require.NoError(t, err, "commit should succeed") require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef updates") addSnap, ok := updates[0].(*addSnapshotUpdate) require.True(t, ok, "first update must be AddSnapshot") + + // Exact snapshot lineage: first-row-id 0 for new table, added-rows matches appended file(s). require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have first-row-id") require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have added-rows") - require.GreaterOrEqual(t, *addSnap.Snapshot.AddedRows, int64(0), "added-rows must be non-negative") + require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID, "first-row-id should be table next-row-id at commit") + require.Equal(t, int64(expectedAddedRows), *addSnap.Snapshot.AddedRows, "added-rows should match appended data file record count") + + // Apply updates and verify metadata next-row-id advances monotonically. + err = txn.apply(updates, reqs) + require.NoError(t, err, "apply should succeed") + meta, err := txn.meta.Build() + require.NoError(t, err, "build metadata") + require.Equal(t, int64(expectedAddedRows), meta.NextRowID(), "next-row-id should equal first-row-id + added-rows") } func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { From 7feb941fa275acd8324050db0a463fa209078400 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Sun, 15 Feb 2026 23:50:37 +0100 Subject: [PATCH 4/8] move added rows to manifet and calculate them in producer --- manifest.go | 10 ++++++++++ manifest_test.go | 23 +++++++++++++++++++++++ table/snapshot_producers.go | 29 +++++++++++++++++------------ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/manifest.go b/manifest.go index 48bae7a62..8fb453899 100644 --- a/manifest.go +++ b/manifest.go @@ -1277,6 +1277,7 @@ type ManifestListWriter struct { sequenceNumber int64 writer *ocf.Encoder nextRowID *int64 + addedRows int64 // v3: rows added by this snapshot (data manifests with AddedSnapshotID == commitSnapshotID) } func NewManifestListWriterV1(out io.Writer, snapshotID int64, parentSnapshot *int64) (*ManifestListWriter, error) { @@ -1374,6 +1375,12 @@ func (m *ManifestListWriter) NextRowID() *int64 { return m.nextRowID } +// AddedRows returns the number of rows added by this snapshot (v3 only). +// Only data manifests with AddedSnapshotID == commitSnapshotID are counted. +func (m *ManifestListWriter) AddedRows() int64 { + return m.addedRows +} + func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { if len(files) == 0 { return nil @@ -1410,6 +1417,9 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { *m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount } } + if wrapped.Content == ManifestContentData && wrapped.AddedSnapshotID == m.commitSnapshotID { + m.addedRows += wrapped.AddedRowsCount + } } if wrapped.SeqNumber == -1 { // if the sequence number is being assigned here, diff --git a/manifest_test.go b/manifest_test.go index 1192474ee..467654524 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -1401,10 +1401,33 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterRowIDTracking() { // Expected: 5000 + 1500 + 2300 = 8800 expectedNextRowID := firstRowID + 1500 + 2300 m.EqualValues(expectedNextRowID, *writer.NextRowID()) + // AddedRows counts only added rows for manifests from this snapshot: 1000 + 2000 = 3000 + m.EqualValues(3000, writer.AddedRows()) err = writer.Close() m.Require().NoError(err) } +func (m *ManifestTestSuite) TestV3ManifestListWriterAddedRowsExcludesOtherSnapshots() { + // AddedRows must count only manifests with AddedSnapshotID == commitSnapshotID. + var buf bytes.Buffer + commitSnapID := int64(100) + otherSnapID := int64(99) + firstRowID := int64(0) + sequenceNum := int64(1) + writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil) + m.Require().NoError(err) + manifests := []ManifestFile{ + NewManifestFile(3, "current.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), + NewManifestFile(3, "carried.avro", 200, 1, otherSnapID).SequenceNum(0, 0).AddedRows(100).ExistingRows(50).Build(), + NewManifestFile(3, "current2.avro", 300, 1, commitSnapID).AddedRows(20).Build(), + } + err = writer.AddManifests(manifests) + m.Require().NoError(err) + // Only current snapshot's added rows: 10 + 20 = 30 (not 100 from carried) + m.EqualValues(30, writer.AddedRows()) + m.Require().NoError(writer.Close()) +} + func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() { // Test v3writerImpl.prepareEntry sequence number validation logic v3Writer := v3writerImpl{} diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index d618b525e..c75f6b003 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -705,14 +705,6 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { firstRowID := int64(0) var addedRows int64 - if sp.txn.meta.formatVersion == 3 { - firstRowID = sp.txn.meta.NextRowID() - for _, m := range newManifests { - if m.ManifestContent() == iceberg.ManifestContentData && m.SnapshotID() == sp.snapshotID { - addedRows += m.AddedRows() - } - } - } out, err := sp.io.Create(manifestListFilePath) if err != nil { @@ -720,10 +712,23 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { } defer internal.CheckedClose(out, &err) - err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, - sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests) - if err != nil { - return nil, nil, err + if sp.txn.meta.formatVersion == 3 { + firstRowID = sp.txn.meta.NextRowID() + writer, err := iceberg.NewManifestListWriterV3(out, sp.snapshotID, nextSequence, firstRowID, parentSnapshot) + if err != nil { + return nil, nil, err + } + defer internal.CheckedClose(writer, &err) + if err = writer.AddManifests(newManifests); err != nil { + return nil, nil, err + } + addedRows = writer.AddedRows() + } else { + err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, + sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests) + if err != nil { + return nil, nil, err + } } snapshot := Snapshot{ From ec129b89e309dd4d86f075da87a078a5170a0375 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Sun, 15 Feb 2026 23:57:54 +0100 Subject: [PATCH 5/8] make implementation closer to iceberg-java --- manifest.go | 10 ---------- manifest_test.go | 12 ++++++------ table/snapshot_producers.go | 4 +++- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/manifest.go b/manifest.go index 8fb453899..48bae7a62 100644 --- a/manifest.go +++ b/manifest.go @@ -1277,7 +1277,6 @@ type ManifestListWriter struct { sequenceNumber int64 writer *ocf.Encoder nextRowID *int64 - addedRows int64 // v3: rows added by this snapshot (data manifests with AddedSnapshotID == commitSnapshotID) } func NewManifestListWriterV1(out io.Writer, snapshotID int64, parentSnapshot *int64) (*ManifestListWriter, error) { @@ -1375,12 +1374,6 @@ func (m *ManifestListWriter) NextRowID() *int64 { return m.nextRowID } -// AddedRows returns the number of rows added by this snapshot (v3 only). -// Only data manifests with AddedSnapshotID == commitSnapshotID are counted. -func (m *ManifestListWriter) AddedRows() int64 { - return m.addedRows -} - func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { if len(files) == 0 { return nil @@ -1417,9 +1410,6 @@ func (m *ManifestListWriter) AddManifests(files []ManifestFile) error { *m.nextRowID += wrapped.ExistingRowsCount + wrapped.AddedRowsCount } } - if wrapped.Content == ManifestContentData && wrapped.AddedSnapshotID == m.commitSnapshotID { - m.addedRows += wrapped.AddedRowsCount - } } if wrapped.SeqNumber == -1 { // if the sequence number is being assigned here, diff --git a/manifest_test.go b/manifest_test.go index 467654524..b1c58a5cd 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -1401,14 +1401,14 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterRowIDTracking() { // Expected: 5000 + 1500 + 2300 = 8800 expectedNextRowID := firstRowID + 1500 + 2300 m.EqualValues(expectedNextRowID, *writer.NextRowID()) - // AddedRows counts only added rows for manifests from this snapshot: 1000 + 2000 = 3000 - m.EqualValues(3000, writer.AddedRows()) + // Assigned row-id delta (for snapshot.added-rows) = 1500 + 2300 = 3800 + m.EqualValues(int64(3800), *writer.NextRowID()-firstRowID) err = writer.Close() m.Require().NoError(err) } -func (m *ManifestTestSuite) TestV3ManifestListWriterAddedRowsExcludesOtherSnapshots() { - // AddedRows must count only manifests with AddedSnapshotID == commitSnapshotID. +func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() { + // Assigned row-id delta = sum of (existing+added) for all data manifests in list. var buf bytes.Buffer commitSnapID := int64(100) otherSnapID := int64(99) @@ -1423,8 +1423,8 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterAddedRowsExcludesOtherSnapsh } err = writer.AddManifests(manifests) m.Require().NoError(err) - // Only current snapshot's added rows: 10 + 20 = 30 (not 100 from carried) - m.EqualValues(30, writer.AddedRows()) + // Delta = 15 + 150 + 20 = 185 (all data manifests get row-id range) + m.EqualValues(185, *writer.NextRowID()-firstRowID) m.Require().NoError(writer.Close()) } diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index c75f6b003..c310499f8 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -722,7 +722,9 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) { if err = writer.AddManifests(newManifests); err != nil { return nil, nil, err } - addedRows = writer.AddedRows() + if writer.NextRowID() != nil { + addedRows = *writer.NextRowID() - firstRowID + } } else { err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out, sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests) From 49735802dce823920ffc68bef05d669772289361 Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Mon, 16 Feb 2026 00:07:20 +0100 Subject: [PATCH 6/8] add more tricky tests --- table/snapshot_producers_test.go | 103 +++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 75436534b..133fc05a8 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -97,6 +97,21 @@ func (m *memIO) Remove(name string) error { return nil } +// createTestTransactionWithMemIO creates a transaction using the io package's mem blob FS +// so that Create() output is persisted and can be read back (e.g. for sequential commits). +func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec) (*Transaction, iceio.WriteFileIO) { + t.Helper() + ctx := context.Background() + fs, err := iceio.LoadFS(ctx, nil, "mem://default/table-location") + require.NoError(t, err, "LoadFS mem") + wfs := fs.(iceio.WriteFileIO) + schema := simpleSchema() + meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, "mem://default/table-location", nil) + require.NoError(t, err, "new metadata") + tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { return fs, nil }, nil) + return tbl.NewTransaction(), wfs +} + func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema *iceberg.Schema) int { t.Helper() @@ -195,6 +210,94 @@ func TestCommitV3RowLineage(t *testing.T) { require.Equal(t, int64(expectedAddedRows), meta.NextRowID(), "next-row-id should equal first-row-id + added-rows") } +// TestCommitV3RowLineageTwoSequentialCommits runs two commits and asserts monotonic, +// gap-free first-row-id / next-row-id progression. +func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) { + spec := iceberg.NewPartitionSpec() + ident := Identifier{"db", "tbl"} + txn, memIO := createTestTransactionWithMemIO(t, spec) + txn.meta.formatVersion = 3 + + // First commit: new table, append one file (1 row). + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) + updates1, reqs1, err := sp1.commit() + require.NoError(t, err, "first commit should succeed") + addSnap1, ok := updates1[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "first snapshot first-row-id") + require.Equal(t, int64(1), *addSnap1.Snapshot.AddedRows, "first snapshot added-rows") + err = txn.apply(updates1, reqs1) + require.NoError(t, err, "first apply should succeed") + meta1, err := txn.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(1), meta1.NextRowID(), "next-row-id after first commit") + + // Second commit: fast append one more file. Carried manifest already has first_row_id, so only new manifest gets row IDs; delta = 1. + tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) + txn2 := tbl2.NewTransaction() + txn2.meta.formatVersion = 3 + sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) + updates2, reqs2, err := sp2.commit() + require.NoError(t, err, "second commit should succeed") + addSnap2, ok := updates2[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "second snapshot first-row-id continues from first next-row-id") + require.Equal(t, int64(1), *addSnap2.Snapshot.AddedRows, "only new manifest gets row IDs assigned") + + err = txn2.apply(updates2, reqs2) + require.NoError(t, err, "second apply should succeed") + meta2, err := txn2.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(2), meta2.NextRowID(), "next-row-id = 1 + 1 (gap-free)") +} + +// TestCommitV3RowLineageDeltaIncludesExistingRows uses merge append so one manifest +// has both existing and added rows; verifies assigned delta includes ExistingRowsCount +// and metadata next-row-id matches. +func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) { + spec := iceberg.NewPartitionSpec() + ident := Identifier{"db", "tbl"} + txn, memIO := createTestTransactionWithMemIO(t, spec) + txn.meta.formatVersion = 3 + + // First commit: one file (1 row). + sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil) + sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil)) + updates1, reqs1, err := sp1.commit() + require.NoError(t, err, "first commit should succeed") + err = txn.apply(updates1, reqs1) + require.NoError(t, err) + meta1, err := txn.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(1), meta1.NextRowID()) + + // Second commit: merge append so the two data manifests (existing + new) are merged into one with 1 existing + 1 added row. + tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil) + txn2 := tbl2.NewTransaction() + txn2.meta.formatVersion = 3 + if txn2.meta.props == nil { + txn2.meta.props = make(iceberg.Properties) + } + txn2.meta.props[ManifestMergeEnabledKey] = "true" + txn2.meta.props[ManifestMinMergeCountKey] = "2" + sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil) + sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil)) + updates2, reqs2, err := sp2.commit() + require.NoError(t, err, "second commit (merge) should succeed") + addSnap2, ok := updates2[0].(*addSnapshotUpdate) + require.True(t, ok) + require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "first-row-id continues from first commit") + require.Equal(t, int64(2), *addSnap2.Snapshot.AddedRows, "assigned delta = existing (1) + added (1) in merged manifest") + + err = txn2.apply(updates2, reqs2) + require.NoError(t, err) + meta2, err := txn2.meta.Build() + require.NoError(t, err) + require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = first-row-id + assigned delta (1+2)") +} + func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) { spec := partitionedSpec() schema := simpleSchema() From 8010073384e9637f62a8d87d9080c832c67d587c Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Mon, 16 Feb 2026 00:12:29 +0100 Subject: [PATCH 7/8] test for non data manifests --- manifest_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/manifest_test.go b/manifest_test.go index b1c58a5cd..87dc4efc0 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -1428,6 +1428,26 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() { m.Require().NoError(writer.Close()) } +func (m *ManifestTestSuite) TestV3ManifestListWriterDeltaIgnoresNonDataManifests() { + // Only data manifests get row-id assignment; delete manifests must not affect delta. + var buf bytes.Buffer + commitSnapID := int64(1) + firstRowID := int64(100) + sequenceNum := int64(1) + writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil) + m.Require().NoError(err) + manifests := []ManifestFile{ + NewManifestFile(3, "data.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(), + NewManifestFile(3, "deletes.avro", 200, 1, commitSnapID).Content(ManifestContentDeletes).AddedRows(100).Build(), + NewManifestFile(3, "data2.avro", 300, 1, commitSnapID).AddedRows(20).Build(), + } + err = writer.AddManifests(manifests) + m.Require().NoError(err) + // Delta = 15 + 20 = 35 (only data manifests; delete manifest ignored) + m.EqualValues(35, *writer.NextRowID()-firstRowID) + m.Require().NoError(writer.Close()) +} + func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() { // Test v3writerImpl.prepareEntry sequence number validation logic v3Writer := v3writerImpl{} From d73e6312d4296bc66876d2660d493e859f0e463b Mon Sep 17 00:00:00 2001 From: Andrei Tserakhau Date: Mon, 16 Feb 2026 00:18:53 +0100 Subject: [PATCH 8/8] fix linter --- table/snapshot_producers_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go index 133fc05a8..f2e55631d 100644 --- a/table/snapshot_producers_test.go +++ b/table/snapshot_producers_test.go @@ -109,6 +109,7 @@ func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec) (* meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, "mem://default/table-location", nil) require.NoError(t, err, "new metadata") tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { return fs, nil }, nil) + return tbl.NewTransaction(), wfs }