Skip to content

Commit 55bdfbf

Browse files
authored
feat(table): export StructLike and GetPartitionRecord (#791)
PartitionRecord and GetPartitionRecord are the only way to go from a DataFile to something PartitionToPath accepts. Keeping them unexported forces external consumers to reimplement the same conversion. Also deduplicates the manual record construction in the pos delete writer.
1 parent 3c01a04 commit 55bdfbf

File tree

8 files changed

+30
-33
lines changed

8 files changed

+30
-33
lines changed

exprs.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,8 @@ type BoundTerm interface {
338338
Ref() BoundReference
339339
Type() Type
340340

341-
evalToLiteral(structLike) Optional[Literal]
342-
evalIsNull(structLike) bool
341+
evalToLiteral(StructLike) Optional[Literal]
342+
evalIsNull(StructLike) bool
343343
}
344344

345345
// unbound is a generic interface representing something that is not yet bound
@@ -485,7 +485,7 @@ func (b *boundRef[T]) Ref() BoundReference { return b }
485485
func (b *boundRef[T]) Field() NestedField { return b.field }
486486
func (b *boundRef[T]) Type() Type { return b.field.Type }
487487

488-
func (b *boundRef[T]) eval(st structLike) Optional[T] {
488+
func (b *boundRef[T]) eval(st StructLike) Optional[T] {
489489
switch v := b.acc.Get(st).(type) {
490490
case nil:
491491
return Optional[T]{}
@@ -506,7 +506,7 @@ func (b *boundRef[T]) eval(st structLike) Optional[T] {
506506
}
507507
}
508508

509-
func (b *boundRef[T]) evalToLiteral(st structLike) Optional[Literal] {
509+
func (b *boundRef[T]) evalToLiteral(st StructLike) Optional[Literal] {
510510
v := b.eval(st)
511511
if !v.Valid {
512512
return Optional[Literal]{}
@@ -520,7 +520,7 @@ func (b *boundRef[T]) evalToLiteral(st structLike) Optional[Literal] {
520520
return Optional[Literal]{Val: lit, Valid: true}
521521
}
522522

523-
func (b *boundRef[T]) evalIsNull(st structLike) bool {
523+
func (b *boundRef[T]) evalIsNull(st StructLike) bool {
524524
v := b.eval(st)
525525

526526
return !v.Valid
@@ -605,7 +605,7 @@ type BoundUnaryPredicate interface {
605605
type bound[T LiteralType] interface {
606606
BoundTerm
607607

608-
eval(structLike) Optional[T]
608+
eval(StructLike) Optional[T]
609609
}
610610

611611
func newBoundUnaryPred[T LiteralType](op Operation, term BoundTerm) BoundUnaryPredicate {
@@ -1048,10 +1048,10 @@ func (b *BoundTransform) Equals(other BoundTerm) bool {
10481048
return b.transform.Equals(rhs.transform) && b.term.Equals(rhs.term)
10491049
}
10501050

1051-
func (b *BoundTransform) evalToLiteral(st structLike) Optional[Literal] {
1051+
func (b *BoundTransform) evalToLiteral(st StructLike) Optional[Literal] {
10521052
return b.transform.Apply(b.term.evalToLiteral(st))
10531053
}
10541054

1055-
func (b *BoundTransform) evalIsNull(st structLike) bool {
1055+
func (b *BoundTransform) evalIsNull(st StructLike) bool {
10561056
return !b.evalToLiteral(st).Valid
10571057
}

partitions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func (ps *PartitionSpec) PartitionType(schema *Schema) *StructType {
479479
//
480480
// This does not apply the transforms to the data, it is assumed the provided data
481481
// has already been transformed appropriately.
482-
func (ps *PartitionSpec) PartitionToPath(data structLike, sc *Schema) string {
482+
func (ps *PartitionSpec) PartitionToPath(data StructLike, sc *Schema) string {
483483
partType := ps.PartitionType(sc)
484484

485485
if len(partType.FieldList) == 0 {

table/partitioned_fanout_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (s *FanoutWriterTestSuite) testTransformPartition(transform iceberg.Transfo
151151
fileCount++
152152
totalRecords += dataFile.Count()
153153

154-
partitionRec := getPartitionRecord(dataFile, spec.PartitionType(icebergSchema))
154+
partitionRec := GetPartitionRecord(dataFile, spec.PartitionType(icebergSchema))
155155
partitionPath := spec.PartitionToPath(partitionRec, icebergSchema)
156156
partitionPaths[partitionPath] += dataFile.Count()
157157
}

table/pos_delete_partitioned_fanout_writer.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,7 @@ func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext p
135135
return "", fmt.Errorf("unexpected missing partition spec in metadata for spec id %d", partitionContext.specID)
136136
}
137137

138-
data := make(partitionRecord, spec.NumFields())
139-
for i, field := range spec.Fields() {
140-
val, ok := partitionContext.partitionData[field.FieldID]
141-
if !ok {
142-
return "", fmt.Errorf("unexpected missing partition value for field id %d in spec id %d", field.FieldID, partitionContext.specID)
143-
}
144-
data[i] = val
145-
}
138+
data := newPartitionRecord(partitionContext.partitionData, spec.PartitionType(p.schema))
146139

147140
return spec.PartitionToPath(data, p.schema), nil
148141
}

table/scanner.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ func (m *manifestEntries) addPositionalDeleteEntry(e iceberg.ManifestEntry) {
119119
m.positionalDeleteEntries = append(m.positionalDeleteEntries, e)
120120
}
121121

122-
func getPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.StructType) partitionRecord {
123-
partitionData := dataFile.Partition()
124-
122+
func newPartitionRecord(partitionData map[int]any, partitionType *iceberg.StructType) partitionRecord {
125123
out := make(partitionRecord, len(partitionType.FieldList))
126124
for i, f := range partitionType.FieldList {
127125
out[i] = partitionData[f.ID]
@@ -130,6 +128,12 @@ func getPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.Struct
130128
return out
131129
}
132130

131+
// GetPartitionRecord converts a DataFile's partition map into a positional
132+
// record ordered by the fields of the given partition struct type.
133+
func GetPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.StructType) iceberg.StructLike {
134+
return newPartitionRecord(dataFile.Partition(), partitionType)
135+
}
136+
133137
func openManifest(io io.IO, manifest iceberg.ManifestFile,
134138
partitionFilter, metricsEval func(iceberg.DataFile) (bool, error),
135139
) ([]iceberg.ManifestEntry, error) {
@@ -284,7 +288,7 @@ func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile) (b
284288
}
285289

286290
return func(d iceberg.DataFile) (bool, error) {
287-
return fn(getPartitionRecord(d, partType))
291+
return fn(GetPartitionRecord(d, partType))
288292
}, nil
289293
}
290294

table/snapshots.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func (s *SnapshotSummaryCollector) addFile(df iceberg.DataFile, sc *iceberg.Sche
413413

414414
if len(df.Partition()) > 0 {
415415
partitionPath := spec.PartitionToPath(
416-
getPartitionRecord(df, spec.PartitionType(sc)), sc)
416+
GetPartitionRecord(df, spec.PartitionType(sc)), sc)
417417

418418
return s.updatePartitionMetrics(partitionPath, df, true)
419419
}
@@ -428,7 +428,7 @@ func (s *SnapshotSummaryCollector) removeFile(df iceberg.DataFile, sc *iceberg.S
428428

429429
if len(df.Partition()) > 0 {
430430
partitionPath := spec.PartitionToPath(
431-
getPartitionRecord(df, spec.PartitionType(sc)), sc)
431+
GetPartitionRecord(df, spec.PartitionType(sc)), sc)
432432

433433
return s.updatePartitionMetrics(partitionPath, df, false)
434434
}

utils.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ type Optional[T any] struct {
4343
Valid bool
4444
}
4545

46-
// represents a single row in a record
47-
type structLike interface {
46+
// StructLike represents a single row in a record.
47+
type StructLike interface {
4848
// Size returns the number of columns in this row
4949
Size() int
5050
// Get returns the value in the requested column,
@@ -64,11 +64,11 @@ func (a *accessor) String() string {
6464
return fmt.Sprintf("Accessor(position=%d, inner=%s)", a.pos, a.inner)
6565
}
6666

67-
func (a *accessor) Get(s structLike) any {
67+
func (a *accessor) Get(s StructLike) any {
6868
val, inner := s.Get(a.pos), a
6969
for val != nil && inner.inner != nil {
7070
inner = inner.inner
71-
val = val.(structLike).Get(inner.pos)
71+
val = val.(StructLike).Get(inner.pos)
7272
}
7373

7474
return val

visitors.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func (*bindVisitor) VisitBound(pred BoundPredicate) BooleanExpression {
186186
// ExpressionEvaluator returns a function which can be used to evaluate a given expression
187187
// as long as a structlike value is passed which operates like and matches the passed in
188188
// schema.
189-
func ExpressionEvaluator(s *Schema, unbound BooleanExpression, caseSensitive bool) (func(structLike) (bool, error), error) {
189+
func ExpressionEvaluator(s *Schema, unbound BooleanExpression, caseSensitive bool) (func(StructLike) (bool, error), error) {
190190
bound, err := BindExpr(s, unbound, caseSensitive)
191191
if err != nil {
192192
return nil, err
@@ -197,10 +197,10 @@ func ExpressionEvaluator(s *Schema, unbound BooleanExpression, caseSensitive boo
197197

198198
type exprEvaluator struct {
199199
bound BooleanExpression
200-
st structLike
200+
st StructLike
201201
}
202202

203-
func (e *exprEvaluator) Eval(st structLike) (bool, error) {
203+
func (e *exprEvaluator) Eval(st StructLike) (bool, error) {
204204
e.st = st
205205

206206
return VisitExpr(e.bound, e)
@@ -283,7 +283,7 @@ func nullsFirstCmp[T LiteralType](cmp Comparator[T], v1, v2 Optional[T]) int {
283283
return cmp(v1.Val, v2.Val)
284284
}
285285

286-
func typedCmp[T LiteralType](st structLike, term BoundTerm, lit Literal) int {
286+
func typedCmp[T LiteralType](st StructLike, term BoundTerm, lit Literal) int {
287287
v := term.(bound[T]).eval(st)
288288
var l Optional[T]
289289

@@ -296,7 +296,7 @@ func typedCmp[T LiteralType](st structLike, term BoundTerm, lit Literal) int {
296296
return nullsFirstCmp(rhs.Comparator(), v, l)
297297
}
298298

299-
func doCmp(st structLike, term BoundTerm, lit Literal) int {
299+
func doCmp(st StructLike, term BoundTerm, lit Literal) int {
300300
// we already properly casted and converted everything during binding
301301
// so we can type assert based on the term type
302302
switch term.Type().(type) {

0 commit comments

Comments
 (0)