Skip to content

Commit a012a17

Browse files
authored
fix(table): ensure partition path is deterministic (#767)
partition record order is expected to match partition spec but maps.Values can change it and cause cross-partition writer reuse, delete expected files, etc. related to #721 Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
1 parent cc8a6ff commit a012a17

File tree

2 files changed

+94
-3
lines changed

2 files changed

+94
-3
lines changed

table/pos_delete_partitioned_fanout_writer.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323
"iter"
24-
"maps"
25-
"slices"
2624

2725
"github.com/apache/arrow-go/v18/arrow"
2826
"github.com/apache/arrow-go/v18/arrow/array"
@@ -132,12 +130,20 @@ func (p *positionDeletePartitionedFanoutWriter) processBatch(ctx context.Context
132130
}
133131

134132
func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext partitionContext) (string, error) {
135-
data := partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData)))
136133
spec := p.metadata.PartitionSpecByID(int(partitionContext.specID))
137134
if spec == nil {
138135
return "", fmt.Errorf("unexpected missing partition spec in metadata for spec id %d", partitionContext.specID)
139136
}
140137

138+
data := make(partitionRecord, spec.NumFields())
139+
for i, field := range spec.Fields() {
140+
val, ok := partitionContext.partitionData[field.FieldID]
141+
if !ok {
142+
return "", fmt.Errorf("unexpected missing partition value for field id %d in spec id %d", field.FieldID, partitionContext.specID)
143+
}
144+
data[i] = val
145+
}
146+
141147
return spec.PartitionToPath(data, p.schema), nil
142148
}
143149

table/pos_delete_partitioned_fanout_writer_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"fmt"
2424
"maps"
25+
"slices"
2526
"strings"
2627
"testing"
2728
"time"
@@ -160,6 +161,90 @@ func TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
160161
}
161162
}
162163

164+
func TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *testing.T) {
165+
t.Parallel()
166+
167+
partitionSpec := iceberg.NewPartitionSpec(
168+
iceberg.PartitionField{
169+
FieldID: 1000,
170+
SourceID: 2147483546, // file_path
171+
Name: "file_path",
172+
Transform: iceberg.IdentityTransform{},
173+
},
174+
iceberg.PartitionField{
175+
FieldID: 1001,
176+
SourceID: 2147483545, // pos
177+
Name: "pos",
178+
Transform: iceberg.IdentityTransform{},
179+
},
180+
iceberg.PartitionField{
181+
FieldID: 1002,
182+
SourceID: 2147483545, // pos
183+
Name: "pos_bucket",
184+
Transform: iceberg.BucketTransform{
185+
NumBuckets: 128,
186+
},
187+
},
188+
)
189+
190+
metadataBuilder, err := NewMetadataBuilder(2)
191+
require.NoError(t, err)
192+
err = metadataBuilder.AddSchema(iceberg.PositionalDeleteSchema)
193+
require.NoError(t, err)
194+
err = metadataBuilder.SetCurrentSchemaID(0)
195+
require.NoError(t, err)
196+
err = metadataBuilder.AddPartitionSpec(&partitionSpec, true)
197+
require.NoError(t, err)
198+
err = metadataBuilder.SetDefaultSpecID(0)
199+
require.NoError(t, err)
200+
sortOrder, err := NewSortOrder(1, []SortField{{
201+
SourceID: 2147483546,
202+
Direction: SortASC,
203+
Transform: iceberg.IdentityTransform{},
204+
NullOrder: NullsFirst,
205+
}})
206+
require.NoError(t, err)
207+
err = metadataBuilder.AddSortOrder(&sortOrder)
208+
require.NoError(t, err)
209+
err = metadataBuilder.SetDefaultSortOrderID(1)
210+
require.NoError(t, err)
211+
212+
latestMeta, err := metadataBuilder.Build()
213+
require.NoError(t, err)
214+
215+
writer := &positionDeletePartitionedFanoutWriter{
216+
metadata: latestMeta,
217+
schema: iceberg.PositionalDeleteSchema,
218+
}
219+
220+
ctx := partitionContext{
221+
specID: 0,
222+
partitionData: map[int]any{
223+
1000: "file://ns/data-file.parquet",
224+
1001: int64(42),
225+
1002: int32(7),
226+
},
227+
}
228+
229+
expectedPath := partitionSpec.PartitionToPath(partitionRecord{
230+
ctx.partitionData[1000],
231+
ctx.partitionData[1001],
232+
ctx.partitionData[1002],
233+
}, iceberg.PositionalDeleteSchema)
234+
235+
// run multiple times to ensure it consistently
236+
// produces the same output for the same input context
237+
seen := make(map[string]struct{})
238+
for range 1024 {
239+
path, err := writer.partitionPath(ctx)
240+
require.NoError(t, err)
241+
seen[path] = struct{}{}
242+
}
243+
244+
require.Lenf(t, seen, 1, "partition path must be stable for the same input map, got paths: %v", slices.Collect(maps.Keys(seen)))
245+
require.Contains(t, seen, expectedPath)
246+
}
247+
163248
func onlyContext(ctx context.Context, _ func()) context.Context {
164249
return ctx
165250
}

0 commit comments

Comments
 (0)