Skip to content

Commit d226b92

Browse files
committed
sql: harden hookFnNode
`hookFnNode` is a special planNode implementation that performs some work from the given function in a new goroutine. That goroutine is started in `startExec` and finishes when either the context is canceled or the function completes. Previously, we didn't wait in the main goroutine of `hookFnNode` to ensure that the worker goroutine has exited, and this is now fixed. Concretely, this could have led to some undefined behavior (e.g. we saw a panic in the backup schedule execution that looks like the planner was reset _before_ the worker goroutine had a chance to perform its job). I briefly tried reproducing that behavior in a test but didn't succeed. Still, it makes sense that we'd wait in the main goroutine before the worker goroutine exits, which is now done in `hookFnNode.Close`. Release note: None
1 parent 7ea679f commit d226b92

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

pkg/sql/planhook.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,12 @@ var _ planNode = &hookFnNode{}
159159

160160
// hookFnRun contains the run-time state of hookFnNode during local execution.
161161
type hookFnRun struct {
162+
// resultsCh is used to communicate both the progress of the function and
163+
// its final result (this depends on the implementation). This channel is
164+
// never closed.
162165
resultsCh chan tree.Datums
163-
errCh chan error
166+
// errCh will be closed when the worker goroutine exits.
167+
errCh chan error
164168

165169
row tree.Datums
166170
}
@@ -174,25 +178,27 @@ func newHookFnNode(
174178
func (f *hookFnNode) startExec(params runParams) error {
175179
f.run.resultsCh = make(chan tree.Datums)
176180
f.run.errCh = make(chan error)
177-
// Note that it's ok if the async task is not started due to server shutdown
178-
// because the context should be canceled then too, which would unblock
179-
// calls to Next if they happen.
180-
return f.stopper.RunAsyncTaskEx(
181+
if err := f.stopper.RunAsyncTaskEx(
181182
params.ctx,
182183
stop.TaskOpts{
183184
TaskName: f.name,
184185
SpanOpt: stop.ChildSpan,
185186
},
186187
func(ctx context.Context) {
188+
defer close(f.run.errCh)
187189
err := f.f(ctx, f.run.resultsCh)
188190
select {
189191
case <-ctx.Done():
190192
case f.run.errCh <- err:
191193
}
192-
close(f.run.errCh)
193-
close(f.run.resultsCh)
194194
},
195-
)
195+
); err != nil {
196+
// The async task is not started due to server shutdown, so we need to
197+
// explicitly close the channel ourselves.
198+
close(f.run.errCh)
199+
return err
200+
}
201+
return nil
196202
}
197203

198204
func (f *hookFnNode) Next(params runParams) (bool, error) {
@@ -208,4 +214,9 @@ func (f *hookFnNode) Next(params runParams) (bool, error) {
208214

209215
func (f *hookFnNode) Values() tree.Datums { return f.run.row }
210216

211-
func (f *hookFnNode) Close(ctx context.Context) {}
217+
func (f *hookFnNode) Close(ctx context.Context) {
218+
if f.run.errCh != nil {
219+
// Block until the worker goroutine exits.
220+
<-f.run.errCh
221+
}
222+
}

0 commit comments

Comments
 (0)