Skip to content

Commit cc8a6ff

Browse files
authored
feat(table): roll parquet files based on actual compressed size (#759)
This change refactors datafile writing to use the actual written file size as iceberg-java & iceberg-rust do instead of the in-memory size.
1 parent e3b130b commit cc8a6ff

10 files changed

+690
-141
lines changed

manifest.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"reflect"
2828
"slices"
2929
"strconv"
30+
"strings"
3031
"sync"
3132
"time"
3233

@@ -1584,6 +1585,20 @@ const (
15841585
ParquetFile FileFormat = "PARQUET"
15851586
)
15861587

1588+
// FileFormatFromString parses a file format string (case-insensitive).
1589+
func FileFormatFromString(s string) (FileFormat, error) {
1590+
switch strings.ToUpper(s) {
1591+
case string(ParquetFile):
1592+
return ParquetFile, nil
1593+
case string(OrcFile):
1594+
return OrcFile, nil
1595+
case string(AvroFile):
1596+
return AvroFile, nil
1597+
default:
1598+
return "", fmt.Errorf("unknown file format: %s", s)
1599+
}
1600+
}
1601+
15871602
type colMap[K, V any] struct {
15881603
Key K `avro:"key"`
15891604
Value V `avro:"value"`

table/arrow_utils.go

Lines changed: 66 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,54 +1332,77 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
13321332
yield(nil, err)
13331333
}
13341334
}
1335-
currentSpec, err := meta.CurrentSpec()
1336-
if err != nil {
1337-
return func(yield func(iceberg.DataFile, error) bool) {
1338-
yield(nil, err)
1339-
}
1340-
}
1341-
if currentSpec == nil {
1342-
return func(yield func(iceberg.DataFile, error) bool) {
1343-
yield(nil, fmt.Errorf("cannot write files without a current spec: %w", err))
1344-
}
1345-
}
13461335

13471336
cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
13481337
return newDataFileWriter(rootLocation, fs, meta, props, opts...)
13491338
})
1350-
nextCount, stopCount := iter.Pull(args.counter)
1351-
if currentSpec.IsUnpartitioned() {
1352-
tasks := func(yield func(WriteTask) bool) {
1353-
defer stopCount()
13541339

1355-
fileCount := 0
1356-
for batch := range binPackRecords(args.itr, defaultBinPackLookback, targetFileSize) {
1357-
cnt, _ := nextCount()
1358-
fileCount++
1359-
t := WriteTask{
1360-
Uuid: *args.writeUUID,
1361-
ID: cnt,
1362-
PartitionID: iceberg.UnpartitionedSpec.ID(),
1363-
FileCount: fileCount,
1364-
Schema: taskSchema,
1365-
Batches: batch,
1366-
}
1367-
if !yield(t) {
1368-
return
1369-
}
1370-
}
1371-
}
1340+
factory, err := newWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize)
1341+
if err != nil {
1342+
panic(err)
1343+
}
13721344

1373-
return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks)
1345+
if factory.currentSpec.IsUnpartitioned() {
1346+
return unpartitionedWrite(ctx, factory, args.itr)
13741347
}
13751348

1376-
factory := NewWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize)
1377-
partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, meta.CurrentSchema(), args.itr, &factory)
1349+
partitionWriter := newPartitionedFanoutWriter(factory.currentSpec, cw, meta.CurrentSchema(), args.itr, factory)
13781350
workers := config.EnvConfig.MaxWorkers
13791351

13801352
return partitionWriter.Write(ctx, workers)
13811353
}
13821354

1355+
func unpartitionedWrite(ctx context.Context, factory *writerFactory, records iter.Seq2[arrow.RecordBatch, error]) iter.Seq2[iceberg.DataFile, error] {
1356+
outputCh := make(chan iceberg.DataFile, 1)
1357+
errCh := make(chan error, 1)
1358+
1359+
go func() {
1360+
defer close(outputCh)
1361+
defer factory.stopCount()
1362+
1363+
writer := factory.newRollingDataWriter(ctx, nil, "", nil, outputCh)
1364+
for rec, err := range records {
1365+
if err != nil {
1366+
errCh <- err
1367+
close(errCh)
1368+
writer.close()
1369+
writer.wg.Wait()
1370+
1371+
return
1372+
}
1373+
if err := writer.Add(rec); err != nil {
1374+
errCh <- err
1375+
close(errCh)
1376+
writer.close()
1377+
writer.wg.Wait()
1378+
1379+
return
1380+
}
1381+
}
1382+
close(writer.recordCh)
1383+
writer.wg.Wait()
1384+
if err := <-writer.errorCh; err != nil {
1385+
errCh <- err
1386+
}
1387+
close(errCh)
1388+
}()
1389+
1390+
return func(yield func(iceberg.DataFile, error) bool) {
1391+
defer func() {
1392+
for range outputCh {
1393+
}
1394+
}()
1395+
for df := range outputCh {
1396+
if !yield(df, nil) {
1397+
return
1398+
}
1399+
}
1400+
if err := <-errCh; err != nil {
1401+
yield(nil, err)
1402+
}
1403+
}
1404+
}
1405+
13831406
type partitionContext struct {
13841407
partitionData map[int]any
13851408
specID int32
@@ -1448,8 +1471,14 @@ func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string,
14481471

14491472
return cw.writeFiles(ctx, rootLocation, args.fs, meta, meta.props, nil, tasks)
14501473
}
1451-
writerFactory := NewWriterFactory(rootLocation, args, meta, iceberg.PositionalDeleteSchema, targetFileSize)
1452-
partitionWriter := newPositionDeletePartitionedFanoutWriter(latestMetadata, cw, partitionContextByFilePath, args.itr, &writerFactory)
1474+
factory, err := newWriterFactory(rootLocation, args, meta, iceberg.PositionalDeleteSchema, targetFileSize,
1475+
withContentType(iceberg.EntryContentPosDeletes),
1476+
withFactoryFileSchema(iceberg.PositionalDeleteSchema))
1477+
if err != nil {
1478+
panic(err)
1479+
}
1480+
1481+
partitionWriter := newPositionDeletePartitionedFanoutWriter(latestMetadata, cw, partitionContextByFilePath, args.itr, factory)
14531482
workers := config.EnvConfig.MaxWorkers
14541483

14551484
return partitionWriter.Write(ctx, workers)

table/internal/interfaces.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,22 @@ type FileReader interface {
7373
ReadTable(context.Context) (arrow.Table, error)
7474
}
7575

76+
// FileWriter is an incremental single-file writer with open/write/close
77+
// lifecycle. It writes Arrow record batches and tracks bytes written for
78+
// rolling file decisions.
79+
type FileWriter interface {
80+
Write(arrow.RecordBatch) error
81+
BytesWritten() int64
82+
Close() (iceberg.DataFile, error)
83+
}
84+
7685
type FileFormat interface {
7786
Open(context.Context, iceio.IO, string) (FileReader, error)
7887
PathToIDMapping(*iceberg.Schema) (map[string]int, error)
7988
DataFileStatsFromMeta(rdr Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics
8089
GetWriteProperties(iceberg.Properties) any
8190
WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (iceberg.DataFile, error)
91+
NewFileWriter(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, arrowSchema *arrow.Schema) (FileWriter, error)
8292
}
8393

8494
func GetFileFormat(format iceberg.FileFormat) FileFormat {

table/internal/parquet_files.go

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"io"
2425
"maps"
2526
"slices"
2627
"strconv"
@@ -243,46 +244,103 @@ func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
243244
parquet.WithCompressionLevel(compressionLevel))
244245
}
245246

246-
func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (_ iceberg.DataFile, err error) {
247+
func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (iceberg.DataFile, error) {
248+
w, err := p.NewFileWriter(ctx, fs, partitionValues, info, batches[0].Schema())
249+
if err != nil {
250+
return nil, err
251+
}
252+
253+
for _, batch := range batches {
254+
if err := w.Write(batch); err != nil {
255+
w.Close()
256+
257+
return nil, err
258+
}
259+
}
260+
261+
return w.Close()
262+
}
263+
264+
// ParquetFileWriter is an incremental single-file writer with open/write/close
265+
// lifecycle. It writes Arrow record batches to a Parquet file and tracks bytes
266+
// written for rolling file decisions.
267+
type ParquetFileWriter struct {
268+
pqWriter *pqarrow.FileWriter
269+
counter *internal.CountingWriter
270+
fileCloser io.Closer
271+
format parquetFormat
272+
info WriteFileInfo
273+
partition map[int]any
274+
colMapping map[string]int
275+
}
276+
277+
// NewFileWriter creates a ParquetFileWriter that writes batches to a single
278+
// Parquet file. Call Write to append batches, BytesWritten to check actual
279+
// compressed file size, and Close to finalize and get the resulting DataFile.
280+
func (p parquetFormat) NewFileWriter(ctx context.Context, fs iceio.WriteFileIO,
281+
partitionValues map[int]any, info WriteFileInfo, arrowSchema *arrow.Schema,
282+
) (FileWriter, error) {
247283
fw, err := fs.Create(info.FileName)
248284
if err != nil {
249285
return nil, err
250286
}
251287

252-
defer internal.CheckedClose(fw, &err)
288+
colMapping, err := p.PathToIDMapping(info.FileSchema)
289+
if err != nil {
290+
fw.Close()
253291

254-
cntWriter := internal.CountingWriter{W: fw}
292+
return nil, err
293+
}
294+
295+
counter := &internal.CountingWriter{W: fw}
255296
mem := compute.GetAllocator(ctx)
256297
writerProps := parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
257298
arrProps := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), pqarrow.WithStoreSchema())
258299

259-
writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter, writerProps, arrProps)
300+
writer, err := pqarrow.NewFileWriter(arrowSchema, counter, writerProps, arrProps)
260301
if err != nil {
302+
fw.Close()
303+
261304
return nil, err
262305
}
263306

264-
for _, batch := range batches {
265-
if err := writer.WriteBuffered(batch); err != nil {
266-
return nil, err
267-
}
268-
}
307+
return &ParquetFileWriter{
308+
pqWriter: writer,
309+
counter: counter,
310+
fileCloser: fw,
311+
format: p,
312+
info: info,
313+
partition: partitionValues,
314+
colMapping: colMapping,
315+
}, nil
316+
}
269317

270-
if err := writer.Close(); err != nil {
271-
return nil, err
272-
}
318+
// Write appends a record batch to the Parquet file.
319+
func (w *ParquetFileWriter) Write(batch arrow.RecordBatch) error {
320+
return w.pqWriter.WriteBuffered(batch)
321+
}
273322

274-
filemeta, err := writer.FileMetadata()
275-
if err != nil {
323+
// BytesWritten returns the number of bytes flushed to the output so far.
324+
func (w *ParquetFileWriter) BytesWritten() int64 {
325+
return w.counter.Count
326+
}
327+
328+
// Close finalizes the Parquet file and returns the resulting DataFile with
329+
// accurate file statistics and size.
330+
func (w *ParquetFileWriter) Close() (_ iceberg.DataFile, err error) {
331+
defer internal.CheckedClose(w.fileCloser, &err)
332+
333+
if err = w.pqWriter.Close(); err != nil {
276334
return nil, err
277335
}
278336

279-
colMapping, err := p.PathToIDMapping(info.FileSchema)
337+
filemeta, err := w.pqWriter.FileMetadata()
280338
if err != nil {
281339
return nil, err
282340
}
283341

284-
return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping).
285-
ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, info.Content, cntWriter.Count, partitionValues), nil
342+
return w.format.DataFileStatsFromMeta(filemeta, w.info.StatsCols, w.colMapping).
343+
ToDataFile(w.info.FileSchema, w.info.Spec, w.info.FileName, iceberg.ParquetFile, w.info.Content, w.counter.Count, w.partition), nil
286344
}
287345

288346
type decAsIntAgg[T int32 | int64] struct {

table/partitioned_fanout_writer_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,13 @@ func (s *FanoutWriterTestSuite) testTransformPartition(transform iceberg.Transfo
130130
},
131131
}
132132

133-
nameMapping := icebergSchema.NameMapping()
134-
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
135-
s.Require().NoError(err)
136-
137133
cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
138134
return newDataFileWriter(rootLocation, fs, meta, props, opts...)
139135
})
140-
writerFactory := NewWriterFactory(loc, args, metaBuilder, icebergSchema, 1024*1024)
141-
partitionWriter := newPartitionedFanoutWriter(spec, cw, taskSchema, args.itr, &writerFactory)
136+
rollingDataWriters, err := newWriterFactory(loc, args, metaBuilder, icebergSchema, 1024*1024)
137+
s.Require().NoError(err)
138+
139+
partitionWriter := newPartitionedFanoutWriter(spec, cw, icebergSchema, args.itr, rollingDataWriters)
142140
workers := config.EnvConfig.MaxWorkers
143141

144142
dataFiles := partitionWriter.Write(s.ctx, workers)

table/pos_delete_partitioned_fanout_writer_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,16 @@ func TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
129129
cw := newConcurrentDataFileWriter(func(rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
130130
return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...)
131131
})
132-
writerFactory := NewWriterFactory(t.TempDir(), recordWritingArgs{
132+
factory, err := newWriterFactory(t.TempDir(), recordWritingArgs{
133133
fs: &io.LocalFS{},
134134
sc: PositionalDeleteArrowSchema,
135135
writeUUID: &writeUUID,
136136
counter: internal.Counter(0),
137-
}, metadataBuilder, iceberg.PositionalDeleteSchema, 1024*1024)
138-
writer := newPositionDeletePartitionedFanoutWriter(latestMeta, cw, tc.pathToPartitionContext, nil, &writerFactory)
137+
}, metadataBuilder, iceberg.PositionalDeleteSchema, 1024*1024,
138+
withContentType(iceberg.EntryContentPosDeletes),
139+
withFactoryFileSchema(iceberg.PositionalDeleteSchema))
139140
require.NoError(t, err)
141+
writer := newPositionDeletePartitionedFanoutWriter(latestMeta, cw, tc.pathToPartitionContext, nil, factory)
140142

141143
dataFileCh := make(chan iceberg.DataFile, 10)
142144
err = writer.processBatch(ctx, tc.input, dataFileCh)
@@ -147,7 +149,7 @@ func TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
147149
}
148150
require.NoError(t, err)
149151

150-
err = writerFactory.closeAll()
152+
err = factory.closeAll()
151153
require.NoError(t, err)
152154

153155
close(dataFileCh)

table/properties.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ const (
7979
MetadataCompressionKey = "write.metadata.compression-codec"
8080
MetadataCompressionDefault = "none"
8181

82+
WriteFormatDefaultKey = "write.format.default"
83+
WriteFormatDefaultDefault = "parquet"
84+
8285
WriteTargetFileSizeBytesKey = "write.target-file-size-bytes"
8386
WriteTargetFileSizeBytesDefault = 512 * 1024 * 1024 // 512 MB
8487

0 commit comments

Comments
 (0)