Skip to content

Commit c4b067a

Browse files
authored
Check versioned and unversioned pollers in certain cases (#132)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed 1. Check unversioned pollers for the target version if the current version is nil and strategy is progressive 2. Check versioned pollers for Drained versions if the controller is not running a k8s Deployment for that version ## Why? 1. So that we can still respect Progressive rollout steps if Current Version is nil and we confirm unversioned pollers for all Target Version task queues 2. So that we can only block rollout of a new version if we have a pathological # of ineligible for delete versions, meaning in most cases, users can rely on the server to delete their old versions. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 3. How was this tested: - Unit test for maxVersionsIneligibleForDeletion - Integration test where Progressive strategy is respected when unversioned pollers exist - Integration test where deployment is at max versions ineligible and controller doesn't roll out new deployment - Integration test where deployment has max versions but they are eligible for deletion so controller does roll out new deployment 4. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent 73e8378 commit c4b067a

22 files changed

+536
-163
lines changed

api/v1alpha1/temporalworker_webhook.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,6 @@ func (s *TemporalWorkerDeploymentSpec) Default(ctx context.Context) error {
5757
s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay}
5858
}
5959

60-
if s.MaxVersions == nil {
61-
maxVersions := int32(defaults.MaxVersions)
62-
s.MaxVersions = &maxVersions
63-
}
64-
6560
return nil
6661
}
6762

api/v1alpha1/temporalworker_webhook_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -159,24 +159,6 @@ func TestTemporalWorkerDeployment_Default(t *testing.T) {
159159
obj runtime.Object
160160
expected func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment)
161161
}{
162-
"sets default maxVersions": {
163-
obj: testhelpers.MakeTWDWithName("default-max-versions", ""),
164-
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
165-
require.NotNil(t, obj.Spec.MaxVersions)
166-
assert.Equal(t, int32(75), *obj.Spec.MaxVersions)
167-
},
168-
},
169-
"preserves existing maxVersions": {
170-
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("preserve-max-versions", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
171-
maxVersions := int32(100)
172-
obj.Spec.MaxVersions = &maxVersions
173-
return obj
174-
}),
175-
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
176-
require.NotNil(t, obj.Spec.MaxVersions)
177-
assert.Equal(t, int32(100), *obj.Spec.MaxVersions)
178-
},
179-
},
180162
"sets default sunset strategy delays": {
181163
obj: testhelpers.MakeTWDWithName("default-sunset-delays", ""),
182164
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {

api/v1alpha1/worker_types.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,6 @@ type TemporalWorkerDeploymentSpec struct {
6060

6161
// TODO(jlegrone): add godoc
6262
WorkerOptions WorkerOptions `json:"workerOptions"`
63-
64-
// MaxVersions defines the maximum number of worker deployment versions allowed.
65-
// This helps prevent hitting Temporal's default limit of 100 versions per deployment.
66-
// Defaults to 75. Users can override this by explicitly setting a higher value in
67-
// the CRD, but should exercise caution: once the server's version limit is reached,
68-
// Temporal attempts to delete an eligible version. If no version is eligible for deletion,
69-
// new deployments get blocked which prevents the controller from making progress.
70-
// This limit can be adjusted server-side by setting `matching.maxVersionsInDeployment`
71-
// in dynamicconfig.
72-
// +optional
73-
// +kubebuilder:validation:Minimum=1
74-
MaxVersions *int32 `json:"maxVersions,omitempty"`
7563
}
7664

7765
// VersionStatus indicates the status of a version.
@@ -240,6 +228,12 @@ type DeprecatedWorkerDeploymentVersion struct {
240228
// Only set when Status is VersionStatusDrained.
241229
// +optional
242230
DrainedSince *metav1.Time `json:"drainedSince"`
231+
232+
// A Version is eligible for deletion if it is drained and has no pollers on any task queue.
233+
// After pollers stop polling, the server will still consider them present until `matching.PollerHistoryTTL`
234+
// has passed.
235+
// +optional
236+
EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"`
243237
}
244238

245239
// DefaultVersionUpdateStrategy describes how to cut over new workflow executions
@@ -248,10 +242,20 @@ type DeprecatedWorkerDeploymentVersion struct {
248242
type DefaultVersionUpdateStrategy string
249243

250244
const (
245+
// UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version.
251246
UpdateManual DefaultVersionUpdateStrategy = "Manual"
252247

248+
// UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy.
253249
UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce"
254250

251+
// UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time.
252+
//
253+
// Note: If the Current Version of a Worker Deployment is nil and the controller cannot confirm that all Task Queues
254+
// in the Target Version have at least one unversioned poller, the controller will immediately set the new worker
255+
// deployment version to be Current and ignore the Progressive rollout steps.
256+
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
257+
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
258+
// queues in this worker deployment can be handled by an active poller.
255259
UpdateProgressive DefaultVersionUpdateStrategy = "Progressive"
256260
)
257261

@@ -263,15 +267,9 @@ type GateWorkflowConfig struct {
263267
type RolloutStrategy struct {
264268
// Specifies how to treat concurrent executions of a Job.
265269
// Valid values are:
266-
// - "Manual": scale worker resources up or down, but do not update the current or ramping worker deployment version;
267-
// - "AllAtOnce": start 100% of new workflow executions on the new worker deployment version as soon as it's healthy;
268-
// - "Progressive": ramp up the percentage of new workflow executions targeting the new worker deployment version over time;
269-
//
270-
// Note: If the Current Version of a Worker Deployment is nil, the controller will ignore any Progressive Rollout
271-
// Steps and immediately set the new worker deployment version to be Current.
272-
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
273-
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
274-
// queues in this worker deployment can be handled by an active poller.
270+
// - "Manual"
271+
// - "AllAtOnce"
272+
// - "Progressive"
275273
Strategy DefaultVersionUpdateStrategy `json:"strategy"`
276274

277275
// Gate specifies a workflow type that must run once to completion on the new worker deployment version before

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func main() {
9191
}))),
9292
mgr.GetClient(),
9393
),
94+
MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(),
9495
}).SetupWithManager(mgr); err != nil {
9596
setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment")
9697
os.Exit(1)

helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ spec:
4646
type: object
4747
spec:
4848
properties:
49-
maxVersions:
50-
format: int32
51-
minimum: 1
52-
type: integer
5349
minReadySeconds:
5450
format: int32
5551
type: integer
@@ -3876,6 +3872,8 @@ spec:
38763872
drainedSince:
38773873
format: date-time
38783874
type: string
3875+
eligibleForDeletion:
3876+
type: boolean
38793877
healthySince:
38803878
format: date-time
38813879
type: string

internal/controller/execplan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
119119
},
120120
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
121121
UpsertEntries: map[string]interface{}{
122-
controllerIdentityKey: getControllerIdentity(),
123-
controllerVersionKey: getControllerVersion(),
122+
controllerIdentityMetadataKey: getControllerIdentity(),
123+
controllerVersionMetadataKey: getControllerVersion(),
124124
},
125125
},
126126
}); err != nil { // would be cool to do this atomically with the update

internal/controller/genplan.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
100100
connection,
101101
plannerConfig,
102102
workerDeploymentName,
103+
r.MaxDeploymentVersionsIneligibleForDeletion,
103104
)
104105
if err != nil {
105106
return nil, fmt.Errorf("error generating plan: %w", err)

internal/controller/genstatus.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package controller
66

77
import (
88
"context"
9-
"fmt"
109

1110
"github.com/go-logr/logr"
1211
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
@@ -23,22 +22,11 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
2322
req ctrl.Request,
2423
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
2524
temporalState *temporal.TemporalWorkerState,
25+
k8sState *k8s.DeploymentState,
2626
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
2727
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
2828
targetBuildID := k8s.ComputeBuildID(workerDeploy)
2929

30-
// Fetch Kubernetes deployment state
31-
k8sState, err := k8s.GetDeploymentState(
32-
ctx,
33-
r.Client,
34-
req.Namespace,
35-
req.Name,
36-
workerDeploymentName,
37-
)
38-
if err != nil {
39-
return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
40-
}
41-
4230
// Fetch test workflow status for the desired version
4331
if targetBuildID != temporalState.CurrentBuildID {
4432
testWorkflows, err := temporal.GetTestWorkflowStatus(

internal/controller/state_mapper.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,17 @@ func (m *stateMapper) mapDeprecatedWorkerDeploymentVersionByBuildID(buildID stri
160160
return nil
161161
}
162162

163+
eligibleForDeletion := false
164+
if vInfo, exists := m.temporalState.Versions[buildID]; exists {
165+
eligibleForDeletion = vInfo.Status == v1alpha1.VersionStatusDrained && vInfo.NoTaskQueuesHaveVersionedPoller
166+
}
167+
163168
version := &v1alpha1.DeprecatedWorkerDeploymentVersion{
164169
BaseWorkerDeploymentVersion: v1alpha1.BaseWorkerDeploymentVersion{
165170
BuildID: buildID,
166171
Status: v1alpha1.VersionStatusNotRegistered,
167172
},
173+
EligibleForDeletion: eligibleForDeletion,
168174
}
169175

170176
// Set deployment reference if it exists

0 commit comments

Comments
 (0)