From 4d9cde362472e03e6b4231bf5faed93bc19b3e04 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 24 Sep 2025 16:09:52 -0700 Subject: [PATCH 1/3] throw error and retry Reconcile if Deployment is NotFound --- internal/temporal/worker_deployment.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 858b8a6c..ddf57cdb 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -86,11 +86,6 @@ func GetWorkerDeploymentState( // Describe the worker deployment resp, err := deploymentHandler.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) if err != nil { - var notFound *serviceerror.NotFound - if errors.As(err, ¬Found) { - // If deployment not found, return empty state - return state, nil - } return nil, fmt.Errorf("unable to describe worker deployment %s: %w", workerDeploymentName, err) } From a428e545d97e467637accdae8d09f334254767fa Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 24 Sep 2025 16:55:07 -0700 Subject: [PATCH 2/3] continue Reconcile loop if Deployment Not Found, and handle empty TemporalState later --- internal/planner/planner.go | 11 +++++++---- internal/temporal/worker_deployment.go | 5 +++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/internal/planner/planner.go b/internal/planner/planner.go index fa601eb3..6b3f1f0a 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -76,7 +76,7 @@ func GeneratePlan( } // Add delete/scale operations based on version status - plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec) + plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec, temporalState) plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec) plan.ShouldCreateDeployment = shouldCreateDeployment(status, maxVersionsIneligibleForDeletion) plan.UpdateDeployments = getUpdateDeployments(k8sState, status, connection) @@ -191,6 +191,7 @@ func getDeleteDeployments( k8sState *k8s.DeploymentState, status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, + temporalState *temporal.TemporalWorkerState, ) []*appsv1.Deployment { var deleteDeployments []*appsv1.Deployment @@ -215,9 +216,11 @@ func getDeleteDeployments( deleteDeployments = append(deleteDeployments, d) } case temporaliov1alpha1.VersionStatusNotRegistered: - // NotRegistered versions are versions that the server doesn't know about. - // Only delete if it's not the target version. - if status.TargetVersion.BuildID != version.BuildID { + // Only delete Deployments of NotRegistered versions if temporalState was not empty + if len(temporalState.Versions) > 0 && + // NotRegistered versions are versions that the server doesn't know about. + // Only delete if it's not the target version. + status.TargetVersion.BuildID != version.BuildID { deleteDeployments = append(deleteDeployments, d) } } diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index ddf57cdb..7c681fe9 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -86,6 +86,11 @@ func GetWorkerDeploymentState( // Describe the worker deployment resp, err := deploymentHandler.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + // If deployment not found, return empty state. Need to scale up workers in order to create Deployment Temporal-side + return state, nil + } return nil, fmt.Errorf("unable to describe worker deployment %s: %w", workerDeploymentName, err) } From 8759669895f7ba1696f4d8118e90946c3ebd2a1c Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Wed, 24 Sep 2025 17:12:13 -0700 Subject: [PATCH 3/3] unit test and fix --- internal/planner/planner.go | 11 +++++-- internal/planner/planner_test.go | 56 ++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 6b3f1f0a..19dfd6e0 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -75,8 +75,13 @@ func GeneratePlan( ScaleDeployments: make(map[*corev1.ObjectReference]uint32), } + // If Deployment was not found in temporal, which always happens on the first worker deployment version + // and sometimes happens transiently thereafter, the versions list will be empty. If the deployment + // exists and was found, there will always be at least one version in the list. + foundDeploymentInTemporal := temporalState != nil && len(temporalState.Versions) > 0 + // Add delete/scale operations based on version status - plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec, temporalState) + plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec, foundDeploymentInTemporal) plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec) plan.ShouldCreateDeployment = shouldCreateDeployment(status, maxVersionsIneligibleForDeletion) plan.UpdateDeployments = getUpdateDeployments(k8sState, status, connection) @@ -191,7 +196,7 @@ func getDeleteDeployments( k8sState *k8s.DeploymentState, status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, - temporalState *temporal.TemporalWorkerState, + foundDeploymentInTemporal bool, ) []*appsv1.Deployment { var deleteDeployments []*appsv1.Deployment @@ -217,7 +222,7 @@ func getDeleteDeployments( } case temporaliov1alpha1.VersionStatusNotRegistered: // Only delete Deployments of NotRegistered versions if temporalState was not empty - if len(temporalState.Versions) > 0 && + if foundDeploymentInTemporal && // NotRegistered versions are versions that the server doesn't know about. // Only delete if it's not the target version. status.TargetVersion.BuildID != version.BuildID { diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index c86f60d4..ccc62469 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -426,12 +426,13 @@ func TestGeneratePlan(t *testing.T) { func TestGetDeleteDeployments(t *testing.T) { testCases := []struct { - name string - k8sState *k8s.DeploymentState - status *temporaliov1alpha1.TemporalWorkerDeploymentStatus - spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec - config *Config - expectDeletes int + name string + k8sState *k8s.DeploymentState + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec + config *Config + expectDeletes int + foundDeploymentInTemporal bool }{ { name: "drained version should be deleted", @@ -463,7 +464,8 @@ func TestGetDeleteDeployments(t *testing.T) { config: &Config{ RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, }, - expectDeletes: 1, + expectDeletes: 1, + foundDeploymentInTemporal: true, }, { name: "not yet drained long enough", @@ -492,7 +494,8 @@ func TestGetDeleteDeployments(t *testing.T) { }, Replicas: func() *int32 { r := int32(1); return &r }(), }, - expectDeletes: 0, + expectDeletes: 0, + foundDeploymentInTemporal: true, }, { name: "not registered version should be deleted", @@ -523,7 +526,40 @@ func TestGetDeleteDeployments(t *testing.T) { spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ Replicas: func() *int32 { r := int32(1); return &r }(), }, - expectDeletes: 1, + expectDeletes: 1, + foundDeploymentInTemporal: true, + }, + { + name: "not registered version should NOT be deleted, deployment not found in temporal", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "123": createDeploymentWithDefaultConnectionSpecHash(1), + "456": createDeploymentWithDefaultConnectionSpecHash(1), + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "123", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-123"}, + }, + }, + DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{ + { + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "456", + Status: temporaliov1alpha1.VersionStatusNotRegistered, + Deployment: &corev1.ObjectReference{Name: "test-456"}, + }, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + expectDeletes: 0, + foundDeploymentInTemporal: false, }, } @@ -531,7 +567,7 @@ func TestGetDeleteDeployments(t *testing.T) { t.Run(tc.name, func(t *testing.T) { err := tc.spec.Default(context.Background()) require.NoError(t, err) - deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec) + deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec, tc.foundDeploymentInTemporal) assert.Equal(t, tc.expectDeletes, len(deletes), "unexpected number of deletes") }) }