Skip to content

Commit 65d0b22

Browse files
committed
sqlproxyccl: pass in ctx when transferring connections
Previously, the span used when transferring connections was reused from the forwarder. However, transferring connections has async components, so it's possible that the forwarder calls `Finish` on the span before the connection transfer is done using the span. This leads to a panic: `panic: use of Span after Finish`. To address this, this patch passes a context in when transferring a connection, instead of using the forwarder's context. Fixes: #153569 Release note: None
1 parent 3f285af commit 65d0b22

File tree

5 files changed

+21
-21
lines changed

5 files changed

+21
-21
lines changed

pkg/ccl/sqlproxyccl/balancer/balancer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (b *Balancer) processQueue(ctx context.Context) {
371371

372372
// Each request is retried up to maxTransferAttempts.
373373
for i := 0; i < maxTransferAttempts && ctx.Err() == nil; i++ {
374-
err := req.conn.TransferConnection()
374+
err := req.conn.TransferConnection(ctx)
375375
if err == nil || errors.Is(err, context.Canceled) {
376376
break
377377
}

pkg/ccl/sqlproxyccl/balancer/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type ConnectionHandle interface {
1919
// TransferConnection performs a connection migration on the connection
2020
// handle. Invoking this blocks until the connection migration process has
2121
// been completed.
22-
TransferConnection() error
22+
TransferConnection(context.Context) error
2323

2424
// IsIdle returns true if the connection is idle, and false otherwise.
2525
IsIdle() bool

pkg/ccl/sqlproxyccl/balancer/conn_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (h *testConnHandle) Close() {
4444
}
4545

4646
// TransferConnection implements the ConnectionHandle interface.
47-
func (h *testConnHandle) TransferConnection() error {
47+
func (h *testConnHandle) TransferConnection(ctx context.Context) error {
4848
h.mu.Lock()
4949
defer h.mu.Unlock()
5050
h.mu.onTransferConnectionCount++

pkg/ccl/sqlproxyccl/conn_migration.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ var errTransferCannotStart = errors.New("transfer cannot be started")
127127
// error).
128128
//
129129
// TransferConnection implements the balancer.ConnectionHandle interface.
130-
func (f *forwarder) TransferConnection() (retErr error) {
130+
func (f *forwarder) TransferConnection(ctx context.Context) (retErr error) {
131131
// A previous non-recoverable transfer would have closed the forwarder, so
132132
// return right away.
133133
if f.ctx.Err() != nil {
@@ -146,18 +146,18 @@ func (f *forwarder) TransferConnection() (retErr error) {
146146
// whenever the context expires. We have to close the forwarder because
147147
// the transfer may be blocked on I/O, and the only way for now is to close
148148
// the connections. This then allow TransferConnection to return and cleanup.
149-
ctx, cancel := newTransferContext(f.ctx)
149+
transferCtx, cancel := newTransferContext(ctx)
150150
defer cancel()
151151

152152
// Use a separate handler for timeouts. This is the only way to handle
153153
// blocked I/Os as described above.
154154
go func() {
155-
<-ctx.Done()
155+
<-transferCtx.Done()
156156
// This Close call here in addition to the one in the defer callback
157157
// below is on purpose. This would help unblock situations where we're
158158
// blocked on sending/reading messages from connections that couldn't
159159
// be handled with context.Context.
160-
if !ctx.isRecoverable() {
160+
if !transferCtx.isRecoverable() {
161161
f.Close()
162162
}
163163
}()
@@ -179,7 +179,7 @@ func (f *forwarder) TransferConnection() (retErr error) {
179179

180180
// When TransferConnection returns, it's either the forwarder has been
181181
// closed, or the procesors have been resumed.
182-
if !ctx.isRecoverable() {
182+
if !transferCtx.isRecoverable() {
183183
log.Dev.Infof(logCtx, "transfer failed: connection closed, latency=%v, err=%v", latencyDur, retErr)
184184
f.metrics.ConnMigrationErrorFatalCount.Inc(1)
185185
f.Close()
@@ -201,16 +201,16 @@ func (f *forwarder) TransferConnection() (retErr error) {
201201

202202
// Suspend both processors before starting the transfer.
203203
request, response := f.getProcessors()
204-
if err := request.suspend(ctx); err != nil {
204+
if err := request.suspend(transferCtx); err != nil {
205205
return errors.Wrap(err, "suspending request processor")
206206
}
207-
if err := response.suspend(ctx); err != nil {
207+
if err := response.suspend(transferCtx); err != nil {
208208
return errors.Wrap(err, "suspending response processor")
209209
}
210210

211211
// Transfer the connection.
212212
clientConn, serverConn := f.getConns()
213-
newServerConn, err := transferConnection(ctx, f, f.connector, f.metrics, clientConn, serverConn)
213+
newServerConn, err := transferConnection(transferCtx, f, f.connector, f.metrics, clientConn, serverConn)
214214
if err != nil {
215215
return errors.Wrap(err, "transferring connection")
216216
}

pkg/ccl/sqlproxyccl/proxy_handler_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2018,12 +2018,12 @@ func TestConnectionMigration(t *testing.T) {
20182018
require.Equal(t, totalAttempts, count)
20192019
}
20202020

2021-
transferConnWithRetries := func(t *testing.T, f *forwarder) error {
2021+
transferConnWithRetries := func(t *testing.T, ctx context.Context, f *forwarder) error {
20222022
t.Helper()
20232023

20242024
var nonRetriableErrSeen bool
20252025
err := testutils.SucceedsSoonError(func() error {
2026-
err := f.TransferConnection()
2026+
err := f.TransferConnection(ctx)
20272027
if err == nil {
20282028
return nil
20292029
}
@@ -2091,7 +2091,7 @@ func TestConnectionMigration(t *testing.T) {
20912091
require.NoError(t, err)
20922092

20932093
// Show that we get alternating SQL pods when we transfer.
2094-
require.NoError(t, transferConnWithRetries(t, f))
2094+
require.NoError(t, transferConnWithRetries(t, tCtx, f))
20952095
require.Equal(t, int64(1), f.metrics.ConnMigrationSuccessCount.Count())
20962096
require.Equal(t, tenant2.SQLAddr(), queryAddr(tCtx, t, db))
20972097

@@ -2102,7 +2102,7 @@ func TestConnectionMigration(t *testing.T) {
21022102
_, err = db.Exec("SET application_name = 'bar'")
21032103
require.NoError(t, err)
21042104

2105-
require.NoError(t, transferConnWithRetries(t, f))
2105+
require.NoError(t, transferConnWithRetries(t, tCtx, f))
21062106
require.Equal(t, int64(2), f.metrics.ConnMigrationSuccessCount.Count())
21072107
require.Equal(t, tenant1.SQLAddr(), queryAddr(tCtx, t, db))
21082108

@@ -2120,14 +2120,14 @@ func TestConnectionMigration(t *testing.T) {
21202120
go func() {
21212121
defer wg.Done()
21222122
for subCtx.Err() == nil {
2123-
_ = f.TransferConnection()
2123+
_ = f.TransferConnection(tCtx)
21242124
time.Sleep(100 * time.Millisecond)
21252125
}
21262126
}()
21272127

21282128
// This loop will run approximately 5 seconds.
21292129
var tenant1Addr, tenant2Addr int
2130-
for i := 0; i < 100; i++ {
2130+
for range 100 {
21312131
addr := queryAddr(tCtx, t, db)
21322132
if addr == tenant1.SQLAddr() {
21332133
tenant1Addr++
@@ -2167,7 +2167,7 @@ func TestConnectionMigration(t *testing.T) {
21672167
err = crdb.ExecuteTx(tCtx, db, nil /* txopts */, func(tx *gosql.Tx) error {
21682168
// Run multiple times to ensure that connection isn't closed.
21692169
for i := 0; i < 5; {
2170-
err := f.TransferConnection()
2170+
err := f.TransferConnection(tCtx)
21712171
if err == nil {
21722172
return errors.New("no error")
21732173
}
@@ -2201,7 +2201,7 @@ func TestConnectionMigration(t *testing.T) {
22012201
require.Equal(t, int64(0), f.metrics.ConnMigrationErrorFatalCount.Count())
22022202

22032203
// Once the transaction is closed, transfers should work.
2204-
require.NoError(t, transferConnWithRetries(t, f))
2204+
require.NoError(t, transferConnWithRetries(t, tCtx, f))
22052205
require.NotEqual(t, initAddr, queryAddr(tCtx, t, db))
22062206
require.Nil(t, f.ctx.Err())
22072207
require.Equal(t, initSuccessCount+1, f.metrics.ConnMigrationSuccessCount.Count())
@@ -2223,7 +2223,7 @@ func TestConnectionMigration(t *testing.T) {
22232223
lookupAddrDelayDuration = 10 * time.Second
22242224
defer testutils.TestingHook(&defaultTransferTimeout, 3*time.Second)()
22252225

2226-
err := f.TransferConnection()
2226+
err := f.TransferConnection(tCtx)
22272227
require.Error(t, err)
22282228
require.Regexp(t, "injected delays", err.Error())
22292229
require.Equal(t, initAddr, queryAddr(tCtx, t, db))
@@ -2319,7 +2319,7 @@ func TestConnectionMigration(t *testing.T) {
23192319
time.Sleep(2 * time.Second)
23202320
// This should be an error because the transfer timed out. Connection
23212321
// should automatically be closed.
2322-
require.Error(t, f.TransferConnection())
2322+
require.Error(t, f.TransferConnection(tCtx))
23232323

23242324
select {
23252325
case <-time.After(10 * time.Second):

0 commit comments

Comments
 (0)