File tree Expand file tree Collapse file tree 2 files changed +36
-34
lines changed Expand file tree Collapse file tree 2 files changed +36
-34
lines changed Original file line number Diff line number Diff line change @@ -237,36 +237,35 @@ func (ie *InternalExecutor) runWithEx(
237
237
ex .close (ctx , closeMode )
238
238
wg .Done ()
239
239
}
240
- if err = ie .s .cfg .Stopper .RunAsyncTaskEx (
241
- ctx ,
242
- stop.TaskOpts {
243
- TaskName : opName .StripMarkers (),
244
- SpanOpt : stop .ChildSpan ,
245
- },
246
- func (ctx context.Context ) {
247
- defer cleanup (ctx )
248
- // TODO(yuzefovich): benchmark whether we should be growing the
249
- // stack size unconditionally.
250
- if growStackSize {
251
- growstack .Grow ()
252
- }
253
- if err := ex .run (
254
- ctx ,
255
- ie .mon ,
256
- & mon.BoundAccount {}, /*reserved*/
257
- nil , /* cancel */
258
- ); err != nil {
259
- sqltelemetry .RecordError (ctx , err , & ex .server .cfg .Settings .SV )
260
- errCallback (err )
261
- }
262
- w .finish ()
263
- },
264
- ); err != nil {
240
+ ctx , hdl , err := ie .s .cfg .Stopper .GetHandle (ctx , stop.TaskOpts {
241
+ TaskName : opName .StripMarkers (),
242
+ SpanOpt : stop .ChildSpan ,
243
+ })
244
+ if err != nil {
265
245
// The goroutine wasn't started, so we need to perform the cleanup
266
246
// ourselves.
267
247
cleanup (ctx )
268
248
return err
269
249
}
250
+ go func () {
251
+ defer hdl .Activate (ctx ).Release (ctx )
252
+ defer cleanup (ctx )
253
+ // TODO(yuzefovich): benchmark whether we should be growing the
254
+ // stack size unconditionally.
255
+ if growStackSize {
256
+ growstack .Grow ()
257
+ }
258
+ if err := ex .run (
259
+ ctx ,
260
+ ie .mon ,
261
+ & mon.BoundAccount {}, /*reserved*/
262
+ nil , /* cancel */
263
+ ); err != nil {
264
+ sqltelemetry .RecordError (ctx , err , & ex .server .cfg .Settings .SV )
265
+ errCallback (err )
266
+ }
267
+ w .finish ()
268
+ }()
270
269
return nil
271
270
}
272
271
Original file line number Diff line number Diff line change @@ -141,13 +141,18 @@ func (db *DB) PollSource(
141
141
// start begins the goroutine for this poller, which will periodically request
142
142
// time series data from the DataSource and store it.
143
143
func (p * poller ) start () (firstDone <- chan struct {}) {
144
- ch := make (chan struct {})
145
- // Poll once immediately and synchronously.
146
- bgCtx := p .AnnotateCtx (context .Background ())
147
- if p .stopper .RunAsyncTask (bgCtx , "ts-poller" , func (ctx context.Context ) {
148
- ch := ch // goroutine-local copy
144
+ ch := make (chan struct {}) // closed on completion of first poll
145
+ ctx , hdl , err := p .stopper .GetHandle (
146
+ p .AnnotateCtx (context .Background ()), stop.TaskOpts {TaskName : "ts-poller" },
147
+ )
148
+ if err != nil {
149
+ close (ch )
150
+ return ch
151
+ }
152
+ go func (ctx context.Context , ch chan struct {}) {
153
+ defer hdl .Activate (ctx ).Release (ctx )
149
154
var ticker timeutil.Timer
150
- ticker .Reset (0 )
155
+ ticker .Reset (0 ) // poll immediately
151
156
defer ticker .Stop ()
152
157
for {
153
158
select {
@@ -162,9 +167,7 @@ func (p *poller) start() (firstDone <-chan struct{}) {
162
167
return
163
168
}
164
169
}
165
- }) != nil {
166
- close (ch )
167
- }
170
+ }(ctx , ch )
168
171
return ch
169
172
}
170
173
You can’t perform that action at this time.
0 commit comments