Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions table/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
return nil
}

func (b *MetadataBuilder) isSnapshotReferenced(snapshotID int64) bool {
for _, ref := range b.refs {
if ref.SnapshotID == snapshotID {
return true
}
}

return false
}

func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder) error {
curSchema := b.CurrentSchema()
if curSchema == nil {
Expand Down
136 changes: 136 additions & 0 deletions table/metadata_builder_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,3 +1697,139 @@ func TestUnknownTypeValidation(t *testing.T) {
require.ErrorContains(t, err, "must be optional")
})
}

// TestRemoveSnapshotsSkipIfReferenced verifies the SkipIfReferenced flag
// on removeSnapshotsUpdate. When set, Apply() should skip any snapshot
// that is still referenced by a branch or tag, rather than removing it
// and silently cleaning up the ref. This guards against a race where a
// concurrent client links a snapshot to a new ref between
// ExpireSnapshots candidate selection and CommitTable.
func TestRemoveSnapshotsSkipIfReferenced(t *testing.T) {
schemaID := 0

makeSnapshot := func(id int64, seqNum int64, parentID *int64, baseTs int64) Snapshot {
return Snapshot{
SnapshotID: id,
ParentSnapshotID: parentID,
SequenceNumber: seqNum,
TimestampMs: baseTs + id,
ManifestList: fmt.Sprintf("/snap-%d.avro", id),
Summary: &Summary{Operation: OpAppend},
SchemaID: &schemaID,
}
}

// Build base metadata with three snapshots:
// snap-1 (oldest) <- snap-2 <- snap-3 (newest, main)
// and a branch "feature" pointing to snap-2.
setup := func() Metadata {
builder := builderWithoutChanges(2)
baseTs := builder.base.LastUpdatedMillis()

snap1ID := int64(1)
snap1 := makeSnapshot(1, 1, nil, baseTs)
snap2 := makeSnapshot(2, 2, &snap1ID, baseTs)
snap2ID := int64(2)
snap3 := makeSnapshot(3, 3, &snap2ID, baseTs)

require.NoError(t, builder.AddSnapshot(&snap1))
require.NoError(t, builder.AddSnapshot(&snap2))
require.NoError(t, builder.AddSnapshot(&snap3))
require.NoError(t, builder.SetSnapshotRef("main", 3, BranchRef))
require.NoError(t, builder.SetSnapshotRef("feature", 2, BranchRef))

meta, err := builder.Build()
require.NoError(t, err)

return meta
}

t.Run("SkipsReferencedSnapshot", func(t *testing.T) {
meta := setup()

// Try to remove snap-2 (referenced by "feature") with SkipIfReferenced.
update := NewRemoveSnapshotsUpdate([]int64{2}).SetSkipIfReferenced()
builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t, update.Apply(builder))

result, err := builder.Build()
require.NoError(t, err)

// snap-2 should still be present
require.NotNil(t, result.SnapshotByID(2))
// "feature" ref should still exist
refs := result.(*metadataV2).SnapshotRefs
ref, ok := refs["feature"]
require.True(t, ok)
require.Equal(t, int64(2), ref.SnapshotID)
})

t.Run("RemovesUnreferencedSnapshot", func(t *testing.T) {
meta := setup()

// Try to remove snap-1 (unreferenced) with SkipIfReferenced.
update := NewRemoveSnapshotsUpdate([]int64{1}).SetSkipIfReferenced()
builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t, update.Apply(builder))

result, err := builder.Build()
require.NoError(t, err)

// snap-1 should be gone
require.Nil(t, result.SnapshotByID(1))
// snap-2 and snap-3 still present
require.NotNil(t, result.SnapshotByID(2))
require.NotNil(t, result.SnapshotByID(3))
})

t.Run("MixedReferencedAndUnreferenced", func(t *testing.T) {
meta := setup()

// Try to remove snap-1 (unreferenced) and snap-2 (referenced)
// with SkipIfReferenced. Only snap-1 should be removed.
update := NewRemoveSnapshotsUpdate([]int64{1, 2}).SetSkipIfReferenced()
builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t, update.Apply(builder))

result, err := builder.Build()
require.NoError(t, err)

// snap-1 gone, snap-2 and snap-3 remain
require.Nil(t, result.SnapshotByID(1))
require.NotNil(t, result.SnapshotByID(2))
require.NotNil(t, result.SnapshotByID(3))
// "feature" ref still intact
refs := result.(*metadataV2).SnapshotRefs
ref, ok := refs["feature"]
require.True(t, ok)
require.Equal(t, int64(2), ref.SnapshotID)
})

t.Run("DefaultBehaviorRemovesReferencedSnapshot", func(t *testing.T) {
meta := setup()

// Without SkipIfReferenced, removing snap-2 should succeed
// and silently clean up the "feature" ref.
update := NewRemoveSnapshotsUpdate([]int64{2})
builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t, update.Apply(builder))

result, err := builder.Build()
require.NoError(t, err)

// snap-2 removed
require.Nil(t, result.SnapshotByID(2))
// "feature" ref cleaned up
refs := result.(*metadataV2).SnapshotRefs
_, ok := refs["feature"]
require.False(t, ok)
// "main" ref still present
ref, ok := refs["main"]
require.True(t, ok)
require.Equal(t, int64(3), ref.SnapshotID)
})
}
2 changes: 1 addition & 1 deletion table/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (t *Transaction) ExpireSnapshots(opts ...ExpireSnapshotsOpt) error {

// Only add the update if there are actually snapshots to delete
if len(snapsToDelete) > 0 {
update := NewRemoveSnapshotsUpdate(snapsToDelete)
update := NewRemoveSnapshotsUpdate(snapsToDelete).SetSkipIfReferenced()
update.postCommit = cfg.postCommit
updates = append(updates, update)
}
Expand Down
21 changes: 19 additions & 2 deletions table/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"slices"

"github.com/apache/iceberg-go"
"github.com/google/uuid"
Expand Down Expand Up @@ -411,8 +412,9 @@ func (u *removePropertiesUpdate) Apply(builder *MetadataBuilder) error {

type removeSnapshotsUpdate struct {
baseUpdate
SnapshotIDs []int64 `json:"snapshot-ids"`
postCommit bool
SnapshotIDs []int64 `json:"snapshot-ids"`
SkipIfReferenced bool `json:"skip-if-referenced,omitempty"`
postCommit bool
}

// NewRemoveSnapshotsUpdate creates a new update that removes all snapshots from
Expand All @@ -424,7 +426,22 @@ func NewRemoveSnapshotsUpdate(ids []int64) *removeSnapshotsUpdate {
}
}

func (u *removeSnapshotsUpdate) SetSkipIfReferenced() *removeSnapshotsUpdate {
u.SkipIfReferenced = true

return u
}

func (u *removeSnapshotsUpdate) Apply(builder *MetadataBuilder) error {
if u.SkipIfReferenced {
u.SnapshotIDs = slices.DeleteFunc(u.SnapshotIDs, func(id int64) bool {
return builder.isSnapshotReferenced(id)
})
if len(u.SnapshotIDs) == 0 {
return nil
}
}

return builder.RemoveSnapshots(u.SnapshotIDs)
}

Expand Down
Loading