Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)

for _, wf := range p.startTestWorkflows {
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, wf.versionID)
if err != nil {
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
}
if _, err = temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
if _, err := temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
ID: wf.workflowID,
TaskQueue: wf.taskQueue,
WorkflowExecutionTimeout: time.Hour,
Expand All @@ -80,13 +76,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
// Register current version or ramps
if vcfg := p.UpdateVersionConfig; vcfg != nil {
if vcfg.SetCurrent {
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID)
if err != nil {
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
}

l.Info("registering new current version", "version", vcfg.VersionID)

if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
Version: vcfg.VersionID,
ConflictToken: vcfg.ConflictToken,
Expand All @@ -95,13 +85,6 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
return fmt.Errorf("unable to set current deployment version: %w", err)
}
} else {
if vcfg.VersionID != "" {
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID)
if err != nil {
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
}
}

if vcfg.RampPercentage > 0 {
l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage)
} else {
Expand Down
14 changes: 1 addition & 13 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/temporalio/temporal-worker-controller/internal/temporal"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)

// plan holds the actions to execute during reconciliation
Expand Down Expand Up @@ -133,16 +132,5 @@ func (r *TemporalWorkerDeploymentReconciler) newDeployment(
buildID string,
connection temporaliov1alpha1.TemporalConnectionSpec,
) (*appsv1.Deployment, error) {
d := k8s.NewDeploymentWithOwnerRef(
&w.TypeMeta,
&w.ObjectMeta,
&w.Spec,
k8s.ComputeWorkerDeploymentName(w),
buildID,
connection,
)
if err := ctrl.SetControllerReference(w, d, r.Scheme); err != nil {
return nil, err
}
return d, nil
return k8s.NewDeploymentWithControllerRef(w, buildID, connection, r.Scheme)
}
74 changes: 0 additions & 74 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,7 @@
package controller

import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
)

const (
Expand All @@ -24,70 +14,6 @@ const (
defaultControllerIdentity = "temporal-worker-controller"
)

// TODO(carlydf): Cache describe success for versions that already exist
// awaitVersionRegistration should be called after a poller starts polling with config of this version, since that is
// what will register the version with the server. SetRamp and SetCurrent will fail if the version does not exist.
func awaitVersionRegistration(
ctx context.Context,
l logr.Logger,
deploymentHandler sdkclient.WorkerDeploymentHandle,
namespace, versionID string) error {
ticker := time.NewTicker(1 * time.Second)
for {
l.Info(fmt.Sprintf("checking if version %s exists", versionID))
select {
case <-ctx.Done():
return context.Canceled
case <-ticker.C:
_, err := deploymentHandler.DescribeVersion(ctx, sdkclient.WorkerDeploymentDescribeVersionOptions{
Version: versionID,
})
var notFoundErr *serviceerror.NotFound
if err != nil {
if errors.As(err, &notFoundErr) {
continue
} else {
return fmt.Errorf("unable to describe worker deployment version %s: %w", versionID, err)
}
}
// After the version exists, confirm that it also exists in the worker deployment
// TODO(carlydf): Remove this check after next Temporal Cloud version which solves this inconsistency
return awaitVersionRegistrationInDeployment(ctx, l, deploymentHandler, namespace, versionID)
}
}
}

func awaitVersionRegistrationInDeployment(
ctx context.Context,
l logr.Logger,
deploymentHandler sdkclient.WorkerDeploymentHandle,
namespace, versionID string) error {
deploymentName, _, _ := strings.Cut(versionID, k8s.VersionIDSeparator)
ticker := time.NewTicker(1 * time.Second)
for {
l.Info(fmt.Sprintf("checking if version %s exists in worker deployment", versionID))
select {
case <-ctx.Done():
return context.Canceled
case <-ticker.C:
resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{})
var notFoundErr *serviceerror.NotFound
if err != nil {
if errors.As(err, &notFoundErr) {
continue
} else {
return fmt.Errorf("unable to describe worker deployment %s: %w", deploymentName, err)
}
}
for _, vs := range resp.Info.VersionSummaries {
if vs.Version == versionID {
return nil
}
}
}
}
}

// getControllerVersion returns the version from environment variable (set by Helm from image.tag)
func getControllerVersion() string {
if version := os.Getenv("CONTROLLER_VERSION"); version != "" {
Expand Down
22 changes: 22 additions & 0 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -290,3 +292,23 @@ func NewDeploymentWithOwnerRef(
},
}
}

func NewDeploymentWithControllerRef(
w *temporaliov1alpha1.TemporalWorkerDeployment,
buildID string,
connection temporaliov1alpha1.TemporalConnectionSpec,
reconcilerScheme *runtime.Scheme,
) (*appsv1.Deployment, error) {
d := NewDeploymentWithOwnerRef(
&w.TypeMeta,
&w.ObjectMeta,
&w.Spec,
ComputeWorkerDeploymentName(w),
buildID,
connection,
)
if err := ctrl.SetControllerReference(w, d, reconcilerScheme); err != nil {
return nil, err
}
return d, nil
}
11 changes: 7 additions & 4 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,12 @@ func getTestWorkflows(
) []WorkflowConfig {
var testWorkflows []WorkflowConfig

// Skip if there's no gate workflow defined or if the target version is already the current
// Skip if there's no gate workflow defined, if the target version is already the current, or if the target
// version is not yet registered in temporal
if config.RolloutStrategy.Gate == nil ||
status.CurrentVersion == nil ||
status.CurrentVersion.VersionID == status.TargetVersion.VersionID {
status.CurrentVersion.VersionID == status.TargetVersion.VersionID ||
status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered {
return nil
}

Expand Down Expand Up @@ -258,8 +260,9 @@ func getVersionConfigDiff(
strategy := config.RolloutStrategy
conflictToken := status.VersionConflictToken

// Do nothing if target version's deployment is not healthy yet
if status.TargetVersion.HealthySince == nil {
// Do nothing if target version's deployment is not healthy yet, or if the version is not yet registered in temporal
if status.TargetVersion.HealthySince == nil ||
status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered {
return nil
}

Expand Down
36 changes: 31 additions & 5 deletions internal/testhelpers/test_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (b *TemporalWorkerDeploymentBuilder) WithProgressiveStrategy(steps ...tempo
return b
}

// WithGate sets the rollout strategy have a gate workflow
func (b *TemporalWorkerDeploymentBuilder) WithGate(expectSuccess bool) *TemporalWorkerDeploymentBuilder {
if expectSuccess {
b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: successTestWorkflowType}
} else {
b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: failTestWorkflowType}
}
return b
}

// WithReplicas sets the number of replicas
func (b *TemporalWorkerDeploymentBuilder) WithReplicas(replicas int32) *TemporalWorkerDeploymentBuilder {
b.twd.Spec.Replicas = &replicas
Expand Down Expand Up @@ -157,17 +167,18 @@ func (sb *StatusBuilder) WithNamespace(k8sNamespace string) *StatusBuilder {
}

// WithCurrentVersion sets the current version in the status
func (sb *StatusBuilder) WithCurrentVersion(imageName string, hasDeployment, createPoller bool) *StatusBuilder {
func (sb *StatusBuilder) WithCurrentVersion(imageName string, healthy, createDeployment bool) *StatusBuilder {
sb.currentVersionBuilder = func(name string, namespace string) *temporaliov1alpha1.CurrentWorkerDeploymentVersion {
return MakeCurrentVersion(namespace, name, imageName, hasDeployment, createPoller)
return MakeCurrentVersion(namespace, name, imageName, healthy, createDeployment)
}
return sb
}

// WithTargetVersion sets the target version in the status
func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, hasDeployment bool, createPoller bool) *StatusBuilder {
// WithTargetVersion sets the target version in the status.
// Target Version is required.
func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, healthy bool, createDeployment bool) *StatusBuilder {
sb.targetVersionBuilder = func(name string, namespace string) temporaliov1alpha1.TargetWorkerDeploymentVersion {
return MakeTargetVersion(namespace, name, imageName, rampPercentage, hasDeployment, createPoller)
return MakeTargetVersion(namespace, name, imageName, rampPercentage, healthy, createDeployment)
}
return sb
}
Expand Down Expand Up @@ -198,6 +209,8 @@ type TestCase struct {
deprecatedBuildReplicas map[string]int32
deprecatedBuildImages map[string]string
expectedStatus *temporaliov1alpha1.TemporalWorkerDeploymentStatus
// Time to delay before checking expected status
waitTime *time.Duration
}

func (tc *TestCase) GetTWD() *temporaliov1alpha1.TemporalWorkerDeployment {
Expand All @@ -216,6 +229,10 @@ func (tc *TestCase) GetExpectedStatus() *temporaliov1alpha1.TemporalWorkerDeploy
return tc.expectedStatus
}

func (tc *TestCase) GetWaitTime() *time.Duration {
return tc.waitTime
}

// TestCaseBuilder provides a fluent interface for building test cases
type TestCaseBuilder struct {
name string
Expand All @@ -225,6 +242,7 @@ type TestCaseBuilder struct {
twdBuilder *TemporalWorkerDeploymentBuilder
expectedStatusBuilder *StatusBuilder
deprecatedVersionInfos []DeprecatedVersionInfo
waitTime *time.Duration
}

// NewTestCase creates a new test case builder
Expand Down Expand Up @@ -255,6 +273,13 @@ func (tcb *TestCaseBuilder) WithInput(twdBuilder *TemporalWorkerDeploymentBuilde
return tcb
}

// WithWaitTime sets the wait time. Use this if you are expecting no change to the initial status and want to ensure
// that after some time, there is still no change.
func (tcb *TestCaseBuilder) WithWaitTime(waitTime time.Duration) *TestCaseBuilder {
tcb.waitTime = &waitTime
return tcb
}

// DeprecatedVersionInfo defines the necessary information about a deprecated worker version, so that
// tests can recreate state that is not visible in the TemporalWorkerDeployment status
type DeprecatedVersionInfo struct {
Expand Down Expand Up @@ -284,6 +309,7 @@ func (tcb *TestCaseBuilder) WithExpectedStatus(statusBuilder *StatusBuilder) *Te
// Build returns the constructed test case
func (tcb *TestCaseBuilder) Build() TestCase {
ret := TestCase{
waitTime: tcb.waitTime,
twd: tcb.twdBuilder.
WithName(tcb.name).
WithNamespace(tcb.k8sNamespace).
Expand Down
Loading
Loading