Skip to content

Commit 7ea679f

Browse files
authored
Merge pull request #153796 from yuzefovich/blathers/backport-release-25.2-153783
release-25.2: sql: fix a few edge case bugs with CHECK EXTERNAL CONNECTION
2 parents c5a5b37 + b475fc6 commit 7ea679f

File tree

5 files changed

+83
-18
lines changed

5 files changed

+83
-18
lines changed

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ go_test(
638638
"backfill_test.go",
639639
"builtin_mem_usage_test.go",
640640
"builtin_test.go",
641+
"check_external_connection_test.go",
641642
"check_test.go",
642643
"closed_session_cache_test.go",
643644
"comment_on_column_test.go",

pkg/sql/check_external_connection.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
5050
return err
5151
}
5252

53-
ctx, span := tracing.ChildSpan(params.ctx, "CheckExternalConnection")
53+
ctx, span := tracing.ChildSpan(params.ctx, "CheckExternalConnection-planning")
5454
defer span.Finish()
5555

5656
store, err := params.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, n.loc, params.p.User())
@@ -90,7 +90,7 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
9090
time.Duration(tree.MustBeDInt(nanos)),
9191
))
9292
}
93-
n.rows = make(chan tree.Datums, int64(len(sqlInstanceIDs))*n.params.Concurrency)
93+
n.rows = make(chan tree.Datums, len(sqlInstanceIDs)*getCloudCheckConcurrency(n.params))
9494
rowWriter := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
9595
// collapse the two pairs of bytes+time to a single string rate each.
9696
res := make(tree.Datums, len(row)-1)
@@ -103,9 +103,16 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
103103
return nil
104104
})
105105

106-
grp := ctxgroup.WithContext(ctx)
107-
n.execGrp = grp
108-
grp.GoCtx(func(ctx context.Context) error {
106+
workerStarted := make(chan struct{})
107+
n.execGrp = ctxgroup.WithContext(params.ctx)
108+
n.execGrp.GoCtx(func(ctx context.Context) error {
109+
// Derive a separate tracing span since the planning one will be
110+
// finished when the main goroutine exits from startExec.
111+
ctx, span := tracing.ChildSpan(ctx, "CheckExternalConnection-execution")
112+
defer span.Finish()
113+
// Unblock the main goroutine after having created the tracing span.
114+
close(workerStarted)
115+
109116
recv := MakeDistSQLReceiver(
110117
ctx,
111118
rowWriter,
@@ -124,13 +131,18 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
124131
return nil
125132
})
126133

134+
// Block until the worker goroutine has started. This allows us to guarantee
135+
// that params.ctx contains a tracing span that hasn't been finished.
136+
// TODO(yuzefovich): this is a bit hacky. The issue is that
137+
// planNodeToRowSource has already created a new tracing span for this
138+
// checkExternalConnectionNode and has updated params.ctx accordingly; then,
139+
// if the query is canceled before the worker goroutine starts, the tracing
140+
// span is finished, yet it will have already been captured by the ctxgroup.
141+
<-workerStarted
127142
return nil
128143
}
129144

130145
func (n *checkExternalConnectionNode) Next(params runParams) (bool, error) {
131-
if n.rows == nil {
132-
return false, nil
133-
}
134146
select {
135147
case <-params.ctx.Done():
136148
return false, params.ctx.Err()
@@ -149,7 +161,6 @@ func (n *checkExternalConnectionNode) Values() tree.Datums {
149161

150162
func (n *checkExternalConnectionNode) Close(_ context.Context) {
151163
_ = n.execGrp.Wait()
152-
n.rows = nil
153164
}
154165

155166
func (n *checkExternalConnectionNode) parseParams(params runParams) error {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package sql_test
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/testutils"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
17+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
18+
"github.com/cockroachdb/cockroach/pkg/util/log"
19+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
20+
)
21+
22+
func TestCheckExternalConnection(t *testing.T) {
23+
defer leaktest.AfterTest(t)()
24+
defer log.Scope(t).Close(t)
25+
26+
ctx := context.Background()
27+
rng, _ := randutil.NewTestRand()
28+
dir, dirCleanupFn := testutils.TempDir(t)
29+
defer dirCleanupFn()
30+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
31+
ExternalIODir: dir,
32+
})
33+
defer s.Stopper().Stop(ctx)
34+
35+
runner := sqlutils.MakeSQLRunner(sqlDB)
36+
runner.Exec(t, "CREATE EXTERNAL CONNECTION foo_conn AS 'nodelocal://1/foo';")
37+
query := "CHECK EXTERNAL CONNECTION 'nodelocal://1/foo';"
38+
// Should execute successfully without a statement timeout.
39+
runner.Exec(t, query)
40+
// Run with a random statement timeout which will likely make the query
41+
// fail. We don't care whether it does nor which error is returned as long
42+
// as the process doesn't crash.
43+
runner.Exec(t, fmt.Sprintf("SET statement_timeout='%dms'", rng.Intn(100)+1))
44+
_, _ = sqlDB.Exec(query)
45+
}

pkg/sql/cloud_check_processor.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,20 +203,27 @@ func newCloudCheckProcessor(
203203
return p, nil
204204
}
205205

206-
// Start is part of the RowSource interface.
207-
func (p *proc) Start(ctx context.Context) {
208-
p.StartInternal(ctx, "cloudcheck.proc")
209-
210-
concurrency := int(p.spec.Params.Concurrency)
206+
func getCloudCheckConcurrency(params CloudCheckParams) int {
207+
concurrency := int(params.Concurrency)
211208
if concurrency < 1 {
212209
concurrency = 1
213210
}
211+
return concurrency
212+
}
214213

214+
// Start is part of the RowSource interface.
215+
func (p *proc) Start(ctx context.Context) {
216+
p.StartInternal(ctx, "cloudcheck.proc")
217+
218+
concurrency := getCloudCheckConcurrency(p.spec.Params)
215219
p.results = make(chan result, concurrency)
216220

217221
if err := p.FlowCtx.Stopper().RunAsyncTask(p.Ctx(), "cloudcheck.proc", func(ctx context.Context) {
218222
defer close(p.results)
219-
if err := ctxgroup.GroupWorkers(ctx, concurrency, func(ctx context.Context, _ int) error {
223+
// We're ignoring the context cancellation error (which is the only one
224+
// possible) because the main goroutine will observe it on its own
225+
// anyway in Next.
226+
_ = ctxgroup.GroupWorkers(ctx, concurrency, func(ctx context.Context, _ int) error {
220227
select {
221228
case p.results <- checkURI(
222229
ctx,
@@ -229,9 +236,7 @@ func (p *proc) Start(ctx context.Context) {
229236
case <-ctx.Done():
230237
return ctx.Err()
231238
}
232-
}); err != nil {
233-
p.MoveToDraining(err)
234-
}
239+
})
235240
}); err != nil {
236241
p.MoveToDraining(err)
237242
}

pkg/sql/execinfra/processorsbase.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,9 @@ const (
521521
//
522522
// An error can be optionally passed. It will be the first piece of metadata
523523
// returned by DrainHelper().
524+
//
525+
// MoveToDraining should only be called from the main goroutine of the
526+
// processor.
524527
func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {
525528
if pb.State != StateRunning {
526529
// Calling MoveToDraining in any state is allowed in order to facilitate the

0 commit comments

Comments
 (0)