Skip to content

Commit a14601d

Browse files
Add tikv raft feature
1 parent aa71268 commit a14601d

File tree

10 files changed

+322
-52
lines changed

10 files changed

+322
-52
lines changed

functional/tester/cluster_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ func Test_read(t *testing.T) {
6464
InitialCorruptCheck: true,
6565
Logger: "zap",
6666
LogOutputs: []string{"/tmp/etcd-functional-1/etcd.log"},
67-
Debug: true,
6867
},
6968
ClientCertData: "",
7069
ClientCertPath: "",
@@ -117,7 +116,6 @@ func Test_read(t *testing.T) {
117116
InitialCorruptCheck: true,
118117
Logger: "zap",
119118
LogOutputs: []string{"/tmp/etcd-functional-2/etcd.log"},
120-
Debug: true,
121119
},
122120
ClientCertData: "",
123121
ClientCertPath: "",
@@ -170,7 +168,6 @@ func Test_read(t *testing.T) {
170168
InitialCorruptCheck: true,
171169
Logger: "zap",
172170
LogOutputs: []string{"/tmp/etcd-functional-3/etcd.log"},
173-
Debug: true,
174171
},
175172
ClientCertData: "",
176173
ClientCertPath: "",

raft/confchange/confchange.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ type Changer struct {
4646
// (Section 4.3) corresponds to `C_{new,old}`.
4747
//
4848
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
49-
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
50-
cfg, prs, err := c.checkAndCopy()
49+
func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
50+
cfg, prs, changes, err := c.checkAndCopy()
5151
if err != nil {
5252
return c.err(err)
5353
}
@@ -68,11 +68,11 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker
6868
outgoing(cfg.Voters)[id] = struct{}{}
6969
}
7070

71-
if err := c.apply(&cfg, prs, ccs...); err != nil {
71+
if err := c.apply(&cfg, prs, changes, ccs...); err != nil {
7272
return c.err(err)
7373
}
7474
cfg.AutoLeave = autoLeave
75-
return checkAndReturn(cfg, prs)
75+
return checkAndReturn(cfg, prs, changes)
7676
}
7777

7878
// LeaveJoint transitions out of a joint configuration. It is an error to call
@@ -89,8 +89,8 @@ func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker
8989
// inserted into Learners.
9090
//
9191
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
92-
func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
93-
cfg, prs, err := c.checkAndCopy()
92+
func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
93+
cfg, prs, changes, err := c.checkAndCopy()
9494
if err != nil {
9595
return c.err(err)
9696
}
@@ -114,45 +114,46 @@ func (c Changer) LeaveJoint() (tracker.Config, tracker.ProgressMap, error) {
114114

115115
if !isVoter && !isLearner {
116116
delete(prs, id)
117+
changes.Removed = append(changes.Removed, id)
117118
}
118119
}
119120
*outgoingPtr(&cfg.Voters) = nil
120121
cfg.AutoLeave = false
121122

122-
return checkAndReturn(cfg, prs)
123+
return checkAndReturn(cfg, prs, changes)
123124
}
124125

125126
// Simple carries out a series of configuration changes that (in aggregate)
126127
// mutates the incoming majority config Voters[0] by at most one. This method
127128
// will return an error if that is not the case, if the resulting quorum is
128129
// zero, or if the configuration is in a joint state (i.e. if there is an
129130
// outgoing configuration).
130-
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {
131-
cfg, prs, err := c.checkAndCopy()
131+
func (c Changer) Simple(ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
132+
cfg, prs, changes, err := c.checkAndCopy()
132133
if err != nil {
133134
return c.err(err)
134135
}
135136
if joint(cfg) {
136137
err := errors.New("can't apply simple config change in joint config")
137138
return c.err(err)
138139
}
139-
if err := c.apply(&cfg, prs, ccs...); err != nil {
140+
if err := c.apply(&cfg, prs, changes, ccs...); err != nil {
140141
return c.err(err)
141142
}
142143
if n := symdiff(incoming(c.Tracker.Voters), incoming(cfg.Voters)); n > 1 {
143-
return tracker.Config{}, nil, errors.New("more than one voter changed without entering joint config")
144+
return tracker.Config{}, nil, nil, errors.New("more than one voter changed without entering joint config")
144145
}
145146
if err := checkInvariants(cfg, prs); err != nil {
146-
return tracker.Config{}, tracker.ProgressMap{}, nil
147+
return tracker.Config{}, tracker.ProgressMap{}, nil, nil
147148
}
148149

149-
return checkAndReturn(cfg, prs)
150+
return checkAndReturn(cfg, prs, changes)
150151
}
151152

152153
// apply a change to the configuration. By convention, changes to voters are
153154
// always made to the incoming majority config Voters[0]. Voters[1] is either
154155
// empty or preserves the outgoing majority configuration while in a joint state.
155-
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {
156+
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes, ccs ...pb.ConfChangeSingle) error {
156157
for _, cc := range ccs {
157158
if cc.NodeID == 0 {
158159
// etcd replaces the NodeID with zero if it decides (downstream of
@@ -162,11 +163,11 @@ func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.C
162163
}
163164
switch cc.Type {
164165
case pb.ConfChangeAddNode:
165-
c.makeVoter(cfg, prs, cc.NodeID)
166+
c.makeVoter(cfg, prs, changes, cc.NodeID)
166167
case pb.ConfChangeAddLearnerNode:
167-
c.makeLearner(cfg, prs, cc.NodeID)
168+
c.makeLearner(cfg, prs, changes, cc.NodeID)
168169
case pb.ConfChangeRemoveNode:
169-
c.remove(cfg, prs, cc.NodeID)
170+
c.remove(cfg, prs, changes, cc.NodeID)
170171
case pb.ConfChangeUpdateNode:
171172
default:
172173
return fmt.Errorf("unexpected conf type %d", cc.Type)
@@ -180,18 +181,17 @@ func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.C
180181

181182
// makeVoter adds or promotes the given ID to be a voter in the incoming
182183
// majority config.
183-
func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
184+
func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes, id uint64) {
184185
pr := prs[id]
185186
if pr == nil {
186-
c.initProgress(cfg, prs, id, false /* isLearner */)
187+
c.initProgress(cfg, prs, changes, id, false /* isLearner */)
187188
return
188189
}
189190

190191
pr.IsLearner = false
191192
nilAwareDelete(&cfg.Learners, id)
192193
nilAwareDelete(&cfg.LearnersNext, id)
193194
incoming(cfg.Voters)[id] = struct{}{}
194-
return
195195
}
196196

197197
// makeLearner makes the given ID a learner or stages it to be a learner once
@@ -207,17 +207,17 @@ func (c Changer) makeVoter(cfg *tracker.Config, prs tracker.ProgressMap, id uint
207207
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
208208
// be added to Learners the moment the outgoing config is removed by
209209
// LeaveJoint().
210-
func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
210+
func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes, id uint64) {
211211
pr := prs[id]
212212
if pr == nil {
213-
c.initProgress(cfg, prs, id, true /* isLearner */)
213+
c.initProgress(cfg, prs, changes, id, true /* isLearner */)
214214
return
215215
}
216216
if pr.IsLearner {
217217
return
218218
}
219219
// Remove any existing voter in the incoming config...
220-
c.remove(cfg, prs, id)
220+
c.remove(cfg, prs, changes, id)
221221
// ... but save the Progress.
222222
prs[id] = pr
223223
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
@@ -234,7 +234,7 @@ func (c Changer) makeLearner(cfg *tracker.Config, prs tracker.ProgressMap, id ui
234234
}
235235

236236
// remove this peer as a voter or learner from the incoming config.
237-
func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64) {
237+
func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes, id uint64) {
238238
if _, ok := prs[id]; !ok {
239239
return
240240
}
@@ -246,11 +246,12 @@ func (c Changer) remove(cfg *tracker.Config, prs tracker.ProgressMap, id uint64)
246246
// If the peer is still a voter in the outgoing config, keep the Progress.
247247
if _, onRight := outgoing(cfg.Voters)[id]; !onRight {
248248
delete(prs, id)
249+
changes.Removed = append(changes.Removed, id)
249250
}
250251
}
251252

252253
// initProgress initializes a new progress for the given node or learner.
253-
func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id uint64, isLearner bool) {
254+
func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes, id uint64, isLearner bool) {
254255
if !isLearner {
255256
incoming(cfg.Voters)[id] = struct{}{}
256257
} else {
@@ -274,6 +275,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u
274275
// before the added node has had a chance to communicate with us.
275276
RecentActive: true,
276277
}
278+
changes.Added = append(changes.Added, id)
277279
}
278280

279281
// checkInvariants makes sure that the config and progress are compatible with
@@ -340,30 +342,31 @@ func checkInvariants(cfg tracker.Config, prs tracker.ProgressMap) error {
340342
// checkAndCopy copies the tracker's config and progress map (deeply enough for
341343
// the purposes of the Changer) and returns those copies. It returns an error
342344
// if checkInvariants does.
343-
func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {
345+
func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
344346
cfg := c.Tracker.Config.Clone()
345347
prs := tracker.ProgressMap{}
348+
changes := &tracker.Changes{}
346349

347350
for id, pr := range c.Tracker.Progress {
348351
// A shallow copy is enough because we only mutate the Learner field.
349352
ppr := *pr
350353
prs[id] = &ppr
351354
}
352-
return checkAndReturn(cfg, prs)
355+
return checkAndReturn(cfg, prs, changes)
353356
}
354357

355358
// checkAndReturn calls checkInvariants on the input and returns either the
356359
// resulting error or the input.
357-
func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap) (tracker.Config, tracker.ProgressMap, error) {
360+
func checkAndReturn(cfg tracker.Config, prs tracker.ProgressMap, changes *tracker.Changes) (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
358361
if err := checkInvariants(cfg, prs); err != nil {
359-
return tracker.Config{}, tracker.ProgressMap{}, err
362+
return tracker.Config{}, tracker.ProgressMap{}, nil, err
360363
}
361-
return cfg, prs, nil
364+
return cfg, prs, changes, nil
362365
}
363366

364367
// err returns zero values and an error.
365-
func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, error) {
366-
return tracker.Config{}, nil, err
368+
func (c Changer) err(err error) (tracker.Config, tracker.ProgressMap, *tracker.Changes, error) {
369+
return tracker.Config{}, nil, nil, err
367370
}
368371

369372
// nilAwareAdd populates a map entry, creating the map if necessary.

raft/confchange/datadriven_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,18 @@ func TestConfChangeDataDriven(t *testing.T) {
8383
var err error
8484
switch d.Cmd {
8585
case "simple":
86-
cfg, prs, err = c.Simple(ccs...)
86+
cfg, prs, _, err = c.Simple(ccs...)
8787
case "enter-joint":
8888
var autoLeave bool
8989
if len(d.CmdArgs) > 0 {
9090
d.ScanArgs(t, "autoleave", &autoLeave)
9191
}
92-
cfg, prs, err = c.EnterJoint(autoLeave, ccs...)
92+
cfg, prs, _, err = c.EnterJoint(autoLeave, ccs...)
9393
case "leave-joint":
9494
if len(ccs) > 0 {
9595
err = errors.New("this command takes no input")
9696
} else {
97-
cfg, prs, err = c.LeaveJoint()
97+
cfg, prs, _, err = c.LeaveJoint()
9898
}
9999
default:
100100
return "unknown command"

raft/confchange/quick_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ func TestConfChangeQuick(t *testing.T) {
3737
const infoCount = 5
3838

3939
runWithJoint := func(c *Changer, ccs []pb.ConfChangeSingle) error {
40-
cfg, prs, err := c.EnterJoint(false /* autoLeave */, ccs...)
40+
cfg, prs, _, err := c.EnterJoint(false /* autoLeave */, ccs...)
4141
if err != nil {
4242
return err
4343
}
4444
// Also do this with autoLeave on, just to check that we'd get the same
4545
// result.
46-
cfg2a, prs2a, err := c.EnterJoint(true /* autoLeave */, ccs...)
46+
cfg2a, prs2a, _, err := c.EnterJoint(true /* autoLeave */, ccs...)
4747
if err != nil {
4848
return err
4949
}
@@ -54,14 +54,14 @@ func TestConfChangeQuick(t *testing.T) {
5454
}
5555
c.Tracker.Config = cfg
5656
c.Tracker.Progress = prs
57-
cfg2b, prs2b, err := c.LeaveJoint()
57+
cfg2b, prs2b, _, err := c.LeaveJoint()
5858
if err != nil {
5959
return err
6060
}
6161
// Reset back to the main branch with autoLeave=false.
6262
c.Tracker.Config = cfg
6363
c.Tracker.Progress = prs
64-
cfg, prs, err = c.LeaveJoint()
64+
cfg, prs, _, err = c.LeaveJoint()
6565
if err != nil {
6666
return err
6767
}
@@ -76,7 +76,7 @@ func TestConfChangeQuick(t *testing.T) {
7676

7777
runWithSimple := func(c *Changer, ccs []pb.ConfChangeSingle) error {
7878
for _, cc := range ccs {
79-
cfg, prs, err := c.Simple(cc)
79+
cfg, prs, _, err := c.Simple(cc)
8080
if err != nil {
8181
return err
8282
}

raft/confchange/restore.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap,
126126
for _, cc := range incoming {
127127
cc := cc // loop-local copy
128128
ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
129-
return chg.Simple(cc)
129+
c, t, _, err := chg.Simple(cc)
130+
return c, t, err
130131
})
131132
}
132133
} else {
@@ -138,7 +139,8 @@ func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap,
138139
for _, cc := range outgoing {
139140
cc := cc // loop-local copy
140141
ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
141-
return chg.Simple(cc)
142+
c, t, _, err := chg.Simple(cc)
143+
return c, t, err
142144
})
143145
}
144146
// Now enter the joint state, which rotates the above additions into the
@@ -147,7 +149,8 @@ func Restore(chg Changer, cs pb.ConfState) (tracker.Config, tracker.ProgressMap,
147149
// would be removing 2,3,4 and then adding in 1,2,3 while transitioning
148150
// into a joint state.
149151
ops = append(ops, func(chg Changer) (tracker.Config, tracker.ProgressMap, error) {
150-
return chg.EnterJoint(cs.AutoLeave, incoming...)
152+
c, t, _, err := chg.EnterJoint(cs.AutoLeave, incoming...)
153+
return c, t, err
151154
})
152155
}
153156

0 commit comments

Comments
 (0)