Skip to content

Commit 4f58950

Browse files
authored
runtime/vam/op: simplify Combine.Pull (#6320)
Simplify by replacing goroutines sending to unbuffered channels with buffered channels.
1 parent 7d6b176 commit 4f58950

File tree

1 file changed

+4
-11
lines changed

1 file changed

+4
-11
lines changed

runtime/vam/op/combine.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"sync"
66

77
"github.com/brimdata/super/vector"
8-
"golang.org/x/sync/errgroup"
98
)
109

1110
type Combine struct {
@@ -25,7 +24,7 @@ func NewCombine(ctx context.Context, parents []vector.Puller) *Combine {
2524
ctx: ctx,
2625
parent: p,
2726
resultCh: resultCh,
28-
doneCh: make(chan struct{}),
27+
doneCh: make(chan struct{}, 1),
2928
resumeCh: make(chan struct{}),
3029
})
3130
}
@@ -46,17 +45,11 @@ func (c *Combine) Pull(done bool) (vector.Any, error) {
4645
// Send done upstream. Parents waiting on resumeCh will ignore
4746
// this. All other parents will transition to waiting on
4847
// resumeCh.
49-
var group errgroup.Group
5048
for _, p := range c.parents {
51-
// We use a goroutine here because sending to parents[i].doneCh
49+
// doneCh must be buffered because sending to parents[i].doneCh
5250
// can block until we've sent to parents[i+1].doneCh, as with
53-
// "fork (=> count() => pass) | head".
54-
group.Go(func() error {
55-
return c.signal(p.doneCh)
56-
})
57-
}
58-
if err := group.Wait(); err != nil {
59-
return nil, err
51+
// "fork (count()) (pass) | head".
52+
p.doneCh <- struct{}{}
6053
}
6154
return nil, nil
6255
}

0 commit comments

Comments
 (0)