Skip to content

Commit f0acd80

Browse files
authored
vip, elect: bind VIP to the active node (#590)
1 parent 3b64de9 commit f0acd80

File tree

11 files changed

+585
-57
lines changed

11 files changed

+585
-57
lines changed

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/go-mysql-org/go-mysql v1.6.0
1212
github.com/go-sql-driver/mysql v1.7.0
1313
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
14+
github.com/j-keck/arping v1.0.3
1415
github.com/klauspost/compress v1.16.6
1516
github.com/pingcap/kvproto v0.0.0-20231018065736-c0689aded40c
1617
github.com/pingcap/sysutil v1.0.0
@@ -24,6 +25,7 @@ require (
2425
github.com/spf13/cobra v1.6.1
2526
github.com/stretchr/testify v1.8.4
2627
github.com/tidwall/btree v1.5.2
28+
github.com/vishvananda/netlink v1.1.0
2729
go.etcd.io/etcd/api/v3 v3.5.6
2830
go.etcd.io/etcd/client/pkg/v3 v3.5.6
2931
go.etcd.io/etcd/client/v3 v3.5.6
@@ -90,6 +92,7 @@ require (
9092
github.com/tklauser/numcpus v0.6.1 // indirect
9193
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
9294
github.com/ugorji/go/codec v1.2.7 // indirect
95+
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
9396
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
9497
github.com/yusufpapurcu/wmi v1.2.3 // indirect
9598
go.etcd.io/bbolt v1.3.6 // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbk
342342
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
343343
github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62A0xJL6I+umB2YTlFRwWXaDFA0jy+5HzGiJjqI=
344344
github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw=
345+
github.com/j-keck/arping v1.0.3 h1:aeVk5WnsK6xPaRsFt5wV6W2x5l/n5XBNp0MMr/FEv2k=
346+
github.com/j-keck/arping v1.0.3/go.mod h1:aJbELhR92bSk7tp79AWM/ftfc90EfEi2bQJrbBFOsPw=
345347
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
346348
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
347349
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
@@ -610,6 +612,10 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
610612
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
611613
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
612614
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
615+
github.com/vishvananda/netlink v1.1.0 h1:1iyaYNBLmP6L0220aDnYQpo1QEV4t4hJ+xEEhhJH8j0=
616+
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
617+
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df h1:OviZH7qLw/7ZovXvuNyL3XQl8UFofeikI1NW1Gypu7k=
618+
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
613619
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
614620
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
615621
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
@@ -834,6 +840,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
834840
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
835841
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
836842
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
843+
golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
837844
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
838845
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
839846
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

lib/config/proxy.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Config struct {
2727
Log Log `yaml:"log,omitempty" toml:"log,omitempty" json:"log,omitempty"`
2828
Balance Balance `yaml:"balance,omitempty" toml:"balance,omitempty" json:"balance,omitempty"`
2929
Labels map[string]string `yaml:"labels,omitempty" toml:"labels,omitempty" json:"labels,omitempty"`
30+
HA HA `yaml:"ha,omitempty" toml:"ha,omitempty" json:"ha,omitempty"`
3031
}
3132

3233
type KeepAlive struct {
@@ -116,6 +117,11 @@ type Security struct {
116117
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
117118
}
118119

120+
type HA struct {
121+
VirtualIP string `yaml:"virtual-ip,omitempty" toml:"virtual-ip,omitempty" json:"virtual-ip,omitempty"`
122+
Interface string `yaml:"interface,omitempty" toml:"interface,omitempty" json:"interface,omitempty"`
123+
}
124+
119125
func DefaultKeepAlive() (frontend, backendHealthy, backendUnhealthy KeepAlive) {
120126
frontend.Enabled = true
121127
backendHealthy.Enabled = true

pkg/manager/elect/election.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ type electionConfig struct {
4646
sessionTTL int
4747
}
4848

49+
func DefaultElectionConfig(sessionTTL int) electionConfig {
50+
return electionConfig{
51+
timeout: 2 * time.Second,
52+
retryIntvl: 500 * time.Millisecond,
53+
retryCnt: 3,
54+
sessionTTL: sessionTTL,
55+
}
56+
}
57+
4958
var _ Election = (*election)(nil)
5059

5160
// election is used for electing owner.
@@ -106,7 +115,11 @@ func (m *election) initSession(ctx context.Context) (*concurrency.Session, error
106115
}
107116

108117
func (m *election) IsOwner() bool {
109-
return m.elec.Load() != nil
118+
ownerID, err := m.GetOwnerID(context.Background())
119+
if err != nil {
120+
return false
121+
}
122+
return ownerID == m.id
110123
}
111124

112125
func (m *election) campaignLoop(ctx context.Context) {

pkg/manager/elect/mock_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func (ts *etcdTestSuite) getOwnerID() string {
145145
} else {
146146
require.Equal(ts.t, ownerID, id)
147147
}
148+
require.Equal(ts.t, elec.id == ownerID, elec.IsOwner())
148149
}
149150
return ownerID
150151
}

pkg/manager/vip/manager.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package vip
5+
6+
import (
7+
"context"
8+
9+
"github.com/pingcap/tiproxy/lib/config"
10+
"github.com/pingcap/tiproxy/pkg/manager/elect"
11+
clientv3 "go.etcd.io/etcd/client/v3"
12+
"go.uber.org/zap"
13+
)
14+
15+
const (
16+
// vipKey is the key in etcd for VIP election.
17+
vipKey = "vip"
18+
// sessionTTL is the session's TTL in seconds for VIP election.
19+
sessionTTL = 5
20+
)
21+
22+
type VIPManager interface {
23+
Start(context.Context, *clientv3.Client) error
24+
OnElected()
25+
OnRetired()
26+
Close()
27+
}
28+
29+
var _ VIPManager = (*vipManager)(nil)
30+
31+
type vipManager struct {
32+
operation NetworkOperation
33+
cfgGetter config.ConfigGetter
34+
election elect.Election
35+
lg *zap.Logger
36+
}
37+
38+
func NewVIPManager(lg *zap.Logger, cfgGetter config.ConfigGetter) (*vipManager, error) {
39+
cfg := cfgGetter.GetConfig()
40+
if len(cfg.HA.VirtualIP) == 0 && len(cfg.HA.Interface) == 0 {
41+
return nil, nil
42+
}
43+
vm := &vipManager{
44+
cfgGetter: cfgGetter,
45+
lg: lg.With(zap.String("address", cfg.HA.VirtualIP), zap.String("link", cfg.HA.Interface)),
46+
}
47+
if len(cfg.HA.VirtualIP) == 0 || len(cfg.HA.Interface) == 0 {
48+
vm.lg.Warn("Both address and link must be specified to enable VIP. VIP is disabled")
49+
return nil, nil
50+
}
51+
operation, err := NewNetworkOperation(cfg.HA.VirtualIP, cfg.HA.Interface)
52+
if err != nil {
53+
vm.lg.Error("init network operation failed", zap.Error(err))
54+
return nil, err
55+
}
56+
vm.operation = operation
57+
return vm, nil
58+
}
59+
60+
func getID(cfg *config.Config) (string, error) {
61+
return cfg.HA.VirtualIP, nil
62+
}
63+
64+
func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error {
65+
cfg := vm.cfgGetter.GetConfig()
66+
addr, err := getID(cfg)
67+
if err != nil {
68+
return err
69+
}
70+
71+
electionCfg := elect.DefaultElectionConfig(sessionTTL)
72+
election := elect.NewElection(vm.lg, etcdCli, electionCfg, addr, vipKey, vm)
73+
vm.election = election
74+
// Check the ownership at startup just in case the node is just down and restarted.
75+
// Before it was down, it may be either the owner or not.
76+
if election.IsOwner() {
77+
vm.OnElected()
78+
} else {
79+
vm.OnRetired()
80+
}
81+
vm.election.Start(ctx)
82+
return nil
83+
}
84+
85+
func (vm *vipManager) OnElected() {
86+
hasIP, err := vm.operation.HasIP()
87+
if err != nil {
88+
vm.lg.Error("checking addresses failed", zap.Error(err))
89+
return
90+
}
91+
if hasIP {
92+
vm.lg.Info("already has VIP, do nothing")
93+
return
94+
}
95+
if err := vm.operation.AddIP(); err != nil {
96+
vm.lg.Error("adding address failed", zap.Error(err))
97+
return
98+
}
99+
if err := vm.operation.SendARP(); err != nil {
100+
vm.lg.Error("broadcast ARP failed", zap.Error(err))
101+
return
102+
}
103+
vm.lg.Info("adding VIP success")
104+
}
105+
106+
func (vm *vipManager) OnRetired() {
107+
hasIP, err := vm.operation.HasIP()
108+
if err != nil {
109+
vm.lg.Error("checking addresses failed", zap.Error(err))
110+
return
111+
}
112+
if !hasIP {
113+
vm.lg.Info("does not have VIP, do nothing")
114+
return
115+
}
116+
if err := vm.operation.DeleteIP(); err != nil {
117+
vm.lg.Error("deleting address failed", zap.Error(err))
118+
return
119+
}
120+
vm.lg.Info("deleting VIP success")
121+
}
122+
123+
func (vm *vipManager) Close() {
124+
// The OnRetired() will be called when the election is closed.
125+
if vm.election != nil {
126+
vm.election.Close()
127+
}
128+
}

pkg/manager/vip/manager_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package vip
5+
6+
import (
7+
"context"
8+
"strings"
9+
"testing"
10+
"time"
11+
12+
"github.com/pingcap/tiproxy/lib/config"
13+
"github.com/pingcap/tiproxy/lib/util/logger"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestVIPCfgError(t *testing.T) {
18+
tests := []struct {
19+
cfg config.HA
20+
hasErr bool
21+
}{
22+
{
23+
cfg: config.HA{},
24+
hasErr: false,
25+
},
26+
{
27+
cfg: config.HA{
28+
VirtualIP: "",
29+
Interface: "eth0",
30+
},
31+
hasErr: false,
32+
},
33+
{
34+
cfg: config.HA{
35+
VirtualIP: "10.10.10.10/24",
36+
Interface: "",
37+
},
38+
hasErr: false,
39+
},
40+
{
41+
cfg: config.HA{
42+
VirtualIP: "0.0.0.0/24",
43+
Interface: "unknown",
44+
},
45+
// OS error for non-linux platform and parse error for linux platform.
46+
hasErr: true,
47+
},
48+
}
49+
50+
lg, _ := logger.CreateLoggerForTest(t)
51+
for i, test := range tests {
52+
cfgGetter := newMockConfigGetter(test.cfg)
53+
vm, err := NewVIPManager(lg, cfgGetter)
54+
if test.hasErr {
55+
require.Error(t, err, "case %d", i)
56+
} else {
57+
require.NoError(t, err, "case %d", i)
58+
}
59+
require.Nil(t, vm)
60+
}
61+
}
62+
63+
func TestNetworkOperation(t *testing.T) {
64+
tests := []struct {
65+
eventType int
66+
hasIP bool
67+
hasIPErr bool
68+
addIPErr bool
69+
delIPErr bool
70+
sendArpErr bool
71+
expectedLog string
72+
}{
73+
{
74+
eventType: eventTypeElected,
75+
hasIP: false,
76+
expectedLog: "adding VIP success",
77+
},
78+
{
79+
eventType: eventTypeRetired,
80+
hasIP: true,
81+
expectedLog: "deleting VIP success",
82+
},
83+
{
84+
eventType: eventTypeElected,
85+
hasIP: true,
86+
expectedLog: "do nothing",
87+
},
88+
{
89+
eventType: eventTypeRetired,
90+
hasIP: false,
91+
expectedLog: "do nothing",
92+
},
93+
{
94+
eventType: eventTypeElected,
95+
hasIPErr: true,
96+
expectedLog: "checking addresses failed",
97+
},
98+
{
99+
eventType: eventTypeElected,
100+
hasIP: false,
101+
addIPErr: true,
102+
expectedLog: "adding address failed",
103+
},
104+
{
105+
eventType: eventTypeElected,
106+
hasIP: false,
107+
sendArpErr: true,
108+
expectedLog: "broadcast ARP failed",
109+
},
110+
{
111+
eventType: eventTypeRetired,
112+
hasIP: true,
113+
delIPErr: true,
114+
expectedLog: "deleting address failed",
115+
},
116+
}
117+
118+
lg, text := logger.CreateLoggerForTest(t)
119+
ch := make(chan int)
120+
operation := newMockNetworkOperation()
121+
vm := &vipManager{
122+
lg: lg,
123+
cfgGetter: newMockConfigGetter(config.HA{VirtualIP: "10.10.10.10/24", Interface: "eth0"}),
124+
operation: operation,
125+
}
126+
vm.election = newMockElection(ch, vm)
127+
childCtx, cancel := context.WithCancel(context.Background())
128+
vm.election.Start(childCtx)
129+
logIdx := 0
130+
for i, test := range tests {
131+
operation.hasIP.Store(test.hasIP)
132+
operation.hasIPErr.Store(test.hasIPErr)
133+
operation.addIPErr.Store(test.addIPErr)
134+
operation.delIPErr.Store(test.delIPErr)
135+
operation.sendArpErr.Store(test.sendArpErr)
136+
ch <- test.eventType
137+
require.Eventually(t, func() bool {
138+
return strings.Contains(text.String()[logIdx:], test.expectedLog)
139+
}, 3*time.Second, 10*time.Millisecond, "case %d", i)
140+
logIdx = len(text.String())
141+
}
142+
cancel()
143+
vm.Close()
144+
}

0 commit comments

Comments
 (0)