Skip to content

Commit 6bbcb98

Browse files
authored
Update Temporal Go SDK to v1.35.0 and index versions by build ID (#119)
1 parent 5572f07 commit 6bbcb98

File tree

20 files changed

+793
-437
lines changed

20 files changed

+793
-437
lines changed

api/v1alpha1/worker_types.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,8 @@ type TaskQueue struct {
176176

177177
// BaseWorkerDeploymentVersion contains fields common to all worker deployment version types
178178
type BaseWorkerDeploymentVersion struct {
179-
// The string representation of the deployment version.
180-
// Currently, this is always `deployment_name.build_id`.
181-
VersionID string `json:"versionID"`
179+
// BuildID is the unique identifier for this version of the worker deployment.
180+
BuildID string `json:"buildID"`
182181

183182
// Status indicates whether workers in this version may
184183
// be eligible to receive tasks from the Temporal server.
@@ -329,8 +328,8 @@ type QueueStatistics struct {
329328
//+kubebuilder:object:root=true
330329
//+kubebuilder:subresource:status
331330
// +kubebuilder:resource:shortName=twd;twdeployment;tworkerdeployment
332-
//+kubebuilder:printcolumn:name="Current",type="string",JSONPath=".status.currentVersion.versionID",description="Current Version for new workflows"
333-
//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.versionID",description="Version of the current worker template"
331+
//+kubebuilder:printcolumn:name="Current",type="string",JSONPath=".status.currentVersion.buildID",description="Current Version Build ID"
332+
//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.buildID",description="Build ID of the target worker (based on the pod template)"
334333
//+kubebuilder:printcolumn:name="Target-Ramp",type="number",JSONPath=".status.targetVersion.rampPercentage",description="Percentage of new workflows starting on Target Version"
335334
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
336335

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/pborman/uuid v1.2.1
1515
github.com/stretchr/testify v1.10.0
1616
go.temporal.io/api v1.50.0
17-
go.temporal.io/sdk v1.34.0
17+
go.temporal.io/sdk v1.35.0
1818
k8s.io/api v0.33.2
1919
k8s.io/apimachinery v0.33.2
2020
k8s.io/client-go v0.33.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
185185
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
186186
go.temporal.io/api v1.50.0 h1:7s8Cn+fKfNx9G0v2Ge9We6X2WiCA3JvJ9JryeNbx1Bc=
187187
go.temporal.io/api v1.50.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
188-
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
189-
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
188+
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
189+
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
190190
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
191191
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
192192
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

go.work.sum

Lines changed: 345 additions & 2 deletions
Large diffs are not rendered by default.

helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ spec:
2020
versions:
2121
- additionalPrinterColumns:
2222
- description: Current Version for new workflows
23-
jsonPath: .status.currentVersion.versionID
23+
jsonPath: .status.currentVersion.buildID
2424
name: Current
2525
type: string
2626
- description: Version of the current worker template
27-
jsonPath: .status.targetVersion.versionID
27+
jsonPath: .status.targetVersion.buildID
2828
name: Target
2929
type: string
3030
- description: Percentage of new workflows starting on Target Version
@@ -3810,6 +3810,8 @@ spec:
38103810
properties:
38113811
currentVersion:
38123812
properties:
3813+
buildID:
3814+
type: string
38133815
deployment:
38143816
properties:
38153817
apiVersion:
@@ -3844,15 +3846,15 @@ spec:
38443846
- name
38453847
type: object
38463848
type: array
3847-
versionID:
3848-
type: string
38493849
required:
3850+
- buildID
38503851
- status
3851-
- versionID
38523852
type: object
38533853
deprecatedVersions:
38543854
items:
38553855
properties:
3856+
buildID:
3857+
type: string
38563858
deployment:
38573859
properties:
38583860
apiVersion:
@@ -3890,17 +3892,17 @@ spec:
38903892
- name
38913893
type: object
38923894
type: array
3893-
versionID:
3894-
type: string
38953895
required:
3896+
- buildID
38963897
- status
3897-
- versionID
38983898
type: object
38993899
type: array
39003900
lastModifierIdentity:
39013901
type: string
39023902
targetVersion:
39033903
properties:
3904+
buildID:
3905+
type: string
39043906
deployment:
39053907
properties:
39063908
apiVersion:
@@ -3961,11 +3963,9 @@ spec:
39613963
- workflowID
39623964
type: object
39633965
type: array
3964-
versionID:
3965-
type: string
39663966
required:
3967+
- buildID
39673968
- status
3968-
- versionID
39693969
type: object
39703970
versionConflictToken:
39713971
format: byte

internal/controller/execplan.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/go-logr/logr"
1313
enumspb "go.temporal.io/api/enums/v1"
1414
sdkclient "go.temporal.io/sdk/client"
15-
"go.temporal.io/sdk/workflow"
15+
"go.temporal.io/sdk/worker"
1616
appsv1 "k8s.io/api/apps/v1"
1717
autoscalingv1 "k8s.io/api/autoscaling/v1"
1818
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -73,9 +73,11 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
7373
WorkflowExecutionTimeout: time.Hour,
7474
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
7575
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
76-
VersioningOverride: sdkclient.VersioningOverride{
77-
Behavior: workflow.VersioningBehaviorPinned,
78-
PinnedVersion: wf.versionID,
76+
VersioningOverride: &sdkclient.PinnedVersioningOverride{
77+
Version: worker.WorkerDeploymentVersion{
78+
DeploymentName: p.WorkerDeploymentName,
79+
BuildId: wf.buildID,
80+
},
7981
},
8082
}, wf.workflowType); err != nil {
8183
return fmt.Errorf("unable to start test workflow execution: %w", err)
@@ -85,32 +87,35 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
8587
// Register current version or ramps
8688
if vcfg := p.UpdateVersionConfig; vcfg != nil {
8789
if vcfg.SetCurrent {
88-
l.Info("registering new current version", "version", vcfg.VersionID)
90+
l.Info("registering new current version", "buildID", vcfg.BuildID)
8991
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
90-
Version: vcfg.VersionID,
92+
BuildID: vcfg.BuildID,
9193
ConflictToken: vcfg.ConflictToken,
9294
Identity: ControllerIdentity,
9395
}); err != nil {
9496
return fmt.Errorf("unable to set current deployment version: %w", err)
9597
}
9698
} else {
9799
if vcfg.RampPercentage > 0 {
98-
l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage)
100+
l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage)
99101
} else {
100-
l.Info("deleting ramp")
102+
l.Info("deleting ramp", "buildID", vcfg.BuildID)
101103
}
102104

103105
if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
104-
Version: vcfg.VersionID,
106+
BuildID: vcfg.BuildID,
105107
Percentage: vcfg.RampPercentage,
106108
ConflictToken: vcfg.ConflictToken,
107109
Identity: ControllerIdentity,
108110
}); err != nil {
109-
return fmt.Errorf("unable to set ramping deployment: %w", err)
111+
return fmt.Errorf("unable to set ramping deployment version: %w", err)
110112
}
111113
}
112114
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
113-
Version: vcfg.VersionID,
115+
Version: worker.WorkerDeploymentVersion{
116+
DeploymentName: p.WorkerDeploymentName,
117+
BuildId: vcfg.BuildID,
118+
},
114119
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
115120
UpsertEntries: map[string]interface{}{
116121
controllerIdentityKey: getControllerIdentity(),

internal/controller/genplan.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type plan struct {
3939
type startWorkflowConfig struct {
4040
workflowType string
4141
workflowID string
42-
versionID string
42+
buildID string
4343
taskQueue string
4444
}
4545

@@ -52,7 +52,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
5252
temporalState *temporal.TemporalWorkerState,
5353
) (*plan, error) {
5454
workerDeploymentName := k8s.ComputeWorkerDeploymentName(w)
55-
targetVersionID := k8s.ComputeVersionID(w)
55+
targetBuildID := k8s.ComputeBuildID(w)
5656

5757
// Fetch Kubernetes deployment state
5858
k8sState, err := k8s.GetDeploymentState(
@@ -93,6 +93,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
9393
temporalState,
9494
connection,
9595
plannerConfig,
96+
workerDeploymentName,
9697
)
9798
if err != nil {
9899
return nil, fmt.Errorf("error generating plan: %w", err)
@@ -111,15 +112,14 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
111112
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
112113
workflowType: wf.WorkflowType,
113114
workflowID: wf.WorkflowID,
114-
versionID: wf.VersionID,
115+
buildID: wf.BuildID,
115116
taskQueue: wf.TaskQueue,
116117
})
117118
}
118119

119120
// Handle deployment creation if needed
120121
if planResult.ShouldCreateDeployment {
121-
_, buildID, _ := k8s.SplitVersionID(targetVersionID)
122-
d, err := r.newDeployment(w, buildID, connection)
122+
d, err := r.newDeployment(w, targetBuildID, connection)
123123
if err != nil {
124124
return nil, err
125125
}

internal/controller/genstatus.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
2828
temporalState *temporal.TemporalWorkerState,
2929
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
3030
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
31-
targetVersionID := k8s.ComputeVersionID(workerDeploy)
31+
targetBuildID := k8s.ComputeBuildID(workerDeploy)
3232

3333
// Fetch Kubernetes deployment state
3434
k8sState, err := k8s.GetDeploymentState(
@@ -43,12 +43,12 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
4343
}
4444

4545
// Fetch test workflow status for the desired version
46-
if targetVersionID != temporalState.CurrentVersionID {
46+
if targetBuildID != temporalState.CurrentBuildID {
4747
testWorkflows, err := temporal.GetTestWorkflowStatus(
4848
ctx,
4949
temporalClient,
5050
workerDeploymentName,
51-
targetVersionID,
51+
targetBuildID,
5252
workerDeploy,
5353
temporalState,
5454
)
@@ -58,14 +58,16 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
5858
}
5959

6060
// Add test workflow status to version info if it doesn't exist
61-
if versionInfo, exists := temporalState.Versions[targetVersionID]; exists {
61+
if versionInfo, exists := temporalState.Versions[targetBuildID]; exists {
6262
versionInfo.TestWorkflows = append(versionInfo.TestWorkflows, testWorkflows...)
6363
}
6464
}
6565

66+
// Target build ID already computed above
67+
6668
// Use the state mapper to convert state objects to CRD status
67-
stateMapper := newStateMapper(k8sState, temporalState)
68-
status := stateMapper.mapToStatus(targetVersionID)
69+
stateMapper := newStateMapper(k8sState, temporalState, workerDeploymentName)
70+
status := stateMapper.mapToStatus(targetBuildID)
6971

7072
return status, nil
7173
}

0 commit comments

Comments
 (0)