Skip to content

Commit 4f4744a

Browse files
authored
elect: retire the owner after the etcd server recovers (#627)
1 parent f99c3c0 commit 4f4744a

File tree

6 files changed

+59
-45
lines changed

6 files changed

+59
-45
lines changed

pkg/balance/metricsreader/backend_reader.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,8 @@ func (br *BackendReader) initElection(ctx context.Context, cfg *config.Config) e
119119
} else {
120120
key = fmt.Sprintf("%s/%s", readerOwnerKeyPrefix, readerOwnerKeySuffix)
121121
}
122-
election := elect.NewElection(br.lg, br.etcdCli, br.electionCfg, id, key, br)
123-
br.election = election
124-
election.Start(ctx)
122+
br.election = elect.NewElection(br.lg.Named("elect"), br.etcdCli, br.electionCfg, id, key, br)
123+
br.election.Start(ctx)
125124
return nil
126125
}
127126

pkg/manager/elect/election.go

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package elect
66
import (
77
"context"
88
"strings"
9-
"sync/atomic"
109
"time"
1110

1211
"github.com/pingcap/tiproxy/lib/util/errors"
@@ -39,8 +38,6 @@ type Election interface {
3938
Start(context.Context)
4039
// ID returns the member ID.
4140
ID() string
42-
// IsOwner returns whether the member is the owner.
43-
IsOwner() bool
4441
// GetOwnerID gets the owner ID.
4542
GetOwnerID(ctx context.Context) (string, error)
4643
// Close stops compaining the owner.
@@ -75,7 +72,6 @@ type election struct {
7572
trimedKey string
7673
lg *zap.Logger
7774
etcdCli *clientv3.Client
78-
elec atomic.Pointer[concurrency.Election]
7975
wg waitgroup.WaitGroup
8076
cancel context.CancelFunc
8177
member Member
@@ -133,19 +129,17 @@ func (m *election) initSession(ctx context.Context) (*concurrency.Session, error
133129
return session, err
134130
}
135131

136-
func (m *election) IsOwner() bool {
137-
ownerID, err := m.GetOwnerID(context.Background())
138-
if err != nil {
139-
return false
140-
}
141-
return ownerID == m.id
142-
}
143-
144132
func (m *election) campaignLoop(ctx context.Context) {
145133
session, err := m.initSession(ctx)
146134
if err != nil {
147135
return
148136
}
137+
isOwner := false
138+
defer func() {
139+
if isOwner {
140+
m.onRetired()
141+
}
142+
}()
149143
for {
150144
select {
151145
case <-session.Done():
@@ -172,34 +166,44 @@ func (m *election) campaignLoop(ctx context.Context) {
172166
continue
173167
}
174168

169+
// Retire after the etcd server can be connected so that there will always be an owner.
170+
// It's allowed if multiple members act as the owner but it's not allowed if no member acts as the owner.
171+
// E.g. at least one member needs to bind the VIP.
172+
if isOwner {
173+
m.onRetired()
174+
isOwner = false
175+
}
176+
175177
elec := concurrency.NewElection(session, m.key)
176178
if err = elec.Campaign(ctx, m.id); err != nil {
177179
m.lg.Info("failed to campaign", zap.Error(err))
178180
continue
179181
}
180182

181-
ownerID, err := m.GetOwnerID(ctx)
182-
if err != nil || ownerID != m.id {
183+
kv, err := m.getOwnerInfo(ctx)
184+
if err != nil {
185+
m.lg.Warn("failed to get owner info", zap.Error(err))
186+
continue
187+
}
188+
if hack.String(kv.Value) != m.id {
189+
m.lg.Warn("owner id mismatches", zap.String("owner", hack.String(kv.Value)))
183190
continue
184191
}
185192

186-
m.onElected(elec)
187-
// NOTICE: watchOwner won't revoke the lease.
188-
m.watchOwner(ctx, session, ownerID)
189-
m.onRetired()
193+
m.onElected()
194+
isOwner = true
195+
m.watchOwner(ctx, session, hack.String(kv.Key))
190196
}
191197
}
192198

193-
func (m *election) onElected(elec *concurrency.Election) {
199+
func (m *election) onElected() {
194200
m.member.OnElected()
195-
m.elec.Store(elec)
196201
metrics.OwnerGauge.WithLabelValues(m.trimedKey).Set(1)
197202
m.lg.Info("elected as the owner")
198203
}
199204

200205
func (m *election) onRetired() {
201206
m.member.OnRetired()
202-
m.elec.Store(nil)
203207
// Delete the metric so that it doesn't show on Grafana.
204208
metrics.OwnerGauge.MetricVec.DeletePartialMatch(map[string]string{metrics.LblType: m.trimedKey})
205209
m.lg.Info("the owner retires")
@@ -218,17 +222,25 @@ func (m *election) revokeLease(leaseID clientv3.LeaseID) {
218222

219223
// GetOwnerID is similar to concurrency.Election.Leader() but it doesn't need an concurrency.Election.
220224
func (m *election) GetOwnerID(ctx context.Context) (string, error) {
225+
kv, err := m.getOwnerInfo(ctx)
226+
if err != nil {
227+
return "", err
228+
}
229+
return hack.String(kv.Value), nil
230+
}
231+
232+
func (m *election) getOwnerInfo(ctx context.Context) (*mvccpb.KeyValue, error) {
221233
if m.etcdCli == nil {
222-
return "", concurrency.ErrElectionNoLeader
234+
return nil, concurrency.ErrElectionNoLeader
223235
}
224236
kvs, err := etcd.GetKVs(ctx, m.etcdCli, m.key, clientv3.WithFirstCreate(), m.cfg.Timeout, m.cfg.RetryIntvl, m.cfg.RetryCnt)
225237
if err != nil {
226-
return "", err
238+
return nil, err
227239
}
228240
if len(kvs) == 0 {
229-
return "", concurrency.ErrElectionNoLeader
241+
return nil, concurrency.ErrElectionNoLeader
230242
}
231-
return hack.String(kvs[0].Value), nil
243+
return kvs[0], nil
232244
}
233245

234246
func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) {

pkg/manager/elect/election_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func TestEtcdServerDown(t *testing.T) {
7575
addr := ts.shutdownServer()
7676
_, err := elec1.GetOwnerID(context.Background())
7777
require.Error(t, err)
78+
// the owner should not retire before the server is up again
79+
ts.expectNoEvent("1")
7880
ts.startServer(addr)
7981
// the previous owner only retires when the new one is elected
8082
ts.expectEvent("1", eventTypeRetired, eventTypeElected)
@@ -133,15 +135,15 @@ func TestOwnerMetric(t *testing.T) {
133135
}
134136

135137
elec1 := NewElection(lg, nil, electionConfigForTest(1), "1", ownerKeyPrefix+"key"+ownerKeySuffix, newMockMember())
136-
elec1.onElected(nil)
138+
elec1.onElected()
137139
checkMetric("key", true)
138140

139141
elec2 := NewElection(lg, nil, electionConfigForTest(1), "1", "key2/1", newMockMember())
140-
elec2.onElected(nil)
142+
elec2.onElected()
141143
checkMetric("key2/1", true)
142144

143145
elec3 := NewElection(lg, nil, electionConfigForTest(1), "1", ownerKeyPrefix+"key3/1", newMockMember())
144-
elec3.onElected(nil)
146+
elec3.onElected()
145147
checkMetric("key3/1", true)
146148

147149
elec1.onRetired()

pkg/manager/elect/mock_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ func (mo *mockMember) expectEvent(t *testing.T, expected ...int) {
5151
}
5252
}
5353

54+
func (mo *mockMember) expectNoEvent(t *testing.T) {
55+
select {
56+
case <-time.After(100 * time.Millisecond):
57+
return
58+
case event := <-mo.ch:
59+
require.Fail(t, "unexpected event", event)
60+
}
61+
}
62+
5463
func (mo *mockMember) hang(hang bool) {
5564
contn := true
5665
for contn {
@@ -144,7 +153,6 @@ func (ts *etcdTestSuite) getOwnerID() string {
144153
} else {
145154
require.Equal(ts.t, ownerID, id)
146155
}
147-
require.Equal(ts.t, elec.id == ownerID, elec.IsOwner())
148156
}
149157
return ownerID
150158
}
@@ -154,6 +162,11 @@ func (ts *etcdTestSuite) expectEvent(id string, event ...int) {
154162
elec.member.(*mockMember).expectEvent(ts.t, event...)
155163
}
156164

165+
func (ts *etcdTestSuite) expectNoEvent(id string) {
166+
elec := ts.getElection(id)
167+
elec.member.(*mockMember).expectNoEvent(ts.t)
168+
}
169+
157170
func (ts *etcdTestSuite) hang(id string, hang bool) {
158171
elec := ts.getElection(id)
159172
elec.member.(*mockMember).hang(hang)

pkg/manager/vip/manager.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,7 @@ func (vm *vipManager) Start(ctx context.Context, etcdCli *clientv3.Client) error
7171

7272
id := net.JoinHostPort(ip, port)
7373
electionCfg := elect.DefaultElectionConfig(sessionTTL)
74-
election := elect.NewElection(vm.lg, etcdCli, electionCfg, id, vipKey, vm)
75-
vm.election = election
76-
// Check the ownership at startup just in case the node is just down and restarted.
77-
// Before it was down, it may be either the owner or not.
78-
if election.IsOwner() {
79-
vm.OnElected()
80-
} else {
81-
vm.OnRetired()
82-
}
74+
vm.election = elect.NewElection(vm.lg.Named("elect"), etcdCli, electionCfg, id, vipKey, vm)
8375
vm.election.Start(ctx)
8476
return nil
8577
}

pkg/manager/vip/mock_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ func (me *mockElection) GetOwnerID(ctx context.Context) (string, error) {
5757
return "", nil
5858
}
5959

60-
func (me *mockElection) IsOwner() bool {
61-
return true
62-
}
63-
6460
func (me *mockElection) Start(ctx context.Context) {
6561
me.wg.Run(func() {
6662
for {

0 commit comments

Comments
 (0)