Skip to content

Commit f8c1582

Browse files
authored
Bug Fix: Ignore LastModifierIdentity if server deleted a version for garbage collection (#163)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed Bug Fix: Ignore LastModifierIdentity if server deleted a version for garbage collection Also, fix issue in describe version that would sometimes cause a deprecated version to be Drained with nil DrainedSince time. ## Why? The server sets the `LastModifierIdentity` to `"try-delete-for-add-version"` when it deletes old versions to do garbage collection upon addition of the a worker version beyond the maximum. We are changing this on the server and will patch v1.29 of the OSS server to handle this more gracefully in v1.29.2 of the server, however, for users who cannot or do not want to use the new patch of the server immediately, this will fix the problem for them on the controller side so that they won't encounter this if they're running against Temporal Server v1.29.1 or v1.29.0. The issue of nil DrainedSince time caused flakiness in tests that involve Deprecated Drained versions. That flakiness appeared a lot in the new test I added to test the bug fix, so I also fixed that. ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> 2. How was this tested: New functional test 3. Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io -->
1 parent be53db0 commit f8c1582

File tree

5 files changed

+87
-10
lines changed

5 files changed

+87
-10
lines changed

internal/controller/genplan.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
8383
// Check if we need to force manual strategy due to external modification
8484
rolloutStrategy := w.Spec.RolloutStrategy
8585
if w.Status.LastModifierIdentity != getControllerIdentity() &&
86+
w.Status.LastModifierIdentity != serverDeleteVersionIdentity &&
8687
w.Status.LastModifierIdentity != "" &&
8788
!temporalState.IgnoreLastModifier {
8889
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))

internal/controller/util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ const (
1818
controllerVersionEnvKey = "CONTROLLER_VERSION"
1919
controllerIdentityEnvKey = "CONTROLLER_IDENTITY"
2020
ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"
21+
22+
serverDeleteVersionIdentity = "try-delete-for-add-version"
2123
)
2224

2325
// Version is set by goreleaser via ldflags at build time

internal/temporal/worker_deployment.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,37 @@ func GetWorkerDeploymentState(
151151
versionInfo.Status = temporaliov1alpha1.VersionStatusDrained
152152

153153
// Get drain time information
154-
versionResp, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
155-
BuildID: version.Version.BuildId,
156-
})
157-
if err == nil {
158-
drainedSince := versionResp.Info.DrainageInfo.LastChangedTime
154+
var desc temporalClient.WorkerDeploymentVersionDescription
155+
describeVersionUntilDrainTime := func() error {
156+
desc, err = deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
157+
BuildID: version.Version.BuildId,
158+
})
159+
if err != nil {
160+
return err
161+
}
162+
if desc.Info.DrainageInfo == nil {
163+
return fmt.Errorf("drainage info nil for build %s", version.Version.BuildId)
164+
}
165+
if desc.Info.DrainageInfo.DrainageStatus != temporalClient.WorkerDeploymentVersionDrainageStatusDrained {
166+
return fmt.Errorf("version info does not say that build %s is drained", version.Version.BuildId)
167+
}
168+
return err
169+
}
170+
// At first, version is found in DeploymentInfo.VersionSummaries but may not have the full drainage info in
171+
// describe version, so we describe with backoff.
172+
// If the version was just deleted by the server, we may never succeed at describing it, and it should
173+
// be treated as NotRegistered, since it no longer exists in Temporal.
174+
var notFound *serviceerror.NotFound
175+
if err = withBackoff(10*time.Second, 1*time.Second, describeVersionUntilDrainTime); err == nil { //revive:disable-line:max-control-nesting
176+
drainedSince := desc.Info.DrainageInfo.LastChangedTime
159177
versionInfo.DrainedSince = &drainedSince
160178
// If the deployment exists and has replicas, we assume there are versioned pollers, no need to check
161179
deployment, ok := k8sDeployments[version.Version.BuildId]
162180
if !ok || deployment.Status.Replicas == 0 { //revive:disable-line:max-control-nesting
163-
versionInfo.NoTaskQueuesHaveVersionedPoller = noTaskQueuesHaveVersionedPollers(ctx, client, versionResp.Info.TaskQueuesInfos)
181+
versionInfo.NoTaskQueuesHaveVersionedPoller = noTaskQueuesHaveVersionedPollers(ctx, client, desc.Info.TaskQueuesInfos)
164182
}
183+
} else if errors.As(err, &notFound) { //revive:disable-line:max-control-nesting
184+
versionInfo.Status = temporaliov1alpha1.VersionStatusNotRegistered
165185
}
166186
} else {
167187
versionInfo.Status = temporaliov1alpha1.VersionStatusInactive

internal/tests/internal/env_helpers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const (
4141
testDrainageVisibilityGracePeriod = time.Second
4242
testDrainageRefreshInterval = time.Second
4343
testMaxVersionsIneligibleForDeletion = 5
44+
testMaxVersionsInDeployment = 6
4445
)
4546

4647
// setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set

internal/tests/internal/integration_test.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestIntegration(t *testing.T) {
3737
// make versions drain faster
3838
dc.OverrideValue("matching.wv.VersionDrainageStatusVisibilityGracePeriod", testDrainageVisibilityGracePeriod)
3939
dc.OverrideValue("matching.wv.VersionDrainageStatusRefreshInterval", testDrainageRefreshInterval)
40+
dc.OverrideValue("matching.maxVersionsInDeployment", testMaxVersionsInDeployment)
4041
ts := temporaltest.NewServer(
4142
temporaltest.WithT(t),
4243
temporaltest.WithBaseServerOptions(temporal.WithDynamicConfigClient(dc)),
@@ -155,7 +156,7 @@ func TestIntegration(t *testing.T) {
155156
),
156157
},
157158
{
158-
name: "manual-rollout-blocked-at-max-replicas",
159+
name: "manual-rollout-blocked-at-max-versions-ineligible-for-deletion",
159160
builder: testhelpers.NewTestCase().
160161
WithInput(
161162
testhelpers.NewTemporalWorkerDeploymentBuilder().
@@ -355,7 +356,7 @@ func TestIntegration(t *testing.T) {
355356
),
356357
},
357358
{
358-
name: "all-at-once-blocked-at-max-replicas",
359+
name: "all-at-once-blocked-at-max-versions-ineligible-for-deletion",
359360
builder: testhelpers.NewTestCase().
360361
WithInput(
361362
testhelpers.NewTemporalWorkerDeploymentBuilder().
@@ -582,7 +583,7 @@ func TestIntegration(t *testing.T) {
582583
),
583584
},
584585
{
585-
name: "progressive-rollout-blocked-at-max-replicas",
586+
name: "progressive-rollout-blocked-at-ctrlr-max-versions",
586587
builder: testhelpers.NewTestCase().
587588
WithInput(
588589
testhelpers.NewTemporalWorkerDeploymentBuilder().
@@ -700,14 +701,15 @@ func TestIntegration(t *testing.T) {
700701
// make versions drain faster
701702
dcShortTTL.OverrideValue("matching.wv.VersionDrainageStatusVisibilityGracePeriod", testDrainageVisibilityGracePeriod)
702703
dcShortTTL.OverrideValue("matching.wv.VersionDrainageStatusRefreshInterval", testDrainageRefreshInterval)
704+
dcShortTTL.OverrideValue("matching.maxVersionsInDeployment", testMaxVersionsInDeployment)
703705
tsShortTTL := temporaltest.NewServer(
704706
temporaltest.WithT(t),
705707
temporaltest.WithBaseServerOptions(temporal.WithDynamicConfigClient(dcShortTTL)),
706708
)
707709
testsShortPollerTTL := []testCase{
708710
// Note: Add tests that require pollers to expire quickly here
709711
{
710-
name: "nth-rollout-unblocked-after-pollers-die",
712+
name: "6th-rollout-unblocked-after-pollers-die-max-ctrlr-versions",
711713
builder: testhelpers.NewTestCase().
712714
WithInput(
713715
testhelpers.NewTemporalWorkerDeploymentBuilder().
@@ -754,6 +756,57 @@ func TestIntegration(t *testing.T) {
754756
testhelpers.NewDeploymentInfo("v5", 1),
755757
),
756758
},
759+
{
760+
name: "7th-rollout-unblocked-after-pollers-die-version-deleted",
761+
builder: testhelpers.NewTestCase().
762+
WithInput(
763+
testhelpers.NewTemporalWorkerDeploymentBuilder().
764+
WithAllAtOnceStrategy().
765+
WithTargetTemplate("v6").
766+
WithStatus(
767+
testhelpers.NewStatusBuilder().
768+
WithTargetVersion("v5", temporaliov1alpha1.VersionStatusCurrent, -1, true, true).
769+
WithCurrentVersion("v5", true, true).
770+
WithDeprecatedVersions( // drained AND has no pollers -> eligible for deletion
771+
testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, true, true),
772+
testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, true, true),
773+
testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, true, true),
774+
testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, true, true),
775+
testhelpers.NewDeprecatedVersionInfo("v4", temporaliov1alpha1.VersionStatusDrained, true, true, true),
776+
),
777+
),
778+
).
779+
WithExistingDeployments(
780+
testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers
781+
testhelpers.NewDeploymentInfo("v1", 1),
782+
testhelpers.NewDeploymentInfo("v2", 1),
783+
testhelpers.NewDeploymentInfo("v3", 1),
784+
testhelpers.NewDeploymentInfo("v4", 1),
785+
testhelpers.NewDeploymentInfo("v5", 1),
786+
).
787+
WithWaitTime(5*time.Second).
788+
WithExpectedStatus(
789+
testhelpers.NewStatusBuilder().
790+
WithTargetVersion("v6", temporaliov1alpha1.VersionStatusCurrent, -1, false, false).
791+
WithCurrentVersion("v6", true, false).
792+
WithDeprecatedVersions( // drained AND has pollers -> eligible for deletion
793+
testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, false, true),
794+
testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, false, true),
795+
testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, false, true),
796+
testhelpers.NewDeprecatedVersionInfo("v4", temporaliov1alpha1.VersionStatusDrained, true, false, true),
797+
testhelpers.NewDeprecatedVersionInfo("v5", temporaliov1alpha1.VersionStatusDrained, true, false, true),
798+
),
799+
).
800+
WithExpectedDeployments(
801+
testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers
802+
testhelpers.NewDeploymentInfo("v1", 1),
803+
testhelpers.NewDeploymentInfo("v2", 1),
804+
testhelpers.NewDeploymentInfo("v3", 1),
805+
testhelpers.NewDeploymentInfo("v4", 1),
806+
testhelpers.NewDeploymentInfo("v5", 1),
807+
testhelpers.NewDeploymentInfo("v6", 1),
808+
),
809+
},
757810
}
758811

759812
for _, tc := range testsShortPollerTTL {

0 commit comments

Comments
 (0)