-
Notifications
You must be signed in to change notification settings - Fork 156
Description
Apache Iceberg version
main (development)
Please describe the bug π
Goroutine leak in MapExec when all worker goroutines error β no context cancellation
Summary
MapExec in table/internal/utils.go uses a plain errgroup.Group without context cancellation. When all worker goroutines return errors while the feeder still has items to produce, the feeder goroutine blocks forever on a channel send, preventing the output channel from ever closing. This deadlocks the consumer (the caller's goroutine) and leaks the feeder goroutine.
MapExec also does not accept a context.Context, so parent cancellation cannot propagate to in-flight operations.
Affected Code
| Location | Role |
|---|---|
table/internal/utils.go:523 |
MapExec β the buggy function |
table/writer.go:131 |
writeFiles calls MapExec for unpartitioned writes |
table/arrow_utils.go:1348 |
recordsToDataFiles calls writeFiles |
table/snapshot_producers.go:235 |
overwriteFiles.existingManifestEntries also calls MapExec |
Reproduction Scenario
- Begin an unpartitioned table write that produces many
WriteTaskitems viabinPackRecords(more than2 * nWorkersitems). - The write goes through
recordsToDataFilesβwriteFilesβMapExecwith N workers. - All N workers encounter errors (e.g., filesystem full, permission denied, network failure) before the feeder has drained its input iterator.
- Each worker returns its error and stops consuming from the input channel
ch. - The feeder goroutine fills the
chbuffer (capacity N), then blocks onch <- vbecause no goroutine is reading fromch. defer close(out)in the feeder never executes.- The consumer blocks on
for v := range out(waiting foroutto close), or the drain loopfor range out {}blocks if early exit was triggered.
Result: The feeder goroutine leaks. The consumer (caller's goroutine) hangs indefinitely.
Root Cause Analysis
// table/internal/utils.go:523-574
func MapExec[T, S any](nWorkers int, slice iter.Seq[T], fn func(T) (S, error)) iter.Seq2[S, error] {
var g errgroup.Group // β (1) no context: first worker error does NOT signal others
ch := make(chan T, nWorkers)
out := make(chan S, nWorkers)
for range nWorkers {
g.Go(func() error {
for v := range ch {
result, err := fn(v)
if err != nil {
return err // β (2) worker exits; stops reading from ch
}
out <- result
}
return nil
})
}
var err error
go func() {
defer close(out) // β (4) never reached if feeder is stuck
for v := range slice {
ch <- v // β (3) blocks when ch buffer is full and all workers exited
}
close(ch)
err = g.Wait()
}()
return func(yield func(S, error) bool) {
defer func() {
for range out { // β (5) drain also blocks forever (out never closes)
}
}()
for v := range out { // β (5) blocks forever (out never closes)
if !yield(v, nil) {
return
}
}
if err != nil {
var z S
yield(z, err)
}
}
}Step-by-step for nWorkers=2, items [A, B, C, D, E]:
- Feeder sends A, B to
ch(fills buffer). Workers 1 and 2 each pick one item. - Feeder sends C to
ch(slot freed by worker pick-up). Worker 1 errors on A, exits. - Feeder sends D to
ch(slot freed by worker 2). Worker 2 errors on B, exits. Both workers have returned. chcontains[C, D]β full buffer, no readers.- Feeder tries
ch <- E. Blocks forever. close(out)never fires β consumer blocks onfor v := range outforever.
Contrast with Correct Pattern
readAllDeleteFiles in table/arrow_scanner.go:73 correctly uses context-aware errgroup:
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)When one goroutine fails, the derived context is cancelled, and other goroutines can observe the cancellation and exit promptly. Any code that selects on ctx.Done() can unblock.
Suggested Fix
- Add a
context.Contextparameter toMapExec. - Use
errgroup.WithContext(ctx)so that worker errors cancel the derived context. - Use
selectwithctx.Done()in both the feeder and workers to unblock on cancellation. - Update all callers (
writeFiles,existingManifestEntries) to pass their context.
func MapExec[T, S any](ctx context.Context, nWorkers int, slice iter.Seq[T], fn func(T) (S, error)) iter.Seq2[S, error] {
if nWorkers <= 0 {
nWorkers = runtime.GOMAXPROCS(0)
}
g, ctx := errgroup.WithContext(ctx)
ch := make(chan T, nWorkers)
out := make(chan S, nWorkers)
for range nWorkers {
g.Go(func() error {
for v := range ch {
result, err := fn(v)
if err != nil {
return err
}
select {
case out <- result:
case <-ctx.Done():
return context.Cause(ctx)
}
}
return nil
})
}
var err error
go func() {
defer close(out)
for v := range slice {
select {
case ch <- v:
case <-ctx.Done():
// Context cancelled (worker errored or parent cancelled).
// Returning from range-over-func body without calling yield
// stops the iterator, so we just need to exit this loop.
// With range-over-func, a bare "return" from this closure
// signals the iterator to stop.
close(ch)
err = g.Wait()
return
}
}
close(ch)
err = g.Wait()
}()
return func(yield func(S, error) bool) {
defer func() {
for range out {
}
}()
for v := range out {
if !yield(v, nil) {
return
}
}
if err != nil {
var z S
yield(z, err)
}
}
}Key point: in the feeder goroutine, we cannot use break inside a select to exit a for loop β break only breaks the select statement. Use return (the feeder is an anonymous goroutine, so returning exits it) with appropriate cleanup.
Secondary Issue: AllManifests errgroup without context
Table.AllManifests in table/table.go:141 also uses a plain errgroup.Group{}. However, unlike MapExec, this does not cause a goroutine leak or deadlock:
- A separate goroutine calls
g.Wait()and thenclose(ch)(via defer), which causesMakeSequencedChanto close its output, and the consumer'sselectloop exits.
The real issue here is data correctness: when a goroutine for snapshot index i errors without sending to ch, MakeSequencedChan never sees index i. Any successfully-processed items with index > i are buffered in the internal priority queue but never emitted (because isNext requires consecutive indices). Those results are silently dropped when the source channel closes. The consumer may return only a partial result set without indicating which snapshots were skipped.
Adding errgroup.WithContext would still be an improvement here β it would short-circuit remaining goroutines on first failure rather than letting them all run to completion.
Existing Test Coverage
TestMapExecFinish in table/internal/utils_test.go:75 only tests the happy path (all items succeed). There is no test for the error path. A test that makes all workers return errors with a large input set would reproduce the deadlock (the test would hang).
Environment
- iceberg-go at current
mainbranch - Go 1.22+ (iter/range-over-func)