Skip to content

Commit 9b2091f

Browse files
authored
*: retire after a new owner is elected (#632)
1 parent 21a813a commit 9b2091f

21 files changed

+242
-210
lines changed

pkg/balance/factor/mock_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ func (mmr *mockMetricsReader) GetBackendMetrics() []byte {
120120
return nil
121121
}
122122

123+
func (mmr *mockMetricsReader) PreClose() {
124+
}
125+
123126
func (mmr *mockMetricsReader) Close() {
124127
}
125128

pkg/balance/metricsreader/backend_reader.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,14 @@ func (br *BackendReader) getBackendAddrs(ctx context.Context, excludeZones []str
548548
return addrs, nil
549549
}
550550

551+
func (br *BackendReader) PreClose() {
552+
if br.election != nil {
553+
br.election.Close()
554+
}
555+
// pretend to be not owner
556+
br.isOwner.Store(false)
557+
}
558+
551559
func (br *BackendReader) Close() {
552560
if br.election != nil {
553561
br.election.Close()

pkg/balance/metricsreader/metrics_reader.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type MetricsReader interface {
3838
RemoveQueryExpr(key string)
3939
GetQueryResult(key string) QueryResult
4040
GetBackendMetrics() []byte
41+
PreClose()
4142
Close()
4243
}
4344

@@ -145,9 +146,21 @@ func (dmr *DefaultMetricsReader) GetBackendMetrics() []byte {
145146
return dmr.backendReader.GetBackendMetrics()
146147
}
147148

149+
func (dmr *DefaultMetricsReader) PreClose() {
150+
// No need to update results in the graceful shutdown.
151+
// Stop the loop before pre-closing the backend reader to avoid data race.
152+
if dmr.cancel != nil {
153+
dmr.cancel()
154+
dmr.cancel = nil
155+
}
156+
dmr.wg.Wait()
157+
dmr.backendReader.PreClose()
158+
}
159+
148160
func (dmr *DefaultMetricsReader) Close() {
149161
if dmr.cancel != nil {
150162
dmr.cancel()
163+
dmr.cancel = nil
151164
}
152165
dmr.wg.Wait()
153166
dmr.backendReader.Close()

pkg/balance/metricsreader/metrics_reader_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,6 @@ func TestFallback(t *testing.T) {
8282
require.False(t, qr.Empty())
8383
require.Equal(t, model.SampleValue(80.0), qr.Value.(model.Vector)[0].Value)
8484
require.GreaterOrEqual(t, qr.UpdateTime, ts)
85+
86+
mr.PreClose()
8587
}

pkg/manager/elect/election.go

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
"github.com/pingcap/tiproxy/lib/util/errors"
12-
"github.com/pingcap/tiproxy/lib/util/retry"
1312
"github.com/pingcap/tiproxy/lib/util/waitgroup"
1413
"github.com/pingcap/tiproxy/pkg/metrics"
1514
"github.com/pingcap/tiproxy/pkg/util/etcd"
@@ -22,7 +21,6 @@ import (
2221
)
2322

2423
const (
25-
logInterval = 10
2624
ownerKeyPrefix = "/tiproxy/"
2725
ownerKeySuffix = "/owner"
2826
)
@@ -40,23 +38,27 @@ type Election interface {
4038
ID() string
4139
// GetOwnerID gets the owner ID.
4240
GetOwnerID(ctx context.Context) (string, error)
43-
// Close stops compaining the owner.
41+
// Close resigns and but doesn't retire.
4442
Close()
4543
}
4644

4745
type ElectionConfig struct {
48-
Timeout time.Duration
49-
RetryIntvl time.Duration
50-
RetryCnt uint64
51-
SessionTTL int
46+
Timeout time.Duration
47+
RetryIntvl time.Duration
48+
QueryIntvl time.Duration
49+
WaitBeforeRetire time.Duration
50+
RetryCnt uint64
51+
SessionTTL int
5252
}
5353

5454
func DefaultElectionConfig(sessionTTL int) ElectionConfig {
5555
return ElectionConfig{
56-
Timeout: 2 * time.Second,
57-
RetryIntvl: 500 * time.Millisecond,
58-
RetryCnt: 3,
59-
SessionTTL: sessionTTL,
56+
Timeout: 2 * time.Second,
57+
RetryIntvl: 500 * time.Millisecond,
58+
QueryIntvl: 1 * time.Second,
59+
WaitBeforeRetire: 3 * time.Second,
60+
RetryCnt: 3,
61+
SessionTTL: sessionTTL,
6062
}
6163
}
6264

@@ -75,6 +77,7 @@ type election struct {
7577
wg waitgroup.WaitGroup
7678
cancel context.CancelFunc
7779
member Member
80+
isOwner bool
7881
}
7982

8083
// NewElection creates an Election.
@@ -108,45 +111,20 @@ func (m *election) ID() string {
108111
return m.id
109112
}
110113

111-
func (m *election) initSession(ctx context.Context) (*concurrency.Session, error) {
112-
var session *concurrency.Session
113-
// If the network breaks for sometime, the session will fail but it still needs to compaign after recovery.
114-
// So retry it infinitely.
115-
err := retry.RetryNotify(func() error {
116-
var err error
117-
// Do not use context.WithTimeout, otherwise the session will be cancelled after timeout, even if it is created successfully.
118-
session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx))
119-
return err
120-
}, ctx, m.cfg.RetryIntvl, retry.InfiniteCnt,
121-
func(err error, duration time.Duration) {
122-
m.lg.Warn("failed to init election session, retrying", zap.Error(err))
123-
}, logInterval)
124-
if err == nil {
125-
m.lg.Info("election session is initialized")
126-
} else {
127-
m.lg.Error("failed to init election session, quit", zap.Error(err))
128-
}
129-
return session, err
130-
}
131-
132114
func (m *election) campaignLoop(ctx context.Context) {
133-
session, err := m.initSession(ctx)
115+
session, err := concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx))
134116
if err != nil {
117+
m.lg.Error("new session failed, break campaign loop", zap.Error(errors.WithStack(err)))
135118
return
136119
}
137-
isOwner := false
138-
defer func() {
139-
if isOwner {
140-
m.onRetired()
141-
}
142-
}()
143120
for {
144121
select {
145122
case <-session.Done():
146123
m.lg.Info("etcd session is done, creates a new one")
147124
leaseID := session.Lease()
148-
if session, err = m.initSession(ctx); err != nil {
149-
m.lg.Error("new session failed, break campaign loop", zap.Error(err))
125+
session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.SessionTTL), concurrency.WithContext(ctx))
126+
if err != nil {
127+
m.lg.Error("new session failed, break campaign loop", zap.Error(errors.WithStack(err)))
150128
m.revokeLease(leaseID)
151129
return
152130
}
@@ -166,17 +144,21 @@ func (m *election) campaignLoop(ctx context.Context) {
166144
continue
167145
}
168146

169-
// Retire after the etcd server can be connected so that there will always be an owner.
170-
// It's allowed if multiple members act as the owner but it's not allowed if no member acts as the owner.
171-
// E.g. at least one member needs to bind the VIP.
172-
if isOwner {
173-
m.onRetired()
174-
isOwner = false
147+
var wg waitgroup.WaitGroup
148+
childCtx, cancel := context.WithCancel(ctx)
149+
if m.isOwner {
150+
// Check if another member becomes the new owner during campaign.
151+
wg.RunWithRecover(func() {
152+
m.waitRetire(childCtx)
153+
}, nil, m.lg)
175154
}
176155

177156
elec := concurrency.NewElection(session, m.key)
178-
if err = elec.Campaign(ctx, m.id); err != nil {
179-
m.lg.Info("failed to campaign", zap.Error(err))
157+
err = elec.Campaign(ctx, m.id)
158+
cancel()
159+
wg.Wait()
160+
if err != nil {
161+
m.lg.Info("failed to campaign", zap.Error(errors.WithStack(err)))
180162
continue
181163
}
182164

@@ -186,27 +168,61 @@ func (m *election) campaignLoop(ctx context.Context) {
186168
continue
187169
}
188170
if hack.String(kv.Value) != m.id {
189-
m.lg.Warn("owner id mismatches", zap.String("owner", hack.String(kv.Value)))
171+
// Campaign may finish without errors when the session is done.
172+
m.lg.Info("owner id mismatches", zap.String("owner", hack.String(kv.Value)))
173+
if m.isOwner {
174+
m.onRetired()
175+
}
190176
continue
191177
}
192178

193-
m.onElected()
194-
isOwner = true
179+
if !m.isOwner {
180+
m.onElected()
181+
} else {
182+
// It was the owner before the etcd failure and now is still the owner.
183+
m.lg.Info("still the owner")
184+
}
195185
m.watchOwner(ctx, session, hack.String(kv.Key))
196186
}
197187
}
198188

199189
func (m *election) onElected() {
190+
m.lg.Info("elected as the owner")
200191
m.member.OnElected()
192+
m.isOwner = true
201193
metrics.OwnerGauge.WithLabelValues(m.trimedKey).Set(1)
202-
m.lg.Info("elected as the owner")
203194
}
204195

205196
func (m *election) onRetired() {
197+
m.lg.Info("the owner retires")
206198
m.member.OnRetired()
199+
m.isOwner = false
207200
// Delete the metric so that it doesn't show on Grafana.
208201
metrics.OwnerGauge.MetricVec.DeletePartialMatch(map[string]string{metrics.LblType: m.trimedKey})
209-
m.lg.Info("the owner retires")
202+
}
203+
204+
// waitRetire retires after another member becomes the owner so that there will always be an owner.
205+
// It's allowed if multiple members act as the owner for some time but it's not allowed if no member acts as the owner.
206+
// E.g. at least one member needs to bind the VIP even if the etcd server leader is down.
207+
func (m *election) waitRetire(ctx context.Context) {
208+
ticker := time.NewTicker(m.cfg.QueryIntvl)
209+
defer ticker.Stop()
210+
for ctx.Err() == nil {
211+
select {
212+
case <-ticker.C:
213+
id, err := m.GetOwnerID(ctx)
214+
if err != nil {
215+
continue
216+
}
217+
// Another member becomes the owner, retire.
218+
if id != m.id {
219+
m.onRetired()
220+
return
221+
}
222+
case <-ctx.Done():
223+
return
224+
}
225+
}
210226
}
211227

212228
// revokeLease revokes the session lease so that other members can compaign immediately.
@@ -271,8 +287,8 @@ func (m *election) watchOwner(ctx context.Context, session *concurrency.Session,
271287
}
272288
}
273289

274-
// Close is called before the instance is going to shutdown.
275-
// It should hand over the owner to someone else.
290+
// Close is typically called before graceful shutdown. It resigns but doesn't retire or wait for the new owner.
291+
// The caller has to decide if it should retire after graceful wait.
276292
func (m *election) Close() {
277293
if m.cancel != nil {
278294
m.cancel()

pkg/manager/elect/election_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func TestElectOwner(t *testing.T) {
17-
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
17+
ts := newEtcdTestSuite(t, "key")
1818
t.Cleanup(ts.close)
1919

2020
// 2 nodes start and 1 node is the owner
@@ -23,6 +23,8 @@ func TestElectOwner(t *testing.T) {
2323
elec2 := ts.newElection("2")
2424
elec1.Start(context.Background())
2525
elec2.Start(context.Background())
26+
require.Equal(t, "1", elec1.ID())
27+
require.Equal(t, "2", elec2.ID())
2628
ownerID := ts.getOwnerID()
2729
ts.expectEvent(ownerID, eventTypeElected)
2830
}
@@ -31,7 +33,7 @@ func TestElectOwner(t *testing.T) {
3133
ownerID := ts.getOwnerID()
3234
elec := ts.getElection(ownerID)
3335
elec.Close()
34-
ts.expectEvent(ownerID, eventTypeRetired)
36+
ts.expectNoEvent(ownerID)
3537
ownerID2 := ts.getOwnerID()
3638
require.NotEqual(t, ownerID, ownerID2)
3739
ts.expectEvent(ownerID2, eventTypeElected)
@@ -49,10 +51,11 @@ func TestElectOwner(t *testing.T) {
4951
{
5052
elec := ts.getElection("3")
5153
elec.Close()
54+
ts.expectNoEvent("3")
5255
ownerID := ts.getOwnerID()
5356
elec = ts.getElection(ownerID)
5457
elec.Close()
55-
ts.expectEvent(ownerID, eventTypeRetired)
58+
ts.expectNoEvent(ownerID)
5659
_, err := elec.GetOwnerID(context.Background())
5760
require.Error(t, err)
5861
}
@@ -64,7 +67,7 @@ func TestElectOwner(t *testing.T) {
6467
}
6568

6669
func TestEtcdServerDown(t *testing.T) {
67-
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
70+
ts := newEtcdTestSuite(t, "key")
6871
t.Cleanup(ts.close)
6972

7073
elec1 := ts.newElection("1")
@@ -78,8 +81,8 @@ func TestEtcdServerDown(t *testing.T) {
7881
// the owner should not retire before the server is up again
7982
ts.expectNoEvent("1")
8083
ts.startServer(addr)
81-
// the previous owner only retires when the new one is elected
82-
ts.expectEvent("1", eventTypeRetired, eventTypeElected)
84+
// the owner should not retire because there's no other member
85+
ts.expectNoEvent("1")
8386
ownerID := ts.getOwnerID()
8487
require.Equal(t, "1", ownerID)
8588

@@ -89,16 +92,22 @@ func TestEtcdServerDown(t *testing.T) {
8992
require.Error(t, err)
9093
elec2 := ts.newElection("2")
9194
elec2.Start(context.Background())
95+
// the owner should not retire before the server is up again
96+
ts.expectNoEvent("1")
9297

9398
// start the server again and the elections recover
9499
ts.startServer(addr)
95100
ownerID = ts.getOwnerID()
96-
ts.expectEvent("1", eventTypeRetired)
97-
ts.expectEvent(ownerID, eventTypeElected)
101+
if ownerID == "1" {
102+
ts.expectNoEvent("1")
103+
} else {
104+
ts.expectEvent("1", eventTypeRetired)
105+
ts.expectEvent(ownerID, eventTypeElected)
106+
}
98107
}
99108

100109
func TestOwnerHang(t *testing.T) {
101-
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
110+
ts := newEtcdTestSuite(t, "key")
102111
t.Cleanup(ts.close)
103112

104113
// make the owner hang at loop

0 commit comments

Comments
 (0)