diff --git a/api/v1alpha1/temporalworker_webhook.go b/api/v1alpha1/temporalworker_webhook.go index 27d3f6bf..5037eda0 100644 --- a/api/v1alpha1/temporalworker_webhook.go +++ b/api/v1alpha1/temporalworker_webhook.go @@ -7,15 +7,15 @@ package v1alpha1 import ( "context" "fmt" + "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" - "time" ) const ( @@ -73,21 +73,59 @@ func (r *TemporalWorkerDeployment) validateForUpdateOrCreate(ctx context.Context return nil, apierrors.NewBadRequest("expected a TemporalWorkerDeployment") } + return validateForUpdateOrCreate(nil, dep) +} + +func validateForUpdateOrCreate(old, new *TemporalWorkerDeployment) (admission.Warnings, error) { var allErrs field.ErrorList - if len(dep.Name) > maxTemporalWorkerDeploymentNameLen { + if len(new.GetName()) > maxTemporalWorkerDeploymentNameLen { allErrs = append(allErrs, - field.Invalid(field.NewPath("name"), dep.Name, fmt.Sprintf("cannot be more than %d characters", maxTemporalWorkerDeploymentNameLen)), + field.Invalid(field.NewPath("metadata.name"), new.GetName(), fmt.Sprintf("cannot be more than %d characters", maxTemporalWorkerDeploymentNameLen)), ) } + allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...) + if len(allErrs) > 0 { - return nil, apierrors.NewInvalid( - schema.GroupKind{Group: "your.group", Kind: "TemporalWorkerDeployment"}, - dep.Name, - allErrs, - ) + return nil, newInvalidErr(new, allErrs) } return nil, nil } + +func validateRolloutStrategy(s RolloutStrategy) []*field.Error { + var allErrs []*field.Error + + if s.Strategy == UpdateProgressive { + rolloutSteps := s.Steps + if len(rolloutSteps) == 0 { + allErrs = append(allErrs, + field.Invalid(field.NewPath("spec.cutover.steps"), rolloutSteps, "steps are required for Progressive cutover"), + ) + } + var lastRamp float32 + for i, s := range rolloutSteps { + // Check duration >= 30s + if s.PauseDuration.Duration < 30*time.Second { + allErrs = append(allErrs, + field.Invalid(field.NewPath(fmt.Sprintf("spec.cutover.steps[%d].pauseDuration", i)), s.PauseDuration.Duration.String(), "pause duration must be at least 30s"), + ) + } + + // Check ramp value greater than last + if s.RampPercentage <= lastRamp { + allErrs = append(allErrs, + field.Invalid(field.NewPath(fmt.Sprintf("spec.cutover.steps[%d].rampPercentage", i)), s.RampPercentage, "rampPercentage must increase between each step"), + ) + } + lastRamp = s.RampPercentage + } + } + + return allErrs +} + +func newInvalidErr(dep *TemporalWorkerDeployment, errs field.ErrorList) *apierrors.StatusError { + return apierrors.NewInvalid(dep.GroupVersionKind().GroupKind(), dep.GetName(), errs) +} diff --git a/api/v1alpha1/temporalworker_webhook_test.go b/api/v1alpha1/temporalworker_webhook_test.go index acaf37f2..f9d6ef8a 100644 --- a/api/v1alpha1/temporalworker_webhook_test.go +++ b/api/v1alpha1/temporalworker_webhook_test.go @@ -2,102 +2,132 @@ // // This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. -package v1alpha1 +package v1alpha1_test import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" ) func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { - tests := []struct { - name string - obj runtime.Object - expectError bool - errorMsg string + tests := map[string]struct { + obj runtime.Object + errorMsg string }{ - { - name: "valid temporal worker deployment", - obj: MakeTWDWithName("valid-worker"), - expectError: false, + "valid temporal worker deployment": { + obj: testhelpers.MakeTWDWithName("valid-worker"), }, - { - name: "temporal worker deployment with name too long", - obj: MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters"), - expectError: true, - errorMsg: "cannot be more than 63 characters", + "temporal worker deployment with name too long": { + obj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters"), + errorMsg: "cannot be more than 63 characters", }, - { - name: "invalid object type", + "invalid object type": { obj: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test", }, }, - expectError: true, - errorMsg: "expected a TemporalWorkerDeployment", + errorMsg: "expected a TemporalWorkerDeployment", + }, + "missing rollout steps": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-missing-steps"), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive + obj.Spec.RolloutStrategy.Steps = nil + return obj + }), + errorMsg: "spec.cutover.steps: Invalid value: []v1alpha1.RolloutStep(nil): steps are required for Progressive cutover", + }, + "ramp value for step <= previous step": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps"), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive + obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{ + {5, metav1.Duration{Duration: time.Minute}}, + {10, metav1.Duration{Duration: time.Minute}}, + {9, metav1.Duration{Duration: time.Minute}}, + {50, metav1.Duration{Duration: time.Minute}}, + {50, metav1.Duration{Duration: time.Minute}}, + {75, metav1.Duration{Duration: time.Minute}}, + } + return obj + }), + errorMsg: "[spec.cutover.steps[2].rampPercentage: Invalid value: 9: rampPercentage must increase between each step, spec.cutover.steps[4].rampPercentage: Invalid value: 50: rampPercentage must increase between each step]", + }, + "pause duration < 30s": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps"), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive + obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{ + {10, metav1.Duration{Duration: time.Minute}}, + {25, metav1.Duration{Duration: 10 * time.Second}}, + {50, metav1.Duration{Duration: time.Minute}}, + } + return obj + }), + errorMsg: `spec.cutover.steps[1].pauseDuration: Invalid value: "10s": pause duration must be at least 30s`, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for name, tc := range tests { + t.Run(name, func(t *testing.T) { ctx := context.Background() - webhook := &TemporalWorkerDeployment{} - - warnings, err := webhook.ValidateCreate(ctx, tt.obj) - - if tt.expectError { - require.Error(t, err) - assert.Contains(t, err.Error(), tt.errorMsg) - } else { - require.NoError(t, err) + webhook := &temporaliov1alpha1.TemporalWorkerDeployment{} + + assertAdmission := func(warnings admission.Warnings, err error) { + if tc.errorMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errorMsg) + } else { + require.NoError(t, err) + } + + // Warnings should always be nil for this implementation + assert.Nil(t, warnings) } - // Warnings should always be nil for this implementation - assert.Nil(t, warnings) + // Verify that create and update enforce the same rules + assertAdmission(webhook.ValidateCreate(ctx, tc.obj)) + assertAdmission(webhook.ValidateUpdate(ctx, nil, tc.obj)) }) } } func TestTemporalWorkerDeployment_ValidateUpdate(t *testing.T) { - tests := []struct { - name string - oldObj runtime.Object - newObj runtime.Object - expectError bool - errorMsg string + tests := map[string]struct { + oldObj runtime.Object + newObj runtime.Object + errorMsg string }{ - { - name: "valid update", - oldObj: nil, - newObj: MakeTWDWithName("valid-worker"), - expectError: false, + "valid update": { + oldObj: nil, + newObj: testhelpers.MakeTWDWithName("valid-worker"), }, - { - name: "update with name too long", - oldObj: nil, - newObj: MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters"), - expectError: true, - errorMsg: "cannot be more than 63 characters", + "update with name too long": { + oldObj: nil, + newObj: testhelpers.MakeTWDWithName("this-is-a-very-long-temporal-worker-deployment-name-that-exceeds-the-maximum-allowed-length-of-sixty-three-characters"), + errorMsg: "cannot be more than 63 characters", }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for name, tc := range tests { + t.Run(name, func(t *testing.T) { ctx := context.Background() - webhook := &TemporalWorkerDeployment{} + webhook := &temporaliov1alpha1.TemporalWorkerDeployment{} - warnings, err := webhook.ValidateUpdate(ctx, tt.oldObj, tt.newObj) + warnings, err := webhook.ValidateUpdate(ctx, tc.oldObj, tc.newObj) - if tt.expectError { + if tc.errorMsg != "" { require.Error(t, err) - assert.Contains(t, err.Error(), tt.errorMsg) + assert.Contains(t, err.Error(), tc.errorMsg) } else { require.NoError(t, err) } @@ -110,9 +140,9 @@ func TestTemporalWorkerDeployment_ValidateUpdate(t *testing.T) { func TestTemporalWorkerDeployment_ValidateDelete(t *testing.T) { ctx := context.Background() - webhook := &TemporalWorkerDeployment{} + webhook := &temporaliov1alpha1.TemporalWorkerDeployment{} - obj := &TemporalWorkerDeployment{ + obj := &temporaliov1alpha1.TemporalWorkerDeployment{ ObjectMeta: metav1.ObjectMeta{ Name: "worker", }, diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 21c096f0..d8cd4111 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -215,6 +215,10 @@ type TargetWorkerDeploymentVersion struct { // Only set when Status is VersionStatusRamping. // +optional RampingSince *metav1.Time `json:"rampingSince"` + + // RampLastModifiedAt is the time when the ramp percentage was last changed for the target version. + // +optional + RampLastModifiedAt *metav1.Time `json:"rampLastModifiedAt,omitempty"` } // DeprecatedWorkerDeploymentVersion represents a worker deployment version that is no longer diff --git a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml index 48e7f0ee..07d8d1c8 100644 --- a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml @@ -3879,6 +3879,9 @@ spec: type: string managedBy: type: string + rampLastModifiedAt: + format: date-time + type: string rampPercentage: type: number rampingSince: @@ -3944,6 +3947,9 @@ spec: type: string managedBy: type: string + rampLastModifiedAt: + format: date-time + type: string rampPercentage: type: number rampingSince: diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index f45c001a..96b8dfa4 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -41,6 +41,7 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork status.TargetVersion = m.mapTargetWorkerDeploymentVersion(targetVersionID) if status.TargetVersion != nil && m.temporalState.RampingVersionID == targetVersionID { status.TargetVersion.RampingSince = m.temporalState.RampingSince + status.TargetVersion.RampLastModifiedAt = m.temporalState.RampLastModifiedAt rampPercentage := m.temporalState.RampPercentage status.TargetVersion.RampPercentage = &rampPercentage } diff --git a/internal/k8s/deployments_test.go b/internal/k8s/deployments_test.go index 174028d6..f03b6c0f 100644 --- a/internal/k8s/deployments_test.go +++ b/internal/k8s/deployments_test.go @@ -21,6 +21,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" ) func TestIsDeploymentHealthy(t *testing.T) { @@ -218,11 +219,11 @@ func TestGenerateBuildID(t *testing.T) { name: "same image different pod specs", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { img := "my.test_image" - pod1 := temporaliov1alpha1.MakePodSpec([]v1.Container{{Image: img}}, map[string]string{"pod": "1"}) - pod2 := temporaliov1alpha1.MakePodSpec([]v1.Container{{Image: img}}, map[string]string{"pod": "2"}) + pod1 := testhelpers.MakePodSpec([]v1.Container{{Image: img}}, map[string]string{"pod": "1"}) + pod2 := testhelpers.MakePodSpec([]v1.Container{{Image: img}}, map[string]string{"pod": "2"}) - twd1 := temporaliov1alpha1.MakeTWD(1, pod1, nil, nil, nil) - twd2 := temporaliov1alpha1.MakeTWD(1, pod2, nil, nil, nil) + twd1 := testhelpers.MakeTWD(1, pod1, nil, nil, nil) + twd2 := testhelpers.MakeTWD(1, pod2, nil, nil, nil) return twd1, twd2 }, expectedPrefix: "my-test-image", @@ -233,10 +234,10 @@ func TestGenerateBuildID(t *testing.T) { name: "same pod specs different TWD spec", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { img := "my.test_image" - pod := temporaliov1alpha1.MakePodSpec([]v1.Container{{Image: img}}, nil) + pod := testhelpers.MakePodSpec([]v1.Container{{Image: img}}, nil) - twd1 := temporaliov1alpha1.MakeTWD(1, pod, nil, nil, nil) - twd2 := temporaliov1alpha1.MakeTWD(2, pod, nil, nil, nil) + twd1 := testhelpers.MakeTWD(1, pod, nil, nil, nil) + twd2 := testhelpers.MakeTWD(2, pod, nil, nil, nil) return twd1, twd2 }, expectedPrefix: "my-test-image", @@ -246,7 +247,7 @@ func TestGenerateBuildID(t *testing.T) { { name: "no containers", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { - twd := temporaliov1alpha1.MakeTWD(1, temporaliov1alpha1.MakePodSpec(nil, nil), nil, nil, nil) + twd := testhelpers.MakeTWD(1, testhelpers.MakePodSpec(nil, nil), nil, nil, nil) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "", @@ -256,7 +257,7 @@ func TestGenerateBuildID(t *testing.T) { { name: "empty image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { - twd := temporaliov1alpha1.MakeTWDWithImage("") + twd := testhelpers.MakeTWDWithImage("") return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "", @@ -267,7 +268,7 @@ func TestGenerateBuildID(t *testing.T) { name: "tagged digest image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { taggedDigestImg := "docker.io/library/busybox:latest@sha256:" + digest - twd := temporaliov1alpha1.MakeTWDWithImage(taggedDigestImg) + twd := testhelpers.MakeTWDWithImage(taggedDigestImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "latest", @@ -278,7 +279,7 @@ func TestGenerateBuildID(t *testing.T) { name: "tagged named image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { taggedNamedImg := "docker.io/library/busybox:latest" - twd := temporaliov1alpha1.MakeTWDWithImage(taggedNamedImg) + twd := testhelpers.MakeTWDWithImage(taggedNamedImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "latest", @@ -289,7 +290,7 @@ func TestGenerateBuildID(t *testing.T) { name: "digested image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { digestedImg := "docker.io@sha256:" + digest - twd := temporaliov1alpha1.MakeTWDWithImage(digestedImg) + twd := testhelpers.MakeTWDWithImage(digestedImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: digest[:maxBuildIdLen-5], @@ -300,7 +301,7 @@ func TestGenerateBuildID(t *testing.T) { name: "digested named image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { digestedNamedImg := "docker.io/library/busybo@sha256:" + digest - twd := temporaliov1alpha1.MakeTWDWithImage(digestedNamedImg) + twd := testhelpers.MakeTWDWithImage(digestedNamedImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: digest[:maxBuildIdLen-5], @@ -311,7 +312,7 @@ func TestGenerateBuildID(t *testing.T) { name: "named image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { namedImg := "docker.io/library/busybox" - twd := temporaliov1alpha1.MakeTWDWithImage(namedImg) + twd := testhelpers.MakeTWDWithImage(namedImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "library-busybox", @@ -322,7 +323,7 @@ func TestGenerateBuildID(t *testing.T) { name: "illegal chars image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { illegalCharsImg := "this.is.my_weird/image" - twd := temporaliov1alpha1.MakeTWDWithImage(illegalCharsImg) + twd := testhelpers.MakeTWDWithImage(illegalCharsImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "this-is-my-weird-image", @@ -333,7 +334,7 @@ func TestGenerateBuildID(t *testing.T) { name: "long image", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { longImg := "ThisIsAVeryLongHumanReadableImage_ThisIsAVeryLongHumanReadableImage_ThisIsAVeryLongHumanReadableImage" // 101 chars - twd := temporaliov1alpha1.MakeTWDWithImage(longImg) + twd := testhelpers.MakeTWDWithImage(longImg) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: cleanAndTruncateString("ThisIsAVeryLongHumanReadableImage_ThisIsAVeryLongHumanReadableImage_ThisIsAVeryLongHumanReadableImage"[:maxBuildIdLen-5], -1), diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 491b8cab..044eaec5 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -338,39 +339,77 @@ func getVersionConfigDiff( vcfg.SetCurrent = true return vcfg case temporaliov1alpha1.UpdateProgressive: - // Determine the correct percentage ramp - var ( - healthyDuration time.Duration - currentRamp float32 - totalPauseDuration = healthyDuration - ) - if status.TargetVersion.RampingSince != nil { - healthyDuration = time.Since(status.TargetVersion.RampingSince.Time) - // TODO(carlydf): Is it important that the version spends x time at each step % ? - // Currently, if 1% ramp is set, and then multiple reconcile loops error so the next steps aren't set, - // the version could skip straight from 1% to current if the error-ing period > totalPauseDuration - } - for _, s := range strategy.Steps { - if s.RampPercentage != 0 { // TODO(carlydf): Support setting any ramp in [0,100] - currentRamp = s.RampPercentage - } - totalPauseDuration += s.PauseDuration.Duration - if healthyDuration < totalPauseDuration { - // We're still in this step's pause duration + return handleProgressiveRollout(strategy.Steps, time.Now(), status.TargetVersion.RampLastModifiedAt, status.TargetVersion.RampPercentage, vcfg) + } - // If this step's ramp percentage is the same as the target version's ramp percentage, do nothing - if status.TargetVersion.RampPercentage != nil && currentRamp == *status.TargetVersion.RampPercentage { - return nil - } + return nil +} - vcfg.RampPercentage = currentRamp - return vcfg - } - } - // We've progressed through all steps; it should now be safe to update the default version - vcfg.SetCurrent = true +// handleProgressiveRollout handles the progressive rollout strategy logic +func handleProgressiveRollout( + steps []temporaliov1alpha1.RolloutStep, + currentTime time.Time, // avoid calling time.Now() inside function to make it easier to test + rampLastModifiedAt *metav1.Time, + targetRampPercentage *float32, + vcfg *VersionConfig, +) *VersionConfig { + // Protect against modifying the current version right away if there are no steps. + // + // The validating admission webhook _should_ prevent creating rollouts with 0 steps, + // but just in case validation is skipped we should go with the more conservative + // behavior of not updating the current version from the controller. + if len(steps) == 0 { + return nil + } + + // Get the currently active step + i := getCurrentStepIndex(steps, targetRampPercentage) + currentStep := steps[i] + + // If this is the first step and there is no ramp percentage set, set the ramp percentage + // to the step's ramp percentage. + if targetRampPercentage == nil { + vcfg.RampPercentage = currentStep.RampPercentage + return vcfg + } + + // If the target ramp percentage doesn't match the current step's defined ramp, the ramp + // is reset immediately. This might be considered overly conservative, but it guarantees that + // rollouts resume from the earliest possible step, and that at least the last step is always + // respected (both % and duration). + if *targetRampPercentage != currentStep.RampPercentage { + vcfg.RampPercentage = currentStep.RampPercentage return vcfg } + // Move to the next step if it has been long enough since the last update + if rampLastModifiedAt.Add(currentStep.PauseDuration.Duration).Before(currentTime) { + if i < len(steps)-1 { + vcfg.RampPercentage = steps[i+1].RampPercentage + return vcfg + } else { + vcfg.SetCurrent = true + return vcfg + } + } + + // In all other cases, do nothing return nil } + +func getCurrentStepIndex(steps []temporaliov1alpha1.RolloutStep, targetRampPercentage *float32) int { + if targetRampPercentage == nil { + return 0 + } + + var result int + for i, s := range steps { + // Break if ramp percentage is greater than current (use last index) + if s.RampPercentage > *targetRampPercentage { + break + } + result = i + } + + return result +} diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index cb0a4dec..2f46ed0b 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -11,13 +11,13 @@ import ( "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/k8s" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers/testlogr" ) func TestGeneratePlan(t *testing.T) { @@ -852,7 +852,10 @@ func TestGetVersionConfigDiff(t *testing.T) { Time: time.Now().Add(-30 * time.Minute), }, }, - RampPercentage: func() *float32 { f := float32(0); return &f }(), + RampPercentage: nil, + RampLastModifiedAt: &metav1.Time{ + Time: time.Now().Add(-30 * time.Minute), + }, }, CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ @@ -861,9 +864,10 @@ func TestGetVersionConfigDiff(t *testing.T) { }, }, }, - conflictToken: []byte("token"), - expectConfig: true, - expectSetCurrent: false, + conflictToken: []byte("token"), + expectConfig: true, + expectRampPercent: float32Ptr(25), + expectSetCurrent: false, }, { name: "rollback scenario - target equals current but different version is ramping", @@ -953,16 +957,14 @@ func TestGetVersionConfigDiff(t *testing.T) { } func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { - testCases := []struct { - name string + testCases := map[string]struct { strategy temporaliov1alpha1.RolloutStrategy status *temporaliov1alpha1.TemporalWorkerDeploymentStatus expectConfig bool expectSetCurrent bool expectRampPercent float32 }{ - { - name: "progressive rollout completes all steps", + "progressive rollout completes last step": { strategy: temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, Steps: []temporaliov1alpha1.RolloutStep{ @@ -989,13 +991,16 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { RampingSince: &metav1.Time{ Time: time.Now().Add(-4 * time.Hour), // Past all steps }, + RampPercentage: float32Ptr(75), + RampLastModifiedAt: &metav1.Time{ + Time: time.Now().Add(-4 * time.Hour), // Past all steps + }, }, }, expectConfig: true, expectSetCurrent: true, // Should become current after all steps }, - { - name: "progressive rollout with nil RampingSince", + "progressive rollout with nil RampingSince": { strategy: temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, Steps: []temporaliov1alpha1.RolloutStep{ @@ -1015,15 +1020,15 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { Status: temporaliov1alpha1.VersionStatusInactive, HealthySince: &metav1.Time{Time: time.Now()}, }, - RampingSince: nil, // Not ramping yet + RampingSince: nil, // Not ramping yet + RampLastModifiedAt: nil, }, }, expectConfig: true, expectRampPercent: 25, // First step expectSetCurrent: false, }, - { - name: "progressive rollout at exact step boundary", + "progressive rollout at exact step boundary": { strategy: temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, Steps: []temporaliov1alpha1.RolloutStep{ @@ -1045,16 +1050,19 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { HealthySince: &metav1.Time{Time: time.Now()}, }, RampingSince: &metav1.Time{ - Time: time.Now().Add(-2 * time.Hour), // Exactly at step boundary + Time: time.Now().Add(-4 * time.Hour), + }, + RampPercentage: float32Ptr(25), + RampLastModifiedAt: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), // Exactly at step boundary }, }, }, expectConfig: true, - expectRampPercent: 0, // When set as current, ramp is 0 - expectSetCurrent: true, // At exactly 2 hours, it sets as current + expectRampPercent: 50, // When set as current, ramp is 0 + expectSetCurrent: false, // At exactly 2 hours, it sets as current }, - { - name: "progressive rollout with zero ramp percentage step", + "progressive rollout with zero ramp percentage step": { strategy: temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, Steps: []temporaliov1alpha1.RolloutStep{ @@ -1079,14 +1087,16 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { RampingSince: &metav1.Time{ Time: time.Now().Add(-45 * time.Minute), // In second step }, + RampLastModifiedAt: &metav1.Time{ + Time: time.Now().Add(-45 * time.Minute), // In second step + }, }, }, expectConfig: true, expectRampPercent: 25, // Should maintain previous ramp value expectSetCurrent: false, }, - { - name: "progressive rollout just past exact boundary", + "progressive rollout just past exact boundary": { strategy: temporaliov1alpha1.RolloutStrategy{ Strategy: temporaliov1alpha1.UpdateProgressive, Steps: []temporaliov1alpha1.RolloutStep{ @@ -1110,6 +1120,10 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { RampingSince: &metav1.Time{ Time: time.Now().Add(-2*time.Hour - 1*time.Second), // Just past boundary }, + RampPercentage: float32Ptr(50), + RampLastModifiedAt: &metav1.Time{ + Time: time.Now().Add(-2*time.Hour - 1*time.Second), // Just past boundary + }, }, }, expectConfig: true, @@ -1118,9 +1132,9 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - config := getVersionConfigDiff(logr.Discard(), tc.strategy, tc.status, []byte("token")) + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + config := getVersionConfigDiff(testlogr.New(t), tc.strategy, tc.status, []byte("token")) assert.Equal(t, tc.expectConfig, config != nil, "unexpected version config presence") if tc.expectConfig { assert.Equal(t, tc.expectSetCurrent, config.SetCurrent, "unexpected set default value") @@ -1132,6 +1146,127 @@ func TestGetVersionConfig_ProgressiveRolloutEdgeCases(t *testing.T) { } } +func TestGetVersionConfig_ProgressiveRolloutOverTime(t *testing.T) { + testCases := map[string]struct { + steps []temporaliov1alpha1.RolloutStep + reconcileFreq time.Duration + initialRamp *float32 + expectRamps []float32 + expectRolloutDuration time.Duration + }{ + "no steps, no ramps": { + steps: []temporaliov1alpha1.RolloutStep{}, + reconcileFreq: time.Second, + expectRamps: nil, + expectRolloutDuration: 5*time.Minute - time.Second, // should reach test timeout with zero ramps + }, + "controller keeping up": { + steps: []temporaliov1alpha1.RolloutStep{ + rolloutStep(25, 5*time.Second), + rolloutStep(50, 10*time.Second), + rolloutStep(75, 30*time.Second), + }, + reconcileFreq: time.Second, + expectRamps: []float32{25, 50, 75, 100}, + expectRolloutDuration: 5*time.Second + 10*time.Second + 30*time.Second + 3*time.Second, + }, + "controller reconciles slower than step durations": { + steps: []temporaliov1alpha1.RolloutStep{ + rolloutStep(25, 5*time.Second), + rolloutStep(50, 10*time.Second), + rolloutStep(75, 30*time.Second), + }, + reconcileFreq: time.Minute, + expectRamps: []float32{25, 50, 75, 100}, + expectRolloutDuration: 3 * time.Minute, + }, + "pick up ramping from last step that is <= current ramp": { + steps: []temporaliov1alpha1.RolloutStep{ + rolloutStep(5, 10*time.Second), + rolloutStep(10, 10*time.Second), + rolloutStep(25, 10*time.Second), + rolloutStep(50, 10*time.Second), + }, + reconcileFreq: time.Second, + // Simulate a ramp value set manually via Temporal CLI + initialRamp: float32Ptr(20), + expectRamps: []float32{10, 25, 50, 100}, + expectRolloutDuration: 30*time.Second + 3*time.Second, + }, + "pick up ramping from first step if current ramp less than all steps": { + steps: []temporaliov1alpha1.RolloutStep{ + rolloutStep(10, 10*time.Second), + rolloutStep(25, 10*time.Second), + rolloutStep(50, 10*time.Second), + }, + reconcileFreq: time.Second, + // Simulate a ramp value set manually via Temporal CLI + initialRamp: float32Ptr(1), + expectRamps: []float32{10, 25, 50, 100}, + expectRolloutDuration: 30*time.Second + 3*time.Second, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + var ( + diffs []float32 + testStart = time.Now() + testTimeout = testStart.Add(5 * time.Minute) + currentRampPercentage *float32 + now time.Time + rampLastModified *metav1.Time + ) + + // Simulate an initial ramping value that is different from existing steps in the rollout. + // This could happen if the progressive rollout spec was updated, or if the user modified + // the current ramp value externally, e.g. via the Temporal CLI/UI. + if tc.initialRamp != nil { + currentRampPercentage = tc.initialRamp + // Imitate the "worst case" of ramp having just been updated. + rampLastModified = &metav1.Time{Time: now} + } + + for ct := testStart; ct.Before(testTimeout); ct = ct.Add(tc.reconcileFreq) { + now = ct + currentTime := metav1.NewTime(ct) + + config := handleProgressiveRollout( + tc.steps, + currentTime.Time, + rampLastModified, + currentRampPercentage, + &VersionConfig{}, + ) + if config == nil { + continue + } + + // Keep track of the diff + if config.SetCurrent { + diffs = append(diffs, 100) + } else { + diffs = append(diffs, config.RampPercentage) + } + + // Set ramp value and last modified time if it was updated (simulates what Temporal server would return on next reconcile loop) + if config.RampPercentage != 0 { + rampLastModified = ¤tTime + currentRampPercentage = &config.RampPercentage + } + + // Exit early if ramping is complete + if config.SetCurrent { + break + } + } + + assert.Equal(t, tc.expectRamps, diffs) + assert.Equal(t, tc.expectRolloutDuration, now.Sub(testStart)) + }) + } +} + func TestGetVersionConfig_GateWorkflowValidation(t *testing.T) { testCases := []struct { name string @@ -1553,3 +1688,18 @@ func createDeploymentWithReplicas(replicas int32) *appsv1.Deployment { }, } } + +func float32Ptr(v float32) *float32 { + return &v +} + +func metav1Duration(d time.Duration) metav1.Duration { + return metav1.Duration{Duration: d} +} + +func rolloutStep(ramp float32, d time.Duration) temporaliov1alpha1.RolloutStep { + return temporaliov1alpha1.RolloutStep{ + RampPercentage: ramp, + PauseDuration: metav1Duration(d), + } +} diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 17489109..247c6296 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -36,6 +36,7 @@ type TemporalWorkerState struct { RampPercentage float32 // RampingSince is the time when the current ramping version was set. RampingSince *metav1.Time + RampLastModifiedAt *metav1.Time Versions map[string]*VersionInfo LastModifierIdentity string } @@ -79,8 +80,12 @@ func GetWorkerDeploymentState( // Set ramping since time if applicable if routingConfig.RampingVersion != "" { - rt := metav1.NewTime(routingConfig.RampingVersionChangedTime) - state.RampingSince = &rt + var ( + rampingSinceTime = metav1.NewTime(routingConfig.RampingVersionChangedTime) + lastRampUpdateTime = metav1.NewTime(workerDeploymentInfo.RoutingConfig.RampingVersionPercentageChangedTime) + ) + state.RampingSince = &rampingSinceTime + state.RampLastModifiedAt = &lastRampUpdateTime } // Process each version diff --git a/api/v1alpha1/make.go b/internal/testhelpers/make.go similarity index 61% rename from api/v1alpha1/make.go rename to internal/testhelpers/make.go index 38c5ffdb..e8bbd363 100644 --- a/api/v1alpha1/make.go +++ b/internal/testhelpers/make.go @@ -1,21 +1,23 @@ -package v1alpha1 +package testhelpers import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" ) func MakeTWD( replicas int32, podSpec v1.PodTemplateSpec, - rolloutStrategy *RolloutStrategy, - sunsetStrategy *SunsetStrategy, - workerOpts *WorkerOptions, -) *TemporalWorkerDeployment { - r := RolloutStrategy{} - s := SunsetStrategy{} - w := WorkerOptions{} + rolloutStrategy *temporaliov1alpha1.RolloutStrategy, + sunsetStrategy *temporaliov1alpha1.SunsetStrategy, + workerOpts *temporaliov1alpha1.WorkerOptions, +) *temporaliov1alpha1.TemporalWorkerDeployment { + r := temporaliov1alpha1.RolloutStrategy{} + s := temporaliov1alpha1.SunsetStrategy{} + w := temporaliov1alpha1.WorkerOptions{} if rolloutStrategy != nil { r = *rolloutStrategy } @@ -26,7 +28,7 @@ func MakeTWD( w = *workerOpts } - twd := &TemporalWorkerDeployment{ + twd := &temporaliov1alpha1.TemporalWorkerDeployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "temporal.io/v1alpha1", Kind: "TemporalWorkerDeployment", @@ -36,7 +38,7 @@ func MakeTWD( Namespace: "default", UID: types.UID("test-owner-uid"), }, - Spec: TemporalWorkerDeploymentSpec{ + Spec: temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: &replicas, Template: podSpec, RolloutStrategy: r, @@ -60,13 +62,17 @@ func MakePodSpec(containers []v1.Container, labels map[string]string) v1.PodTemp } } -func MakeTWDWithImage(imageName string) *TemporalWorkerDeployment { +func MakeTWDWithImage(imageName string) *temporaliov1alpha1.TemporalWorkerDeployment { return MakeTWD(1, MakePodSpec([]v1.Container{{Image: imageName}}, nil), nil, nil, nil) } -func MakeTWDWithName(name string) *TemporalWorkerDeployment { +func MakeTWDWithName(name string) *temporaliov1alpha1.TemporalWorkerDeployment { twd := MakeTWD(1, MakePodSpec(nil, nil), nil, nil, nil) twd.ObjectMeta.Name = name twd.Name = name return twd } + +func ModifyObj[T any](obj T, callback func(obj T) T) T { + return callback(obj) +} diff --git a/internal/testhelpers/testlogr/testlogr.go b/internal/testhelpers/testlogr/testlogr.go new file mode 100644 index 00000000..8b011f46 --- /dev/null +++ b/internal/testhelpers/testlogr/testlogr.go @@ -0,0 +1,58 @@ +package testlogr + +import ( + "testing" + + "github.com/go-logr/logr" +) + +// New creates a logger that writes to test output. +func New(t testing.TB) logr.Logger { + t.Helper() + return logr.New(&sink{t, nil}) +} + +// TODO(jlegrone): Refactor this after https://github.com/golang/go/issues/59928 is released. +type sink struct { + t testing.TB + keysAndValues []any +} + +func (s *sink) Init(info logr.RuntimeInfo) { + // pass +} + +func (s *sink) Enabled(level int) bool { + // log everything + return true +} + +func (s *sink) Info(level int, msg string, keysAndValues ...any) { + s.t.Helper() + s.log("INFO", msg, keysAndValues...) +} + +func (s *sink) Error(err error, msg string, keysAndValues ...any) { + s.t.Helper() + s.log("ERROR", msg+" "+err.Error(), keysAndValues...) +} + +func (s *sink) WithValues(keysAndValues ...any) logr.LogSink { + s.t.Helper() + return &sink{ + t: s.t, + keysAndValues: append(s.keysAndValues, keysAndValues...), + } +} + +func (s *sink) WithName(name string) logr.LogSink { + s.t.Helper() + return s +} + +func (s *sink) log(level string, msg string, keysAndValues ...any) { + s.t.Helper() + args := append([]any{level, msg}, s.keysAndValues...) + args = append(args, keysAndValues...) + s.t.Log(args...) +}