Skip to content

Commit 51d025a

Browse files
carlydfjlegrone
andauthored
LastModifier-based ownership transfer with docs (#131)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed Add temporary version-metadata method for unblocking worker controller after user modification ## Why? To enable ownership transfer until `OwnerIdentity` management is available per temporalio/api#632 ## Checklist <!--- add/delete as needed ---> 1. Closes #74 2. How was this tested: Unit tested, but still needs more tests (ie. test that controller correctly unsets the value) 3. Any docs updates needed? Yes, see docs/ownership.md --------- Co-authored-by: Jacob LeGrone <[email protected]>
1 parent 071789e commit 51d025a

File tree

16 files changed

+382
-17
lines changed

16 files changed

+382
-17
lines changed

docs/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ This documentation structure is designed to support various types of technical d
1414
### [Limits](limits.md)
1515
Technical constraints and limitations of the Temporal Worker Controller system, including maximum field lengths and other operational boundaries.
1616

17+
### [Ownership](ownership.md)
18+
How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control.
19+
1720
---
1821

1922
*Note: This documentation structure is designed to grow with the project.*

docs/ownership.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Ownership Transfer in the Worker Controller
2+
3+
## Problem
4+
5+
If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker
6+
Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the
7+
new target version from ramping because an issue was detected), the controller should not clobber what the human did.
8+
9+
At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is
10+
authorized to resume making changes to the Routing Config of the Worker Deployment.
11+
12+
## Solution
13+
14+
_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `OwnerIdentity`
15+
field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._
16+
17+
In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether
18+
another user has made a change. If another user made a change to the Worker Deployment, the controller will not make
19+
any more changes to ensure a human's change is not clobbered.
20+
21+
Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready
22+
for the Worker Controller to take over, you can update the metadata to indicate that.
23+
24+
There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on
25+
the Current Version of your Worker Deployment.
26+
27+
```bash
28+
temporal worker deployment update-metadata-version \
29+
--deployment-name $MY_DEPLOYMENT \
30+
--build-id $CURRENT_VERSION_BUILD_ID
31+
--metadata 'temporal.io/ignore-last-modifier=true'`
32+
```
33+
In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version
34+
```bash
35+
temporal worker deployment update-metadata-version \
36+
--deployment-name $MY_DEPLOYMENT \
37+
--build-id $RAMPING_VERSION_BUILD_ID
38+
--metadata 'temporal.io/ignore-last-modifier=true'`
39+
```
40+
41+
In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to
42+
set a Current or Ramping Version and then do as instructed above.

internal/controller/execplan.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"time"
1111

1212
"github.com/go-logr/logr"
13+
"github.com/temporalio/temporal-worker-controller/internal/temporal"
1314
enumspb "go.temporal.io/api/enums/v1"
1415
sdkclient "go.temporal.io/sdk/client"
1516
"go.temporal.io/sdk/worker"
@@ -91,7 +92,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
9192
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
9293
BuildID: vcfg.BuildID,
9394
ConflictToken: vcfg.ConflictToken,
94-
Identity: ControllerIdentity,
95+
Identity: getControllerIdentity(),
9596
}); err != nil {
9697
return fmt.Errorf("unable to set current deployment version: %w", err)
9798
}
@@ -106,7 +107,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
106107
BuildID: vcfg.BuildID,
107108
Percentage: vcfg.RampPercentage,
108109
ConflictToken: vcfg.ConflictToken,
109-
Identity: ControllerIdentity,
110+
Identity: getControllerIdentity(),
110111
}); err != nil {
111112
return fmt.Errorf("unable to set ramping deployment version: %w", err)
112113
}
@@ -127,5 +128,19 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
127128
}
128129
}
129130

131+
for _, buildId := range p.RemoveIgnoreLastModifierBuilds {
132+
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
133+
Version: worker.WorkerDeploymentVersion{
134+
DeploymentName: p.WorkerDeploymentName,
135+
BuildId: buildId,
136+
},
137+
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
138+
RemoveEntries: []string{temporal.IgnoreLastModifierKey},
139+
},
140+
}); err != nil {
141+
return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err)
142+
}
143+
}
144+
130145
return nil
131146
}

internal/controller/genplan.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ type plan struct {
3333

3434
// Start a workflow
3535
startTestWorkflows []startWorkflowConfig
36+
37+
// Build IDs of versions from which the controller should
38+
// remove IgnoreLastModifierKey from the version metadata
39+
RemoveIgnoreLastModifierBuilds []string
3640
}
3741

3842
// startWorkflowConfig defines a workflow to be started
@@ -75,8 +79,10 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
7579

7680
// Check if we need to force manual strategy due to external modification
7781
rolloutStrategy := w.Spec.RolloutStrategy
78-
if w.Status.LastModifierIdentity != ControllerIdentity && w.Status.LastModifierIdentity != "" {
79-
l.Info("Forcing manual rollout strategy since deployment was modified externally")
82+
if w.Status.LastModifierIdentity != getControllerIdentity() &&
83+
w.Status.LastModifierIdentity != "" &&
84+
!temporalState.IgnoreLastModifier {
85+
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
8086
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
8187
}
8288

@@ -107,6 +113,8 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
107113
// Convert version config
108114
plan.UpdateVersionConfig = planResult.VersionConfig
109115

116+
plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds
117+
110118
// Convert test workflows
111119
for _, wf := range planResult.TestWorkflows {
112120
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{

internal/controller/genstatus.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ import (
1616
ctrl "sigs.k8s.io/controller-runtime"
1717
)
1818

19-
// ControllerIdentity is the identity the controller passes to all write calls.
20-
const ControllerIdentity = "temporal-worker-controller"
21-
2219
func (r *TemporalWorkerDeploymentReconciler) generateStatus(
2320
ctx context.Context,
2421
l logr.Logger,

internal/controller/state_mapper.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker
3333
VersionConflictToken: m.temporalState.VersionConflictToken,
3434
}
3535

36+
status.LastModifierIdentity = m.temporalState.LastModifierIdentity
37+
3638
// Get build IDs directly from temporal state
3739
currentBuildID := m.temporalState.CurrentBuildID
3840
rampingBuildID := m.temporalState.RampingBuildID

internal/controller/util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
const (
1212
controllerIdentityKey = "temporal.io/controller"
1313
controllerVersionKey = "temporal.io/controller-version"
14-
defaultControllerIdentity = "temporal-worker-controller"
14+
DefaultControllerIdentity = "temporal-worker-controller"
1515
)
1616

1717
// Version is set by goreleaser via ldflags at build time
@@ -35,5 +35,5 @@ func getControllerIdentity() string {
3535
if identity := os.Getenv("CONTROLLER_IDENTITY"); identity != "" {
3636
return identity
3737
}
38-
return defaultControllerIdentity
38+
return DefaultControllerIdentity
3939
}

internal/controller/worker_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
137137
temporalClient,
138138
workerDeploymentName,
139139
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
140+
getControllerIdentity(),
140141
)
141142
if err != nil {
142143
return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)

internal/planner/planner.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ type Plan struct {
2525
ShouldCreateDeployment bool
2626
VersionConfig *VersionConfig
2727
TestWorkflows []WorkflowConfig
28+
// Build IDs of versions from which the controller should
29+
// remove IgnoreLastModifierKey from the version metadata
30+
RemoveIgnoreLastModifierBuilds []string
2831
}
2932

3033
// VersionConfig defines version configuration for Temporal
@@ -83,6 +86,17 @@ func GeneratePlan(
8386
// Determine version config changes
8487
plan.VersionConfig = getVersionConfigDiff(l, status, temporalState, config, workerDeploymentName)
8588

89+
// Only remove the IgnoreLastModifier metadata after it's been used to make a version config change, which will
90+
// make the controller the LastModifier again
91+
if temporalState != nil && temporalState.IgnoreLastModifier && plan.VersionConfig != nil {
92+
if temporalState.RampingBuildID != "" {
93+
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.RampingBuildID)
94+
}
95+
if temporalState.CurrentBuildID != "" {
96+
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.CurrentBuildID)
97+
}
98+
}
99+
86100
// TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable
87101
// but have no corresponding Deployment.
88102

internal/temporal/worker_deployment.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ import (
1515
enumspb "go.temporal.io/api/enums/v1"
1616
"go.temporal.io/api/serviceerror"
1717
temporalClient "go.temporal.io/sdk/client"
18+
"go.temporal.io/sdk/converter"
1819
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1920
)
2021

22+
const (
23+
IgnoreLastModifierKey = "temporal.io/ignore-last-modifier"
24+
)
25+
2126
// VersionInfo contains information about a specific version
2227
type VersionInfo struct {
2328
DeploymentName string
@@ -40,6 +45,7 @@ type TemporalWorkerState struct {
4045
// Versions indexed by build ID
4146
Versions map[string]*VersionInfo
4247
LastModifierIdentity string
48+
IgnoreLastModifier bool
4349
}
4450

4551
// GetWorkerDeploymentState queries Temporal to get the state of a worker deployment
@@ -48,6 +54,7 @@ func GetWorkerDeploymentState(
4854
client temporalClient.Client,
4955
workerDeploymentName string,
5056
namespace string,
57+
controllerIdentity string,
5158
) (*TemporalWorkerState, error) {
5259
state := &TemporalWorkerState{
5360
Versions: make(map[string]*VersionInfo),
@@ -81,6 +88,14 @@ func GetWorkerDeploymentState(
8188
state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity
8289
state.VersionConflictToken = resp.ConflictToken
8390

91+
// Decide whether to ignore LastModifierIdentity
92+
if state.LastModifierIdentity != controllerIdentity && state.LastModifierIdentity != "" {
93+
state.IgnoreLastModifier, err = DeploymentShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig)
94+
if err != nil {
95+
return nil, err
96+
}
97+
}
98+
8499
// TODO(jlegrone): Re-enable stats once available in versioning v3.
85100

86101
// Set ramping since time if applicable
@@ -232,3 +247,45 @@ func mapWorkflowStatus(status enumspb.WorkflowExecutionStatus) temporaliov1alpha
232247
func GetTestWorkflowID(deploymentName, buildID, taskQueue string) string {
233248
return fmt.Sprintf("test-%s:%s-%s", deploymentName, buildID, taskQueue)
234249
}
250+
251+
func DeploymentShouldIgnoreLastModifier(
252+
ctx context.Context,
253+
deploymentHandler temporalClient.WorkerDeploymentHandle,
254+
routingConfig temporalClient.WorkerDeploymentRoutingConfig,
255+
) (shouldIgnore bool, err error) {
256+
if routingConfig.CurrentVersion != nil {
257+
shouldIgnore, err = getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId)
258+
if err != nil {
259+
return false, err
260+
}
261+
}
262+
if !shouldIgnore && // if someone has a non-nil Current Version, but only set the metadata in their Ramping Version, also count that
263+
routingConfig.RampingVersion != nil {
264+
return getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId)
265+
}
266+
return shouldIgnore, nil
267+
}
268+
269+
func getShouldIgnoreLastModifier(
270+
ctx context.Context,
271+
deploymentHandler temporalClient.WorkerDeploymentHandle,
272+
buildId string,
273+
) (bool, error) {
274+
desc, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
275+
BuildID: buildId,
276+
})
277+
if err != nil {
278+
return false, fmt.Errorf("unable to describe version: %w", err)
279+
}
280+
for k, v := range desc.Info.Metadata {
281+
if k == IgnoreLastModifierKey {
282+
var s string
283+
err = converter.GetDefaultDataConverter().FromPayload(v, &s)
284+
if err != nil {
285+
return false, fmt.Errorf("unable to decode metadata payload for key %s: %w", IgnoreLastModifierKey, err)
286+
}
287+
return s == "true", nil
288+
}
289+
}
290+
return false, nil
291+
}

0 commit comments

Comments
 (0)