Skip to content

Commit 762aba8

Browse files
committed
Fix sourceNodeClusterInfo.MigratingSlot maybe nil when tryUpdateMigrationStatus
1 parent 686b338 commit 762aba8

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

controller/cluster.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -338,48 +338,72 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
338338
).Error("Failed to get the cluster info from the source node", zap.Error(err))
339339
continue
340340
}
341-
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
342-
log.Error("Mismatch migrating slot",
341+
342+
// If there is no migration information on the master node, you need to clear the migration information on the controller.
343+
if sourceNodeClusterInfo.MigratingSlot == nil {
344+
log.Error("Mismatch migrating slot, no migrating info on node",
343345
zap.Int("shard_index", i),
344-
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
346+
zap.String("source_node", sourceNode.Addr()),
345347
zap.String("migrating_slot", shard.MigratingSlot.String()),
346348
)
349+
clonedCluster.Shards[i].ClearMigrateState()
350+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
351+
log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err))
352+
return
353+
}
354+
c.updateCluster(clonedCluster)
355+
continue
356+
}
357+
// If the migration information on the master node is inconsistent with the controller, you need to clear the migration information on the controller.
358+
if sourceNodeClusterInfo.MigratingSlot != nil &&
359+
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
360+
log.Error("Mismatch migrating slot",
361+
zap.Int("shard_index", i),
362+
zap.String("source node cluster info migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
363+
zap.String("controller migrating_slot", shard.MigratingSlot.String()),
364+
)
365+
clonedCluster.Shards[i].ClearMigrateState()
366+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
367+
log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err))
368+
return
369+
}
370+
c.updateCluster(clonedCluster)
347371
continue
348372
}
373+
349374
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
350375
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
351376
return
352377
}
353378

379+
migratingSlot := shard.MigratingSlot.String()
354380
switch sourceNodeClusterInfo.MigratingState {
355381
case "none", "start":
356382
continue
357383
case "fail":
358-
migratingSlot := shard.MigratingSlot
359384
clonedCluster.Shards[i].ClearMigrateState()
360-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
385+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
361386
log.Error("Failed to update the cluster", zap.Error(err))
362387
return
363388
}
364389
c.updateCluster(clonedCluster)
365-
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
390+
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot))
366391
case "success":
367392
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange)
368393
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
369394
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange,
370395
)
371-
migratedSlot := shard.MigratingSlot
372396
clonedCluster.Shards[i].ClearMigrateState()
373-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
397+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
374398
log.Error("Failed to update the cluster", zap.Error(err))
375399
return
376400
} else {
377-
log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
401+
log.Info("Migrate the slot successfully", zap.String("slot", migratingSlot))
378402
}
379403
c.updateCluster(clonedCluster)
380404
default:
381405
clonedCluster.Shards[i].ClearMigrateState()
382-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
406+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
383407
log.Error("Failed to update the cluster", zap.Error(err))
384408
return
385409
}

0 commit comments

Comments
 (0)