Skip to content

Commit 5d34813

Browse files
committed
Update Temporal Go SDK to v1.35.0 and index versions by build ID
- Upgrade go.temporal.io/sdk from v1.34.0 to v1.35.0 - Refactor VersionInfo to separate DeploymentName and BuildID fields - Index version collections by build ID instead of version ID - Update all Temporal API calls to use new WorkerDeploymentVersion structure - Fix PinnedVersioningOverride to use pointer and new Version field - Update tests to use new VersionInfo structure Fixes #27: Version collections now properly indexed by build ID with deployment name and build ID maintained separately.
1 parent 5d9fc97 commit 5d34813

File tree

8 files changed

+490
-81
lines changed

8 files changed

+490
-81
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/pborman/uuid v1.2.1
1515
github.com/stretchr/testify v1.10.0
1616
go.temporal.io/api v1.50.0
17-
go.temporal.io/sdk v1.34.0
17+
go.temporal.io/sdk v1.35.0
1818
k8s.io/api v0.33.2
1919
k8s.io/apimachinery v0.33.2
2020
k8s.io/client-go v0.33.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
185185
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
186186
go.temporal.io/api v1.50.0 h1:7s8Cn+fKfNx9G0v2Ge9We6X2WiCA3JvJ9JryeNbx1Bc=
187187
go.temporal.io/api v1.50.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
188-
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
189-
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
188+
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
189+
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
190190
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
191191
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
192192
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

go.work.sum

Lines changed: 345 additions & 2 deletions
Large diffs are not rendered by default.

internal/controller/execplan.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ import (
1212
"github.com/go-logr/logr"
1313
enumspb "go.temporal.io/api/enums/v1"
1414
sdkclient "go.temporal.io/sdk/client"
15-
"go.temporal.io/sdk/workflow"
15+
"go.temporal.io/sdk/worker"
1616
appsv1 "k8s.io/api/apps/v1"
1717
autoscalingv1 "k8s.io/api/autoscaling/v1"
1818
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1919
"sigs.k8s.io/controller-runtime/pkg/client"
20+
21+
"github.com/temporalio/temporal-worker-controller/internal/k8s"
2022
)
2123

2224
func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l logr.Logger, temporalClient sdkclient.Client, p *plan) error {
@@ -67,15 +69,23 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
6769
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)
6870

6971
for _, wf := range p.startTestWorkflows {
72+
// Extract deployment name and build ID from version ID
73+
deploymentName, buildID, err := k8s.SplitVersionID(wf.versionID)
74+
if err != nil {
75+
return fmt.Errorf("unable to split version ID %s: %w", wf.versionID, err)
76+
}
77+
7078
if _, err := temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
7179
ID: wf.workflowID,
7280
TaskQueue: wf.taskQueue,
7381
WorkflowExecutionTimeout: time.Hour,
7482
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
7583
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
76-
VersioningOverride: sdkclient.VersioningOverride{
77-
Behavior: workflow.VersioningBehaviorPinned,
78-
PinnedVersion: wf.versionID,
84+
VersioningOverride: &sdkclient.PinnedVersioningOverride{
85+
Version: worker.WorkerDeploymentVersion{
86+
DeploymentName: deploymentName,
87+
BuildId: buildID,
88+
},
7989
},
8090
}, wf.workflowType); err != nil {
8191
return fmt.Errorf("unable to start test workflow execution: %w", err)
@@ -85,32 +95,50 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
8595
// Register current version or ramps
8696
if vcfg := p.UpdateVersionConfig; vcfg != nil {
8797
if vcfg.SetCurrent {
88-
l.Info("registering new current version", "version", vcfg.VersionID)
98+
// Extract build ID from version ID
99+
_, buildID, err := k8s.SplitVersionID(vcfg.VersionID)
100+
if err != nil {
101+
return fmt.Errorf("unable to split version ID %s: %w", vcfg.VersionID, err)
102+
}
103+
l.Info("registering new current version", "version", vcfg.VersionID, "buildID", buildID)
89104
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
90-
Version: vcfg.VersionID,
105+
BuildID: buildID,
91106
ConflictToken: vcfg.ConflictToken,
92107
Identity: ControllerIdentity,
93108
}); err != nil {
94109
return fmt.Errorf("unable to set current deployment version: %w", err)
95110
}
96111
} else {
112+
// Extract build ID from version ID
113+
_, buildID, err := k8s.SplitVersionID(vcfg.VersionID)
114+
if err != nil {
115+
return fmt.Errorf("unable to split version ID %s: %w", vcfg.VersionID, err)
116+
}
97117
if vcfg.RampPercentage > 0 {
98-
l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage)
118+
l.Info("applying ramp", "version", vcfg.VersionID, "buildID", buildID, "percentage", vcfg.RampPercentage)
99119
} else {
100-
l.Info("deleting ramp")
120+
l.Info("deleting ramp", "buildID", buildID)
101121
}
102122

103123
if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
104-
Version: vcfg.VersionID,
124+
BuildID: buildID,
105125
Percentage: vcfg.RampPercentage,
106126
ConflictToken: vcfg.ConflictToken,
107127
Identity: ControllerIdentity,
108128
}); err != nil {
109129
return fmt.Errorf("unable to set ramping deployment: %w", err)
110130
}
111131
}
132+
// Extract deployment name and build ID for metadata update
133+
deploymentName, buildID, err := k8s.SplitVersionID(vcfg.VersionID)
134+
if err != nil {
135+
return fmt.Errorf("unable to split version ID %s: %w", vcfg.VersionID, err)
136+
}
112137
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
113-
Version: vcfg.VersionID,
138+
Version: worker.WorkerDeploymentVersion{
139+
DeploymentName: deploymentName,
140+
BuildId: buildID,
141+
},
114142
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
115143
UpsertEntries: map[string]interface{}{
116144
controllerIdentityKey: getControllerIdentity(),

internal/controller/state_mapper.go

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,15 @@ func (m *stateMapper) mapCurrentWorkerDeploymentVersion(versionID string) *v1alp
9191
}
9292
}
9393

94-
// Set version status from temporal state
95-
if temporalVersion, exists := m.temporalState.Versions[versionID]; exists {
96-
version.Status = temporalVersion.Status
97-
98-
// Set task queues
99-
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
94+
// Set version status from temporal state by extracting build ID from version ID
95+
_, buildID, err := k8s.SplitVersionID(versionID)
96+
if err == nil {
97+
if temporalVersion, exists := m.temporalState.Versions[buildID]; exists {
98+
version.Status = temporalVersion.Status
99+
100+
// Set task queues
101+
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
102+
}
100103
}
101104

102105
return version
@@ -122,22 +125,25 @@ func (m *stateMapper) mapTargetWorkerDeploymentVersion(versionID string) v1alpha
122125
}
123126
}
124127

125-
// Set version status from temporal state
126-
if temporalVersion, exists := m.temporalState.Versions[versionID]; exists {
127-
version.Status = temporalVersion.Status
128+
// Set version status from temporal state by extracting build ID from version ID
129+
_, buildID, err := k8s.SplitVersionID(versionID)
130+
if err == nil {
131+
if temporalVersion, exists := m.temporalState.Versions[buildID]; exists {
132+
version.Status = temporalVersion.Status
128133

129-
// Set ramp percentage if this is a ramping version
130-
// TODO(carlydf): Support setting any ramp in [0,100]
131-
// NOTE(rob): We are now setting any ramp > 0, is that correct?
132-
if temporalVersion.Status == v1alpha1.VersionStatusRamping && m.temporalState.RampPercentage > 0 {
133-
version.RampPercentage = &m.temporalState.RampPercentage
134-
}
134+
// Set ramp percentage if this is a ramping version
135+
// TODO(carlydf): Support setting any ramp in [0,100]
136+
// NOTE(rob): We are now setting any ramp > 0, is that correct?
137+
if temporalVersion.Status == v1alpha1.VersionStatusRamping && m.temporalState.RampPercentage > 0 {
138+
version.RampPercentage = &m.temporalState.RampPercentage
139+
}
135140

136-
// Set task queues
137-
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
141+
// Set task queues
142+
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
138143

139-
// Set test workflows
140-
version.TestWorkflows = append(version.TestWorkflows, temporalVersion.TestWorkflows...)
144+
// Set test workflows
145+
version.TestWorkflows = append(version.TestWorkflows, temporalVersion.TestWorkflows...)
146+
}
141147
}
142148

143149
return version
@@ -167,18 +173,21 @@ func (m *stateMapper) mapDeprecatedWorkerDeploymentVersion(versionID string) *v1
167173
}
168174
}
169175

170-
// Set version status from temporal state
171-
if temporalVersion, exists := m.temporalState.Versions[versionID]; exists {
172-
version.Status = temporalVersion.Status
176+
// Set version status from temporal state by extracting build ID from version ID
177+
_, buildID, err := k8s.SplitVersionID(versionID)
178+
if err == nil {
179+
if temporalVersion, exists := m.temporalState.Versions[buildID]; exists {
180+
version.Status = temporalVersion.Status
173181

174-
// Set drained since if available
175-
if temporalVersion.DrainedSince != nil {
176-
drainedSince := metav1.NewTime(*temporalVersion.DrainedSince)
177-
version.DrainedSince = &drainedSince
178-
}
182+
// Set drained since if available
183+
if temporalVersion.DrainedSince != nil {
184+
drainedSince := metav1.NewTime(*temporalVersion.DrainedSince)
185+
version.DrainedSince = &drainedSince
186+
}
179187

180-
// Set task queues
181-
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
188+
// Set task queues
189+
version.TaskQueues = append(version.TaskQueues, temporalVersion.TaskQueues...)
190+
}
182191
}
183192

184193
return version

internal/controller/state_mapper_test.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,18 @@ func TestMapToStatus(t *testing.T) {
9898
RampingSince: &rampingSince,
9999
VersionConflictToken: []byte("test-token"),
100100
Versions: map[string]*temporal.VersionInfo{
101-
"worker.v1": {
102-
VersionID: "worker.v1",
103-
Status: temporaliov1alpha1.VersionStatusCurrent,
101+
"v1": {
102+
DeploymentName: "worker",
103+
BuildID: "v1",
104+
Status: temporaliov1alpha1.VersionStatusCurrent,
104105
TaskQueues: []temporaliov1alpha1.TaskQueue{
105106
{Name: "queue1"},
106107
},
107108
},
108-
"worker.v2": {
109-
VersionID: "worker.v2",
110-
Status: temporaliov1alpha1.VersionStatusRamping,
109+
"v2": {
110+
DeploymentName: "worker",
111+
BuildID: "v2",
112+
Status: temporaliov1alpha1.VersionStatusRamping,
111113
TaskQueues: []temporaliov1alpha1.TaskQueue{
112114
{Name: "queue1"},
113115
},
@@ -120,10 +122,11 @@ func TestMapToStatus(t *testing.T) {
120122
},
121123
},
122124
},
123-
"worker.v3": {
124-
VersionID: "worker.v3",
125-
Status: temporaliov1alpha1.VersionStatusDrained,
126-
DrainedSince: &drainedSince,
125+
"v3": {
126+
DeploymentName: "worker",
127+
BuildID: "v3",
128+
Status: temporaliov1alpha1.VersionStatusDrained,
129+
DrainedSince: &drainedSince,
127130
},
128131
},
129132
}
@@ -216,10 +219,11 @@ func TestMapWorkerDeploymentVersion(t *testing.T) {
216219

217220
temporalState := &temporal.TemporalWorkerState{
218221
Versions: map[string]*temporal.VersionInfo{
219-
"worker.v1": {
220-
VersionID: "worker.v1",
221-
Status: temporaliov1alpha1.VersionStatusCurrent,
222-
DrainedSince: &drainedSince,
222+
"v1": {
223+
DeploymentName: "worker",
224+
BuildID: "v1",
225+
Status: temporaliov1alpha1.VersionStatusCurrent,
226+
DrainedSince: &drainedSince,
223227
},
224228
},
225229
}

0 commit comments

Comments
 (0)