Skip to content

Commit 0032e54

Browse files
authored
Fix wrongly check if the migrating slot in the source node (#269)
* Fix wrongly check if the migrating slot in the source node Kvrocks will return `migrating_slot(s)` instead of `migrating_slot` after the slot range migration is supported. From the controller, we need to allow both of them to be compatiable with the old behavior. * Fix data race
1 parent fe4d22a commit 0032e54

File tree

3 files changed

+43
-14
lines changed

3 files changed

+43
-14
lines changed

controller/cluster.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,10 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
288288
return
289289
}
290290
if sourceNodeClusterInfo.MigratingSlot != shard.MigratingSlot {
291-
log.Error("Mismatch migrate slot", zap.Int("slot", shard.MigratingSlot))
291+
log.Error("Mismatch migrating slot",
292+
zap.Int("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot),
293+
zap.Int("migrating_slot", shard.MigratingSlot),
294+
)
292295
return
293296
}
294297
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
@@ -301,13 +304,14 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
301304
case "none", "start":
302305
continue
303306
case "fail":
307+
migratingSlot := shard.MigratingSlot
304308
clonedCluster.Shards[i].ClearMigrateState()
305309
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
306310
log.Error("Failed to update the cluster", zap.Error(err))
307311
return
308312
}
309313
c.updateCluster(clonedCluster)
310-
log.Warn("Failed to migrate the slot", zap.Int("slot", shard.MigratingSlot))
314+
log.Warn("Failed to migrate the slot", zap.Int("slot", migratingSlot))
311315
case "success":
312316
err := clonedCluster.SetSlot(ctx, shard.MigratingSlot, targetMasterNode.ID())
313317
if err != nil {

server/api/cluster_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ import (
2727
"net/http"
2828
"net/http/httptest"
2929
"testing"
30+
"time"
3031

3132
"github.com/gin-gonic/gin"
3233
"github.com/stretchr/testify/require"
3334

35+
"github.com/apache/kvrocks-controller/config"
3436
"github.com/apache/kvrocks-controller/consts"
37+
"github.com/apache/kvrocks-controller/controller"
3538
"github.com/apache/kvrocks-controller/server/middleware"
3639
"github.com/apache/kvrocks-controller/store"
3740
"github.com/apache/kvrocks-controller/store/engine"
@@ -117,10 +120,12 @@ func TestClusterBasics(t *testing.T) {
117120
})
118121

119122
t.Run("migrate slot only", func(t *testing.T) {
123+
handler := &ClusterHandler{s: store.NewClusterStore(engine.NewMock())}
124+
clusterName := "test-migrate-slot-only-cluster"
120125
recorder := httptest.NewRecorder()
121126
ctx := GetTestContext(recorder)
122127
ctx.Set(consts.ContextKeyStore, handler.s)
123-
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: "test-cluster"}}
128+
ctx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}}
124129
testMigrateReq := &MigrateSlotRequest{
125130
Slot: 3,
126131
SlotOnly: true,
@@ -130,18 +135,19 @@ func TestClusterBasics(t *testing.T) {
130135
require.NoError(t, err)
131136
ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
132137

133-
before, err := handler.s.GetCluster(ctx, ns, "test-cluster")
138+
cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:1111", "127.0.0.1:2222"}, 1)
139+
require.NoError(t, err)
140+
require.NoError(t, handler.s.CreateCluster(ctx, ns, cluster))
141+
142+
before, err := handler.s.GetCluster(ctx, ns, clusterName)
134143
require.NoError(t, err)
135144
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 8191}, before.Shards[0].SlotRanges[0])
136145
require.EqualValues(t, store.SlotRange{Start: 8192, Stop: store.MaxSlotID}, before.Shards[1].SlotRanges[0])
137146

138147
middleware.RequiredCluster(ctx)
139-
if recorder.Code != http.StatusOK {
140-
return
141-
}
142148
handler.MigrateSlot(ctx)
143149
require.Equal(t, http.StatusOK, recorder.Code)
144-
after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
150+
after, err := handler.s.GetCluster(ctx, ns, clusterName)
145151
require.NoError(t, err)
146152

147153
require.EqualValues(t, before.Version.Add(1), after.Version.Load())
@@ -221,11 +227,12 @@ func TestClusterMigrateData(t *testing.T) {
221227
require.NoError(t, node.SyncClusterInfo(ctx, cluster))
222228
}
223229
}
230+
handler.s.CreateCluster(ctx, ns, cluster)
224231

225232
recorder := httptest.NewRecorder()
226233
reqCtx := GetTestContext(recorder)
227234
reqCtx.Set(consts.ContextKeyStore, handler.s)
228-
reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: "test-cluster"}}
235+
reqCtx.Params = []gin.Param{{Key: "namespace", Value: ns}, {Key: "cluster", Value: clusterName}}
229236
testMigrateReq := &MigrateSlotRequest{
230237
Slot: 0,
231238
Target: 1,
@@ -234,16 +241,33 @@ func TestClusterMigrateData(t *testing.T) {
234241
require.NoError(t, err)
235242
reqCtx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
236243
middleware.RequiredCluster(reqCtx)
237-
if recorder.Code != http.StatusOK {
238-
return
239-
}
240244
handler.MigrateSlot(reqCtx)
241245
require.Equal(t, http.StatusOK, recorder.Code)
242246

243-
gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
247+
gotCluster, err := handler.s.GetCluster(ctx, ns, clusterName)
244248
require.NoError(t, err)
245249
require.EqualValues(t, 1, gotCluster.Version.Load())
246250
require.Len(t, gotCluster.Shards[0].SlotRanges, 1)
247251
require.EqualValues(t, 0, gotCluster.Shards[0].MigratingSlot)
248252
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
253+
254+
ctrl, err := controller.New(handler.s.(*store.ClusterStore), &config.ControllerConfig{
255+
FailOver: &config.FailOverConfig{
256+
PingIntervalSeconds: 1,
257+
MaxPingCount: 3,
258+
}})
259+
require.NoError(t, err)
260+
require.NoError(t, ctrl.Start(ctx))
261+
ctrl.WaitForReady()
262+
defer ctrl.Close()
263+
264+
// Migration will be failed due to the source node cannot connect to the target node,
265+
// we just use it to confirm if the migration loop took effected.
266+
require.Eventually(t, func() bool {
267+
gotCluster, err := handler.s.GetCluster(ctx, ns, "test-cluster")
268+
if err != nil {
269+
return false
270+
}
271+
return gotCluster.Shards[0].MigratingSlot == -1
272+
}, 10*time.Second, 100*time.Millisecond)
249273
}

store/cluster_node.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
191191
if err != nil {
192192
return nil, err
193193
}
194-
case "migrating_slot":
194+
case "migrating_slot", "migrating_slot(s)":
195+
// TODO(@git-hulk): handle multiple migrating slots
195196
clusterInfo.MigratingSlot, err = strconv.Atoi(fields[1])
196197
if err != nil {
197198
return nil, err

0 commit comments

Comments
 (0)