From 10e5b060bdf145f4012e58271564a08270ad95b6 Mon Sep 17 00:00:00 2001 From: starpact Date: Wed, 18 Mar 2026 15:18:43 +0800 Subject: [PATCH] fix(table): AddFiles: close file after usage and parallelize --- table/arrow_utils.go | 109 +++++++++++++++++++++++-------------------- table/table_test.go | 2 +- table/transaction.go | 59 +++++++++++++++-------- 3 files changed, 99 insertions(+), 71 deletions(-) diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 16bbc0082..3215512d4 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -42,6 +42,7 @@ import ( tblutils "github.com/apache/iceberg-go/table/internal" "github.com/google/uuid" "github.com/pterm/pterm" + "golang.org/x/sync/errgroup" ) // constants to look for as Keys in Arrow field metadata @@ -1343,72 +1344,78 @@ func computeStatsPlan(sc *iceberg.Schema, props iceberg.Properties) (map[int]tbl return result, nil } -func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] { - return func(yield func(iceberg.DataFile, error) bool) { - defer func() { - if r := recover(); r != nil { - switch e := r.(type) { - case string: - yield(nil, fmt.Errorf("error encountered during file conversion: %s", e)) - case error: - yield(nil, fmt.Errorf("error encountered during file conversion: %w", e)) - } - } - }() - - partitionSpec, err := meta.CurrentSpec() - if err != nil || partitionSpec == nil { - yield(nil, fmt.Errorf("%w: cannot add files without a current spec", err)) - - return - } +func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, err error) { + partitionSpec, err := meta.CurrentSpec() + if err != nil || partitionSpec == nil { + return nil, fmt.Errorf("%w: cannot add files without a current spec", err) + } - currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec + currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec - for filePath := range paths { - format := tblutils.FormatFromFileName(filePath) - rdr := must(format.Open(ctx, fileIO, filePath)) - // TODO: take a look at this defer Close() and consider refactoring - defer rdr.Close() + dataFiles := make([]iceberg.DataFile, len(filePaths)) + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(concurrency) + for i, filePath := range filePaths { + eg.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case error: + err = fmt.Errorf("error encountered during file conversion: %w", e) + default: + err = fmt.Errorf("error encountered during file conversion: %v", e) + } + } + }() - arrSchema := must(rdr.Schema()) + dataFiles[i] = fileToDataFile(ctx, fileIO, filePath, currentSchema, currentSpec, meta.props) - if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil { - yield(nil, err) + return nil + }) + } - return - } + if err := eg.Wait(); err != nil { + return nil, err + } - pathToIDSchema := currentSchema - if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); fileHasIDs { - pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false, nil)) - } + return dataFiles, nil +} - statistics := format.DataFileStatsFromMeta(rdr.Metadata(), must(computeStatsPlan(currentSchema, meta.props)), - must(format.PathToIDMapping(pathToIDSchema))) +func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string, currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props iceberg.Properties) iceberg.DataFile { + format := tblutils.FormatFromFileName(filePath) + rdr := must(format.Open(ctx, fileIO, filePath)) + defer rdr.Close() - partitionValues := make(map[int]any) - if !currentSpec.Equals(*iceberg.UnpartitionedSpec) { - for _, field := range currentSpec.Fields() { - if !field.Transform.PreservesOrder() { - yield(nil, fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform)) + arrSchema := must(rdr.Schema()) + if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil { + panic(err) + } - return - } + pathToIDSchema := currentSchema + if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); fileHasIDs { + pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false, nil)) + } + statistics := format.DataFileStatsFromMeta( + rdr.Metadata(), + must(computeStatsPlan(currentSchema, props)), + must(format.PathToIDMapping(pathToIDSchema)), + ) - partitionVal := statistics.PartitionValue(field, currentSchema) - if partitionVal != nil { - partitionValues[field.FieldID] = partitionVal - } - } + partitionValues := make(map[int]any) + if !currentSpec.Equals(*iceberg.UnpartitionedSpec) { + for _, field := range currentSpec.Fields() { + if !field.Transform.PreservesOrder() { + panic(fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform)) } - df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues) - if !yield(df, nil) { - return + partitionVal := statistics.PartitionValue(field, currentSchema) + if partitionVal != nil { + partitionValues[field.FieldID] = partitionVal } } } + + return statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues) } func recordNBytes(rec arrow.RecordBatch) (total int64) { diff --git a/table/table_test.go b/table/table_test.go index 59927d1fd..9f5733433 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -596,7 +596,7 @@ func (t *TableWritingTestSuite) TestAddFilesFailsSchemaMismatch() { tx := tbl.NewTransaction() err = tx.AddFiles(t.ctx, files, nil, false) t.Error(err) - t.EqualError(err, `invalid schema: mismatch in fields: + t.EqualError(err, `error encountered during file conversion: invalid schema: mismatch in fields: | Table Field | Requested Field ✅ | 1: foo: optional boolean | 1: foo: optional boolean ✅ | 2: bar: optional string | 2: bar: optional string diff --git a/table/transaction.go b/table/transaction.go index a28457a31..655ee7487 100644 --- a/table/transaction.go +++ b/table/transaction.go @@ -25,7 +25,6 @@ import ( "fmt" "iter" "runtime" - "slices" "sync" "time" @@ -421,12 +420,12 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files updater.deleteDataFile(df) } - dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(filesToAdd)) - for df, err := range dataFiles { - if err != nil { - return err - } - updater.appendDataFile(df) + dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filesToAdd, 1) + if err != nil { + return err + } + for _, dataFile := range dataFiles { + updater.appendDataFile(dataFile) } updates, reqs, err := updater.commit() @@ -728,14 +727,36 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo return t.apply(updates, reqs) } -func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error { - set := make(map[string]string) - for _, f := range files { - set[f] = f +type AddFilesOption func(addFilesOp *addFilesOperation) + +type addFilesOperation struct { + concurrency int +} + +// WithAddFilesConcurrency overwrites the default concurrency for add files operation. +// Default: runtime.GOMAXPROCS(0) +func WithAddFilesConcurrency(concurrency int) AddFilesOption { + return func(op *addFilesOperation) { + if concurrency > 0 { + op.concurrency = concurrency + } } +} - if len(set) != len(files) { - return errors.New("file paths must be unique for AddFiles") +func (t *Transaction) AddFiles(ctx context.Context, filePaths []string, snapshotProps iceberg.Properties, ignoreDuplicates bool, opts ...AddFilesOption) error { + addFilesOp := addFilesOperation{ + concurrency: runtime.GOMAXPROCS(0), + } + for _, apply := range opts { + apply(&addFilesOp) + } + + set := make(map[string]struct{}, len(filePaths)) + for _, filePath := range filePaths { + if _, ok := set[filePath]; ok { + return errors.New("file paths must be unique for AddFiles") + } + set[filePath] = struct{}{} } if !ignoreDuplicates { @@ -779,12 +800,12 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend() - dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files)) - for df, err := range dataFiles { - if err != nil { - return err - } - updater.appendDataFile(df) + dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filePaths, addFilesOp.concurrency) + if err != nil { + return err + } + for _, dataFile := range dataFiles { + updater.appendDataFile(dataFile) } updates, reqs, err := updater.commit()