Skip to content

Commit d7d264f

Browse files
Fix sourceNodeClusterInfo.MigratingSlot maybe nil when tryUpdateMigrationStatus (#357)
1 parent 686b338 commit d7d264f

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

controller/cluster.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,48 +338,57 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
338338
).Error("Failed to get the cluster info from the source node", zap.Error(err))
339339
continue
340340
}
341-
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
341+
342+
// If there is no migration information on the source node or the source node migration slot is not equal to the shard,
343+
// you need to clear the migration information on the controller.
344+
if sourceNodeClusterInfo.MigratingSlot == nil || (sourceNodeClusterInfo.MigratingSlot != nil &&
345+
!sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange)) {
342346
log.Error("Mismatch migrating slot",
343347
zap.Int("shard_index", i),
344-
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
345348
zap.String("migrating_slot", shard.MigratingSlot.String()),
346349
)
350+
clonedCluster.Shards[i].ClearMigrateState()
351+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
352+
log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err))
353+
return
354+
}
355+
c.updateCluster(clonedCluster)
347356
continue
348357
}
358+
349359
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
350360
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
351361
return
352362
}
353363

364+
migratingSlot := shard.MigratingSlot.String()
354365
switch sourceNodeClusterInfo.MigratingState {
355366
case "none", "start":
356367
continue
357368
case "fail":
358-
migratingSlot := shard.MigratingSlot
359369
clonedCluster.Shards[i].ClearMigrateState()
360-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
370+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
361371
log.Error("Failed to update the cluster", zap.Error(err))
362372
return
363373
}
364374
c.updateCluster(clonedCluster)
365-
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
375+
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot))
366376
case "success":
367377
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange)
368378
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
369379
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange,
370380
)
371-
migratedSlot := shard.MigratingSlot
372381
clonedCluster.Shards[i].ClearMigrateState()
373-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
382+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
374383
log.Error("Failed to update the cluster", zap.Error(err))
375384
return
376385
} else {
377-
log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String()))
386+
log.Info("Migrate the slot successfully", zap.String("slot", migratingSlot))
378387
}
379388
c.updateCluster(clonedCluster)
380389
default:
381390
clonedCluster.Shards[i].ClearMigrateState()
382-
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
391+
if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
383392
log.Error("Failed to update the cluster", zap.Error(err))
384393
return
385394
}

0 commit comments

Comments
 (0)