Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 64 additions & 51 deletions table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
tblutils "github.com/apache/iceberg-go/table/internal"
"github.com/google/uuid"
"github.com/pterm/pterm"
"golang.org/x/sync/errgroup"
)

// constants to look for as Keys in Arrow field metadata
Expand Down Expand Up @@ -1343,72 +1344,84 @@ func computeStatsPlan(sc *iceberg.Schema, props iceberg.Properties) (map[int]tbl
return result, nil
}

func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
return func(yield func(iceberg.DataFile, error) bool) {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
yield(nil, fmt.Errorf("error encountered during file conversion: %s", e))
case error:
yield(nil, fmt.Errorf("error encountered during file conversion: %w", e))
}
}
}()

partitionSpec, err := meta.CurrentSpec()
if err != nil || partitionSpec == nil {
yield(nil, fmt.Errorf("%w: cannot add files without a current spec", err))

return
}
func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, err error) {
partitionSpec, err := meta.CurrentSpec()
if err != nil || partitionSpec == nil {
return nil, fmt.Errorf("%w: cannot add files without a current spec", err)
}

currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec

for filePath := range paths {
format := tblutils.FormatFromFileName(filePath)
rdr := must(format.Open(ctx, fileIO, filePath))
// TODO: take a look at this defer Close() and consider refactoring
defer rdr.Close()
dataFiles := make([]iceberg.DataFile, len(filePaths))
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
for i, filePath := range filePaths {
eg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
err = fmt.Errorf("error encountered during file conversion: %s", e)
case error:
err = fmt.Errorf("error encountered during file conversion: %w", e)
}
}
}()

arrSchema := must(rdr.Schema())
dataFile, err := fileToDataFile(ctx, fileIO, filePath, currentSchema, currentSpec, meta.props)
if err != nil {
return err
}
dataFiles[i] = dataFile

if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil {
yield(nil, err)
return nil
})
}

return
}
if err := eg.Wait(); err != nil {
return nil, err
}

pathToIDSchema := currentSchema
if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); fileHasIDs {
pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false, nil))
}
return dataFiles, nil
}

statistics := format.DataFileStatsFromMeta(rdr.Metadata(), must(computeStatsPlan(currentSchema, meta.props)),
must(format.PathToIDMapping(pathToIDSchema)))
func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string, currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props iceberg.Properties) (iceberg.DataFile, error) {
format := tblutils.FormatFromFileName(filePath)
rdr := must(format.Open(ctx, fileIO, filePath))
defer rdr.Close()

partitionValues := make(map[int]any)
if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
for _, field := range currentSpec.Fields() {
if !field.Transform.PreservesOrder() {
yield(nil, fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform))
arrSchema := must(rdr.Schema())
if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil {
return nil, err
}

return
}
pathToIDSchema := currentSchema
if fileHasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); fileHasIDs {
pathToIDSchema = must(ArrowSchemaToIceberg(arrSchema, false, nil))
}
statistics := format.DataFileStatsFromMeta(
rdr.Metadata(),
must(computeStatsPlan(currentSchema, props)),
must(format.PathToIDMapping(pathToIDSchema)),
)

partitionVal := statistics.PartitionValue(field, currentSchema)
if partitionVal != nil {
partitionValues[field.FieldID] = partitionVal
}
}
partitionValues := make(map[int]any)
if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
for _, field := range currentSpec.Fields() {
if !field.Transform.PreservesOrder() {
return nil, fmt.Errorf("cannot infer partition value from parquet metadata for a non-linear partition field: %s with transform %s", field.Name, field.Transform)
}

df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues)
if !yield(df, nil) {
return
partitionVal := statistics.PartitionValue(field, currentSchema)
if partitionVal != nil {
partitionValues[field.FieldID] = partitionVal
}
}
}

dataFile := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(), partitionValues)

return dataFile, nil
}

func recordNBytes(rec arrow.RecordBatch) (total int64) {
Expand Down
59 changes: 40 additions & 19 deletions table/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"fmt"
"iter"
"runtime"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -421,12 +420,12 @@ func (t *Transaction) ReplaceDataFiles(ctx context.Context, filesToDelete, files
updater.deleteDataFile(df)
}

dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(filesToAdd))
for df, err := range dataFiles {
if err != nil {
return err
}
updater.appendDataFile(df)
dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filesToAdd, 1)
if err != nil {
return err
}
for _, dataFile := range dataFiles {
updater.appendDataFile(dataFile)
}

updates, reqs, err := updater.commit()
Expand Down Expand Up @@ -728,14 +727,36 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx context.Context, filesTo
return t.apply(updates, reqs)
}

func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProps iceberg.Properties, ignoreDuplicates bool) error {
set := make(map[string]string)
for _, f := range files {
set[f] = f
type AddFilesOption func(addFilesOp *addFilesOperation)

type addFilesOperation struct {
concurrency int
}

// WithAddFilesConcurrency overwrites the default concurrency for add files operation.
// Default: runtime.GOMAXPROCS(0)
func WithAddFilesConcurrency(concurrency int) AddFilesOption {
return func(op *addFilesOperation) {
if concurrency > 0 {
op.concurrency = concurrency
}
}
}

if len(set) != len(files) {
return errors.New("file paths must be unique for AddFiles")
func (t *Transaction) AddFiles(ctx context.Context, filePaths []string, snapshotProps iceberg.Properties, ignoreDuplicates bool, opts ...AddFilesOption) error {
addFilesOp := addFilesOperation{
concurrency: runtime.GOMAXPROCS(0),
}
for _, apply := range opts {
apply(&addFilesOp)
}

set := make(map[string]struct{}, len(filePaths))
for _, filePath := range filePaths {
if _, ok := set[filePath]; ok {
return errors.New("file paths must be unique for AddFiles")
}
set[filePath] = struct{}{}
}

if !ignoreDuplicates {
Expand Down Expand Up @@ -779,12 +800,12 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp

updater := t.updateSnapshot(fs, snapshotProps, OpAppend).fastAppend()

dataFiles := filesToDataFiles(ctx, fs, t.meta, slices.Values(files))
for df, err := range dataFiles {
if err != nil {
return err
}
updater.appendDataFile(df)
dataFiles, err := filesToDataFiles(ctx, fs, t.meta, filePaths, addFilesOp.concurrency)
if err != nil {
return err
}
for _, dataFile := range dataFiles {
updater.appendDataFile(dataFile)
}

updates, reqs, err := updater.commit()
Expand Down
Loading