Skip to content

Commit be8c1fc

Browse files
fix(drain): cancel stale drains on spec revert
When a scale-down or rolling update is reversed before the drain completes, pods stay stuck with drain annotations indefinitely, causing the Shard to report Progressing even though all pods are within the desired state. - Add clearDrainAnnotations helper in drain_helpers.go - Add isDrainStale check in reconcileDrainState before running ExecuteDrainStateMachine — only cancels at Requested state where no standby removal RPC has been sent yet - Guard against cancelling drains on: Draining/Acknowledged/ ReadyForDeletion states, deleting pods, DRAINED pods, extra pods beyond replica count, and spec-drifted pods - Add TestIsDrainStale with 7 cases covering all guard paths Prevents pods from getting stuck in drain limbo when the desired state reverts, allowing the Shard to recover to Healthy.
1 parent 0d2f603 commit be8c1fc

File tree

3 files changed

+199
-9
lines changed

3 files changed

+199
-9
lines changed

pkg/resource-handler/controller/shard/drain_helpers.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ func resolvePodRole(shard *multigresv1alpha1.Shard, podName string) string {
3131
return ""
3232
}
3333

34+
// clearDrainAnnotations removes all drain annotations from a pod via merge patch,
35+
// cancelling a drain that is no longer needed (e.g. scale-down reversed).
36+
func clearDrainAnnotations(ctx context.Context, k8sClient client.Client, pod *corev1.Pod) error {
37+
patch := client.MergeFrom(pod.DeepCopy())
38+
delete(pod.Annotations, metadata.AnnotationDrainState)
39+
delete(pod.Annotations, metadata.AnnotationDrainRequestedAt)
40+
if err := k8sClient.Patch(ctx, pod, patch); err != nil {
41+
return fmt.Errorf("failed to clear drain annotations for pod %s: %w", pod.Name, err)
42+
}
43+
return nil
44+
}
45+
3446
// initiateDrain sets the drain-requested annotation on a pod via merge patch,
3547
// starting the drain state machine: the reconciler removes the pod from the
3648
// sync standby list, unregisters it from etcd, then marks it ready-for-deletion.

pkg/resource-handler/controller/shard/reconcile_data_plane.go

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,22 +182,88 @@ func (r *ShardReconciler) reconcileDrainState(
182182
requeue := false
183183
for i := range podList.Items {
184184
pod := &podList.Items[i]
185-
if pod.Annotations[metadata.AnnotationDrainState] != "" {
186-
shouldRequeue, derr := drain.ExecuteDrainStateMachine(
187-
ctx, r.Client, r.RPCClient, r.Recorder, store, shard, pod,
188-
)
189-
if derr != nil {
190-
logger.Error(derr, "Failed to execute drain state machine", "pod", pod.Name)
191-
}
192-
if shouldRequeue {
193-
requeue = true
185+
state := pod.Annotations[metadata.AnnotationDrainState]
186+
if state == "" {
187+
continue
188+
}
189+
190+
if r.isDrainStale(shard, pod, state) {
191+
logger.Info("Cancelling stale drain: pod is within desired replicas and spec matches",
192+
"pod", pod.Name, "state", state)
193+
if err := clearDrainAnnotations(ctx, r.Client, pod); err != nil {
194+
logger.Error(err, "Failed to clear drain annotations", "pod", pod.Name)
194195
}
196+
r.Recorder.Eventf(shard, "Normal", "DrainCancelled",
197+
"Cancelled stale drain on pod %s (now within desired state)", pod.Name)
198+
continue
199+
}
200+
201+
shouldRequeue, derr := drain.ExecuteDrainStateMachine(
202+
ctx, r.Client, r.RPCClient, r.Recorder, store, shard, pod,
203+
)
204+
if derr != nil {
205+
logger.Error(derr, "Failed to execute drain state machine", "pod", pod.Name)
206+
}
207+
if shouldRequeue {
208+
requeue = true
195209
}
196210
}
197211

198212
return requeue, nil
199213
}
200214

215+
// isDrainStale returns true when a pod's drain is no longer needed because the
216+
// desired state has changed (e.g. scale-down reversed or rolling-update reverted).
217+
// Only early drain states (Requested/Draining) are cancellable — once Acknowledged,
218+
// etcd unregistration may have started and the drain must complete.
219+
func (r *ShardReconciler) isDrainStale(
220+
shard *multigresv1alpha1.Shard,
221+
pod *corev1.Pod,
222+
state string,
223+
) bool {
224+
// Only cancel Requested — nothing has happened yet at this point.
225+
// Draining means the standby removal RPC already succeeded and the pod
226+
// has been removed from the sync standby list; cancelling there would
227+
// leave an orphaned replica unless multiorch re-registers it.
228+
if state != metadata.DrainStateRequested {
229+
return false
230+
}
231+
232+
// Pods being deleted need the drain to cleanly unregister from etcd.
233+
if !pod.DeletionTimestamp.IsZero() {
234+
return false
235+
}
236+
237+
// DRAINED pods need replacement regardless of replica count or spec match.
238+
if resolvePodRole(shard, pod.Name) == "DRAINED" {
239+
return false
240+
}
241+
242+
poolName := pod.Labels[metadata.LabelMultigresPool]
243+
cellName := pod.Labels[metadata.LabelMultigresCell]
244+
if poolName == "" || cellName == "" {
245+
return false
246+
}
247+
248+
poolSpec, ok := shard.Spec.Pools[multigresv1alpha1.PoolName(poolName)]
249+
if !ok {
250+
return false
251+
}
252+
253+
replicas := DefaultPoolReplicas
254+
if poolSpec.ReplicasPerCell != nil {
255+
replicas = *poolSpec.ReplicasPerCell
256+
}
257+
258+
index := resolvePodIndex(pod.Name)
259+
if index < 0 || index >= int(replicas) {
260+
return false // Pod is still an extra pod for scale-down
261+
}
262+
263+
// Pod is within replica range — check if its spec still matches desired.
264+
return !podNeedsUpdate(pod, shard, poolName, cellName, poolSpec, index, r.Scheme)
265+
}
266+
201267
// getTopoStore returns a topology store, using the custom factory if set, otherwise the default.
202268
func (r *ShardReconciler) getTopoStore(shard *multigresv1alpha1.Shard) (topoclient.Store, error) {
203269
if r.CreateTopoStore != nil {

pkg/resource-handler/controller/shard/shard_controller_internal_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4411,6 +4411,118 @@ func TestUpdatePoolsStatus_TerminatingPodExcluded(t *testing.T) {
44114411
}
44124412
}
44134413

4414+
func TestIsDrainStale(t *testing.T) {
4415+
scheme := runtime.NewScheme()
4416+
_ = multigresv1alpha1.AddToScheme(scheme)
4417+
_ = corev1.AddToScheme(scheme)
4418+
4419+
shard := &multigresv1alpha1.Shard{
4420+
ObjectMeta: metav1.ObjectMeta{
4421+
Name: "test-shard-stale",
4422+
Namespace: "default",
4423+
Labels: map[string]string{
4424+
metadata.LabelMultigresCluster: "test-cluster",
4425+
},
4426+
},
4427+
Spec: multigresv1alpha1.ShardSpec{
4428+
DatabaseName: "db",
4429+
TableGroupName: "tg",
4430+
ShardName: "s1",
4431+
Pools: map[multigresv1alpha1.PoolName]multigresv1alpha1.PoolSpec{
4432+
"main": {
4433+
Cells: []multigresv1alpha1.CellName{"z1"},
4434+
ReplicasPerCell: ptr.To(int32(5)),
4435+
},
4436+
},
4437+
},
4438+
}
4439+
4440+
r := &ShardReconciler{Scheme: scheme}
4441+
4442+
// Build a pod with matching spec-hash for index 4
4443+
matchingPod := func(index int, drainState string) *corev1.Pod {
4444+
desired, err := BuildPoolPod(shard, "main", "z1", shard.Spec.Pools["main"], index, scheme)
4445+
if err != nil {
4446+
t.Fatalf("BuildPoolPod failed: %v", err)
4447+
}
4448+
hash := ComputeSpecHash(desired)
4449+
return &corev1.Pod{
4450+
ObjectMeta: metav1.ObjectMeta{
4451+
Name: BuildPoolPodName(shard, "main", "z1", index),
4452+
Namespace: "default",
4453+
Labels: buildPoolLabelsWithCell(shard, "main", "z1"),
4454+
Annotations: map[string]string{
4455+
metadata.AnnotationSpecHash: hash,
4456+
metadata.AnnotationDrainState: drainState,
4457+
metadata.AnnotationDrainRequestedAt: "2026-03-08T18:00:00Z",
4458+
},
4459+
},
4460+
}
4461+
}
4462+
4463+
t.Run("CancelsStaleScaleDownDrain", func(t *testing.T) {
4464+
pod := matchingPod(4, metadata.DrainStateRequested)
4465+
if !r.isDrainStale(shard, pod, metadata.DrainStateRequested) {
4466+
t.Error("expected drain to be stale (pod within replicas, spec matches)")
4467+
}
4468+
})
4469+
4470+
t.Run("DoesNotCancelDrainingState", func(t *testing.T) {
4471+
pod := matchingPod(4, metadata.DrainStateDraining)
4472+
if r.isDrainStale(shard, pod, metadata.DrainStateDraining) {
4473+
t.Error("expected drain NOT to be stale in Draining state (standby removal already sent)")
4474+
}
4475+
})
4476+
4477+
t.Run("DoesNotCancelExtraPodDrain", func(t *testing.T) {
4478+
// Reduce replicas so pod-4 (index 4) is extra
4479+
smallShard := shard.DeepCopy()
4480+
smallShard.Spec.Pools["main"] = multigresv1alpha1.PoolSpec{
4481+
Cells: []multigresv1alpha1.CellName{"z1"},
4482+
ReplicasPerCell: ptr.To(int32(4)),
4483+
}
4484+
pod := matchingPod(4, metadata.DrainStateRequested)
4485+
if r.isDrainStale(smallShard, pod, metadata.DrainStateRequested) {
4486+
t.Error("expected drain NOT to be stale (pod is extra)")
4487+
}
4488+
})
4489+
4490+
t.Run("DoesNotCancelAcknowledgedDrain", func(t *testing.T) {
4491+
pod := matchingPod(4, metadata.DrainStateAcknowledged)
4492+
if r.isDrainStale(shard, pod, metadata.DrainStateAcknowledged) {
4493+
t.Error("expected drain NOT to be stale (past point of no return)")
4494+
}
4495+
})
4496+
4497+
t.Run("DoesNotCancelDrainedPodDrain", func(t *testing.T) {
4498+
shardWithDrained := shard.DeepCopy()
4499+
pod := matchingPod(0, metadata.DrainStateRequested)
4500+
shardWithDrained.Status.PodRoles = map[string]string{
4501+
pod.Name: "DRAINED",
4502+
}
4503+
if r.isDrainStale(shardWithDrained, pod, metadata.DrainStateRequested) {
4504+
t.Error("expected drain NOT to be stale (pod role is DRAINED)")
4505+
}
4506+
})
4507+
4508+
t.Run("DoesNotCancelDrainOnDeletingPod", func(t *testing.T) {
4509+
pod := matchingPod(4, metadata.DrainStateRequested)
4510+
now := metav1.Now()
4511+
pod.DeletionTimestamp = &now
4512+
if r.isDrainStale(shard, pod, metadata.DrainStateRequested) {
4513+
t.Error("expected drain NOT to be stale (pod is being deleted)")
4514+
}
4515+
})
4516+
4517+
t.Run("DoesNotCancelWhenSpecDrifted", func(t *testing.T) {
4518+
pod := matchingPod(4, metadata.DrainStateRequested)
4519+
pod.Annotations[metadata.AnnotationSpecHash] = "wrong-hash"
4520+
if r.isDrainStale(shard, pod, metadata.DrainStateRequested) {
4521+
t.Error("expected drain NOT to be stale (spec-hash mismatch)")
4522+
}
4523+
})
4524+
}
4525+
44144526
func TestUpdatePoolsStatus_DrainAnnotationExcludedFromReady(t *testing.T) {
44154527
scheme := runtime.NewScheme()
44164528
_ = multigresv1alpha1.AddToScheme(scheme)

0 commit comments

Comments
 (0)