diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index 257ae3c1..044abff1 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -58,11 +58,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName) for _, wf := range p.startTestWorkflows { - err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, wf.versionID) - if err != nil { - return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err) - } - if _, err = temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ + if _, err := temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{ ID: wf.workflowID, TaskQueue: wf.taskQueue, WorkflowExecutionTimeout: time.Hour, @@ -80,13 +76,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l // Register current version or ramps if vcfg := p.UpdateVersionConfig; vcfg != nil { if vcfg.SetCurrent { - err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID) - if err != nil { - return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err) - } - l.Info("registering new current version", "version", vcfg.VersionID) - if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ Version: vcfg.VersionID, ConflictToken: vcfg.ConflictToken, @@ -95,13 +85,6 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l return fmt.Errorf("unable to set current deployment version: %w", err) } } else { - if vcfg.VersionID != "" { - err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID) - if err != nil { - return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err) - } - } - if vcfg.RampPercentage > 0 { l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage) } else { diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 28c34c4c..d6586ebc 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -15,7 +15,6 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/temporal" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - ctrl "sigs.k8s.io/controller-runtime" ) // plan holds the actions to execute during reconciliation @@ -133,16 +132,5 @@ func (r *TemporalWorkerDeploymentReconciler) newDeployment( buildID string, connection temporaliov1alpha1.TemporalConnectionSpec, ) (*appsv1.Deployment, error) { - d := k8s.NewDeploymentWithOwnerRef( - &w.TypeMeta, - &w.ObjectMeta, - &w.Spec, - k8s.ComputeWorkerDeploymentName(w), - buildID, - connection, - ) - if err := ctrl.SetControllerReference(w, d, r.Scheme); err != nil { - return nil, err - } - return d, nil + return k8s.NewDeploymentWithControllerRef(w, buildID, connection, r.Scheme) } diff --git a/internal/controller/util.go b/internal/controller/util.go index c1561148..3c42ef76 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -5,17 +5,7 @@ package controller import ( - "context" - "errors" - "fmt" "os" - "strings" - "time" - - "github.com/go-logr/logr" - "github.com/temporalio/temporal-worker-controller/internal/k8s" - "go.temporal.io/api/serviceerror" - sdkclient "go.temporal.io/sdk/client" ) const ( @@ -24,70 +14,6 @@ const ( defaultControllerIdentity = "temporal-worker-controller" ) -// TODO(carlydf): Cache describe success for versions that already exist -// awaitVersionRegistration should be called after a poller starts polling with config of this version, since that is -// what will register the version with the server. SetRamp and SetCurrent will fail if the version does not exist. -func awaitVersionRegistration( - ctx context.Context, - l logr.Logger, - deploymentHandler sdkclient.WorkerDeploymentHandle, - namespace, versionID string) error { - ticker := time.NewTicker(1 * time.Second) - for { - l.Info(fmt.Sprintf("checking if version %s exists", versionID)) - select { - case <-ctx.Done(): - return context.Canceled - case <-ticker.C: - _, err := deploymentHandler.DescribeVersion(ctx, sdkclient.WorkerDeploymentDescribeVersionOptions{ - Version: versionID, - }) - var notFoundErr *serviceerror.NotFound - if err != nil { - if errors.As(err, ¬FoundErr) { - continue - } else { - return fmt.Errorf("unable to describe worker deployment version %s: %w", versionID, err) - } - } - // After the version exists, confirm that it also exists in the worker deployment - // TODO(carlydf): Remove this check after next Temporal Cloud version which solves this inconsistency - return awaitVersionRegistrationInDeployment(ctx, l, deploymentHandler, namespace, versionID) - } - } -} - -func awaitVersionRegistrationInDeployment( - ctx context.Context, - l logr.Logger, - deploymentHandler sdkclient.WorkerDeploymentHandle, - namespace, versionID string) error { - deploymentName, _, _ := strings.Cut(versionID, k8s.VersionIDSeparator) - ticker := time.NewTicker(1 * time.Second) - for { - l.Info(fmt.Sprintf("checking if version %s exists in worker deployment", versionID)) - select { - case <-ctx.Done(): - return context.Canceled - case <-ticker.C: - resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) - var notFoundErr *serviceerror.NotFound - if err != nil { - if errors.As(err, ¬FoundErr) { - continue - } else { - return fmt.Errorf("unable to describe worker deployment %s: %w", deploymentName, err) - } - } - for _, vs := range resp.Info.VersionSummaries { - if vs.Version == versionID { - return nil - } - } - } - } -} - // getControllerVersion returns the version from environment variable (set by Helm from image.tag) func getControllerVersion() string { if version := os.Getenv("CONTROLLER_VERSION"); version != "" { diff --git a/internal/k8s/deployments.go b/internal/k8s/deployments.go index bcaf78af..6ab4bd4d 100644 --- a/internal/k8s/deployments.go +++ b/internal/k8s/deployments.go @@ -17,6 +17,8 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -290,3 +292,23 @@ func NewDeploymentWithOwnerRef( }, } } + +func NewDeploymentWithControllerRef( + w *temporaliov1alpha1.TemporalWorkerDeployment, + buildID string, + connection temporaliov1alpha1.TemporalConnectionSpec, + reconcilerScheme *runtime.Scheme, +) (*appsv1.Deployment, error) { + d := NewDeploymentWithOwnerRef( + &w.TypeMeta, + &w.ObjectMeta, + &w.Spec, + ComputeWorkerDeploymentName(w), + buildID, + connection, + ) + if err := ctrl.SetControllerReference(w, d, reconcilerScheme); err != nil { + return nil, err + } + return d, nil +} diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 6f1e7172..3e7d4e8f 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -218,10 +218,12 @@ func getTestWorkflows( ) []WorkflowConfig { var testWorkflows []WorkflowConfig - // Skip if there's no gate workflow defined or if the target version is already the current + // Skip if there's no gate workflow defined, if the target version is already the current, or if the target + // version is not yet registered in temporal if config.RolloutStrategy.Gate == nil || status.CurrentVersion == nil || - status.CurrentVersion.VersionID == status.TargetVersion.VersionID { + status.CurrentVersion.VersionID == status.TargetVersion.VersionID || + status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered { return nil } @@ -258,8 +260,9 @@ func getVersionConfigDiff( strategy := config.RolloutStrategy conflictToken := status.VersionConflictToken - // Do nothing if target version's deployment is not healthy yet - if status.TargetVersion.HealthySince == nil { + // Do nothing if target version's deployment is not healthy yet, or if the version is not yet registered in temporal + if status.TargetVersion.HealthySince == nil || + status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered { return nil } diff --git a/internal/testhelpers/test_builder.go b/internal/testhelpers/test_builder.go index 6c783879..75f7477e 100644 --- a/internal/testhelpers/test_builder.go +++ b/internal/testhelpers/test_builder.go @@ -47,6 +47,16 @@ func (b *TemporalWorkerDeploymentBuilder) WithProgressiveStrategy(steps ...tempo return b } +// WithGate sets the rollout strategy have a gate workflow +func (b *TemporalWorkerDeploymentBuilder) WithGate(expectSuccess bool) *TemporalWorkerDeploymentBuilder { + if expectSuccess { + b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: successTestWorkflowType} + } else { + b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: failTestWorkflowType} + } + return b +} + // WithReplicas sets the number of replicas func (b *TemporalWorkerDeploymentBuilder) WithReplicas(replicas int32) *TemporalWorkerDeploymentBuilder { b.twd.Spec.Replicas = &replicas @@ -157,17 +167,18 @@ func (sb *StatusBuilder) WithNamespace(k8sNamespace string) *StatusBuilder { } // WithCurrentVersion sets the current version in the status -func (sb *StatusBuilder) WithCurrentVersion(imageName string, hasDeployment, createPoller bool) *StatusBuilder { +func (sb *StatusBuilder) WithCurrentVersion(imageName string, healthy, createDeployment bool) *StatusBuilder { sb.currentVersionBuilder = func(name string, namespace string) *temporaliov1alpha1.CurrentWorkerDeploymentVersion { - return MakeCurrentVersion(namespace, name, imageName, hasDeployment, createPoller) + return MakeCurrentVersion(namespace, name, imageName, healthy, createDeployment) } return sb } -// WithTargetVersion sets the target version in the status -func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, hasDeployment bool, createPoller bool) *StatusBuilder { +// WithTargetVersion sets the target version in the status. +// Target Version is required. +func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, healthy bool, createDeployment bool) *StatusBuilder { sb.targetVersionBuilder = func(name string, namespace string) temporaliov1alpha1.TargetWorkerDeploymentVersion { - return MakeTargetVersion(namespace, name, imageName, rampPercentage, hasDeployment, createPoller) + return MakeTargetVersion(namespace, name, imageName, rampPercentage, healthy, createDeployment) } return sb } @@ -198,6 +209,8 @@ type TestCase struct { deprecatedBuildReplicas map[string]int32 deprecatedBuildImages map[string]string expectedStatus *temporaliov1alpha1.TemporalWorkerDeploymentStatus + // Time to delay before checking expected status + waitTime *time.Duration } func (tc *TestCase) GetTWD() *temporaliov1alpha1.TemporalWorkerDeployment { @@ -216,6 +229,10 @@ func (tc *TestCase) GetExpectedStatus() *temporaliov1alpha1.TemporalWorkerDeploy return tc.expectedStatus } +func (tc *TestCase) GetWaitTime() *time.Duration { + return tc.waitTime +} + // TestCaseBuilder provides a fluent interface for building test cases type TestCaseBuilder struct { name string @@ -225,6 +242,7 @@ type TestCaseBuilder struct { twdBuilder *TemporalWorkerDeploymentBuilder expectedStatusBuilder *StatusBuilder deprecatedVersionInfos []DeprecatedVersionInfo + waitTime *time.Duration } // NewTestCase creates a new test case builder @@ -255,6 +273,13 @@ func (tcb *TestCaseBuilder) WithInput(twdBuilder *TemporalWorkerDeploymentBuilde return tcb } +// WithWaitTime sets the wait time. Use this if you are expecting no change to the initial status and want to ensure +// that after some time, there is still no change. +func (tcb *TestCaseBuilder) WithWaitTime(waitTime time.Duration) *TestCaseBuilder { + tcb.waitTime = &waitTime + return tcb +} + // DeprecatedVersionInfo defines the necessary information about a deprecated worker version, so that // tests can recreate state that is not visible in the TemporalWorkerDeployment status type DeprecatedVersionInfo struct { @@ -284,6 +309,7 @@ func (tcb *TestCaseBuilder) WithExpectedStatus(statusBuilder *StatusBuilder) *Te // Build returns the constructed test case func (tcb *TestCaseBuilder) Build() TestCase { ret := TestCase{ + waitTime: tcb.waitTime, twd: tcb.twdBuilder. WithName(tcb.name). WithNamespace(tcb.k8sNamespace). diff --git a/internal/tests/internal/workers.go b/internal/testhelpers/workers.go similarity index 61% rename from internal/tests/internal/workers.go rename to internal/testhelpers/workers.go index 60a60175..dc9f2a0d 100644 --- a/internal/tests/internal/workers.go +++ b/internal/testhelpers/workers.go @@ -1,7 +1,8 @@ -package internal +package testhelpers import ( "context" + "errors" "fmt" "time" @@ -13,6 +14,11 @@ import ( corev1 "k8s.io/api/core/v1" ) +const ( + successTestWorkflowType = "successTestWorkflow" + failTestWorkflowType = "failTestWorkflow" +) + func getEnv(podTemplateSpec corev1.PodTemplateSpec, key string) (string, error) { for _, e := range podTemplateSpec.Spec.Containers[0].Env { if e.Name == key { @@ -90,8 +96,8 @@ func newClient(ctx context.Context, hostPort, namespace string) (client.Client, return c, nil } -// callback is a function that can be called multiple times. -func runHelloWorldWorker(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, callback func(stopFunc func(), err error)) { +// RunHelloWorldWorker runs one worker per replica in the pod spec. callback is a function that can be called multiple times. +func RunHelloWorldWorker(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, callback func(stopFunc func(), err error)) { w, stopFunc, err := newVersionedWorker(ctx, podTemplateSpec) defer func() { callback(stopFunc, err) @@ -100,38 +106,11 @@ func runHelloWorldWorker(ctx context.Context, podTemplateSpec corev1.PodTemplate return } - getSubject := func(ctx context.Context) (string, error) { - return "World10", nil - } - - sleep := func(ctx context.Context, seconds uint) error { - time.Sleep(time.Duration(seconds) * time.Second) - return nil - } - - helloWorld := func(ctx workflow.Context) (string, error) { - workflow.GetLogger(ctx).Info("HelloWorld workflow started") - ctx = setActivityTimeout(ctx, 5*time.Minute) - - // Compute a subject - var subject string - if err := workflow.ExecuteActivity(ctx, getSubject).Get(ctx, &subject); err != nil { - return "", err - } - - // Sleep for a while - if err := workflow.ExecuteActivity(ctx, sleep, 60).Get(ctx, nil); err != nil { - return "", err - } - - // Return the greeting - return fmt.Sprintf("Hello %s", subject), nil - } - // Register activities and workflows - w.RegisterWorkflow(helloWorld) - w.RegisterActivity(getSubject) - w.RegisterActivity(sleep) + w.RegisterWorkflowWithOptions(successTestWorkflow, workflow.RegisterOptions{Name: successTestWorkflowType}) + w.RegisterWorkflowWithOptions(failTestWorkflow, workflow.RegisterOptions{Name: failTestWorkflowType}) + w.RegisterActivity(getSubjectTestActivity) + w.RegisterActivity(sleepTestActivity) // Start the worker in a separate goroutine so that the stopFunc can be passed back to the caller via callback go func() { @@ -147,3 +126,50 @@ func setActivityTimeout(ctx workflow.Context, d time.Duration) workflow.Context ScheduleToCloseTimeout: d, }) } + +func successTestWorkflow(ctx workflow.Context) (string, error) { + workflow.GetLogger(ctx).Info("HelloWorld(success) workflow started") + ctx = setActivityTimeout(ctx, 5*time.Minute) + + // Compute a subject + var subject string + if err := workflow.ExecuteActivity(ctx, getSubjectTestActivity).Get(ctx, &subject); err != nil { + return "", err + } + + // Sleep for a while + if err := workflow.ExecuteActivity(ctx, sleepTestActivity, 5).Get(ctx, nil); err != nil { + return "", err + } + + // Return the greeting + return fmt.Sprintf("Hello %s", subject), nil +} + +func failTestWorkflow(ctx workflow.Context) (string, error) { + workflow.GetLogger(ctx).Info("HelloWorld(fail) workflow started") + ctx = setActivityTimeout(ctx, 5*time.Minute) + + // Compute a subject + var subject string + if err := workflow.ExecuteActivity(ctx, getSubjectTestActivity).Get(ctx, &subject); err != nil { + return "", err + } + + // Sleep for a while + if err := workflow.ExecuteActivity(ctx, sleepTestActivity, 5).Get(ctx, nil); err != nil { + return "", err + } + + // Return the greeting + return "", errors.New("this is a manufactured error to make the test fail") +} + +func sleepTestActivity(ctx context.Context, seconds uint) error { + time.Sleep(time.Duration(seconds) * time.Second) + return nil +} + +func getSubjectTestActivity(ctx context.Context) (string, error) { + return "World10", nil +} diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go new file mode 100644 index 00000000..66e17282 --- /dev/null +++ b/internal/tests/internal/deployment_controller.go @@ -0,0 +1,222 @@ +package internal + +import ( + "context" + "sync" + "testing" + "time" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/k8s" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" + "go.temporal.io/server/api/deployment/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} + +func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) []func() { + var deployment appsv1.Deployment + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: deploymentName, + Namespace: namespace, + }, &deployment); err != nil { + t.Fatalf("failed to get deployment: %v", err) + } + + var wg sync.WaitGroup + stopFuncs := make([]func(), *(deployment.Spec.Replicas)) + workerErrors := make([]error, *(deployment.Spec.Replicas)) + workerCallback := func(i int32) func(func(), error) { + return func(stopFunc func(), err error) { + if err == nil { + stopFuncs[i] = stopFunc + wg.Done() + } else { + workerErrors[i] = err + } + } + } + + for i := int32(0); i < *(deployment.Spec.Replicas); i++ { + wg.Add(1) + go testhelpers.RunHelloWorldWorker(ctx, deployment.Spec.Template, workerCallback(i)) + } + + // wait 10s for all expected workers to be healthy + timedOut := waitTimeout(&wg, 10*time.Second) + + if timedOut { + t.Fatalf("could not start workers, errors were: %+v", workerErrors) + } else { + setHealthyDeploymentStatus(t, ctx, k8sClient, deployment) + } + + return stopFuncs +} + +// Set deployment status to `DeploymentAvailable` to simulate a healthy deployment +// This is necessary because envtest doesn't actually start pods +func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient client.Client, deployment appsv1.Deployment) { + now := metav1.Now() + deployment.Status = appsv1.DeploymentStatus{ + Replicas: *deployment.Spec.Replicas, + UpdatedReplicas: *deployment.Spec.Replicas, + ReadyReplicas: *deployment.Spec.Replicas, + AvailableReplicas: *deployment.Spec.Replicas, + UnavailableReplicas: 0, + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionTrue, + LastUpdateTime: now, + LastTransitionTime: now, + Reason: "MinimumReplicasAvailable", + Message: "Deployment has minimum availability.", + }, + { + Type: appsv1.DeploymentProgressing, + Status: corev1.ConditionTrue, + LastUpdateTime: now, + LastTransitionTime: now, + Reason: "NewReplicaSetAvailable", + Message: "ReplicaSet is available.", + }, + }, + } + t.Logf("started %d healthy workers, updating deployment status", *deployment.Spec.Replicas) + if err := k8sClient.Status().Update(ctx, &deployment); err != nil { + t.Fatalf("failed to update deployment status: %v", err) + } +} + +// Uses input.Status + deprecatedBuildReplicas to create (and maybe kill) pollers for deprecated versions in temporal +// also gets routing config of the deployment into the starting state before running the test. +// Does not set Status.VersionConflictToken, since that is only set internally by the server. +func makePreliminaryStatusTrue( + ctx context.Context, + t *testing.T, + env testEnv, + twd *temporaliov1alpha1.TemporalWorkerDeployment, +) { + t.Logf("Creating starting test env based on input.Status") + + // Make a separate list of deferred functions, because calling defer in a for loop is not allowed. + loopDefers := make([]func(), 0) + defer handleStopFuncs(loopDefers) + for _, dv := range twd.Status.DeprecatedVersions { + t.Logf("Setting up deprecated version %v with status %v", dv.VersionID, dv.Status) + workerStopFuncs := createStatus(ctx, t, env, twd, dv.BaseWorkerDeploymentVersion, nil) + loopDefers = append(loopDefers, func() { handleStopFuncs(workerStopFuncs) }) + } + + if tv := twd.Status.TargetVersion; tv.VersionID != "" { + t.Logf("Setting up target version %v with status %v", tv.VersionID, tv.Status) + workerStopFuncs := createStatus(ctx, t, env, twd, tv.BaseWorkerDeploymentVersion, tv.RampPercentage) + defer handleStopFuncs(workerStopFuncs) + } +} + +func handleStopFuncs(funcs []func()) { + for _, f := range funcs { + if f != nil { + f() + } + } +} + +// creates k8s deployment, pollers, and routing config state as needed. +func createStatus( + ctx context.Context, + t *testing.T, + env testEnv, + newTWD *temporaliov1alpha1.TemporalWorkerDeployment, + prevVersion temporaliov1alpha1.BaseWorkerDeploymentVersion, + rampPercentage *float32, +) (workerStopFuncs []func()) { + if prevVersion.Deployment != nil && prevVersion.Deployment.FieldPath == "create" { + v := getVersion(t, prevVersion.VersionID) + prevTWD := recreateTWD(newTWD, env.images[v.BuildId], env.replicas[v.BuildId]) + createWorkerDeployment(ctx, t, env, prevTWD, v.BuildId) + expectedDeploymentName := k8s.ComputeVersionedDeploymentName(prevTWD.Name, k8s.ComputeBuildID(prevTWD)) + waitForDeployment(t, env.k8sClient, expectedDeploymentName, prevTWD.Namespace, 30*time.Second) + if prevVersion.Status != temporaliov1alpha1.VersionStatusNotRegistered { + workerStopFuncs = applyDeployment(t, ctx, env.k8sClient, expectedDeploymentName, prevTWD.Namespace) + } + + switch prevVersion.Status { + case temporaliov1alpha1.VersionStatusInactive, temporaliov1alpha1.VersionStatusNotRegistered: + // no-op + case temporaliov1alpha1.VersionStatusRamping: + setRampingVersion(t, ctx, env.ts, v, *rampPercentage) // rampPercentage won't be nil if the version is ramping + case temporaliov1alpha1.VersionStatusCurrent: + setCurrentVersion(t, ctx, env.ts, v) + case temporaliov1alpha1.VersionStatusDraining: + setRampingVersion(t, ctx, env.ts, v, 1) + // TODO(carlydf): start a workflow on v that does not complete -> will never drain + setRampingVersion(t, ctx, env.ts, nil, 0) + case temporaliov1alpha1.VersionStatusDrained: + setRampingVersion(t, ctx, env.ts, v, 1) + setRampingVersion(t, ctx, env.ts, nil, 0) + } + } + + return workerStopFuncs +} + +// Helper to handle unlikely error caused by invalid string split. +func getVersion(t *testing.T, versionId string) *deployment.WorkerDeploymentVersion { + deploymentName, buildId, err := k8s.SplitVersionID(versionId) + if err != nil { + t.Error(err) + } + return &deployment.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildId: buildId, + } +} + +// recreateTWD returns a copy of the given TWD, but replaces the build-id-generating image name with the given one, +// and the Spec.Replicas with the given replica count. +// Panics if the twd spec is nil, or if it has no containers, but that should never be true for these integration tests. +func recreateTWD(twd *temporaliov1alpha1.TemporalWorkerDeployment, imageName string, replicas int32) *temporaliov1alpha1.TemporalWorkerDeployment { + ret := twd.DeepCopy() + ret.Spec.Template.Spec.Containers[0].Image = imageName + ret.Spec.Replicas = &replicas + return ret +} + +func createWorkerDeployment( + ctx context.Context, + t *testing.T, + env testEnv, + twd *temporaliov1alpha1.TemporalWorkerDeployment, + buildId string, +) { + dep, err := k8s.NewDeploymentWithControllerRef(twd, buildId, env.connection.Spec, env.mgr.GetScheme()) + if err != nil { + t.Fatalf("error creating Deployment spec: %v", err.Error()) + } + + t.Logf("Creating Deployment %s in namespace %s", dep.Name, dep.Namespace) + + if err := env.k8sClient.Create(ctx, dep); err != nil { + t.Fatalf("failed to create Deployment: %v", err) + } +} diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index 97ce6ea8..2b02d8fa 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -9,7 +9,6 @@ import ( "path/filepath" "runtime" "strings" - "sync" "testing" "time" @@ -17,10 +16,9 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/controller" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "go.temporal.io/sdk/log" - appsv1 "k8s.io/api/apps/v1" + "go.temporal.io/server/temporaltest" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -31,6 +29,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) +type testEnv struct { + k8sClient client.Client + mgr manager.Manager + ts *temporaltest.TestServer + connection *temporaliov1alpha1.TemporalConnection + replicas map[string]int32 + images map[string]string +} + // setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set func setupKubebuilderAssets() error { if os.Getenv("KUBEBUILDER_ASSETS") != "" { @@ -164,95 +171,6 @@ func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Ma return cfg, k8sClient, mgr, clientPool, cleanup } -func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} - -func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) []func() { - var deployment appsv1.Deployment - if err := k8sClient.Get(ctx, types.NamespacedName{ - Name: deploymentName, - Namespace: namespace, - }, &deployment); err != nil { - t.Fatalf("failed to get deployment: %v", err) - } - - var wg sync.WaitGroup - stopFuncs := make([]func(), *(deployment.Spec.Replicas)) - workerErrors := make([]error, *(deployment.Spec.Replicas)) - workerCallback := func(i int32) func(func(), error) { - return func(stopFunc func(), err error) { - if err == nil { - stopFuncs[i] = stopFunc - wg.Done() - } else { - workerErrors[i] = err - } - } - } - - for i := int32(0); i < *(deployment.Spec.Replicas); i++ { - wg.Add(1) - go runHelloWorldWorker(ctx, deployment.Spec.Template, workerCallback(i)) - } - - // wait 10s for all expected workers to be healthy - timedOut := waitTimeout(&wg, 10*time.Second) - - if timedOut { - t.Fatalf("could not start workers, errors were: %+v", workerErrors) - } else { - setHealthyDeploymentStatus(t, ctx, k8sClient, deployment) - } - - return stopFuncs -} - -// Set deployment status to `DeploymentAvailable` to simulate a healthy deployment -// This is necessary because envtest doesn't actually start pods -func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient client.Client, deployment appsv1.Deployment) { - now := metav1.Now() - deployment.Status = appsv1.DeploymentStatus{ - Replicas: *deployment.Spec.Replicas, - UpdatedReplicas: *deployment.Spec.Replicas, - ReadyReplicas: *deployment.Spec.Replicas, - AvailableReplicas: *deployment.Spec.Replicas, - UnavailableReplicas: 0, - Conditions: []appsv1.DeploymentCondition{ - { - Type: appsv1.DeploymentAvailable, - Status: corev1.ConditionTrue, - LastUpdateTime: now, - LastTransitionTime: now, - Reason: "MinimumReplicasAvailable", - Message: "Deployment has minimum availability.", - }, - { - Type: appsv1.DeploymentProgressing, - Status: corev1.ConditionTrue, - LastUpdateTime: now, - LastTransitionTime: now, - Reason: "NewReplicaSetAvailable", - Message: "ReplicaSet is available.", - }, - }, - } - t.Logf("started %d healthy workers, updating deployment status", *deployment.Spec.Replicas) - if err := k8sClient.Status().Update(ctx, &deployment); err != nil { - t.Fatalf("failed to update deployment status: %v", err) - } -} - // createTestNamespace creates a test namespace func createTestNamespace(t *testing.T, k8sClient client.Client) *corev1.Namespace { testNamespace := &corev1.Namespace{ diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 726356cf..0d842c09 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -8,12 +8,12 @@ import ( temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" - "go.temporal.io/server/api/deployment/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" ) const ( @@ -22,18 +22,10 @@ const ( testDrainageRefreshInterval = time.Second ) -type testEnv struct { - k8sClient client.Client - ts *temporaltest.TestServer - connection *temporaliov1alpha1.TemporalConnection - replicas map[string]int32 - images map[string]string -} - // TestIntegration runs integration tests for the Temporal Worker Controller func TestIntegration(t *testing.T) { // Set up test environment - cfg, k8sClient, _, _, cleanup := setupTestEnvironment(t) + cfg, k8sClient, mgr, _, cleanup := setupTestEnvironment(t) defer cleanup() // Create test namespace @@ -80,141 +72,58 @@ func TestIntegration(t *testing.T) { testhelpers.NewStatusBuilder(). WithTargetVersion("v1", 5, true, true), ), + "progressive-rollout-with-success-gate": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithProgressiveStrategy(testhelpers.ProgressiveStep(5, time.Hour)). + WithGate(true). + WithVersion("v1"). + WithTargetVersionStatus("v0", -1, true, true). + WithCurrentVersionStatus("v0", true, true), + ). + WithDeprecatedBuilds( + testhelpers.NewDeprecatedVersionInfo("v0", 1), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", 5, true, true), + ), + "progressive-rollout-with-failed-gate": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithProgressiveStrategy(testhelpers.ProgressiveStep(5, time.Hour)). + WithGate(false). + WithVersion("v1"). + WithTargetVersionStatus("v0", -1, true, true). + WithCurrentVersionStatus("v0", true, true), + ). + WithDeprecatedBuilds( + testhelpers.NewDeprecatedVersionInfo("v0", 1), + ). + WithWaitTime(5 * time.Second). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", -1, true, false). + WithCurrentVersion("v0", true, true), + ), } for testName, tc := range tests { t.Run(testName, func(t *testing.T) { ctx := context.Background() - testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, ts, tc.BuildWithValues(testName, testNamespace.Name, ts.GetDefaultNamespace())) + testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, mgr, ts, tc.BuildWithValues(testName, testNamespace.Name, ts.GetDefaultNamespace())) }) } } -// Uses input.Status + deprecatedBuildReplicas to create (and maybe kill) pollers for deprecated versions in temporal -// also gets routing config of the deployment into the starting state before running the test. -// Does not set Status.VersionConflictToken, since that is only set internally by the server. -func makePreliminaryStatusTrue( - ctx context.Context, - t *testing.T, - env testEnv, - twd *temporaliov1alpha1.TemporalWorkerDeployment, -) { - t.Logf("Creating starting test env based on input.Status") - - // Make a separate list of deferred functions, because calling defer in a for loop is not allowed. - loopDefers := make([]func(), 0) - defer handleStopFuncs(loopDefers) - for _, dv := range twd.Status.DeprecatedVersions { - t.Logf("Setting up deprecated version %v with status %v", dv.VersionID, dv.Status) - workerStopFuncs := createStatus(ctx, t, env, twd, dv.BaseWorkerDeploymentVersion, nil) - loopDefers = append(loopDefers, func() { handleStopFuncs(workerStopFuncs) }) - } - - if tv := twd.Status.TargetVersion; tv.VersionID != "" { - t.Logf("Setting up target version %v with status %v", tv.VersionID, tv.Status) - workerStopFuncs := createStatus(ctx, t, env, twd, tv.BaseWorkerDeploymentVersion, tv.RampPercentage) - defer handleStopFuncs(workerStopFuncs) - } -} - -func handleStopFuncs(funcs []func()) { - for _, f := range funcs { - if f != nil { - f() - } - } -} - -// creates k8s deployment, pollers, and routing config state as needed. -func createStatus( - ctx context.Context, - t *testing.T, - env testEnv, - newTWD *temporaliov1alpha1.TemporalWorkerDeployment, - prevVersion temporaliov1alpha1.BaseWorkerDeploymentVersion, - rampPercentage *float32, -) (workerStopFuncs []func()) { - if prevVersion.Deployment != nil && prevVersion.Deployment.FieldPath == "create" { - v := getVersion(t, prevVersion.VersionID) - prevTWD := recreateTWD(newTWD, env.images[v.BuildId], env.replicas[v.BuildId]) - createWorkerDeployment(ctx, t, env, prevTWD, v.BuildId) - expectedDeploymentName := k8s.ComputeVersionedDeploymentName(prevTWD.Name, k8s.ComputeBuildID(prevTWD)) - waitForDeployment(t, env.k8sClient, expectedDeploymentName, prevTWD.Namespace, 30*time.Second) - if prevVersion.Status != temporaliov1alpha1.VersionStatusNotRegistered { - workerStopFuncs = applyDeployment(t, ctx, env.k8sClient, expectedDeploymentName, prevTWD.Namespace) - } - - switch prevVersion.Status { - case temporaliov1alpha1.VersionStatusInactive, temporaliov1alpha1.VersionStatusNotRegistered: - // no-op - case temporaliov1alpha1.VersionStatusRamping: - setRampingVersion(t, ctx, env.ts, v, *rampPercentage) // rampPercentage won't be nil if the version is ramping - case temporaliov1alpha1.VersionStatusCurrent: - setCurrentVersion(t, ctx, env.ts, v) - case temporaliov1alpha1.VersionStatusDraining: - setRampingVersion(t, ctx, env.ts, v, 1) - // TODO(carlydf): start a workflow on v that does not complete -> will never drain - setRampingVersion(t, ctx, env.ts, nil, 0) - case temporaliov1alpha1.VersionStatusDrained: - setRampingVersion(t, ctx, env.ts, v, 1) - setRampingVersion(t, ctx, env.ts, nil, 0) - } - } - - return workerStopFuncs -} - -// Helper to handle unlikely error caused by invalid string split. -func getVersion(t *testing.T, versionId string) *deployment.WorkerDeploymentVersion { - deploymentName, buildId, err := k8s.SplitVersionID(versionId) - if err != nil { - t.Error(err) - } - return &deployment.WorkerDeploymentVersion{ - DeploymentName: deploymentName, - BuildId: buildId, - } -} - -// recreateTWD returns a copy of the given TWD, but replaces the build-id-generating image name with the given one, -// and the Spec.Replicas with the given replica count. -// Panics if the twd spec is nil, or if it has no containers, but that should never be true for these integration tests. -func recreateTWD(twd *temporaliov1alpha1.TemporalWorkerDeployment, imageName string, replicas int32) *temporaliov1alpha1.TemporalWorkerDeployment { - ret := twd.DeepCopy() - ret.Spec.Template.Spec.Containers[0].Image = imageName - ret.Spec.Replicas = &replicas - return ret -} - -func createWorkerDeployment( - ctx context.Context, - t *testing.T, - env testEnv, - twd *temporaliov1alpha1.TemporalWorkerDeployment, - buildId string, -) { - dep := k8s.NewDeploymentWithOwnerRef( - &twd.TypeMeta, - &twd.ObjectMeta, - &twd.Spec, - k8s.ComputeWorkerDeploymentName(twd), - buildId, - env.connection.Spec, - ) - t.Logf("Creating Deployment %s in namespace %s", dep.Name, dep.Namespace) - - if err := env.k8sClient.Create(ctx, dep); err != nil { - t.Fatalf("failed to create Deployment: %v", err) - } -} - // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status func testTemporalWorkerDeploymentCreation( ctx context.Context, t *testing.T, k8sClient client.Client, + mgr manager.Manager, ts *temporaltest.TestServer, tc testhelpers.TestCase, ) { @@ -237,6 +146,7 @@ func testTemporalWorkerDeploymentCreation( env := testEnv{ k8sClient: k8sClient, + mgr: mgr, ts: ts, connection: temporalConnection, replicas: tc.GetDeprecatedBuildReplicas(), @@ -256,5 +166,8 @@ func testTemporalWorkerDeploymentCreation( workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, twd.Namespace) defer handleStopFuncs(workerStopFuncs) - verifyTemporalWorkerDeploymentStatusEventually(t, ctx, k8sClient, twd.Name, twd.Namespace, expectedStatus, 60*time.Second, 10*time.Second) + if wait := tc.GetWaitTime(); wait != nil { + time.Sleep(*wait) + } + verifyTemporalWorkerDeploymentStatusEventually(t, ctx, k8sClient, twd.Name, twd.Namespace, expectedStatus, 30*time.Second, 5*time.Second) } diff --git a/internal/tests/internal/test_helpers.go b/internal/tests/internal/validation_helpers.go similarity index 97% rename from internal/tests/internal/test_helpers.go rename to internal/tests/internal/validation_helpers.go index 18ff1bab..7052cda8 100644 --- a/internal/tests/internal/test_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -121,6 +121,9 @@ func verifyTemporalWorkerDeploymentStatusEventually( timeout time.Duration, interval time.Duration, ) { + if expectedDeploymentStatus == nil { + t.Fatalf("expected deployment status cannot be nil") + } eventually(t, timeout, interval, func() error { var twd temporaliov1alpha1.TemporalWorkerDeployment if err := k8sClient.Get(ctx, types.NamespacedName{ @@ -133,14 +136,14 @@ func verifyTemporalWorkerDeploymentStatusEventually( if twd.Status.CurrentVersion == nil { return fmt.Errorf("expected CurrentVersion to be set") } - if twd.Status.CurrentVersion.Deployment == nil { - return fmt.Errorf("expected CurrentVersion.Deployment to be set") - } if twd.Status.CurrentVersion.VersionID != expectedDeploymentStatus.CurrentVersion.VersionID { return fmt.Errorf("expected current version id to be '%s', got '%s'", expectedDeploymentStatus.CurrentVersion.VersionID, twd.Status.CurrentVersion.VersionID) } + if twd.Status.CurrentVersion.Deployment == nil { + return fmt.Errorf("expected CurrentVersion.Deployment to be set") + } if twd.Status.CurrentVersion.Deployment.Name != expectedDeploymentStatus.CurrentVersion.Deployment.Name { return fmt.Errorf("expected deployment name to be '%s', got '%s'", expectedDeploymentStatus.CurrentVersion.Deployment.Name, @@ -152,7 +155,7 @@ func verifyTemporalWorkerDeploymentStatusEventually( return fmt.Errorf("expected TargetVersion to be set") } if twd.Status.TargetVersion.VersionID != expectedDeploymentStatus.TargetVersion.VersionID { - return fmt.Errorf("expected ramping version id to be '%s', got '%s'", + return fmt.Errorf("expected target version id to be '%s', got '%s'", expectedDeploymentStatus.TargetVersion.VersionID, twd.Status.TargetVersion.VersionID) }