Skip to content

Commit 7b2ea9b

Browse files
authored
chore: pass writerFactory to newPartitionedFanoutWrite (#743)
Just a small cleanup to improve ergonomics, the only two call sites of newPartitionedFanoutWriter were setting the writers field immediately after instantiation, this way it's harder to write incorrect code which forgets to set them.
1 parent 1114e38 commit 7b2ea9b

File tree

6 files changed

+18
-20
lines changed

6 files changed

+18
-20
lines changed

table/arrow_utils.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,9 +1360,8 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata
13601360

13611361
return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
13621362
} else {
1363-
partitionWriter := newPartitionedFanoutWriter(*currentSpec, meta.CurrentSchema(), args.itr)
13641363
rollingDataWriters := NewWriterFactory(rootLocation, args, meta, taskSchema, targetFileSize)
1365-
partitionWriter.writers = &rollingDataWriters
1364+
partitionWriter := newPartitionedFanoutWriter(*currentSpec, meta.CurrentSchema(), args.itr, &rollingDataWriters)
13661365
workers := config.EnvConfig.MaxWorkers
13671366

13681367
return partitionWriter.Write(ctx, workers)

table/metadata.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type Metadata interface {
7272
// table is created. Implementations must throw an exception if a table's
7373
// UUID does not match the expected UUID after refreshing metadata.
7474
TableUUID() uuid.UUID
75-
// Location is the table's base location. This is used by writers to determine
75+
// Location is the table's base location. This is used by writerFactory to determine
7676
// where to store data files, manifest files, and table metadata files.
7777
Location() string
7878
// LastUpdatedMillis is the timestamp in milliseconds from the unix epoch when
@@ -95,7 +95,7 @@ type Metadata interface {
9595
// PartitionSpecByID returns the partition spec with the given ID. Returns
9696
// nil if the ID is not found in the list of partition specs.
9797
PartitionSpecByID(int) *iceberg.PartitionSpec
98-
// DefaultPartitionSpec is the ID of the current spec that writers should
98+
// DefaultPartitionSpec is the ID of the current spec that writerFactory should
9999
// use by default.
100100
DefaultPartitionSpec() int
101101
// LastPartitionSpecID is the highest assigned partition field ID across
@@ -126,7 +126,7 @@ type Metadata interface {
126126
SortOrder() SortOrder
127127
// SortOrders returns the list of sort orders in the table.
128128
SortOrders() []SortOrder
129-
// DefaultSortOrder returns the ID of the current sort order that writers
129+
// DefaultSortOrder returns the ID of the current sort order that writerFactory
130130
// should use by default.
131131
DefaultSortOrder() int
132132
// Properties is a string to string map of table properties. This is used

table/partitioned_fanout_writer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type partitionedFanoutWriter struct {
3939
partitionSpec iceberg.PartitionSpec
4040
schema *iceberg.Schema
4141
itr iter.Seq2[arrow.RecordBatch, error]
42-
writers *writerFactory
42+
writerFactory *writerFactory
4343
}
4444

4545
// PartitionInfo holds the row indices and partition values for a specific partition,
@@ -51,12 +51,13 @@ type partitionInfo struct {
5151
}
5252

5353
// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified
54-
// partition specification, schema, and record iterator.
55-
func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error]) *partitionedFanoutWriter {
54+
// partition specification, schema, record iterator, and writerFactory.
55+
func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error], writerFactory *writerFactory) *partitionedFanoutWriter {
5656
return &partitionedFanoutWriter{
5757
partitionSpec: partitionSpec,
5858
schema: schema,
5959
itr: itr,
60+
writerFactory: writerFactory,
6061
}
6162
}
6263

@@ -136,7 +137,7 @@ func (p *partitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-c
136137
}
137138

138139
partitionPath := p.partitionPath(val.partitionRec)
139-
rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues, dataFilesChannel)
140+
rollingDataWriter, err := p.writerFactory.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues, dataFilesChannel)
140141
if err != nil {
141142
return err
142143
}
@@ -157,7 +158,7 @@ func (p *partitionedFanoutWriter) yieldDataFiles(fanoutWorkers *errgroup.Group,
157158
go func() {
158159
defer close(outputDataFilesCh)
159160
err := fanoutWorkers.Wait()
160-
err = errors.Join(err, p.writers.closeAll())
161+
err = errors.Join(err, p.writerFactory.closeAll())
161162
errCh <- err
162163
close(errCh)
163164
}()

table/partitioned_fanout_writer_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,8 @@ func (s *FanoutWriterTestSuite) testTransformPartition(transform iceberg.Transfo
134134
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
135135
s.Require().NoError(err)
136136

137-
partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, args.itr)
138137
rollingDataWriters := NewWriterFactory(loc, args, metaBuilder, icebergSchema, 1024*1024)
139-
140-
partitionWriter.writers = &rollingDataWriters
138+
partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, args.itr, &rollingDataWriters)
141139
workers := config.EnvConfig.MaxWorkers
142140

143141
dataFiles := partitionWriter.Write(s.ctx, workers)

table/rolling_data_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type writerFactory struct {
4646
}
4747

4848
// NewWriterFactory creates a new WriterFactory with the specified configuration
49-
// for managing rolling data writers across partitions.
49+
// for managing rolling data writerFactory across partitions.
5050
func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) writerFactory {
5151
nextCount, stopCount := iter.Pull(args.counter)
5252

table/snapshot_producers_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ func TestManifestWriterClosesUnderlyingFile(t *testing.T) {
606606
require.Len(t, manifests, 1, "should have one manifest")
607607

608608
unclosed := trackIO.GetUnclosedWriters()
609-
require.Empty(t, unclosed, "all file writers should be closed, but these are still open: %v", unclosed)
609+
require.Empty(t, unclosed, "all file writerFactory should be closed, but these are still open: %v", unclosed)
610610
}
611611

612612
// TestCreateManifestClosesUnderlyingFile tests that createManifest properly
@@ -636,7 +636,7 @@ func TestCreateManifestClosesUnderlyingFile(t *testing.T) {
636636
require.NoError(t, err, "createManifest should succeed")
637637

638638
unclosed := trackIO.GetUnclosedWriters()
639-
require.Empty(t, unclosed, "all file writers should be closed after createManifest, but these are still open: %v", unclosed)
639+
require.Empty(t, unclosed, "all file writerFactory should be closed after createManifest, but these are still open: %v", unclosed)
640640
}
641641

642642
// TestOverwriteExistingManifestsClosesUnderlyingFile tests that existingManifests
@@ -688,11 +688,11 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t *testing.T) {
688688
require.NoError(t, err, "existingManifests should succeed")
689689

690690
unclosed := trackIO.GetUnclosedWriters()
691-
require.Empty(t, unclosed, "all file writers should be closed after existingManifests, but these are still open: %v", unclosed)
691+
require.Empty(t, unclosed, "all file writerFactory should be closed after existingManifests, but these are still open: %v", unclosed)
692692
}
693693

694694
// errorOnDeletedEntries is a producerImpl that returns an error from deletedEntries()
695-
// to test that file writers are properly closed even when deletedEntries fails.
695+
// to test that file writerFactory are properly closed even when deletedEntries fails.
696696
type errorOnDeletedEntries struct {
697697
base *snapshotProducer
698698
err error
@@ -743,7 +743,7 @@ func (b *blockingTrackingIO) Create(name string) (iceio.FileWriter, error) {
743743
return writer, err
744744
}
745745

746-
// This test verifies that NO writers are created when deletedEntries() fails,
746+
// This test verifies that NO writerFactory are created when deletedEntries() fails,
747747
// because the error should be returned before any goroutines start.
748748
func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
749749
ctx, cancel := context.WithCancel(context.Background())
@@ -781,6 +781,6 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
781781

782782
case <-time.After(100 * time.Millisecond):
783783
writerCount := blockingIO.GetWriterCount()
784-
require.Zero(t, writerCount, "expected no writers to be created when deletedEntries is called first")
784+
require.Zero(t, writerCount, "expected no writerFactory to be created when deletedEntries is called first")
785785
}
786786
}

0 commit comments

Comments
 (0)