Skip to content

Commit 492810d

Browse files
committed
fix(table): AddFiles: close file after usage and parallelize
1 parent 55bdfbf commit 492810d

File tree

2 files changed

+102
-71
lines changed

2 files changed

+102
-71
lines changed

table/arrow_utils.go

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
tblutils "github.com/apache/iceberg-go/table/internal"
4343
"github.com/google/uuid"
4444
"github.com/pterm/pterm"
45+
"golang.org/x/sync/errgroup"
4546
)
4647

4748
// constants to look for as Keys in Arrow field metadata
@@ -1343,74 +1344,83 @@ func computeStatsPlan(sc *iceberg.Schema, props iceberg.Properties) (map[int]tbl
13431344
return result, nil
13441345
}
13451346

1346-
func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
1347-
return func(yield func(iceberg.DataFile, error) bool) {
1348-
defer func() {
1349-
if r := recover(); r != nil {
1350-
switch e := r.(type) {
1351-
case string:
1352-
yield(nil, fmt.Errorf("error encountered during file conversion: %s", e))
1353-
case error:
1354-
yield(nil, fmt.Errorf("error encountered during file conversion: %w", e))
1355-
}
1356-
}
1357-
}()
1358-
1359-
partitionSpec, err := meta.CurrentSpec()
1360-
if err != nil || partitionSpec == nil {
1361-
yield(nil, fmt.Errorf("%w: cannot add files without a current spec", err))
1362-
1363-
return
1364-
}
1347+
func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, err error) {
1348+
partitionSpec, err := meta.CurrentSpec()
1349+
if err != nil || partitionSpec == nil {
1350+
return nil, fmt.Errorf("%w: cannot add files without a current spec", err)
1351+
}
13651352

1366-
currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
1353+
currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
13671354

1368-
for filePath := range paths {
1369-
format := tblutils.FormatFromFileName(filePath)
1370-
rdr := must(format.Open(ctx, fileIO, filePath))
1371-
// TODO: take a look at this defer Close() and consider refactoring
1372-
defer rdr.Close()
1355+
dataFiles := make([]iceberg.DataFile, len(filePaths))
1356+
eg, ctx := errgroup.WithContext(ctx)
1357+
eg.SetLimit(concurrency)
1358+
for i, filePath := range filePaths {
1359+
eg.Go(func() (err error) {
1360+
defer func() {
1361+
if r := recover(); r != nil {
1362+
switch e := r.(type) {
1363+
case string:
1364+
err = fmt.Errorf("error encountered during file conversion: %s", e)
1365+
case error:
1366+
err = fmt.Errorf("error encountered during file conversion: %w", e)
1367+
}
1368+
}
1369+
}()
13731370

1374-
arrSchema := must(rdr.Schema())
1371+
dataFile, err := fileToDataFile(ctx, fileIO, currentSchema, currentSpec, meta.props, filePath)
1372+
if err != nil {
1373+
return err
1374+
}
1375+
dataFiles[i] = dataFile
13751376

1376-
if hasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); hasIDs {
1377-
yield(nil, fmt.Errorf("%w: cannot add file %s because it has field-ids. add-files only supports the addition of files without field_ids",
1378-
iceberg.ErrNotImplemented, filePath))
1377+
return nil
1378+
})
1379+
}
13791380

1380-
return
1381-
}
1381+
if err := eg.Wait(); err != nil {
1382+
return nil, err
1383+
}
13821384

1383-
if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil {
1384-
yield(nil, err)
1385+
return dataFiles, nil
1386+
}
13851387

1386-
return
1387-
}
1388+
func fileToDataFile(ctx context.Context, fileIO iceio.IO, currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props iceberg.Properties, filePath string) (iceberg.DataFile, error) {
1389+
format := tblutils.FormatFromFileName(filePath)
1390+
rdr := must(format.Open(ctx, fileIO, filePath))
1391+
defer rdr.Close()
13881392

1389-
statistics := format.DataFileStatsFromMeta(rdr.Metadata(), must(computeStatsPlan(currentSchema, meta.props)),
1390-
must(format.PathToIDMapping(currentSchema)))
1393+
arrSchema := must(rdr.Schema())
1394+
if hasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); hasIDs {
1395+
return nil, fmt.Errorf("%w: cannot add file %s because it has field-ids. add-files only supports the addition of files without field_ids",
1396+
iceberg.ErrNotImplemented, filePath)
1397+
}
13911398

1392-
partitionValues := make(map[int]any)
1393-
if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
1394-
for _, field := range currentSpec.Fields() {
1395-
if !field.Transform.PreservesOrder() {
1396-
yield(nil, fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform))
1399+
if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil {
1400+
return nil, err
1401+
}
13971402

1398-
return
1399-
}
1403+
statsCols := must(computeStatsPlan(currentSchema, props))
1404+
colMapping := must(format.PathToIDMapping(currentSchema))
1405+
statistics := format.DataFileStatsFromMeta(rdr.Metadata(), statsCols, colMapping)
14001406

1401-
partitionVal := statistics.PartitionValue(field, currentSchema)
1402-
if partitionVal != nil {
1403-
partitionValues[field.FieldID] = partitionVal
1404-
}
1405-
}
1407+
partitionValues := make(map[int]any)
1408+
if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
1409+
for _, field := range currentSpec.Fields() {
1410+
if !field.Transform.PreservesOrder() {
1411+
return nil, fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform)
14061412
}
14071413

1408-
df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues)
1409-
if !yield(df, nil) {
1410-
return
1414+
partitionVal := statistics.PartitionValue(field, currentSchema)
1415+
if partitionVal != nil {
1416+
partitionValues[field.FieldID] = partitionVal
14111417
}
14121418
}
14131419
}
1420+
1421+
dataFile := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues)
1422+
1423+
return dataFile, nil
14141424
}
14151425

14161426
func recordNBytes(rec arrow.RecordBatch) (total int64) {

table/transaction.go

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"fmt"
2626
"iter"
2727
"runtime"
28-
"slices"
2928
"sync"
3029
"time"
3130

@@ -421,12 +420,12 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files
421420
updater.deleteDataFile(df)
422421
}
423422

424-
dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(filesToAdd))
425-
for df, err := range dataFiles {
426-
if err != nil {
427-
return err
428-
}
429-
updater.appendDataFile(df)
423+
dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filesToAdd, 1)
424+
if err != nil {
425+
return err
426+
}
427+
for _, dataFile := range dataFiles {
428+
updater.appendDataFile(dataFile)
430429
}
431430

432431
updates, reqs, err := updater.commit()
@@ -685,14 +684,36 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo
685684
return t.apply(updates, reqs)
686685
}
687686

688-
func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
689-
set := make(map[string]string)
690-
for _, f := range files {
691-
set[f] = f
687+
type AddFilesOption func(addFilesOp *addFilesOperation)
688+
689+
type addFilesOperation struct {
690+
concurrency int
691+
}
692+
693+
// WithAddFilesConcurrency overwrites the default concurrency for add files operation.
694+
// Default: runtime.GOMAXPROCS(0)
695+
func WithAddFilesConcurrency(concurrency int) AddFilesOption {
696+
return func(op *addFilesOperation) {
697+
if concurrency > 0 {
698+
op.concurrency = concurrency
699+
}
692700
}
701+
}
693702

694-
if len(set) != len(files) {
695-
return errors.New("file paths must be unique for AddFiles")
703+
func (t *Transaction) AddFiles(ctx context.Context, filePaths []string, snapshotProps iceberg.Properties, ignoreDuplicates bool, opts ...AddFilesOption) error {
704+
addFilesOp := addFilesOperation{
705+
concurrency: runtime.GOMAXPROCS(0),
706+
}
707+
for _, apply := range opts {
708+
apply(&addFilesOp)
709+
}
710+
711+
set := make(map[string]struct{}, len(filePaths))
712+
for _, filePath := range filePaths {
713+
if _, ok := set[filePath]; ok {
714+
return errors.New("file paths must be unique for AddFiles")
715+
}
716+
set[filePath] = struct{}{}
696717
}
697718

698719
if !ignoreDuplicates {
@@ -736,12 +757,12 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp
736757

737758
updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()
738759

739-
dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
740-
for df, err := range dataFiles {
741-
if err != nil {
742-
return err
743-
}
744-
updater.appendDataFile(df)
760+
dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filePaths, addFilesOp.concurrency)
761+
if err != nil {
762+
return err
763+
}
764+
for _, dataFile := range dataFiles {
765+
updater.appendDataFile(dataFile)
745766
}
746767

747768
updates, reqs, err := updater.commit()

0 commit comments

Comments
 (0)