Skip to content

Commit b21cf6c

Browse files
committed
improve the canary features
Signed-off-by: Jiaxin Shan <[email protected]>
1 parent 1061839 commit b21cf6c

File tree

2 files changed

+28
-23
lines changed

2 files changed

+28
-23
lines changed

pkg/controller/stormservice/canary.go

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -135,19 +135,20 @@ func (r *StormServiceReconciler) processCanaryUpdate(ctx context.Context, stormS
135135
return r.handleCanaryPause(ctx, stormService)
136136
}
137137

138-
// Check if we're waiting for pause condition removal on a manual pause step
139-
currentStepIndex := canaryStatus.CurrentStep
140-
if currentStepIndex < int32(len(steps)) {
141-
currentStep := steps[currentStepIndex]
142-
// If current step is a manual pause step but no pause condition exists, advance
143-
if currentStep.Pause != nil && currentStep.Pause.IsManualPause() {
144-
if !r.hasPauseCondition(canaryStatus, orchestrationv1alpha1.PauseReasonCanaryPauseStep) {
145-
klog.Infof("Manual pause condition removed for StormService %s/%s, advancing to next step",
146-
stormService.Namespace, stormService.Name)
147-
return r.advanceCanaryStep(ctx, stormService)
148-
}
149-
}
150-
}
138+
// Check if we're waiting for pause condition removal on a manual pause step
139+
currentStepIndex := canaryStatus.CurrentStep
140+
if currentStepIndex < int32(len(steps)) {
141+
currentStep := steps[currentStepIndex]
142+
// Treat as resume ONLY if we've already marked PausedAt (i.e., entered this pause step)
143+
// and the CanaryPauseStep condition has been removed by the user.
144+
if currentStep.Pause != nil && currentStep.Pause.IsManualPause() {
145+
if canaryStatus.PausedAt != nil && !r.hasPauseCondition(canaryStatus, orchestrationv1alpha1.PauseReasonCanaryPauseStep) {
146+
klog.Infof("Manual pause condition removed for StormService %s/%s, advancing to next step",
147+
stormService.Namespace, stormService.Name)
148+
return r.advanceCanaryStep(ctx, stormService)
149+
}
150+
}
151+
}
151152

152153
// Process current step
153154
if currentStepIndex >= int32(len(steps)) {
@@ -211,7 +212,7 @@ func (r *StormServiceReconciler) processCanaryPauseStep(ctx context.Context, sto
211212
addStatusUpdate(func(status *orchestrationv1alpha1.CanaryStatus) {
212213
status.PausedAt = &now
213214
status.Phase = orchestrationv1alpha1.CanaryPhasePaused
214-
215+
215216
// Add pause condition to track why it's paused
216217
pauseCondition := orchestrationv1alpha1.PauseCondition{
217218
Reason: orchestrationv1alpha1.PauseReasonCanaryPauseStep,
@@ -297,8 +298,12 @@ func (r *StormServiceReconciler) processCanaryPauseStep(ctx context.Context, sto
297298
return ctrl.Result{}, fmt.Errorf("failed to add CanaryPauseStep pause condition: %w", err)
298299
}
299300
} else {
300-
r.EventRecorder.Eventf(stormService, "Normal", "CanaryPauseManual",
301-
"Canary paused at manual pause step. Remove CanaryPauseStep pause condition to continue")
301+
// Emit a consistent CanaryUpdate event even if the pause condition already exists
302+
update := newCanaryStatusUpdate().
303+
addEvent("Canary paused at manual pause step. Remove CanaryPauseStep pause condition to continue")
304+
if err := r.applyCanaryStatusUpdate(ctx, stormService, update); err != nil {
305+
return ctrl.Result{}, fmt.Errorf("failed to record manual pause event: %w", err)
306+
}
302307
}
303308

304309
// Requeue periodically to check if pause condition was removed
@@ -342,15 +347,15 @@ func (r *StormServiceReconciler) applyCanaryWeight(ctx context.Context, stormSer
342347

343348
// Check if this is a scaling event during canary (compare with current status replicas)
344349
currentStatusReplicas := stormService.Status.Replicas
345-
350+
346351
// If replicas changed during canary, log and notify but continue with recalculation
347352
if currentStatusReplicas > 0 && currentStatusReplicas != totalReplicas {
348-
klog.Infof("Scaling detected during canary for StormService %s/%s: %d -> %d replicas, recalculating distribution",
353+
klog.Infof("Scaling detected during canary for StormService %s/%s: %d -> %d replicas, recalculating distribution",
349354
stormService.Namespace, stormService.Name, currentStatusReplicas, totalReplicas)
350-
355+
351356
// Log the recalculation event
352357
r.EventRecorder.Eventf(stormService, "Normal", "CanaryScaling",
353-
"Recalculated canary distribution due to scaling: %d%% weight with %d total replicas (%d canary, %d stable)",
358+
"Recalculated canary distribution due to scaling: %d%% weight with %d total replicas (%d canary, %d stable)",
354359
weight, totalReplicas, canaryReplicas, stableReplicas)
355360
}
356361

@@ -401,7 +406,7 @@ func (r *StormServiceReconciler) advanceCanaryStep(ctx context.Context, stormSer
401406
update := newCanaryStatusUpdate().
402407
addStatusUpdate(func(status *orchestrationv1alpha1.CanaryStatus) {
403408
status.CurrentStep = nextStep
404-
status.PausedAt = nil // Clear pause timestamp
409+
status.PausedAt = nil // Clear pause timestamp
405410
status.PauseConditions = nil // Clear pause conditions when resuming
406411
status.Phase = orchestrationv1alpha1.CanaryPhaseProgressing
407412
})

test/e2e/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,12 @@ func waitForStormServiceReady(ctx context.Context, t *testing.T, k8sClient *kube
181181

182182
// Check if all replicas are ready
183183
if ss.Status.ReadyReplicas > 0 && ss.Status.ReadyReplicas == ss.Status.Replicas {
184-
t.Logf("StormService %s is ready: %d/%d replicas ready",
184+
t.Logf("StormService %s is ready: %d/%d replicas ready",
185185
stormService.Name, ss.Status.ReadyReplicas, ss.Status.Replicas)
186186
return true, nil
187187
}
188188

189-
t.Logf("Waiting for StormService %s to be ready: %d/%d replicas ready",
189+
t.Logf("Waiting for StormService %s to be ready: %d/%d replicas ready",
190190
stormService.Name, ss.Status.ReadyReplicas, ss.Status.Replicas)
191191
return false, nil
192192
})

0 commit comments

Comments
 (0)