Skip to content

Commit 291bfe6

Browse files
committed
sql: fix another minor bug with CHECK EXTERNAL CONNECTION
Previously, we could create a channel with no buffer if concurrency parameter wasn't specified. As a result, in an edge case (that was exposed by recently added test) the reader goroutine might have exited _before_ reading from the unbuffered channel which would result in a deadlock (because the writer goroutine would block on sending into the channel, and the reader - i.e. the main goroutine - would block waiting on the wait group). This is now fixed by unifying the logic for determining the concurrency. We could've made sending on `n.rows` channel also check the context cancellation if we're being conservative, but that seems like an overkill. Additionally, it fixes an extreme edge case where we could hit "use of Span after Finish" issue in case when the context canceled (because of small statement timeout) before the execution worker goroutine was started. This is addressed by blocking the main goroutine until the worker goroutine is spun off. Release note: None
1 parent e5f821c commit 291bfe6

File tree

2 files changed

+23
-9
lines changed

2 files changed

+23
-9
lines changed

pkg/sql/check_external_connection.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
9090
time.Duration(tree.MustBeDInt(nanos)),
9191
))
9292
}
93-
n.rows = make(chan tree.Datums, int64(len(sqlInstanceIDs))*n.params.Concurrency)
93+
n.rows = make(chan tree.Datums, len(sqlInstanceIDs)*getCloudCheckConcurrency(n.params))
9494
rowWriter := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
9595
// collapse the two pairs of bytes+time to a single string rate each.
9696
res := make(tree.Datums, len(row)-1)
@@ -103,13 +103,15 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
103103
return nil
104104
})
105105

106-
grp := ctxgroup.WithContext(params.ctx)
107-
n.execGrp = grp
108-
grp.GoCtx(func(ctx context.Context) error {
106+
workerStarted := make(chan struct{})
107+
n.execGrp = ctxgroup.WithContext(params.ctx)
108+
n.execGrp.GoCtx(func(ctx context.Context) error {
109109
// Derive a separate tracing span since the planning one will be
110110
// finished when the main goroutine exits from startExec.
111111
ctx, span := tracing.ChildSpan(ctx, "CheckExternalConnection-execution")
112112
defer span.Finish()
113+
// Unblock the main goroutine after having created the tracing span.
114+
close(workerStarted)
113115

114116
recv := MakeDistSQLReceiver(
115117
ctx,
@@ -129,6 +131,14 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
129131
return nil
130132
})
131133

134+
// Block until the worker goroutine has started. This allows us to guarantee
135+
// that params.ctx contains a tracing span that hasn't been finished.
136+
// TODO(yuzefovich): this is a bit hacky. The issue is that
137+
// planNodeToRowSource has already created a new tracing span for this
138+
// checkExternalConnectionNode and has updated params.ctx accordingly; then,
139+
// if the query is canceled before the worker goroutine starts, the tracing
140+
// span is finished, yet it will have already been captured by the ctxgroup.
141+
<-workerStarted
132142
return nil
133143
}
134144

pkg/sql/cloud_check_processor.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,19 @@ func newCloudCheckProcessor(
203203
return p, nil
204204
}
205205

206-
// Start is part of the RowSource interface.
207-
func (p *proc) Start(ctx context.Context) {
208-
p.StartInternal(ctx, "cloudcheck.proc")
209-
210-
concurrency := int(p.spec.Params.Concurrency)
206+
func getCloudCheckConcurrency(params CloudCheckParams) int {
207+
concurrency := int(params.Concurrency)
211208
if concurrency < 1 {
212209
concurrency = 1
213210
}
211+
return concurrency
212+
}
213+
214+
// Start is part of the RowSource interface.
215+
func (p *proc) Start(ctx context.Context) {
216+
p.StartInternal(ctx, "cloudcheck.proc")
214217

218+
concurrency := getCloudCheckConcurrency(p.spec.Params)
215219
p.results = make(chan result, concurrency)
216220

217221
if err := p.FlowCtx.Stopper().RunAsyncTask(p.Ctx(), "cloudcheck.proc", func(ctx context.Context) {

0 commit comments

Comments
 (0)