Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ func GeneratePlan(
ScaleDeployments: make(map[*corev1.ObjectReference]uint32),
}

// If Deployment was not found in temporal, which always happens on the first worker deployment version
// and sometimes happens transiently thereafter, the versions list will be empty. If the deployment
// exists and was found, there will always be at least one version in the list.
foundDeploymentInTemporal := temporalState != nil && len(temporalState.Versions) > 0

// Add delete/scale operations based on version status
plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec)
plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec, foundDeploymentInTemporal)
plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec)
plan.ShouldCreateDeployment = shouldCreateDeployment(status, maxVersionsIneligibleForDeletion)
plan.UpdateDeployments = getUpdateDeployments(k8sState, status, connection)
Expand Down Expand Up @@ -191,6 +196,7 @@ func getDeleteDeployments(
k8sState *k8s.DeploymentState,
status *temporaliov1alpha1.TemporalWorkerDeploymentStatus,
spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec,
foundDeploymentInTemporal bool,
) []*appsv1.Deployment {
var deleteDeployments []*appsv1.Deployment

Expand All @@ -215,9 +221,11 @@ func getDeleteDeployments(
deleteDeployments = append(deleteDeployments, d)
}
case temporaliov1alpha1.VersionStatusNotRegistered:
// NotRegistered versions are versions that the server doesn't know about.
// Only delete if it's not the target version.
if status.TargetVersion.BuildID != version.BuildID {
// Only delete Deployments of NotRegistered versions if temporalState was not empty
if foundDeploymentInTemporal &&
// NotRegistered versions are versions that the server doesn't know about.
// Only delete if it's not the target version.
status.TargetVersion.BuildID != version.BuildID {
deleteDeployments = append(deleteDeployments, d)
}
}
Expand Down
56 changes: 46 additions & 10 deletions internal/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,13 @@ func TestGeneratePlan(t *testing.T) {

func TestGetDeleteDeployments(t *testing.T) {
testCases := []struct {
name string
k8sState *k8s.DeploymentState
status *temporaliov1alpha1.TemporalWorkerDeploymentStatus
spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec
config *Config
expectDeletes int
name string
k8sState *k8s.DeploymentState
status *temporaliov1alpha1.TemporalWorkerDeploymentStatus
spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec
config *Config
expectDeletes int
foundDeploymentInTemporal bool
}{
{
name: "drained version should be deleted",
Expand Down Expand Up @@ -463,7 +464,8 @@ func TestGetDeleteDeployments(t *testing.T) {
config: &Config{
RolloutStrategy: temporaliov1alpha1.RolloutStrategy{},
},
expectDeletes: 1,
expectDeletes: 1,
foundDeploymentInTemporal: true,
},
{
name: "not yet drained long enough",
Expand Down Expand Up @@ -492,7 +494,8 @@ func TestGetDeleteDeployments(t *testing.T) {
},
Replicas: func() *int32 { r := int32(1); return &r }(),
},
expectDeletes: 0,
expectDeletes: 0,
foundDeploymentInTemporal: true,
},
{
name: "not registered version should be deleted",
Expand Down Expand Up @@ -523,15 +526,48 @@ func TestGetDeleteDeployments(t *testing.T) {
spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{
Replicas: func() *int32 { r := int32(1); return &r }(),
},
expectDeletes: 1,
expectDeletes: 1,
foundDeploymentInTemporal: true,
},
{
name: "not registered version should NOT be deleted, deployment not found in temporal",
k8sState: &k8s.DeploymentState{
Deployments: map[string]*appsv1.Deployment{
"123": createDeploymentWithDefaultConnectionSpecHash(1),
"456": createDeploymentWithDefaultConnectionSpecHash(1),
},
},
status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "123",
Status: temporaliov1alpha1.VersionStatusCurrent,
Deployment: &corev1.ObjectReference{Name: "test-123"},
},
},
DeprecatedVersions: []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{
{
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
BuildID: "456",
Status: temporaliov1alpha1.VersionStatusNotRegistered,
Deployment: &corev1.ObjectReference{Name: "test-456"},
},
},
},
},
spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{
Replicas: func() *int32 { r := int32(1); return &r }(),
},
expectDeletes: 0,
foundDeploymentInTemporal: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.spec.Default(context.Background())
require.NoError(t, err)
deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec)
deletes := getDeleteDeployments(tc.k8sState, tc.status, tc.spec, tc.foundDeploymentInTemporal)
assert.Equal(t, tc.expectDeletes, len(deletes), "unexpected number of deletes")
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/temporal/worker_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func GetWorkerDeploymentState(
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
// If deployment not found, return empty state
// If deployment not found, return empty state. Need to scale up workers in order to create Deployment Temporal-side
return state, nil
}
return nil, fmt.Errorf("unable to describe worker deployment %s: %w", workerDeploymentName, err)
Expand Down
Loading