Skip to content

Commit 4d67a7d

Browse files
feat(table): add RowDelta API for atomic row-level mutations
Add Transaction.NewRowDelta() for committing data files and delete files (position or equality) in a single atomic snapshot. Includes format version validation, equality field ID validation, and full round-trip integration test.
1 parent a012a17 commit 4d67a7d

File tree

2 files changed

+742
-0
lines changed

2 files changed

+742
-0
lines changed

table/row_delta.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package table
19+
20+
import (
21+
"context"
22+
"errors"
23+
"fmt"
24+
25+
"github.com/apache/iceberg-go"
26+
iceio "github.com/apache/iceberg-go/io"
27+
)
28+
29+
// RowDelta encodes a set of row-level changes to a table: new data files
30+
// (inserts) and delete files (equality or position deletes). All changes
31+
// are committed atomically in a single snapshot.
32+
//
33+
// The operation type of the produced snapshot is determined automatically:
34+
// - Data files only → OpAppend
35+
// - Delete files only → OpDelete
36+
// - Both data and delete files → OpOverwrite
37+
//
38+
// This matches the semantics of Java's BaseRowDelta. It is the primary
39+
// API for CDC/streaming workloads where INSERTs, UPDATEs, and DELETEs
40+
// must be committed together.
41+
//
42+
// Note: conflict detection for concurrent writers is not yet implemented.
43+
// Concurrent RowDelta commits against the same table may produce incorrect
44+
// results if delete files miss newly appended data. For single-writer
45+
// workloads this is safe.
46+
//
47+
// Usage:
48+
//
49+
// rd := tx.NewRowDelta(snapshotProps)
50+
// rd.AddRows(dataFile1, dataFile2)
51+
// rd.AddDeletes(equalityDeleteFile1)
52+
// err := rd.Commit(ctx)
53+
type RowDelta struct {
54+
txn *Transaction
55+
dataFiles []iceberg.DataFile
56+
delFiles []iceberg.DataFile
57+
props iceberg.Properties
58+
}
59+
60+
// NewRowDelta creates a new RowDelta for committing row-level changes
61+
// within this transaction. The provided properties are included in the
62+
// snapshot summary.
63+
func (t *Transaction) NewRowDelta(snapshotProps iceberg.Properties) *RowDelta {
64+
return &RowDelta{
65+
txn: t,
66+
props: snapshotProps,
67+
}
68+
}
69+
70+
// AddRows adds data files containing new rows (inserts) to this RowDelta.
71+
func (rd *RowDelta) AddRows(files ...iceberg.DataFile) *RowDelta {
72+
rd.dataFiles = append(rd.dataFiles, files...)
73+
74+
return rd
75+
}
76+
77+
// AddDeletes adds delete files (equality or position) to this RowDelta.
78+
// Equality delete files must have ContentType == EntryContentEqDeletes
79+
// and non-empty EqualityFieldIDs referencing valid schema columns.
80+
// Position delete files must have ContentType == EntryContentPosDeletes.
81+
func (rd *RowDelta) AddDeletes(files ...iceberg.DataFile) *RowDelta {
82+
rd.delFiles = append(rd.delFiles, files...)
83+
84+
return rd
85+
}
86+
87+
// Commit validates and commits all accumulated row-level changes as a
88+
// single atomic snapshot. Returns an error if there are no files to
89+
// commit, if any file has an unexpected content type, or if the table
90+
// format version does not support delete files.
91+
func (rd *RowDelta) Commit(ctx context.Context) error {
92+
if len(rd.dataFiles) == 0 && len(rd.delFiles) == 0 {
93+
return errors.New("row delta must have at least one data file or delete file")
94+
}
95+
96+
// Delete files require format version >= 2.
97+
if len(rd.delFiles) > 0 && rd.txn.meta.formatVersion < 2 {
98+
return fmt.Errorf("delete files require table format version >= 2, got v%d",
99+
rd.txn.meta.formatVersion)
100+
}
101+
102+
for _, f := range rd.dataFiles {
103+
if f.ContentType() != iceberg.EntryContentData {
104+
return fmt.Errorf("expected data file, got content type %s: %s",
105+
f.ContentType(), f.FilePath())
106+
}
107+
}
108+
109+
schema := rd.txn.meta.CurrentSchema()
110+
for _, f := range rd.delFiles {
111+
ct := f.ContentType()
112+
if ct != iceberg.EntryContentPosDeletes && ct != iceberg.EntryContentEqDeletes {
113+
return fmt.Errorf("expected delete file, got content type %s: %s",
114+
ct, f.FilePath())
115+
}
116+
117+
// Equality delete files must declare which columns form the delete key,
118+
// and those columns must exist in the current schema.
119+
if ct == iceberg.EntryContentEqDeletes {
120+
eqIDs := f.EqualityFieldIDs()
121+
if len(eqIDs) == 0 {
122+
return fmt.Errorf("equality delete file must have non-empty EqualityFieldIDs: %s",
123+
f.FilePath())
124+
}
125+
126+
for _, id := range eqIDs {
127+
if _, ok := schema.FindFieldByID(id); !ok {
128+
return fmt.Errorf("equality field ID %d not found in table schema: %s",
129+
id, f.FilePath())
130+
}
131+
}
132+
}
133+
}
134+
135+
fs, err := rd.txn.tbl.fsF(ctx)
136+
if err != nil {
137+
return err
138+
}
139+
140+
wfs, ok := fs.(iceio.WriteFileIO)
141+
if !ok {
142+
return fmt.Errorf("filesystem does not support writing")
143+
}
144+
145+
op := rd.Operation()
146+
producer := newFastAppendFilesProducer(op, rd.txn, wfs, nil, rd.props)
147+
148+
for _, f := range rd.dataFiles {
149+
producer.appendDataFile(f)
150+
}
151+
152+
// Both equality and position delete files are added via
153+
// appendPositionDeleteFile — they share ManifestContentDeletes
154+
// in manifests. The DataFile's ContentType() and EqualityFieldIDs()
155+
// distinguish them at the manifest entry level.
156+
for _, f := range rd.delFiles {
157+
producer.appendPositionDeleteFile(f)
158+
}
159+
160+
updates, reqs, err := producer.commit()
161+
if err != nil {
162+
return err
163+
}
164+
165+
return rd.txn.apply(updates, reqs)
166+
}
167+
168+
// Operation returns the snapshot operation type that will be used when
169+
// this RowDelta is committed:
170+
// - data only → OpAppend
171+
// - deletes only → OpDelete
172+
// - both → OpOverwrite
173+
func (rd *RowDelta) Operation() Operation {
174+
hasData := len(rd.dataFiles) > 0
175+
hasDeletes := len(rd.delFiles) > 0
176+
177+
switch {
178+
case hasData && hasDeletes:
179+
return OpOverwrite
180+
case hasDeletes:
181+
return OpDelete
182+
default:
183+
return OpAppend
184+
}
185+
}

0 commit comments

Comments
 (0)