Skip to content

Commit 3fba696

Browse files
authored
balance, backend: support traffic replay when backend signing cert is unavailable (#734)
1 parent c834132 commit 3fba696

File tree

12 files changed

+205
-52
lines changed

12 files changed

+205
-52
lines changed

pkg/balance/observer/backend_health.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package observer
55

66
import (
77
"fmt"
8+
"time"
89

910
"github.com/pingcap/tiproxy/lib/config"
1011
)
@@ -16,6 +17,10 @@ type BackendHealth struct {
1617
PingErr error
1718
// The backend version that returned to the client during handshake.
1819
ServerVersion string
20+
// The last time checking the signing cert.
21+
lastCheckSigningCertTime time.Time
22+
// Whether the backend has set the signing cert. If not, the connection redirection will be disabled.
23+
SupportRedirection bool
1924
// Whether the backend in the same zone with TiProxy. If TiProxy location is undefined, take all backends as local.
2025
Local bool
2126
}
@@ -37,10 +42,6 @@ func (bh *BackendHealth) setLocal(cfg *config.Config) {
3742
bh.Local = false
3843
}
3944

40-
func (bh *BackendHealth) Equals(health BackendHealth) bool {
41-
return bh.Healthy == health.Healthy && bh.ServerVersion == health.ServerVersion && bh.Local == health.Local
42-
}
43-
4445
func (bh *BackendHealth) String() string {
4546
str := "down"
4647
if bh.Healthy {
@@ -49,6 +50,9 @@ func (bh *BackendHealth) String() string {
4950
if bh.PingErr != nil {
5051
str += fmt.Sprintf(", err: %s", bh.PingErr.Error())
5152
}
53+
if !bh.SupportRedirection {
54+
str += ", support redirection: false"
55+
}
5256
return str
5357
}
5458

pkg/balance/observer/backend_observer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[
123123
if !bo.healthCheckConfig.Enable {
124124
for addr, backend := range backends {
125125
curBackendHealth[addr] = &BackendHealth{
126-
BackendInfo: *backend,
127-
Healthy: true,
126+
BackendInfo: *backend,
127+
SupportRedirection: true,
128+
Healthy: true,
128129
}
129130
}
130131
return curBackendHealth
@@ -139,7 +140,8 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[
139140
if ctx.Err() != nil {
140141
return
141142
}
142-
health := bo.hc.Check(ctx, addr, info)
143+
lastHealth := bo.curBackends[addr]
144+
health := bo.hc.Check(ctx, addr, info, lastHealth)
143145
health.setLocal(cfg)
144146
lock.Lock()
145147
curBackendHealth[addr] = health

pkg/balance/observer/backend_observer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,9 @@ func (ts *observerTestSuite) addBackend() (string, BackendInfo) {
284284
}
285285
ts.fetcher.setBackend(addr, info)
286286
ts.hc.setBackend(addr, &BackendHealth{
287-
BackendInfo: *info,
288-
Healthy: true,
287+
BackendInfo: *info,
288+
SupportRedirection: true,
289+
Healthy: true,
289290
})
290291
return addr, *info
291292
}

pkg/balance/observer/health_check.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121

2222
// HealthCheck is used to check the backends of one backend. One can pass a customized health check function to the observer.
2323
type HealthCheck interface {
24-
Check(ctx context.Context, addr string, info *BackendInfo) *BackendHealth
24+
Check(ctx context.Context, addr string, info *BackendInfo, lastHealth *BackendHealth) *BackendHealth
2525
}
2626

2727
const (
2828
statusPathSuffix = "/status"
29+
configPathSuffix = "/config"
30+
31+
checkSigningCertInterval = time.Minute
2932
)
3033

3134
type backendHttpStatusRespBody struct {
@@ -34,6 +37,14 @@ type backendHttpStatusRespBody struct {
3437
GitHash string `json:"git_hash"`
3538
}
3639

40+
type backendHttpConfigRespBody struct {
41+
Security security `json:"security"`
42+
}
43+
44+
type security struct {
45+
SessionTokenSigningCert string `json:"session-token-signing-cert"`
46+
}
47+
3748
type DefaultHealthCheck struct {
3849
cfg *config.HealthCheck
3950
logger *zap.Logger
@@ -51,7 +62,7 @@ func NewDefaultHealthCheck(httpCli *http.Client, cfg *config.HealthCheck, logger
5162
}
5263
}
5364

54-
func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo) *BackendHealth {
65+
func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *BackendInfo, lastBh *BackendHealth) *BackendHealth {
5566
bh := &BackendHealth{
5667
BackendInfo: *info,
5768
Healthy: true,
@@ -64,6 +75,10 @@ func (dhc *DefaultHealthCheck) Check(ctx context.Context, addr string, info *Bac
6475
return bh
6576
}
6677
dhc.checkSqlPort(ctx, addr, bh)
78+
if !bh.Healthy {
79+
return bh
80+
}
81+
dhc.queryConfig(ctx, info, bh, lastBh)
6782
return bh
6883
}
6984

@@ -123,3 +138,48 @@ func (dhc *DefaultHealthCheck) checkStatusPort(ctx context.Context, info *Backen
123138
bh.PingErr = errors.Wrapf(err, "connect status port failed")
124139
}
125140
}
141+
142+
func (dhc *DefaultHealthCheck) queryConfig(ctx context.Context, info *BackendInfo, bh *BackendHealth, lastBh *BackendHealth) {
143+
if ctx.Err() != nil {
144+
return
145+
}
146+
// Using static backends, no status port.
147+
if info == nil || len(info.IP) == 0 {
148+
return
149+
}
150+
151+
now := time.Now()
152+
if lastBh != nil {
153+
bh.SupportRedirection = lastBh.SupportRedirection
154+
if lastBh.lastCheckSigningCertTime.Add(checkSigningCertInterval).After(now) {
155+
bh.lastCheckSigningCertTime = lastBh.lastCheckSigningCertTime
156+
return
157+
}
158+
} else {
159+
// Assume it has the signing cert if reading config fails.
160+
bh.SupportRedirection = true
161+
}
162+
bh.lastCheckSigningCertTime = now
163+
164+
var err error
165+
defer func() {
166+
if lastBh == nil || lastBh.SupportRedirection != bh.SupportRedirection {
167+
dhc.logger.Info("backend has updated signing cert", zap.Bool("support_redirection", bh.SupportRedirection), zap.Error(err))
168+
}
169+
}()
170+
171+
addr := net.JoinHostPort(info.IP, strconv.Itoa(int(info.StatusPort)))
172+
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(dhc.cfg.RetryInterval), uint64(dhc.cfg.MaxRetries)), ctx)
173+
var resp []byte
174+
if resp, err = dhc.httpCli.Get(addr, configPathSuffix, b, dhc.cfg.DialTimeout); err != nil {
175+
return
176+
}
177+
var respBody backendHttpConfigRespBody
178+
if err = json.Unmarshal(resp, &respBody); err != nil {
179+
dhc.logger.Error("unmarshal body in healthy check failed", zap.String("addr", addr), zap.String("resp body", string(resp)), zap.Error(err))
180+
return
181+
}
182+
if len(respBody.Security.SessionTokenSigningCert) == 0 {
183+
bh.SupportRedirection = false
184+
}
185+
}

pkg/balance/observer/health_check_test.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ func TestReadServerVersion(t *testing.T) {
2525
hc := NewDefaultHealthCheck(nil, newHealthCheckConfigForTest(), lg)
2626
backend, info := newBackendServer(t)
2727
backend.setServerVersion("1.0")
28-
health := hc.Check(context.Background(), backend.sqlAddr, info)
28+
health := hc.Check(context.Background(), backend.sqlAddr, info, nil)
2929
require.Equal(t, "1.0", health.ServerVersion)
3030
backend.stopSQLServer()
3131
backend.setServerVersion("2.0")
3232
backend.startSQLServer()
33-
health = hc.Check(context.Background(), backend.sqlAddr, info)
33+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
3434
require.Equal(t, "2.0", health.ServerVersion)
3535
backend.stopSQLServer()
3636

3737
//test for respBody not ok
38-
backend.mockHttpHandler.setHTTPRespBody("")
38+
backend.mockHttpHandler.setStatusRespBody("")
3939
backend.startSQLServer()
40-
health = hc.Check(context.Background(), backend.sqlAddr, info)
40+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
4141
require.False(t, health.Healthy)
4242
require.NotNil(t, health.PingErr)
4343
require.Equal(t, true, strings.Contains(health.PingErr.Error(), "unexpected end of JSON input"))
@@ -52,37 +52,47 @@ func TestHealthCheck(t *testing.T) {
5252
hc := NewDefaultHealthCheck(nil, cfg, lg)
5353
backend, info := newBackendServer(t)
5454
backend.setServerVersion("1.0")
55-
health := hc.Check(context.Background(), backend.sqlAddr, info)
55+
backend.setHasSigningCert(true)
56+
health := hc.Check(context.Background(), backend.sqlAddr, info, nil)
5657
require.True(t, health.Healthy)
5758

5859
backend.stopSQLServer()
59-
health = hc.Check(context.Background(), backend.sqlAddr, info)
60+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
6061
require.False(t, health.Healthy)
6162
backend.startSQLServer()
62-
health = hc.Check(context.Background(), backend.sqlAddr, info)
63+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
6364
require.True(t, health.Healthy)
64-
6565
backend.setHTTPResp(false)
66-
health = hc.Check(context.Background(), backend.sqlAddr, info)
66+
67+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
6768
require.False(t, health.Healthy)
6869
require.NotContains(t, text.String(), "unmarshal body")
6970
backend.setHTTPResp(true)
70-
health = hc.Check(context.Background(), backend.sqlAddr, info)
71+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
7172
require.True(t, health.Healthy)
7273

7374
backend.setHTTPWait(time.Second + cfg.DialTimeout)
74-
health = hc.Check(context.Background(), backend.sqlAddr, info)
75+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
7576
require.False(t, health.Healthy)
7677
backend.setHTTPWait(time.Duration(0))
77-
health = hc.Check(context.Background(), backend.sqlAddr, info)
78+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
7879
require.True(t, health.Healthy)
7980

8081
backend.setSqlResp(false)
81-
health = hc.Check(context.Background(), backend.sqlAddr, info)
82+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
8283
require.False(t, health.Healthy)
8384
backend.setSqlResp(true)
84-
health = hc.Check(context.Background(), backend.sqlAddr, info)
85+
health = hc.Check(context.Background(), backend.sqlAddr, info, nil)
8586
require.True(t, health.Healthy)
87+
88+
require.True(t, health.SupportRedirection)
89+
health.lastCheckSigningCertTime = time.Time{}
90+
health = hc.Check(context.Background(), backend.sqlAddr, info, health)
91+
require.True(t, health.SupportRedirection)
92+
backend.setHasSigningCert(false)
93+
health.lastCheckSigningCertTime = time.Time{}
94+
health = hc.Check(context.Background(), backend.sqlAddr, info, health)
95+
require.False(t, health.SupportRedirection)
8696
backend.close()
8797
}
8898

@@ -105,7 +115,7 @@ func newBackendServer(t *testing.T) (*backendServer, *BackendInfo) {
105115
}
106116
backend.startHTTPServer()
107117
backend.setHTTPResp(true)
108-
backend.setHTTPRespBody("")
118+
backend.setStatusRespBody("")
109119
backend.setSqlResp(true)
110120
backend.startSQLServer()
111121
return backend, &BackendInfo{
@@ -120,7 +130,7 @@ func (srv *backendServer) setServerVersion(version string) {
120130
GitHash: "",
121131
}
122132
body, _ := json.Marshal(resp)
123-
srv.mockHttpHandler.setHTTPRespBody(string(body))
133+
srv.mockHttpHandler.setStatusRespBody(string(body))
124134
}
125135

126136
func (srv *backendServer) startHTTPServer() {

pkg/balance/observer/mock_test.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package observer
55

66
import (
77
"context"
8+
"encoding/json"
89
"net/http"
910
"sync"
1011
"testing"
@@ -81,7 +82,7 @@ func newMockHealthCheck() *mockHealthCheck {
8182
}
8283
}
8384

84-
func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo) *BackendHealth {
85+
func (mhc *mockHealthCheck) Check(_ context.Context, addr string, info *BackendInfo, _ *BackendHealth) *BackendHealth {
8586
mhc.Lock()
8687
defer mhc.Unlock()
8788
mhc.backends[addr].BackendInfo = *info
@@ -109,18 +110,28 @@ func (mhc *mockHealthCheck) removeBackend(addr string) {
109110
}
110111

111112
type mockHttpHandler struct {
112-
t *testing.T
113-
httpOK atomic.Bool
114-
respBody atomic.String
115-
wait atomic.Int64
113+
t *testing.T
114+
httpOK atomic.Bool
115+
status atomic.String
116+
config atomic.String
117+
wait atomic.Int64
116118
}
117119

118120
func (handler *mockHttpHandler) setHTTPResp(succeed bool) {
119121
handler.httpOK.Store(succeed)
120122
}
121123

122-
func (handler *mockHttpHandler) setHTTPRespBody(body string) {
123-
handler.respBody.Store(body)
124+
func (handler *mockHttpHandler) setStatusRespBody(body string) {
125+
handler.status.Store(body)
126+
}
127+
128+
func (handler *mockHttpHandler) setHasSigningCert(hasSigningCert bool) {
129+
var resp backendHttpConfigRespBody
130+
if hasSigningCert {
131+
resp.Security.SessionTokenSigningCert = "/tmp"
132+
}
133+
b, _ := json.Marshal(resp)
134+
handler.config.Store(string(b))
124135
}
125136

126137
func (handler *mockHttpHandler) setHTTPWait(wait time.Duration) {
@@ -134,7 +145,11 @@ func (handler *mockHttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request
134145
}
135146
if handler.httpOK.Load() {
136147
w.WriteHeader(http.StatusOK)
137-
_, _ = w.Write([]byte(handler.respBody.Load()))
148+
if r.RequestURI == statusPathSuffix {
149+
_, _ = w.Write([]byte(handler.status.Load()))
150+
} else {
151+
_, _ = w.Write([]byte(handler.config.Load()))
152+
}
138153
} else {
139154
w.WriteHeader(http.StatusInternalServerError)
140155
}

pkg/balance/router/router.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ func (b *backendWrapper) GetBackendInfo() observer.BackendInfo {
146146
return info
147147
}
148148

149-
func (b *backendWrapper) Equals(health observer.BackendHealth) bool {
149+
func (b *backendWrapper) SupportRedirection() bool {
150150
b.mu.RLock()
151-
equal := b.mu.BackendHealth.Equals(health)
151+
supportRedirection := b.mu.SupportRedirection
152152
b.mu.RUnlock()
153-
return equal
153+
return supportRedirection
154154
}
155155

156156
func (b *backendWrapper) String() string {

0 commit comments

Comments
 (0)