@@ -159,8 +159,12 @@ var _ planNode = &hookFnNode{}
159
159
160
160
// hookFnRun contains the run-time state of hookFnNode during local execution.
161
161
type hookFnRun struct {
162
+ // resultsCh is used to communicate both the progress of the function and
163
+ // its final result (this depends on the implementation). This channel is
164
+ // never closed.
162
165
resultsCh chan tree.Datums
163
- errCh chan error
166
+ // errCh will be closed when the worker goroutine exits.
167
+ errCh chan error
164
168
165
169
row tree.Datums
166
170
}
@@ -174,25 +178,27 @@ func newHookFnNode(
174
178
func (f * hookFnNode ) startExec (params runParams ) error {
175
179
f .run .resultsCh = make (chan tree.Datums )
176
180
f .run .errCh = make (chan error )
177
- // Note that it's ok if the async task is not started due to server shutdown
178
- // because the context should be canceled then too, which would unblock
179
- // calls to Next if they happen.
180
- return f .stopper .RunAsyncTaskEx (
181
+ if err := f .stopper .RunAsyncTaskEx (
181
182
params .ctx ,
182
183
stop.TaskOpts {
183
184
TaskName : f .name ,
184
185
SpanOpt : stop .ChildSpan ,
185
186
},
186
187
func (ctx context.Context ) {
188
+ defer close (f .run .errCh )
187
189
err := f .f (ctx , f .run .resultsCh )
188
190
select {
189
191
case <- ctx .Done ():
190
192
case f .run .errCh <- err :
191
193
}
192
- close (f .run .errCh )
193
- close (f .run .resultsCh )
194
194
},
195
- )
195
+ ); err != nil {
196
+ // The async task is not started due to server shutdown, so we need to
197
+ // explicitly close the channel ourselves.
198
+ close (f .run .errCh )
199
+ return err
200
+ }
201
+ return nil
196
202
}
197
203
198
204
func (f * hookFnNode ) Next (params runParams ) (bool , error ) {
@@ -208,4 +214,9 @@ func (f *hookFnNode) Next(params runParams) (bool, error) {
208
214
209
215
func (f * hookFnNode ) Values () tree.Datums { return f .run .row }
210
216
211
- func (f * hookFnNode ) Close (ctx context.Context ) {}
217
+ func (f * hookFnNode ) Close (ctx context.Context ) {
218
+ if f .run .errCh != nil {
219
+ // Block until the worker goroutine exits.
220
+ <- f .run .errCh
221
+ }
222
+ }
0 commit comments