Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,10 +1401,53 @@ func (m *ManifestTestSuite) TestV3ManifestListWriterRowIDTracking() {
// Expected: 5000 + 1500 + 2300 = 8800
expectedNextRowID := firstRowID + 1500 + 2300
m.EqualValues(expectedNextRowID, *writer.NextRowID())
// Assigned row-id delta (for snapshot.added-rows) = 1500 + 2300 = 3800
m.EqualValues(int64(3800), *writer.NextRowID()-firstRowID)
err = writer.Close()
m.Require().NoError(err)
}

func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() {
// Assigned row-id delta = sum of (existing+added) for all data manifests in list.
var buf bytes.Buffer
commitSnapID := int64(100)
otherSnapID := int64(99)
firstRowID := int64(0)
sequenceNum := int64(1)
writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil)
m.Require().NoError(err)
manifests := []ManifestFile{
NewManifestFile(3, "current.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(),
NewManifestFile(3, "carried.avro", 200, 1, otherSnapID).SequenceNum(0, 0).AddedRows(100).ExistingRows(50).Build(),
NewManifestFile(3, "current2.avro", 300, 1, commitSnapID).AddedRows(20).Build(),
}
err = writer.AddManifests(manifests)
m.Require().NoError(err)
// Delta = 15 + 150 + 20 = 185 (all data manifests get row-id range)
m.EqualValues(185, *writer.NextRowID()-firstRowID)
m.Require().NoError(writer.Close())
}

func (m *ManifestTestSuite) TestV3ManifestListWriterDeltaIgnoresNonDataManifests() {
// Only data manifests get row-id assignment; delete manifests must not affect delta.
var buf bytes.Buffer
commitSnapID := int64(1)
firstRowID := int64(100)
sequenceNum := int64(1)
writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, firstRowID, nil)
m.Require().NoError(err)
manifests := []ManifestFile{
NewManifestFile(3, "data.avro", 100, 1, commitSnapID).AddedRows(10).ExistingRows(5).Build(),
NewManifestFile(3, "deletes.avro", 200, 1, commitSnapID).Content(ManifestContentDeletes).AddedRows(100).Build(),
NewManifestFile(3, "data2.avro", 300, 1, commitSnapID).AddedRows(20).Build(),
}
err = writer.AddManifests(manifests)
m.Require().NoError(err)
// Delta = 15 + 20 = 35 (only data manifests; delete manifest ignored)
m.EqualValues(35, *writer.NextRowID()-firstRowID)
m.Require().NoError(writer.Close())
}

func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
// Test v3writerImpl.prepareEntry sequence number validation logic
v3Writer := v3writerImpl{}
Expand Down
5 changes: 5 additions & 0 deletions table/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,11 @@ func (b *MetadataBuilder) currentNextRowID() int64 {
return initialRowID
}

// NextRowID returns the next available row ID (for v3 row lineage). For v1/v2 returns 0.
func (b *MetadataBuilder) NextRowID() int64 {
return b.currentNextRowID()
}

func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
if b.currentSnapshotID != nil && slices.Contains(snapshotIds, *b.currentSnapshotID) {
return errors.New("current snapshot cannot be removed")
Expand Down
31 changes: 26 additions & 5 deletions table/snapshot_producers.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,17 +703,34 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) {
parentSnapshot = &sp.parentSnapshotID
}

firstRowID := int64(0)
var addedRows int64

out, err := sp.io.Create(manifestListFilePath)
if err != nil {
return nil, nil, err
}
defer internal.CheckedClose(out, &err)

// TODO: Implement v3 here
err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
if err != nil {
return nil, nil, err
if sp.txn.meta.formatVersion == 3 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should result to use the single API WriteManifestList for all 3 version instead of branching out like this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be aligned with what other drivers do (rust / java / python)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now let's keep things aligned with the other drivers and then we can do another pass for improving it later on

firstRowID = sp.txn.meta.NextRowID()
writer, err := iceberg.NewManifestListWriterV3(out, sp.snapshotID, nextSequence, firstRowID, parentSnapshot)
if err != nil {
return nil, nil, err
}
defer internal.CheckedClose(writer, &err)
if err = writer.AddManifests(newManifests); err != nil {
return nil, nil, err
}
if writer.NextRowID() != nil {
addedRows = *writer.NextRowID() - firstRowID
}
} else {
err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
sp.snapshotID, parentSnapshot, &nextSequence, firstRowID, newManifests)
if err != nil {
return nil, nil, err
}
}

snapshot := Snapshot{
Expand All @@ -725,6 +742,10 @@ func (sp *snapshotProducer) commit() (_ []Update, _ []Requirement, err error) {
SchemaID: &sp.txn.meta.currentSchemaID,
TimestampMs: time.Now().UnixMilli(),
}
if sp.txn.meta.formatVersion == 3 {
snapshot.FirstRowID = &firstRowID
snapshot.AddedRows = &addedRows
}

return []Update{
NewAddSnapshotUpdate(&snapshot),
Expand Down
138 changes: 138 additions & 0 deletions table/snapshot_producers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func (m *memIO) Remove(name string) error {
return nil
}

// createTestTransactionWithMemIO creates a transaction using the io package's mem blob FS
// so that Create() output is persisted and can be read back (e.g. for sequential commits).
func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec) (*Transaction, iceio.WriteFileIO) {
t.Helper()
ctx := context.Background()
fs, err := iceio.LoadFS(ctx, nil, "mem://default/table-location")
require.NoError(t, err, "LoadFS mem")
wfs := fs.(iceio.WriteFileIO)
schema := simpleSchema()
meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, "mem://default/table-location", nil)
require.NoError(t, err, "new metadata")
tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", func(context.Context) (iceio.IO, error) { return fs, nil }, nil)

return tbl.NewTransaction(), wfs
}

func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema *iceberg.Schema) int {
t.Helper()

Expand Down Expand Up @@ -161,6 +177,128 @@ func createTestTransaction(t *testing.T, io iceio.IO, spec iceberg.PartitionSpec
return tbl.NewTransaction()
}

// TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and AddedRows
// on the snapshot for row lineage, and that applying updates advances next-row-id correctly.
func TestCommitV3RowLineage(t *testing.T) {
trackIO := newTrackingIO()
spec := iceberg.NewPartitionSpec()
txn := createTestTransaction(t, trackIO, spec)
txn.meta.formatVersion = 3

// Single data file with record count 1 (newTestDataFile uses 1, 1 for record count and file size).
const expectedAddedRows = 1
sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil)
df := newTestDataFile(t, spec, "file://data.parquet", nil)
sp.appendDataFile(df)

updates, reqs, err := sp.commit()
require.NoError(t, err, "commit should succeed")
require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef updates")
addSnap, ok := updates[0].(*addSnapshotUpdate)
require.True(t, ok, "first update must be AddSnapshot")

// Exact snapshot lineage: first-row-id 0 for new table, added-rows matches appended file(s).
require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have first-row-id")
require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have added-rows")
require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID, "first-row-id should be table next-row-id at commit")
require.Equal(t, int64(expectedAddedRows), *addSnap.Snapshot.AddedRows, "added-rows should match appended data file record count")

// Apply updates and verify metadata next-row-id advances monotonically.
err = txn.apply(updates, reqs)
require.NoError(t, err, "apply should succeed")
meta, err := txn.meta.Build()
require.NoError(t, err, "build metadata")
require.Equal(t, int64(expectedAddedRows), meta.NextRowID(), "next-row-id should equal first-row-id + added-rows")
}

// TestCommitV3RowLineageTwoSequentialCommits runs two commits and asserts monotonic,
// gap-free first-row-id / next-row-id progression.
func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) {
spec := iceberg.NewPartitionSpec()
ident := Identifier{"db", "tbl"}
txn, memIO := createTestTransactionWithMemIO(t, spec)
txn.meta.formatVersion = 3

// First commit: new table, append one file (1 row).
sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil))
updates1, reqs1, err := sp1.commit()
require.NoError(t, err, "first commit should succeed")
addSnap1, ok := updates1[0].(*addSnapshotUpdate)
require.True(t, ok)
require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "first snapshot first-row-id")
require.Equal(t, int64(1), *addSnap1.Snapshot.AddedRows, "first snapshot added-rows")
err = txn.apply(updates1, reqs1)
require.NoError(t, err, "first apply should succeed")
meta1, err := txn.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(1), meta1.NextRowID(), "next-row-id after first commit")

// Second commit: fast append one more file. Carried manifest already has first_row_id, so only new manifest gets row IDs; delta = 1.
tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil)
txn2 := tbl2.NewTransaction()
txn2.meta.formatVersion = 3
sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil))
updates2, reqs2, err := sp2.commit()
require.NoError(t, err, "second commit should succeed")
addSnap2, ok := updates2[0].(*addSnapshotUpdate)
require.True(t, ok)
require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "second snapshot first-row-id continues from first next-row-id")
require.Equal(t, int64(1), *addSnap2.Snapshot.AddedRows, "only new manifest gets row IDs assigned")

err = txn2.apply(updates2, reqs2)
require.NoError(t, err, "second apply should succeed")
meta2, err := txn2.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(2), meta2.NextRowID(), "next-row-id = 1 + 1 (gap-free)")
}

// TestCommitV3RowLineageDeltaIncludesExistingRows uses merge append so one manifest
// has both existing and added rows; verifies assigned delta includes ExistingRowsCount
// and metadata next-row-id matches.
func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
spec := iceberg.NewPartitionSpec()
ident := Identifier{"db", "tbl"}
txn, memIO := createTestTransactionWithMemIO(t, spec)
txn.meta.formatVersion = 3

// First commit: one file (1 row).
sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", nil))
updates1, reqs1, err := sp1.commit()
require.NoError(t, err, "first commit should succeed")
err = txn.apply(updates1, reqs1)
require.NoError(t, err)
meta1, err := txn.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(1), meta1.NextRowID())

// Second commit: merge append so the two data manifests (existing + new) are merged into one with 1 existing + 1 added row.
tbl2 := New(ident, meta1, "metadata.json", func(context.Context) (iceio.IO, error) { return memIO, nil }, nil)
txn2 := tbl2.NewTransaction()
txn2.meta.formatVersion = 3
if txn2.meta.props == nil {
txn2.meta.props = make(iceberg.Properties)
}
txn2.meta.props[ManifestMergeEnabledKey] = "true"
txn2.meta.props[ManifestMinMergeCountKey] = "2"
sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", nil))
updates2, reqs2, err := sp2.commit()
require.NoError(t, err, "second commit (merge) should succeed")
addSnap2, ok := updates2[0].(*addSnapshotUpdate)
require.True(t, ok)
require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "first-row-id continues from first commit")
require.Equal(t, int64(2), *addSnap2.Snapshot.AddedRows, "assigned delta = existing (1) + added (1) in merged manifest")

err = txn2.apply(updates2, reqs2)
require.NoError(t, err)
meta2, err := txn2.meta.Build()
require.NoError(t, err)
require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = first-row-id + assigned delta (1+2)")
}

func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
spec := partitionedSpec()
schema := simpleSchema()
Expand Down
Loading