Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions api/v1alpha1/temporalworker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/temporalio/temporal-worker-controller/internal/defaults"
)

const (
defaultScaledownDelay = 1 * time.Hour
defaultDeleteDelay = 24 * time.Hour
maxTemporalWorkerDeploymentNameLen = 63
)

Expand All @@ -42,13 +42,27 @@ func (r *TemporalWorkerDeployment) Default(ctx context.Context, obj runtime.Obje
return apierrors.NewBadRequest("expected a TemporalWorkerDeployment")
}

if dep.Spec.SunsetStrategy.ScaledownDelay == nil {
dep.Spec.SunsetStrategy.ScaledownDelay = &v1.Duration{Duration: defaultScaledownDelay}
if err := dep.Spec.Default(ctx); err != nil {
return err
}

if dep.Spec.SunsetStrategy.DeleteDelay == nil {
dep.Spec.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaultDeleteDelay}
return nil
}

func (s *TemporalWorkerDeploymentSpec) Default(ctx context.Context) error {
if s.SunsetStrategy.ScaledownDelay == nil {
s.SunsetStrategy.ScaledownDelay = &v1.Duration{Duration: defaults.ScaledownDelay}
}

if s.SunsetStrategy.DeleteDelay == nil {
s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay}
}

if s.MaxVersions == nil {
maxVersions := int32(defaults.MaxVersions)
s.MaxVersions = &maxVersions
}

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions api/v1alpha1/temporalworker_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,53 @@ func TestTemporalWorkerDeployment_ValidateDelete(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, warnings)
}

func TestTemporalWorkerDeployment_Default(t *testing.T) {
tests := map[string]struct {
obj runtime.Object
expected func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment)
}{
"sets default maxVersions": {
obj: testhelpers.MakeTWDWithName("default-max-versions"),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
require.NotNil(t, obj.Spec.MaxVersions)
assert.Equal(t, int32(75), *obj.Spec.MaxVersions)
},
},
"preserves existing maxVersions": {
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("preserve-max-versions"), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
maxVersions := int32(100)
obj.Spec.MaxVersions = &maxVersions
return obj
}),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
require.NotNil(t, obj.Spec.MaxVersions)
assert.Equal(t, int32(100), *obj.Spec.MaxVersions)
},
},
"sets default sunset strategy delays": {
obj: testhelpers.MakeTWDWithName("default-sunset-delays"),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
require.NotNil(t, obj.Spec.SunsetStrategy.ScaledownDelay)
assert.Equal(t, time.Hour, obj.Spec.SunsetStrategy.ScaledownDelay.Duration)
require.NotNil(t, obj.Spec.SunsetStrategy.DeleteDelay)
assert.Equal(t, 24*time.Hour, obj.Spec.SunsetStrategy.DeleteDelay.Duration)
},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
webhook := &temporaliov1alpha1.TemporalWorkerDeployment{}

err := webhook.Default(ctx, tc.obj)
require.NoError(t, err)

obj, ok := tc.obj.(*temporaliov1alpha1.TemporalWorkerDeployment)
require.True(t, ok)

tc.expected(t, obj)
})
}
}
19 changes: 17 additions & 2 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ type TemporalWorkerDeploymentSpec struct {

// TODO(jlegrone): add godoc
WorkerOptions WorkerOptions `json:"workerOptions"`

// MaxVersions defines the maximum number of worker deployment versions allowed.
// This helps prevent hitting Temporal's default limit of 100 versions per deployment.
// Defaults to 75. Users can override this by explicitly setting a higher value in
// the CRD, but should exercise caution: once the server's version limit is reached,
// Temporal attempts to delete an eligible version. If no version is eligible for deletion,
// new deployments get blocked which prevents the controller from making progress.
// This limit can be adjusted server-side by setting `matching.maxVersionsInDeployment`
// in dynamicconfig.
// +optional
// +kubebuilder:validation:Minimum=1
MaxVersions *int32 `json:"maxVersions,omitempty"`
}

// VersionStatus indicates the status of a version.
Expand Down Expand Up @@ -124,6 +136,11 @@ type TemporalWorkerDeploymentStatus struct {
// LastModifierIdentity is the identity of the client that most recently modified the worker deployment.
// +optional
LastModifierIdentity string `json:"lastModifierIdentity,omitempty"`

// VersionCount is the total number of versions currently known by the worker deployment.
// This includes current, target, ramping, and deprecated versions.
// +optional
VersionCount int32 `json:"versionCount,omitempty"`
}

// WorkflowExecutionStatus describes the current state of a workflow.
Expand Down Expand Up @@ -312,8 +329,6 @@ type QueueStatistics struct {
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// TemporalWorkerDeployment is the Schema for the temporalworkerdeployments API
//
// TODO(jlegrone): Implement default/validate interface https://book.kubebuilder.io/cronjob-tutorial/webhook-implementation.html
type TemporalWorkerDeployment struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork
}
status.DeprecatedVersions = deprecatedVersions

// Set version count from temporal state (directly from VersionSummaries via Versions map)
status.VersionCount = int32(len(m.temporalState.Versions))

return status
}

Expand Down
4 changes: 4 additions & 0 deletions internal/controller/state_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func TestMapToStatus(t *testing.T) {

assert.NotNil(t, status.DeprecatedVersions[0].DrainedSince)
assert.Equal(t, drainedSince.Unix(), status.DeprecatedVersions[0].DrainedSince.Time.Unix())

// Verify version count is set correctly from VersionSummaries (via Versions map)
// Should count: worker.v1, worker.v2, worker.v3 (3 versions total)
assert.Equal(t, int32(3), status.VersionCount)
}

func TestMapWorkerDeploymentVersion(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions internal/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc.
package defaults

import "time"

// Default values for TemporalWorkerDeploymentSpec fields
const (
ScaledownDelay = 1 * time.Hour
DeleteDelay = 24 * time.Hour
ServerMaxVersions = 100
MaxVersions = int32(ServerMaxVersions * 0.75)
)
40 changes: 20 additions & 20 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,6 @@ type Config struct {
RolloutStrategy temporaliov1alpha1.RolloutStrategy
}

// ScaledownDelay returns the scaledown delay from the sunset strategy
func getScaledownDelay(spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec) time.Duration {
if spec.SunsetStrategy.ScaledownDelay == nil {
return 0
}
return spec.SunsetStrategy.ScaledownDelay.Duration
}

// DeleteDelay returns the delete delay from the sunset strategy
func getDeleteDelay(spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec) time.Duration {
if spec.SunsetStrategy.DeleteDelay == nil {
return 0
}
return spec.SunsetStrategy.DeleteDelay.Duration
}

// GeneratePlan creates a plan for updating the worker deployment
func GeneratePlan(
l logr.Logger,
Expand All @@ -89,7 +73,7 @@ func GeneratePlan(
// Add delete/scale operations based on version status
plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec)
plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec)
plan.ShouldCreateDeployment = shouldCreateDeployment(status)
plan.ShouldCreateDeployment = shouldCreateDeployment(status, spec)

// Determine if we need to start any test workflows
plan.TestWorkflows = getTestWorkflows(status, config)
Expand Down Expand Up @@ -127,7 +111,7 @@ func getDeleteDeployments(
// Deleting a deployment is only possible when:
// 1. The deployment has been drained for deleteDelay + scaledownDelay.
// 2. The deployment is scaled to 0 replicas.
if (time.Since(version.DrainedSince.Time) > getDeleteDelay(spec)+getScaledownDelay(spec)) &&
if (time.Since(version.DrainedSince.Time) > spec.SunsetStrategy.DeleteDelay.Duration+spec.SunsetStrategy.ScaledownDelay.Duration) &&
*d.Spec.Replicas == 0 {
deleteDeployments = append(deleteDeployments, d)
}
Expand Down Expand Up @@ -193,7 +177,7 @@ func getScaleDeployments(
scaleDeployments[version.Deployment] = uint32(replicas)
}
case temporaliov1alpha1.VersionStatusDrained:
if time.Since(version.DrainedSince.Time) > getScaledownDelay(spec) {
if time.Since(version.DrainedSince.Time) > spec.SunsetStrategy.ScaledownDelay.Duration {
// TODO(jlegrone): Compute scale based on load? Or percentage of replicas?
// Scale down drained deployments after delay
if d.Spec.Replicas != nil && *d.Spec.Replicas != 0 {
Expand All @@ -209,8 +193,24 @@ func getScaleDeployments(
// shouldCreateDeployment determines if a new deployment needs to be created
func shouldCreateDeployment(
status *temporaliov1alpha1.TemporalWorkerDeploymentStatus,
spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec,
) bool {
return status.TargetVersion.Deployment == nil
// Check if target version already has a deployment
if status.TargetVersion.Deployment != nil {
return false
}

// Check if we're at the version limit
maxVersions := int32(75) // Default from defaults.MaxVersions
if spec.MaxVersions != nil {
maxVersions = *spec.MaxVersions
}

if status.VersionCount >= maxVersions {
return false
}

return true
}

// getTestWorkflows determines which test workflows should be started
Expand Down
Loading