Skip to content

Commit 4488532

Browse files
authored
dm: fix leader did not retire after delete the key (#11604) (#11626)
close #11602
1 parent e34b472 commit 4488532

File tree

2 files changed

+91
-4
lines changed

2 files changed

+91
-4
lines changed

dm/pkg/election/election.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
264264
e.l.Debug("begin to campaign", zap.Stringer("current member", e.info))
265265

266266
err2 := elec.Campaign(ctx2, e.infoStr)
267+
failpoint.Inject("mockCapaignSucceedButReturnErr", func() {
268+
if err2 == nil {
269+
err2 = errors.New("mock campaign succeed but return error")
270+
time.Sleep(time.Second)
271+
}
272+
})
267273
if err2 != nil {
268274
// because inner commit may return undetermined error, we try to delete the election key manually
269275
deleted, err3 := e.ClearSessionIfNeeded(ctx, e.ID())
@@ -282,6 +288,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
282288
var (
283289
leaderKey string
284290
leaderInfo *CampaignerInfo
291+
revision int64
285292
)
286293
eleObserveCh := elec.Observe(ctx2)
287294

@@ -300,6 +307,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
300307
e.l.Info("get response from election observe", zap.String("key", string(resp.Kvs[0].Key)), zap.String("value", string(resp.Kvs[0].Value)))
301308
leaderKey = string(resp.Kvs[0].Key)
302309
leaderInfo, err = getCampaignerInfo(resp.Kvs[0].Value)
310+
revision = resp.Header.Revision
303311
if err != nil {
304312
// this should never happened
305313
e.l.Error("fail to get leader information", zap.String("value", string(resp.Kvs[0].Value)), zap.Error(err))
@@ -330,7 +338,7 @@ func (e *Election) campaignLoop(ctx context.Context, session *concurrency.Sessio
330338

331339
e.l.Info("become leader", zap.Stringer("current member", e.info))
332340
e.notifyLeader(ctx, leaderInfo) // become the leader now
333-
e.watchLeader(ctx, session, leaderKey, elec)
341+
e.watchLeader(ctx, session, leaderKey, elec, revision)
334342
e.l.Info("retire from leader", zap.Stringer("current member", e.info))
335343
e.notifyLeader(ctx, nil) // need to re-campaign
336344
oldLeaderID = ""
@@ -359,8 +367,8 @@ func (e *Election) notifyLeader(ctx context.Context, leaderInfo *CampaignerInfo)
359367
}
360368
}
361369

362-
func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election) {
363-
e.l.Debug("watch leader key", zap.String("key", key))
370+
func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session, key string, elec *concurrency.Election, revision int64) {
371+
e.l.Debug("watch leader key", zap.String("key", key), zap.Int64("revision", revision), zap.Stringer("current member", e.info))
364372

365373
e.campaignMu.Lock()
366374
e.resignCh = make(chan struct{})
@@ -374,7 +382,7 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session
374382

375383
wCtx, cancel := context.WithCancel(ctx)
376384
defer cancel()
377-
wch := e.cli.Watch(wCtx, key)
385+
wch := e.cli.Watch(wCtx, key, clientv3.WithRev(revision+1))
378386

379387
for {
380388
if e.evictLeader.Load() {

dm/pkg/election/election_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,3 +412,82 @@ func (t *testElectionSuite) TestElectionDeleteKey(c *C) {
412412
c.Assert(err, IsNil)
413413
wg.Wait()
414414
}
415+
416+
func (t *testElectionSuite) TestElectionSucceedButReturnError(c *C) {
417+
var (
418+
timeout = 5 * time.Second
419+
sessionTTL = 60
420+
key = "unit-test/election-succeed-but-return-error"
421+
ID1 = "member1"
422+
ID2 = "member2"
423+
addr1 = "127.0.0.1:1"
424+
addr2 = "127.0.0.1:2"
425+
)
426+
cli, err := etcdutil.CreateClient([]string{t.endPoint}, nil)
427+
c.Assert(err, IsNil)
428+
defer cli.Close()
429+
430+
ctx1, cancel1 := context.WithCancel(context.Background())
431+
defer cancel1()
432+
433+
e1, err := NewElection(ctx1, cli, sessionTTL, key, ID1, addr1, t.notifyBlockTime)
434+
c.Assert(err, IsNil)
435+
defer e1.Close()
436+
437+
// e1 should become the leader
438+
select {
439+
case leader := <-e1.LeaderNotify():
440+
c.Assert(leader.ID, Equals, ID1)
441+
case <-time.After(timeout):
442+
c.Fatal("leader campaign timeout")
443+
}
444+
c.Assert(e1.IsLeader(), IsTrue)
445+
_, leaderID, leaderAddr, err := e1.LeaderInfo(ctx1)
446+
c.Assert(err, IsNil)
447+
c.Assert(leaderID, Equals, e1.ID())
448+
c.Assert(leaderAddr, Equals, addr1)
449+
450+
// start e2
451+
ctx2, cancel2 := context.WithCancel(context.Background())
452+
defer cancel2()
453+
e2, err := NewElection(ctx2, cli, sessionTTL, key, ID2, addr2, t.notifyBlockTime)
454+
c.Assert(err, IsNil)
455+
defer e2.Close()
456+
select {
457+
case leader := <-e2.leaderCh:
458+
c.Assert(leader.ID, Equals, ID1)
459+
case <-time.After(timeout):
460+
c.Fatal("leader campaign timeout")
461+
}
462+
// but the leader should still be e1
463+
_, leaderID, leaderAddr, err = e2.LeaderInfo(ctx2)
464+
c.Assert(err, IsNil)
465+
c.Assert(leaderID, Equals, e1.ID())
466+
c.Assert(leaderAddr, Equals, addr1)
467+
c.Assert(e2.IsLeader(), IsFalse)
468+
469+
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr", `return()`), IsNil)
470+
//nolint:errcheck
471+
defer failpoint.Disable("github.com/pingcap/tiflow/dm/pkg/election/mockCapaignSucceedButReturnErr")
472+
473+
e1.Close() // stop the campaign for e1
474+
c.Assert(e1.IsLeader(), IsFalse)
475+
476+
// e2 should become the leader
477+
select {
478+
case leader := <-e2.LeaderNotify():
479+
c.Assert(leader.ID, Equals, ID2)
480+
case <-time.After(timeout):
481+
c.Fatal("leader campaign timeout")
482+
}
483+
484+
// the leader retired after deleted the key
485+
select {
486+
case err2 := <-e2.ErrorNotify():
487+
c.Fatalf("delete the leader key should not get an error, %v", err2)
488+
case leader := <-e2.LeaderNotify():
489+
c.Assert(leader, IsNil)
490+
case <-time.After(timeout):
491+
c.Fatal("leader retire timeout")
492+
}
493+
}

0 commit comments

Comments
 (0)