Skip to content

Commit 92e2bcc

Browse files
committed
sql: fix a couple of edge case bugs with CHECK EXTERNAL CONNECTION
This commit fixes a couple of edge case bugs with CHECK EXTERNAL CONNECTION implementation. Namely: - previously, the execution goroutine (that is created in `startExec`) would use the tracing span that is finished, which is not allowed. This is now fixed by deriving a new tracing span. - also previously we would call `MoveToDraining` from an auxiliary goroutine of the cloud check processor when the context cancellation is observed. This would race with `MoveToDraining` call from the main goroutine in `Next`. The implicit contract of this helper method is that it's only called from the main goroutine, and otherwise it can lead to "MoveToDraining called in state × with err" errors. This is now fixed by removing the call from the auxiliary goroutine since it's actually not needed. Additionally, this commit simplifies dealing with the `rows` channel a bit. I decided to omit the release note given we recently merged a fix that contained one. Release note: None
1 parent 76b08e5 commit 92e2bcc

File tree

5 files changed

+61
-10
lines changed

5 files changed

+61
-10
lines changed

pkg/sql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,7 @@ go_test(
655655
"backfill_test.go",
656656
"builtin_mem_usage_test.go",
657657
"builtin_test.go",
658+
"check_external_connection_test.go",
658659
"check_test.go",
659660
"closed_session_cache_test.go",
660661
"comment_on_column_test.go",

pkg/sql/check_external_connection.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
5050
return err
5151
}
5252

53-
ctx, span := tracing.ChildSpan(params.ctx, "CheckExternalConnection")
53+
ctx, span := tracing.ChildSpan(params.ctx, "CheckExternalConnection-planning")
5454
defer span.Finish()
5555

5656
store, err := params.ExecCfg().DistSQLSrv.ExternalStorageFromURI(ctx, n.loc, params.p.User())
@@ -103,9 +103,14 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
103103
return nil
104104
})
105105

106-
grp := ctxgroup.WithContext(ctx)
106+
grp := ctxgroup.WithContext(params.ctx)
107107
n.execGrp = grp
108108
grp.GoCtx(func(ctx context.Context) error {
109+
// Derive a separate tracing span since the planning one will be
110+
// finished when the main goroutine exits from startExec.
111+
ctx, span := tracing.ChildSpan(ctx, "CheckExternalConnection-execution")
112+
defer span.Finish()
113+
109114
recv := MakeDistSQLReceiver(
110115
ctx,
111116
rowWriter,
@@ -128,9 +133,6 @@ func (n *checkExternalConnectionNode) startExec(params runParams) error {
128133
}
129134

130135
func (n *checkExternalConnectionNode) Next(params runParams) (bool, error) {
131-
if n.rows == nil {
132-
return false, nil
133-
}
134136
select {
135137
case <-params.ctx.Done():
136138
return false, params.ctx.Err()
@@ -149,7 +151,6 @@ func (n *checkExternalConnectionNode) Values() tree.Datums {
149151

150152
func (n *checkExternalConnectionNode) Close(_ context.Context) {
151153
_ = n.execGrp.Wait()
152-
n.rows = nil
153154
}
154155

155156
func (n *checkExternalConnectionNode) parseParams(params runParams) error {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package sql_test
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"testing"
12+
13+
"github.com/cockroachdb/cockroach/pkg/base"
14+
"github.com/cockroachdb/cockroach/pkg/testutils"
15+
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
16+
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
17+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
18+
"github.com/cockroachdb/cockroach/pkg/util/log"
19+
"github.com/cockroachdb/cockroach/pkg/util/randutil"
20+
)
21+
22+
func TestCheckExternalConnection(t *testing.T) {
23+
defer leaktest.AfterTest(t)()
24+
defer log.Scope(t).Close(t)
25+
26+
ctx := context.Background()
27+
rng, _ := randutil.NewTestRand()
28+
dir, dirCleanupFn := testutils.TempDir(t)
29+
defer dirCleanupFn()
30+
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
31+
ExternalIODir: dir,
32+
})
33+
defer s.Stopper().Stop(ctx)
34+
35+
runner := sqlutils.MakeSQLRunner(sqlDB)
36+
runner.Exec(t, "CREATE EXTERNAL CONNECTION foo_conn AS 'nodelocal://1/foo';")
37+
query := "CHECK EXTERNAL CONNECTION 'nodelocal://1/foo';"
38+
// Should execute successfully without a statement timeout.
39+
runner.Exec(t, query)
40+
// Run with a random statement timeout which will likely make the query
41+
// fail. We don't care whether it does nor which error is returned as long
42+
// as the process doesn't crash.
43+
runner.Exec(t, fmt.Sprintf("SET statement_timeout='%dms'", rng.Intn(100)+1))
44+
_, _ = sqlDB.Exec(query)
45+
}

pkg/sql/cloud_check_processor.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,10 @@ func (p *proc) Start(ctx context.Context) {
216216

217217
if err := p.FlowCtx.Stopper().RunAsyncTask(p.Ctx(), "cloudcheck.proc", func(ctx context.Context) {
218218
defer close(p.results)
219-
if err := ctxgroup.GroupWorkers(ctx, concurrency, func(ctx context.Context, _ int) error {
219+
// We're ignoring the context cancellation error (which is the only one
220+
// possible) because the main goroutine will observe it on its own
221+
// anyway in Next.
222+
_ = ctxgroup.GroupWorkers(ctx, concurrency, func(ctx context.Context, _ int) error {
220223
select {
221224
case p.results <- checkURI(
222225
ctx,
@@ -229,9 +232,7 @@ func (p *proc) Start(ctx context.Context) {
229232
case <-ctx.Done():
230233
return ctx.Err()
231234
}
232-
}); err != nil {
233-
p.MoveToDraining(err)
234-
}
235+
})
235236
}); err != nil {
236237
p.MoveToDraining(err)
237238
}

pkg/sql/execinfra/processorsbase.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ const (
523523
//
524524
// An error can be optionally passed. It will be the first piece of metadata
525525
// returned by DrainHelper().
526+
//
527+
// MoveToDraining should only be called from the main goroutine of the
528+
// processor.
526529
func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {
527530
if pb.State != StateRunning {
528531
// Calling MoveToDraining in any state is allowed in order to facilitate the

0 commit comments

Comments
 (0)