Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5d34813
Update Temporal Go SDK to v1.35.0 and index versions by build ID
jlegrone Aug 18, 2025
b058d33
Refactor startTestWorkflows to avoid version ID splitting during exec…
jlegrone Aug 18, 2025
dd9cbbb
Remove VersionID from external API and VersionConfig
jlegrone Aug 18, 2025
d02d425
Remove deploymentName from version registration logs
jlegrone Aug 18, 2025
2e94172
Fix integration tests to use BuildID instead of VersionID
jlegrone Aug 18, 2025
69afd6f
Update CRD schema and API annotations to use buildID
jlegrone Aug 18, 2025
19bb1c5
Remove VersionID from WorkflowConfig and eliminate SplitVersionID
jlegrone Aug 18, 2025
b13dab1
Remove deploymentName from startWorkflowConfig
jlegrone Aug 18, 2025
cae766f
Fix import formatting
jlegrone Aug 18, 2025
175c14d
Remove unnecessary comment about build ID indexing
jlegrone Aug 18, 2025
cbc6f54
Remove legacy version ID compatibility
jlegrone Aug 18, 2025
cebef9a
Remove ComputeVersionID usage from genstatus
jlegrone Aug 18, 2025
c0051e7
Update GetTestWorkflowID to use deploymentName and buildID
jlegrone Aug 18, 2025
bf7eb96
Remove redundant DeploymentName field from VersionConfig
jlegrone Aug 19, 2025
454bf73
Remove redundant DeploymentName field from WorkflowConfig
jlegrone Aug 19, 2025
3246090
Address review feedback
jlegrone Aug 19, 2025
6c95c43
Address remaining review comments
jlegrone Aug 19, 2025
4b8e008
Address final review feedback and refactor test helper
jlegrone Aug 20, 2025
54ab149
Address remaining review feedback
jlegrone Aug 20, 2025
726c157
Clean up test comments
jlegrone Aug 20, 2025
71690af
Apply suggestions from code review
jlegrone Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,8 @@ type TaskQueue struct {

// BaseWorkerDeploymentVersion contains fields common to all worker deployment version types
type BaseWorkerDeploymentVersion struct {
// The string representation of the deployment version.
// Currently, this is always `deployment_name.build_id`.
VersionID string `json:"versionID"`
// BuildID is the unique identifier for this version of the worker deployment.
BuildID string `json:"buildID"`

// Status indicates whether workers in this version may
// be eligible to receive tasks from the Temporal server.
Expand Down Expand Up @@ -329,8 +328,8 @@ type QueueStatistics struct {
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// +kubebuilder:resource:shortName=twd;twdeployment;tworkerdeployment
//+kubebuilder:printcolumn:name="Current",type="string",JSONPath=".status.currentVersion.versionID",description="Current Version for new workflows"
//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.versionID",description="Version of the current worker template"
//+kubebuilder:printcolumn:name="Current",type="string",JSONPath=".status.currentVersion.buildID",description="Current Version Build ID"
//+kubebuilder:printcolumn:name="Target",type="string",JSONPath=".status.targetVersion.buildID",description="Build ID of the target worker (based on the pod template)"
//+kubebuilder:printcolumn:name="Target-Ramp",type="number",JSONPath=".status.targetVersion.rampPercentage",description="Percentage of new workflows starting on Target Version"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.10.0
go.temporal.io/api v1.50.0
go.temporal.io/sdk v1.34.0
go.temporal.io/sdk v1.35.0
k8s.io/api v0.33.2
k8s.io/apimachinery v0.33.2
k8s.io/client-go v0.33.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.temporal.io/api v1.50.0 h1:7s8Cn+fKfNx9G0v2Ge9We6X2WiCA3JvJ9JryeNbx1Bc=
go.temporal.io/api v1.50.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ=
go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
347 changes: 345 additions & 2 deletions go.work.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ spec:
versions:
- additionalPrinterColumns:
- description: Current Version for new workflows
jsonPath: .status.currentVersion.versionID
jsonPath: .status.currentVersion.buildID
name: Current
type: string
- description: Version of the current worker template
jsonPath: .status.targetVersion.versionID
jsonPath: .status.targetVersion.buildID
name: Target
type: string
- description: Percentage of new workflows starting on Target Version
Expand Down Expand Up @@ -3810,6 +3810,8 @@ spec:
properties:
currentVersion:
properties:
buildID:
type: string
deployment:
properties:
apiVersion:
Expand Down Expand Up @@ -3844,15 +3846,15 @@ spec:
- name
type: object
type: array
versionID:
type: string
required:
- buildID
- status
- versionID
type: object
deprecatedVersions:
items:
properties:
buildID:
type: string
deployment:
properties:
apiVersion:
Expand Down Expand Up @@ -3890,17 +3892,17 @@ spec:
- name
type: object
type: array
versionID:
type: string
required:
- buildID
- status
- versionID
type: object
type: array
lastModifierIdentity:
type: string
targetVersion:
properties:
buildID:
type: string
deployment:
properties:
apiVersion:
Expand Down Expand Up @@ -3961,11 +3963,9 @@ spec:
- workflowID
type: object
type: array
versionID:
type: string
required:
- buildID
- status
- versionID
type: object
versionConflictToken:
format: byte
Expand Down
27 changes: 16 additions & 11 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/go-logr/logr"
enumspb "go.temporal.io/api/enums/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"go.temporal.io/sdk/worker"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -73,9 +73,11 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
WorkflowExecutionTimeout: time.Hour,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
VersioningOverride: sdkclient.VersioningOverride{
Behavior: workflow.VersioningBehaviorPinned,
PinnedVersion: wf.versionID,
VersioningOverride: &sdkclient.PinnedVersioningOverride{
Version: worker.WorkerDeploymentVersion{
DeploymentName: p.WorkerDeploymentName,
BuildId: wf.buildID,
},
},
}, wf.workflowType); err != nil {
return fmt.Errorf("unable to start test workflow execution: %w", err)
Expand All @@ -85,32 +87,35 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
// Register current version or ramps
if vcfg := p.UpdateVersionConfig; vcfg != nil {
if vcfg.SetCurrent {
l.Info("registering new current version", "version", vcfg.VersionID)
l.Info("registering new current version", "buildID", vcfg.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
Version: vcfg.VersionID,
BuildID: vcfg.BuildID,
ConflictToken: vcfg.ConflictToken,
Identity: ControllerIdentity,
}); err != nil {
return fmt.Errorf("unable to set current deployment version: %w", err)
}
} else {
if vcfg.RampPercentage > 0 {
l.Info("applying ramp", "version", vcfg.VersionID, "percentage", vcfg.RampPercentage)
l.Info("applying ramp", "buildID", vcfg.BuildID, "percentage", vcfg.RampPercentage)
} else {
l.Info("deleting ramp")
l.Info("deleting ramp", "buildID", vcfg.BuildID)
}

if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
Version: vcfg.VersionID,
BuildID: vcfg.BuildID,
Percentage: vcfg.RampPercentage,
ConflictToken: vcfg.ConflictToken,
Identity: ControllerIdentity,
}); err != nil {
return fmt.Errorf("unable to set ramping deployment: %w", err)
return fmt.Errorf("unable to set ramping deployment version: %w", err)
}
}
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
Version: vcfg.VersionID,
Version: worker.WorkerDeploymentVersion{
DeploymentName: p.WorkerDeploymentName,
BuildId: vcfg.BuildID,
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
UpsertEntries: map[string]interface{}{
controllerIdentityKey: getControllerIdentity(),
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type plan struct {
type startWorkflowConfig struct {
workflowType string
workflowID string
versionID string
buildID string
taskQueue string
}

Expand All @@ -52,7 +52,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
temporalState *temporal.TemporalWorkerState,
) (*plan, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(w)
targetVersionID := k8s.ComputeVersionID(w)
targetBuildID := k8s.ComputeBuildID(w)

// Fetch Kubernetes deployment state
k8sState, err := k8s.GetDeploymentState(
Expand Down Expand Up @@ -93,6 +93,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
temporalState,
connection,
plannerConfig,
workerDeploymentName,
)
if err != nil {
return nil, fmt.Errorf("error generating plan: %w", err)
Expand All @@ -111,15 +112,14 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
workflowType: wf.WorkflowType,
workflowID: wf.WorkflowID,
versionID: wf.VersionID,
buildID: wf.BuildID,
taskQueue: wf.TaskQueue,
})
}

// Handle deployment creation if needed
if planResult.ShouldCreateDeployment {
_, buildID, _ := k8s.SplitVersionID(targetVersionID)
d, err := r.newDeployment(w, buildID, connection)
d, err := r.newDeployment(w, targetBuildID, connection)
if err != nil {
return nil, err
}
Expand Down
14 changes: 8 additions & 6 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
temporalState *temporal.TemporalWorkerState,
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
targetVersionID := k8s.ComputeVersionID(workerDeploy)
targetBuildID := k8s.ComputeBuildID(workerDeploy)

// Fetch Kubernetes deployment state
k8sState, err := k8s.GetDeploymentState(
Expand All @@ -43,12 +43,12 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
}

// Fetch test workflow status for the desired version
if targetVersionID != temporalState.CurrentVersionID {
if targetBuildID != temporalState.CurrentBuildID {
testWorkflows, err := temporal.GetTestWorkflowStatus(
ctx,
temporalClient,
workerDeploymentName,
targetVersionID,
targetBuildID,
workerDeploy,
temporalState,
)
Expand All @@ -58,14 +58,16 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
}

// Add test workflow status to version info if it doesn't exist
if versionInfo, exists := temporalState.Versions[targetVersionID]; exists {
if versionInfo, exists := temporalState.Versions[targetBuildID]; exists {
versionInfo.TestWorkflows = append(versionInfo.TestWorkflows, testWorkflows...)
}
}

// Target build ID already computed above

// Use the state mapper to convert state objects to CRD status
stateMapper := newStateMapper(k8sState, temporalState)
status := stateMapper.mapToStatus(targetVersionID)
stateMapper := newStateMapper(k8sState, temporalState, workerDeploymentName)
status := stateMapper.mapToStatus(targetBuildID)

return status, nil
}
Loading
Loading