Skip to content

Commit 5b2ad52

Browse files
authored
Remove infinite version existence check (#98)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed What it says in the title. Replaced with Version Status == NotRegistered condition in planning phase. Also reorganized test helper functions, ensured that Deployments created by test helpers can be found by the controller, and added a failed and successful Gate workflow test. ## Why? Because checking for version existence in execplan.go breaks the abstraction of only reading server status in gen status. And we have the information to not do another describe, so we should use it. Changes to test files are so that helper functions and test functions are separated and easier to read and work on. ## Checklist <!--- add/delete as needed ---> 1. Closes #57 2. How was this tested: New tests specific to Gate workflows ensure that we are not starting those workflows on a non-existent version, and old tests also show that SetCurrent and SetRamping calls are not erroring due to nonexistent version. 3. Any docs updates needed? no
1 parent fff3e8f commit 5b2ad52

File tree

11 files changed

+404
-374
lines changed

11 files changed

+404
-374
lines changed

internal/controller/execplan.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
5858
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)
5959

6060
for _, wf := range p.startTestWorkflows {
61-
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, wf.versionID)
62-
if err != nil {
63-
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
64-
}
65-
if _, err = temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
61+
if _, err := temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
6662
ID: wf.workflowID,
6763
TaskQueue: wf.taskQueue,
6864
WorkflowExecutionTimeout: time.Hour,
@@ -80,13 +76,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
8076
// Register current version or ramps
8177
if vcfg := p.UpdateVersionConfig; vcfg != nil {
8278
if vcfg.SetCurrent {
83-
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID)
84-
if err != nil {
85-
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
86-
}
87-
8879
l.Info("registering new current version", "version", vcfg.VersionID)
89-
9080
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
9181
Version: vcfg.VersionID,
9282
ConflictToken: vcfg.ConflictToken,
@@ -95,13 +85,6 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
9585
return fmt.Errorf("unable to set current deployment version: %w", err)
9686
}
9787
} else {
98-
if vcfg.VersionID != "" {
99-
err := awaitVersionRegistration(ctx, l, deploymentHandler, p.TemporalNamespace, vcfg.VersionID)
100-
if err != nil {
101-
return fmt.Errorf("error waiting for version to register, did your pollers start successfully?: %w", err)
102-
}
103-
}
104-
10588
if vcfg.RampPercentage > 0 {
10689
l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage)
10790
} else {

internal/controller/genplan.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/temporalio/temporal-worker-controller/internal/temporal"
1616
appsv1 "k8s.io/api/apps/v1"
1717
corev1 "k8s.io/api/core/v1"
18-
ctrl "sigs.k8s.io/controller-runtime"
1918
)
2019

2120
// plan holds the actions to execute during reconciliation
@@ -133,16 +132,5 @@ func (r *TemporalWorkerDeploymentReconciler) newDeployment(
133132
buildID string,
134133
connection temporaliov1alpha1.TemporalConnectionSpec,
135134
) (*appsv1.Deployment, error) {
136-
d := k8s.NewDeploymentWithOwnerRef(
137-
&w.TypeMeta,
138-
&w.ObjectMeta,
139-
&w.Spec,
140-
k8s.ComputeWorkerDeploymentName(w),
141-
buildID,
142-
connection,
143-
)
144-
if err := ctrl.SetControllerReference(w, d, r.Scheme); err != nil {
145-
return nil, err
146-
}
147-
return d, nil
135+
return k8s.NewDeploymentWithControllerRef(w, buildID, connection, r.Scheme)
148136
}

internal/controller/util.go

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,7 @@
55
package controller
66

77
import (
8-
"context"
9-
"errors"
10-
"fmt"
118
"os"
12-
"strings"
13-
"time"
14-
15-
"github.com/go-logr/logr"
16-
"github.com/temporalio/temporal-worker-controller/internal/k8s"
17-
"go.temporal.io/api/serviceerror"
18-
sdkclient "go.temporal.io/sdk/client"
199
)
2010

2111
const (
@@ -24,70 +14,6 @@ const (
2414
defaultControllerIdentity = "temporal-worker-controller"
2515
)
2616

27-
// TODO(carlydf): Cache describe success for versions that already exist
28-
// awaitVersionRegistration should be called after a poller starts polling with config of this version, since that is
29-
// what will register the version with the server. SetRamp and SetCurrent will fail if the version does not exist.
30-
func awaitVersionRegistration(
31-
ctx context.Context,
32-
l logr.Logger,
33-
deploymentHandler sdkclient.WorkerDeploymentHandle,
34-
namespace, versionID string) error {
35-
ticker := time.NewTicker(1 * time.Second)
36-
for {
37-
l.Info(fmt.Sprintf("checking if version %s exists", versionID))
38-
select {
39-
case <-ctx.Done():
40-
return context.Canceled
41-
case <-ticker.C:
42-
_, err := deploymentHandler.DescribeVersion(ctx, sdkclient.WorkerDeploymentDescribeVersionOptions{
43-
Version: versionID,
44-
})
45-
var notFoundErr *serviceerror.NotFound
46-
if err != nil {
47-
if errors.As(err, &notFoundErr) {
48-
continue
49-
} else {
50-
return fmt.Errorf("unable to describe worker deployment version %s: %w", versionID, err)
51-
}
52-
}
53-
// After the version exists, confirm that it also exists in the worker deployment
54-
// TODO(carlydf): Remove this check after next Temporal Cloud version which solves this inconsistency
55-
return awaitVersionRegistrationInDeployment(ctx, l, deploymentHandler, namespace, versionID)
56-
}
57-
}
58-
}
59-
60-
func awaitVersionRegistrationInDeployment(
61-
ctx context.Context,
62-
l logr.Logger,
63-
deploymentHandler sdkclient.WorkerDeploymentHandle,
64-
namespace, versionID string) error {
65-
deploymentName, _, _ := strings.Cut(versionID, k8s.VersionIDSeparator)
66-
ticker := time.NewTicker(1 * time.Second)
67-
for {
68-
l.Info(fmt.Sprintf("checking if version %s exists in worker deployment", versionID))
69-
select {
70-
case <-ctx.Done():
71-
return context.Canceled
72-
case <-ticker.C:
73-
resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{})
74-
var notFoundErr *serviceerror.NotFound
75-
if err != nil {
76-
if errors.As(err, &notFoundErr) {
77-
continue
78-
} else {
79-
return fmt.Errorf("unable to describe worker deployment %s: %w", deploymentName, err)
80-
}
81-
}
82-
for _, vs := range resp.Info.VersionSummaries {
83-
if vs.Version == versionID {
84-
return nil
85-
}
86-
}
87-
}
88-
}
89-
}
90-
9117
// getControllerVersion returns the version from environment variable (set by Helm from image.tag)
9218
func getControllerVersion() string {
9319
if version := os.Getenv("CONTROLLER_VERSION"); version != "" {

internal/k8s/deployments.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
appsv1 "k8s.io/api/apps/v1"
1818
corev1 "k8s.io/api/core/v1"
1919
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
"k8s.io/apimachinery/pkg/runtime"
21+
ctrl "sigs.k8s.io/controller-runtime"
2022
"sigs.k8s.io/controller-runtime/pkg/client"
2123
)
2224

@@ -290,3 +292,23 @@ func NewDeploymentWithOwnerRef(
290292
},
291293
}
292294
}
295+
296+
func NewDeploymentWithControllerRef(
297+
w *temporaliov1alpha1.TemporalWorkerDeployment,
298+
buildID string,
299+
connection temporaliov1alpha1.TemporalConnectionSpec,
300+
reconcilerScheme *runtime.Scheme,
301+
) (*appsv1.Deployment, error) {
302+
d := NewDeploymentWithOwnerRef(
303+
&w.TypeMeta,
304+
&w.ObjectMeta,
305+
&w.Spec,
306+
ComputeWorkerDeploymentName(w),
307+
buildID,
308+
connection,
309+
)
310+
if err := ctrl.SetControllerReference(w, d, reconcilerScheme); err != nil {
311+
return nil, err
312+
}
313+
return d, nil
314+
}

internal/planner/planner.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,12 @@ func getTestWorkflows(
218218
) []WorkflowConfig {
219219
var testWorkflows []WorkflowConfig
220220

221-
// Skip if there's no gate workflow defined or if the target version is already the current
221+
// Skip if there's no gate workflow defined, if the target version is already the current, or if the target
222+
// version is not yet registered in temporal
222223
if config.RolloutStrategy.Gate == nil ||
223224
status.CurrentVersion == nil ||
224-
status.CurrentVersion.VersionID == status.TargetVersion.VersionID {
225+
status.CurrentVersion.VersionID == status.TargetVersion.VersionID ||
226+
status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered {
225227
return nil
226228
}
227229

@@ -258,8 +260,9 @@ func getVersionConfigDiff(
258260
strategy := config.RolloutStrategy
259261
conflictToken := status.VersionConflictToken
260262

261-
// Do nothing if target version's deployment is not healthy yet
262-
if status.TargetVersion.HealthySince == nil {
263+
// Do nothing if target version's deployment is not healthy yet, or if the version is not yet registered in temporal
264+
if status.TargetVersion.HealthySince == nil ||
265+
status.TargetVersion.Status == temporaliov1alpha1.VersionStatusNotRegistered {
263266
return nil
264267
}
265268

internal/testhelpers/test_builder.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ func (b *TemporalWorkerDeploymentBuilder) WithProgressiveStrategy(steps ...tempo
4747
return b
4848
}
4949

50+
// WithGate sets the rollout strategy have a gate workflow
51+
func (b *TemporalWorkerDeploymentBuilder) WithGate(expectSuccess bool) *TemporalWorkerDeploymentBuilder {
52+
if expectSuccess {
53+
b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: successTestWorkflowType}
54+
} else {
55+
b.twd.Spec.RolloutStrategy.Gate = &temporaliov1alpha1.GateWorkflowConfig{WorkflowType: failTestWorkflowType}
56+
}
57+
return b
58+
}
59+
5060
// WithReplicas sets the number of replicas
5161
func (b *TemporalWorkerDeploymentBuilder) WithReplicas(replicas int32) *TemporalWorkerDeploymentBuilder {
5262
b.twd.Spec.Replicas = &replicas
@@ -157,17 +167,18 @@ func (sb *StatusBuilder) WithNamespace(k8sNamespace string) *StatusBuilder {
157167
}
158168

159169
// WithCurrentVersion sets the current version in the status
160-
func (sb *StatusBuilder) WithCurrentVersion(imageName string, hasDeployment, createPoller bool) *StatusBuilder {
170+
func (sb *StatusBuilder) WithCurrentVersion(imageName string, healthy, createDeployment bool) *StatusBuilder {
161171
sb.currentVersionBuilder = func(name string, namespace string) *temporaliov1alpha1.CurrentWorkerDeploymentVersion {
162-
return MakeCurrentVersion(namespace, name, imageName, hasDeployment, createPoller)
172+
return MakeCurrentVersion(namespace, name, imageName, healthy, createDeployment)
163173
}
164174
return sb
165175
}
166176

167-
// WithTargetVersion sets the target version in the status
168-
func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, hasDeployment bool, createPoller bool) *StatusBuilder {
177+
// WithTargetVersion sets the target version in the status.
178+
// Target Version is required.
179+
func (sb *StatusBuilder) WithTargetVersion(imageName string, rampPercentage float32, healthy bool, createDeployment bool) *StatusBuilder {
169180
sb.targetVersionBuilder = func(name string, namespace string) temporaliov1alpha1.TargetWorkerDeploymentVersion {
170-
return MakeTargetVersion(namespace, name, imageName, rampPercentage, hasDeployment, createPoller)
181+
return MakeTargetVersion(namespace, name, imageName, rampPercentage, healthy, createDeployment)
171182
}
172183
return sb
173184
}
@@ -198,6 +209,8 @@ type TestCase struct {
198209
deprecatedBuildReplicas map[string]int32
199210
deprecatedBuildImages map[string]string
200211
expectedStatus *temporaliov1alpha1.TemporalWorkerDeploymentStatus
212+
// Time to delay before checking expected status
213+
waitTime *time.Duration
201214
}
202215

203216
func (tc *TestCase) GetTWD() *temporaliov1alpha1.TemporalWorkerDeployment {
@@ -216,6 +229,10 @@ func (tc *TestCase) GetExpectedStatus() *temporaliov1alpha1.TemporalWorkerDeploy
216229
return tc.expectedStatus
217230
}
218231

232+
func (tc *TestCase) GetWaitTime() *time.Duration {
233+
return tc.waitTime
234+
}
235+
219236
// TestCaseBuilder provides a fluent interface for building test cases
220237
type TestCaseBuilder struct {
221238
name string
@@ -225,6 +242,7 @@ type TestCaseBuilder struct {
225242
twdBuilder *TemporalWorkerDeploymentBuilder
226243
expectedStatusBuilder *StatusBuilder
227244
deprecatedVersionInfos []DeprecatedVersionInfo
245+
waitTime *time.Duration
228246
}
229247

230248
// NewTestCase creates a new test case builder
@@ -255,6 +273,13 @@ func (tcb *TestCaseBuilder) WithInput(twdBuilder *TemporalWorkerDeploymentBuilde
255273
return tcb
256274
}
257275

276+
// WithWaitTime sets the wait time. Use this if you are expecting no change to the initial status and want to ensure
277+
// that after some time, there is still no change.
278+
func (tcb *TestCaseBuilder) WithWaitTime(waitTime time.Duration) *TestCaseBuilder {
279+
tcb.waitTime = &waitTime
280+
return tcb
281+
}
282+
258283
// DeprecatedVersionInfo defines the necessary information about a deprecated worker version, so that
259284
// tests can recreate state that is not visible in the TemporalWorkerDeployment status
260285
type DeprecatedVersionInfo struct {
@@ -284,6 +309,7 @@ func (tcb *TestCaseBuilder) WithExpectedStatus(statusBuilder *StatusBuilder) *Te
284309
// Build returns the constructed test case
285310
func (tcb *TestCaseBuilder) Build() TestCase {
286311
ret := TestCase{
312+
waitTime: tcb.waitTime,
287313
twd: tcb.twdBuilder.
288314
WithName(tcb.name).
289315
WithNamespace(tcb.k8sNamespace).

0 commit comments

Comments
 (0)