Skip to content

Commit 233f456

Browse files
authored
*: recover goroutines when they panic (#488)
1 parent 9f2db8e commit 233f456

File tree

15 files changed

+111
-86
lines changed

15 files changed

+111
-86
lines changed

lib/util/waitgroup/waitgroup.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package waitgroup
55

66
import (
77
"sync"
8+
9+
"go.uber.org/zap"
810
)
911

1012
// WaitGroup is a wrapper for sync.WaitGroup
@@ -27,15 +29,25 @@ func (w *WaitGroup) Run(exec func()) {
2729
// and call done when function return. it will dump current goroutine stack into log if catch any recover result.
2830
// exec is that execute logic function. recoverFn is that handler will be called after recover and before dump stack,
2931
// passing `nil` means noop.
30-
func (w *WaitGroup) RunWithRecover(exec func(), recoverFn func(r interface{})) {
32+
func (w *WaitGroup) RunWithRecover(exec func(), recoverFn func(r interface{}), logger *zap.Logger) {
3133
w.Add(1)
3234
go func() {
3335
defer func() {
3436
r := recover()
37+
defer func() {
38+
// If it panics again in recovery, quit ASAP.
39+
_ = recover()
40+
}()
41+
if r != nil && logger != nil {
42+
logger.Error("panic in the recoverable goroutine",
43+
zap.Reflect("r", r),
44+
zap.Stack("stack trace"))
45+
}
46+
// Call Done() before recoverFn because recoverFn normally calls `Close()`, which may call `wg.Wait()`.
47+
w.Done()
3548
if r != nil && recoverFn != nil {
3649
recoverFn(r)
3750
}
38-
w.Done()
3951
}()
4052
exec()
4153
}()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package waitgroup
5+
6+
import (
7+
"testing"
8+
9+
"github.com/pingcap/tiproxy/lib/util/logger"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestWithRecoveryLog(t *testing.T) {
14+
lg, text := logger.CreateLoggerForTest(t)
15+
var wg WaitGroup
16+
wg.RunWithRecover(func() {
17+
panic("mock panic")
18+
}, nil, lg)
19+
wg.Wait()
20+
require.Contains(t, text.String(), "mock panic")
21+
}
22+
23+
func TestWithRecoveryPanic(t *testing.T) {
24+
var wg WaitGroup
25+
wg.RunWithRecover(func() {
26+
panic("mock panic1")
27+
}, func(r interface{}) {
28+
panic("mock panic2")
29+
}, nil)
30+
wg.Wait()
31+
}

pkg/manager/cert/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (cm *CertManager) SQLTLS() *tls.Config {
9898
// The proxy periodically reloads certs. If it fails, we will retry in the next round.
9999
// If configuration changes, it only affects new connections by returning new *tls.Config.
100100
func (cm *CertManager) reloadLoop(ctx context.Context, cfgch <-chan *config.Config) {
101+
// Failing to reload certs may cause even more serious problems than TiProxy reboot, so we don't recover panics.
101102
cm.wg.Run(func() {
102103
for {
103104
select {

pkg/manager/config/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile
8282
return err
8383
}
8484

85-
e.wg.Run(func() {
85+
e.wg.RunWithRecover(func() {
8686
// Read the file periodically and reload the config if it changes.
8787
//
8888
// We tried other ways to watch file:
@@ -100,7 +100,7 @@ func (e *ConfigManager) Init(ctx context.Context, logger *zap.Logger, configFile
100100
}
101101
}
102102
}
103-
})
103+
}, nil, e.logger)
104104
} else {
105105
if err := e.SetTOMLConfig(nil); err != nil {
106106
return err

pkg/manager/infosync/info.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ func (is *InfoSyncer) Init(ctx context.Context, cfg *config.Config, certMgr *cer
109109

110110
childCtx, cancelFunc := context.WithCancel(ctx)
111111
is.cancelFunc = cancelFunc
112-
is.wg.Run(func() {
112+
is.wg.RunWithRecover(func() {
113113
is.updateTopologyLivenessLoop(childCtx, topologyInfo)
114-
})
114+
}, nil, is.lg)
115115
return nil
116116
}
117117

pkg/manager/logger/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ func (lm *LoggerManager) Init(cfgch <-chan *config.Config) {
4747
ctx, cancel := context.WithCancel(context.Background())
4848
lm.cancel = cancel
4949

50-
lm.wg.Run(func() {
50+
lm.wg.RunWithRecover(func() {
5151
lm.watchCfg(ctx, cfgch)
52-
})
52+
}, nil, lm.logger)
5353
}
5454

5555
func (lm *LoggerManager) watchCfg(ctx context.Context, cfgch <-chan *config.Config) {

pkg/manager/router/backend_observer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver,
123123
func (bo *BackendObserver) Start() {
124124
childCtx, cancelFunc := context.WithCancel(context.Background())
125125
bo.cancelFunc = cancelFunc
126+
// Failing to observe backends may cause even more serious problems than TiProxy reboot, so we don't recover panics.
126127
bo.wg.Run(func() {
127128
bo.observe(childCtx)
128129
})

pkg/manager/router/router_score.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func (r *ScoreBasedRouter) Init(fetcher BackendFetcher, hc HealthCheck, cfg *con
5050
r.observer = observer
5151
childCtx, cancelFunc := context.WithCancel(context.Background())
5252
r.cancelFunc = cancelFunc
53+
// Failing to rebalance backends may cause even more serious problems than TiProxy reboot, so we don't recover panics.
5354
r.wg.Run(func() {
5455
r.rebalanceLoop(childCtx)
5556
})

pkg/metrics/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ func (mm *MetricsManager) setupMonitor(ctx context.Context) {
7979
KeepAliveCounter.Inc()
8080
}
8181
}
82-
mm.wg.Run(func() {
82+
mm.wg.RunWithRecover(func() {
8383
systimemon.StartMonitor(ctx, mm.logger, time.Now, systimeErrHandler, successCallBack)
84-
})
84+
}, nil, mm.logger)
8585
}
8686

8787
// registerProxyMetrics registers metrics.

pkg/proxy/backend/backend_conn_mgr.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,13 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
203203
childCtx, cancelFunc := context.WithCancel(ctx)
204204
mgr.cancelFunc = cancelFunc
205205
mgr.lastActiveTime = endTime
206-
mgr.wg.Run(func() {
206+
mgr.wg.RunWithRecover(func() {
207207
mgr.processSignals(childCtx)
208-
})
208+
}, func(_ any) {
209+
// If we do not clean up, the router may retain the connection forever and the TiDB won't be released in the
210+
// Serverless Tier.
211+
_ = mgr.Close()
212+
}, mgr.logger)
209213
return nil
210214
}
211215

@@ -422,22 +426,26 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context) {
422426
for {
423427
select {
424428
case s := <-mgr.signalReceived:
425-
// Redirect the session immediately just in case the session is finishedTxn.
426-
mgr.processLock.Lock()
427-
switch s {
428-
case signalTypeGracefulClose:
429-
mgr.tryGracefulClose(ctx)
430-
case signalTypeRedirect:
431-
mgr.tryRedirect(ctx)
432-
}
433-
mgr.processLock.Unlock()
429+
func() {
430+
// Redirect the session immediately just in case the session is finishedTxn.
431+
mgr.processLock.Lock()
432+
defer mgr.processLock.Unlock()
433+
switch s {
434+
case signalTypeGracefulClose:
435+
mgr.tryGracefulClose(ctx)
436+
case signalTypeRedirect:
437+
mgr.tryRedirect(ctx)
438+
}
439+
}()
434440
case rs := <-mgr.redirectResCh:
435441
mgr.notifyRedirectResult(ctx, rs)
436442
case <-checkBackendTicker.C:
437-
mgr.checkBackendActive()
438-
mgr.processLock.Lock()
439-
mgr.setKeepAlive()
440-
mgr.processLock.Unlock()
443+
func() {
444+
mgr.checkBackendActive()
445+
mgr.processLock.Lock()
446+
defer mgr.processLock.Unlock()
447+
mgr.setKeepAlive()
448+
}()
441449
case <-ctx.Done():
442450
checkBackendTicker.Stop()
443451
return

0 commit comments

Comments
 (0)