Skip to content

Commit 990dfdc

Browse files
committed
sql: retry all DistSQL runner dial errors
This commit marks the error that DistSQL runners produce when dialing remote nodes in a special way that is now always retried-as-local. In particular, this allows us to fix two problematic scenarios that could occur when using secondary tenants: - when attempting to start a pod with stale instance information - the port is in use by an RPC server for the same tenant, but with a new instance id. This commit includes the test from Jeff that exposed the gap in the retry-as-local mechanism. Release note: None
1 parent d819b9c commit 990dfdc

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

pkg/ccl/serverccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ go_test(
8787
"//pkg/util/log",
8888
"//pkg/util/protoutil",
8989
"//pkg/util/randutil",
90+
"//pkg/util/stop",
9091
"//pkg/util/timeutil",
9192
"@com_github_cockroachdb_datadriven//:datadriven",
9293
"@com_github_cockroachdb_errors//:errors",

pkg/ccl/serverccl/server_sql_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"context"
1313
"fmt"
1414
"io"
15+
"net"
1516
"net/http"
1617
"strings"
1718
"testing"
@@ -35,6 +36,7 @@ import (
3536
"github.com/cockroachdb/cockroach/pkg/util/envutil"
3637
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3738
"github.com/cockroachdb/cockroach/pkg/util/log"
39+
"github.com/cockroachdb/cockroach/pkg/util/stop"
3840
"github.com/cockroachdb/errors"
3941
"github.com/lib/pq"
4042
"github.com/stretchr/testify/require"
@@ -413,3 +415,59 @@ func TestSystemConfigWatcherCache(t *testing.T) {
413415
defer leaktest.AfterTest(t)()
414416
systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */)
415417
}
418+
419+
// TestStartTenantWithStaleInstance covers the following scenario:
420+
// - a sql server starts up and is assigned port 'a'
421+
// - the sql server shuts down and releases port 'a'
422+
// - something else starts up and claims port 'a'. In the test that is the
423+
// listener. This is important because the listener causes connections to 'a' to
424+
// hang instead of responding with a RESET packet.
425+
// - a different server with stale instance information schedules a distsql
426+
// flow and attempts to dial port 'a'.
427+
func TestStartTenantWithStaleInstance(t *testing.T) {
428+
defer leaktest.AfterTest(t)()
429+
defer log.Scope(t).Close(t)
430+
431+
ctx := context.Background()
432+
s := serverutils.StartServerOnly(t, base.TestServerArgs{
433+
DefaultTestTenant: base.TestControlsTenantsExplicitly,
434+
})
435+
defer s.Stopper().Stop(ctx)
436+
437+
var listener net.Listener
438+
// In rare cases under stress net.Listen call can result in an error that
439+
// the address is already in use (because the stopped tenant hasn't released
440+
// the socket); thus, we allow for some retries to go around that issue.
441+
testutils.SucceedsSoon(t, func() error {
442+
rpcAddr := func() string {
443+
tenantStopper := stop.NewStopper()
444+
defer tenantStopper.Stop(ctx)
445+
server, db := serverutils.StartTenant(t, s, base.TestTenantArgs{
446+
Stopper: tenantStopper,
447+
TenantID: serverutils.TestTenantID(),
448+
},
449+
)
450+
defer db.Close()
451+
return server.RPCAddr()
452+
}()
453+
454+
var err error
455+
listener, err = net.Listen("tcp", rpcAddr)
456+
return err
457+
})
458+
defer func() {
459+
_ = listener.Close()
460+
}()
461+
462+
_, db := serverutils.StartTenant(t, s, base.TestTenantArgs{
463+
TenantID: serverutils.TestTenantID(),
464+
})
465+
defer func() {
466+
_ = db.Close()
467+
}()
468+
469+
// Query a table to make sure the tenant is healthy, doesn't really matter
470+
// which table.
471+
_, err := db.Exec("SELECT count(*) FROM system.sqlliveness")
472+
require.NoError(t, err)
473+
}

pkg/sql/distsql_running.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,22 @@ type runnerResult struct {
100100
err error
101101
}
102102

103+
type runnerDialErr struct {
104+
err error
105+
}
106+
107+
func (e *runnerDialErr) Error() string {
108+
return e.err.Error()
109+
}
110+
111+
func (e *runnerDialErr) Cause() error {
112+
return e.err
113+
}
114+
115+
func isDialErr(err error) bool {
116+
return errors.HasType(err, (*runnerDialErr)(nil))
117+
}
118+
103119
// run executes the request. An error, if encountered, is both sent on the
104120
// result channel and returned.
105121
func (req runnerRequest) run() error {
@@ -111,6 +127,9 @@ func (req runnerRequest) run() error {
111127

112128
conn, err := req.podNodeDialer.Dial(req.ctx, roachpb.NodeID(req.sqlInstanceID), rpc.DefaultClass)
113129
if err != nil {
130+
// Mark this error as special runnerDialErr so that we could retry this
131+
// distributed query as local.
132+
err = &runnerDialErr{err: err}
114133
res.err = err
115134
return err
116135
}
@@ -1979,7 +1998,9 @@ func (dsp *DistSQLPlanner) PlanAndRun(
19791998
// cancellation has already occurred.
19801999
return
19812000
}
1982-
if !pgerror.IsSQLRetryableError(distributedErr) && !flowinfra.IsFlowRetryableError(distributedErr) {
2001+
if !pgerror.IsSQLRetryableError(distributedErr) &&
2002+
!flowinfra.IsFlowRetryableError(distributedErr) &&
2003+
!isDialErr(distributedErr) {
19832004
// Only re-run the query if we think there is a high chance of a
19842005
// successful local execution.
19852006
return

0 commit comments

Comments
 (0)