@@ -2,7 +2,7 @@ package core
22
33import (
44 "fmt"
5- "strconv "
5+ "sync "
66 "time"
77
88 "github.com/hdt3213/godis/aof"
@@ -26,6 +26,58 @@ func init() {
2626 RegisterCmd (startMigrationCommand , execStartMigration )
2727}
2828
29+ // slotsManager 负责管理当前 node 上的 slot
30+ type slotsManager struct {
31+ mu * sync.RWMutex
32+ slots map [uint32 ]* slotStatus // 记录当前node上的 slot
33+ importingTask * raft.MigratingTask
34+ }
35+
36+ const (
37+ slotStateHosting = iota
38+ slotStateImporting
39+ slotStateExporting
40+ )
41+
42+ type slotStatus struct {
43+ mu * sync.RWMutex
44+ state int
45+ keys * set.Set // 记录当前 slot 上的 key
46+
47+ exportSnapshot * set.Set // 开始传输时拷贝 slot 中的 key, 避免并发
48+ dirtyKeys * set.Set // 传输开始后被修改的key, 在传输结束阶段需要重传一遍
49+ }
50+
51+ func newSlotsManager () * slotsManager {
52+ return & slotsManager {
53+ mu : & sync.RWMutex {},
54+ slots : map [uint32 ]* slotStatus {},
55+ }
56+ }
57+
58+ func (ssm * slotsManager ) getSlot (index uint32 ) * slotStatus {
59+ ssm .mu .RLock ()
60+ slot := ssm .slots [index ]
61+ ssm .mu .RUnlock ()
62+ if slot != nil {
63+ return slot
64+ }
65+ ssm .mu .Lock ()
66+ defer ssm .mu .Unlock ()
67+ // check-lock-check
68+ slot = ssm .slots [index ]
69+ if slot != nil {
70+ return slot
71+ }
72+ slot = & slotStatus {
73+ state : slotStateHosting ,
74+ keys : set .Make (),
75+ mu : & sync.RWMutex {},
76+ }
77+ ssm .slots [index ] = slot
78+ return slot
79+ }
80+
2981func (sm * slotStatus ) startExporting () protocol.ErrorReply {
3082 sm .mu .Lock ()
3183 defer sm .mu .Unlock ()
@@ -44,6 +96,25 @@ func (sm *slotStatus) finishExportingWithinLock() {
4496 sm .exportSnapshot = nil
4597}
4698
99+ func (cluster * Cluster ) dropSlot (index uint32 ) {
100+ cluster .slotsManager .mu .RLock ()
101+ slot := cluster .slotsManager .slots [index ]
102+ cluster .slotsManager .mu .RUnlock ()
103+ if slot == nil {
104+ return
105+ }
106+ slot .mu .Lock ()
107+ defer slot .mu .Unlock ()
108+ c := connection .NewFakeConn ()
109+ slot .keys .ForEach (func (key string ) bool {
110+ cluster .LocalExec (c , utils .ToCmdLine ("del" , key ))
111+ return true
112+ })
113+ cluster .slotsManager .mu .Lock ()
114+ delete (cluster .slotsManager .slots , index )
115+ cluster .slotsManager .mu .Unlock ()
116+ }
117+
47118func (cluster * Cluster ) injectInsertCallback () {
48119 cb := func (dbIndex int , key string , entity * database.DataEntity ) {
49120 slotIndex := cluster .GetSlot (key )
@@ -187,32 +258,40 @@ func execFinishExport(cluster *Cluster, c redis.Connection, cmdLine CmdLine) red
187258 }
188259 logger .Infof ("finishing migration task %s, route changed" , taskId )
189260
261+ // clean migrated slots
262+ go func () {
263+ defer func () {
264+ if e := recover (); e != nil {
265+ logger .Errorf ("panic %v" , e )
266+ }
267+ }()
268+ for _ , index := range task .Slots {
269+ cluster .dropSlot (index )
270+ }
271+ }()
190272 c .Write (protocol .MakeOkReply ().ToBytes ())
191273 return & protocol.NoReply {}
192274}
193275
194276// execStartMigration receives startMigrationCommand from leader and start migration job at background
195- // command line: startMigrationCommand taskId srcNode slotId1 [slotId2]...
277+ // command line: startMigrationCommand taskId
196278func execStartMigration (cluster * Cluster , c redis.Connection , cmdLine CmdLine ) redis.Reply {
197- if len (cmdLine ) < 4 {
279+ if len (cmdLine ) != 2 {
198280 return protocol .MakeArgNumErrReply (startMigrationCommand )
199281 }
200282 taskId := string (cmdLine [1 ])
201- srcNode := string ( cmdLine [ 2 ])
202- var slotIds [] uint32
203- for _ , slotIdStr := range cmdLine [ 3 :] {
204- slotId , err := strconv . Atoi ( string ( slotIdStr ) )
205- if err ! = nil {
206- return protocol . MakeErrReply ( "illegal slot id: " + string ( slotIdStr ) )
283+
284+ var task * raft. MigratingTask
285+ for i := 0 ; i < 50 ; i ++ {
286+ task = cluster . raftNode . FSM . GetMigratingTask ( taskId )
287+ if task = = nil {
288+ time . Sleep ( time . Millisecond * 100 )
207289 }
208- slotIds = append (slotIds , uint32 (slotId ))
209290 }
210- task := & raft.MigratingTask {
211- ID : taskId ,
212- SrcNode : srcNode ,
213- TargetNode : cluster .SelfID (),
214- Slots : slotIds ,
291+ if task == nil {
292+ return protocol .MakeErrReply ("ERR get migrating task timeout" )
215293 }
294+
216295 cluster .slotsManager .mu .Lock ()
217296 cluster .slotsManager .importingTask = task
218297 cluster .slotsManager .mu .Unlock ()
0 commit comments