Skip to content

Commit 3b64de9

Browse files
authored
elect, infosync: create Election to elect owner (#588)
1 parent e4b7832 commit 3b64de9

File tree

7 files changed

+589
-27
lines changed

7 files changed

+589
-27
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/spf13/cobra v1.6.1
2525
github.com/stretchr/testify v1.8.4
2626
github.com/tidwall/btree v1.5.2
27+
go.etcd.io/etcd/api/v3 v3.5.6
2728
go.etcd.io/etcd/client/pkg/v3 v3.5.6
2829
go.etcd.io/etcd/client/v3 v3.5.6
2930
go.etcd.io/etcd/server/v3 v3.5.6
@@ -92,7 +93,6 @@ require (
9293
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
9394
github.com/yusufpapurcu/wmi v1.2.3 // indirect
9495
go.etcd.io/bbolt v1.3.6 // indirect
95-
go.etcd.io/etcd/api/v3 v3.5.6 // indirect
9696
go.etcd.io/etcd/client/v2 v2.305.6 // indirect
9797
go.etcd.io/etcd/pkg/v3 v3.5.6 // indirect
9898
go.etcd.io/etcd/raft/v3 v3.5.6 // indirect

pkg/manager/elect/election.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elect
5+
6+
import (
7+
"context"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/pingcap/tiproxy/lib/util/errors"
12+
"github.com/pingcap/tiproxy/lib/util/retry"
13+
"github.com/pingcap/tiproxy/lib/util/waitgroup"
14+
"go.etcd.io/etcd/api/v3/mvccpb"
15+
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
16+
clientv3 "go.etcd.io/etcd/client/v3"
17+
"go.etcd.io/etcd/client/v3/concurrency"
18+
"go.uber.org/zap"
19+
)
20+
21+
const (
22+
logInterval = 10
23+
)
24+
25+
type Member interface {
26+
OnElected()
27+
OnRetired()
28+
}
29+
30+
// Election is used to campaign the owner and manage the owner information.
31+
type Election interface {
32+
// Start starts compaining the owner.
33+
Start(context.Context)
34+
// IsOwner returns whether the member is the owner.
35+
IsOwner() bool
36+
// GetOwnerID gets the owner ID.
37+
GetOwnerID(ctx context.Context) (string, error)
38+
// Close stops compaining the owner.
39+
Close()
40+
}
41+
42+
type electionConfig struct {
43+
timeout time.Duration
44+
retryIntvl time.Duration
45+
retryCnt uint64
46+
sessionTTL int
47+
}
48+
49+
var _ Election = (*election)(nil)
50+
51+
// election is used for electing owner.
52+
type election struct {
53+
cfg electionConfig
54+
// id is typically the instance address
55+
id string
56+
key string
57+
lg *zap.Logger
58+
etcdCli *clientv3.Client
59+
elec atomic.Pointer[concurrency.Election]
60+
wg waitgroup.WaitGroup
61+
cancel context.CancelFunc
62+
member Member
63+
}
64+
65+
// NewElection creates an Election.
66+
func NewElection(lg *zap.Logger, etcdCli *clientv3.Client, cfg electionConfig, id, key string, member Member) *election {
67+
lg = lg.With(zap.String("key", key), zap.String("id", id))
68+
return &election{
69+
lg: lg,
70+
etcdCli: etcdCli,
71+
cfg: cfg,
72+
id: id,
73+
key: key,
74+
member: member,
75+
}
76+
}
77+
78+
func (m *election) Start(ctx context.Context) {
79+
clientCtx, cancelFunc := context.WithCancel(ctx)
80+
m.cancel = cancelFunc
81+
// Don't recover because we don't know what will happen after recovery.
82+
m.wg.Run(func() {
83+
m.campaignLoop(clientCtx)
84+
})
85+
}
86+
87+
func (m *election) initSession(ctx context.Context) (*concurrency.Session, error) {
88+
var session *concurrency.Session
89+
// If the network breaks for sometime, the session will fail but it still needs to compaign after recovery.
90+
// So retry it infinitely.
91+
err := retry.RetryNotify(func() error {
92+
var err error
93+
// Do not use context.WithTimeout, otherwise the session will be cancelled after timeout, even if it is created successfully.
94+
session, err = concurrency.NewSession(m.etcdCli, concurrency.WithTTL(m.cfg.sessionTTL), concurrency.WithContext(ctx))
95+
return err
96+
}, ctx, m.cfg.retryIntvl, retry.InfiniteCnt,
97+
func(err error, duration time.Duration) {
98+
m.lg.Warn("failed to init election session, retrying", zap.Error(err))
99+
}, logInterval)
100+
if err == nil {
101+
m.lg.Info("election session is initialized")
102+
} else {
103+
m.lg.Error("failed to init election session, quit", zap.Error(err))
104+
}
105+
return session, err
106+
}
107+
108+
func (m *election) IsOwner() bool {
109+
return m.elec.Load() != nil
110+
}
111+
112+
func (m *election) campaignLoop(ctx context.Context) {
113+
session, err := m.initSession(ctx)
114+
if err != nil {
115+
return
116+
}
117+
for {
118+
select {
119+
case <-session.Done():
120+
m.lg.Info("etcd session is done, creates a new one")
121+
leaseID := session.Lease()
122+
if session, err = m.initSession(ctx); err != nil {
123+
m.lg.Error("new session failed, break campaign loop", zap.Error(err))
124+
m.revokeLease(leaseID)
125+
return
126+
}
127+
case <-ctx.Done():
128+
m.revokeLease(session.Lease())
129+
return
130+
default:
131+
}
132+
// If the etcd server turns clocks forward, the following case may occur.
133+
// The etcd server deletes this session's lease ID, but etcd session doesn't find it.
134+
// In this case if we do the campaign operation, the etcd server will return ErrLeaseNotFound.
135+
if errors.Is(err, rpctypes.ErrLeaseNotFound) {
136+
if session != nil {
137+
err = session.Close()
138+
m.lg.Warn("etcd session encounters ErrLeaseNotFound, close it", zap.Error(err))
139+
}
140+
continue
141+
}
142+
143+
elec := concurrency.NewElection(session, m.key)
144+
if err = elec.Campaign(ctx, m.id); err != nil {
145+
m.lg.Info("failed to campaign", zap.Error(err))
146+
continue
147+
}
148+
149+
ownerID, err := m.GetOwnerID(ctx)
150+
if err != nil || ownerID != m.id {
151+
continue
152+
}
153+
154+
m.onElected(elec)
155+
// NOTICE: watchOwner won't revoke the lease.
156+
m.watchOwner(ctx, session, ownerID)
157+
m.onRetired()
158+
}
159+
}
160+
161+
func (m *election) onElected(elec *concurrency.Election) {
162+
m.member.OnElected()
163+
m.elec.Store(elec)
164+
m.lg.Info("elected as the owner")
165+
}
166+
167+
func (m *election) onRetired() {
168+
m.member.OnRetired()
169+
m.elec.Store(nil)
170+
m.lg.Info("the owner retires")
171+
}
172+
173+
// revokeLease revokes the session lease so that other members can compaign immediately.
174+
func (m *election) revokeLease(leaseID clientv3.LeaseID) {
175+
// If revoke takes longer than the ttl, lease is expired anyway.
176+
// Don't use the context of the caller because it may be already done.
177+
cancelCtx, cancel := context.WithTimeout(context.Background(), time.Duration(m.cfg.sessionTTL)*time.Second)
178+
if _, err := m.etcdCli.Revoke(cancelCtx, leaseID); err != nil {
179+
m.lg.Warn("revoke session failed", zap.Error(errors.WithStack(err)))
180+
}
181+
cancel()
182+
}
183+
184+
// GetOwnerID is similar to concurrency.Election.Leader() but it doesn't need an concurrency.Election.
185+
func (m *election) GetOwnerID(ctx context.Context) (string, error) {
186+
var resp *clientv3.GetResponse
187+
err := retry.Retry(func() error {
188+
childCtx, cancel := context.WithTimeout(ctx, m.cfg.timeout)
189+
var err error
190+
resp, err = m.etcdCli.Get(childCtx, m.key, clientv3.WithFirstCreate()...)
191+
cancel()
192+
return errors.WithStack(err)
193+
}, ctx, m.cfg.retryIntvl, m.cfg.retryCnt)
194+
195+
if err != nil {
196+
m.lg.Error("failed to get owner info, quit", zap.Error(err))
197+
return "", err
198+
}
199+
if len(resp.Kvs) == 0 {
200+
return "", concurrency.ErrElectionNoLeader
201+
}
202+
return string(resp.Kvs[0].Value), nil
203+
}
204+
205+
func (m *election) watchOwner(ctx context.Context, session *concurrency.Session, key string) {
206+
watchCh := m.etcdCli.Watch(ctx, key)
207+
for {
208+
select {
209+
case resp, ok := <-watchCh:
210+
if !ok {
211+
m.lg.Info("watcher is closed, no owner")
212+
return
213+
}
214+
if resp.Canceled {
215+
m.lg.Info("watch canceled, no owner")
216+
return
217+
}
218+
219+
for _, ev := range resp.Events {
220+
if ev.Type == mvccpb.DELETE {
221+
m.lg.Info("watch failed, owner is deleted")
222+
return
223+
}
224+
}
225+
case <-session.Done():
226+
return
227+
case <-ctx.Done():
228+
return
229+
}
230+
}
231+
}
232+
233+
// Close is called before the instance is going to shutdown.
234+
// It should hand over the owner to someone else.
235+
func (m *election) Close() {
236+
m.cancel()
237+
m.wg.Wait()
238+
}

pkg/manager/elect/election_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package elect
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestElectOwner(t *testing.T) {
15+
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
16+
t.Cleanup(ts.close)
17+
18+
// 2 nodes start and 1 node is the owner
19+
{
20+
elec1 := ts.newElection("1")
21+
elec2 := ts.newElection("2")
22+
elec1.Start(context.Background())
23+
elec2.Start(context.Background())
24+
ownerID := ts.getOwnerID()
25+
ts.expectEvent(ownerID, eventTypeElected)
26+
}
27+
// stop the owner and the other becomes the owner
28+
{
29+
ownerID := ts.getOwnerID()
30+
elec := ts.getElection(ownerID)
31+
elec.Close()
32+
ts.expectEvent(ownerID, eventTypeRetired)
33+
ownerID2 := ts.getOwnerID()
34+
require.NotEqual(t, ownerID, ownerID2)
35+
ts.expectEvent(ownerID2, eventTypeElected)
36+
}
37+
// start a new node and the owner doesn't change
38+
{
39+
ownerID := ts.getOwnerID()
40+
elec := ts.newElection("3")
41+
elec.Start(context.Background())
42+
time.Sleep(300 * time.Millisecond)
43+
ownerID2 := ts.getOwnerID()
44+
require.Equal(t, ownerID, ownerID2)
45+
}
46+
// stop all the nodes and there's no owner
47+
{
48+
elec := ts.getElection("3")
49+
elec.Close()
50+
ownerID := ts.getOwnerID()
51+
elec = ts.getElection(ownerID)
52+
elec.Close()
53+
ts.expectEvent(ownerID, eventTypeRetired)
54+
_, err := elec.GetOwnerID(context.Background())
55+
require.Error(t, err)
56+
}
57+
}
58+
59+
func TestEtcdServerDown(t *testing.T) {
60+
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
61+
t.Cleanup(ts.close)
62+
63+
elec1 := ts.newElection("1")
64+
elec1.Start(context.Background())
65+
ts.expectEvent("1", eventTypeElected)
66+
67+
// server is down
68+
addr := ts.shutdownServer()
69+
_, err := elec1.GetOwnerID(context.Background())
70+
require.Error(t, err)
71+
ts.startServer(addr)
72+
// the previous owner only retires when the new one is elected
73+
ts.expectEvent("1", eventTypeRetired, eventTypeElected)
74+
ownerID := ts.getOwnerID()
75+
require.Equal(t, "1", ownerID)
76+
77+
// server is down and start another member
78+
addr = ts.shutdownServer()
79+
_, err = elec1.GetOwnerID(context.Background())
80+
require.Error(t, err)
81+
elec2 := ts.newElection("2")
82+
elec2.Start(context.Background())
83+
84+
// start the server again and the elections recover
85+
ts.startServer(addr)
86+
ownerID = ts.getOwnerID()
87+
ts.expectEvent("1", eventTypeRetired)
88+
ts.expectEvent(ownerID, eventTypeElected)
89+
}
90+
91+
func TestOwnerHang(t *testing.T) {
92+
ts := newEtcdTestSuite(t, electionConfigForTest(1), "key")
93+
t.Cleanup(ts.close)
94+
95+
// make the owner hang at loop
96+
elec1 := ts.newElection("1")
97+
ts.hang("1", true)
98+
defer ts.hang("1", false)
99+
elec1.Start(context.Background())
100+
ownerID := ts.getOwnerID()
101+
require.Equal(t, "1", ownerID)
102+
103+
// start another member
104+
elec2 := ts.newElection("2")
105+
elec2.Start(context.Background())
106+
// even if the owner hangs, it's keeping alive at background
107+
time.Sleep(time.Second)
108+
ownerID = ts.getOwnerID()
109+
require.Equal(t, "1", ownerID)
110+
}

0 commit comments

Comments
 (0)