Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,13 @@ type TemporalWorkerDeploymentStatus struct {
// TargetVersion is the desired next version. If TargetVersion.Deployment is nil,
// then the controller should create it. If not nil, the controller should
// wait for it to become healthy and then move it to the CurrentVersion.
TargetVersion *TargetWorkerDeploymentVersion `json:"targetVersion"`
TargetVersion TargetWorkerDeploymentVersion `json:"targetVersion"`

// CurrentVersion is the version that is currently registered with
// Temporal as the current version of its worker deployment. This will be nil
// during initial bootstrap until a version is registered and set as current.
CurrentVersion *CurrentWorkerDeploymentVersion `json:"currentVersion,omitempty"`

// RampingVersion is the version that is currently registered with
// Temporal as the ramping version of its worker deployment. The controller
// should ensure that this is always equal to the TargetVersion, or, if the
// TargetVersion has been promoted to the current version, this should be nil.
RampingVersion *TargetWorkerDeploymentVersion `json:"rampingVersion,omitempty"`

// DeprecatedVersions are deployment versions that are no longer the default. Any
// deployment versions that are unreachable should be deleted by the controller.
DeprecatedVersions []*DeprecatedWorkerDeploymentVersion `json:"deprecatedVersions,omitempty"`
Expand Down
17 changes: 6 additions & 11 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3854,74 +3854,6 @@ spec:
type: array
lastModifierIdentity:
type: string
rampingVersion:
properties:
deployment:
properties:
apiVersion:
type: string
fieldPath:
type: string
kind:
type: string
name:
type: string
namespace:
type: string
resourceVersion:
type: string
uid:
type: string
type: object
x-kubernetes-map-type: atomic
healthySince:
format: date-time
type: string
managedBy:
type: string
rampLastModifiedAt:
format: date-time
type: string
rampPercentage:
type: number
rampingSince:
format: date-time
type: string
status:
type: string
taskQueues:
items:
properties:
name:
type: string
required:
- name
type: object
type: array
testWorkflows:
items:
properties:
runID:
type: string
status:
type: string
taskQueue:
type: string
workflowID:
type: string
required:
- runID
- status
- taskQueue
- workflowID
type: object
type: array
versionID:
type: string
required:
- status
- versionID
type: object
targetVersion:
properties:
deployment:
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/planner"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
)

// plan holds the actions to execute during reconciliation
Expand Down Expand Up @@ -50,6 +51,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
l logr.Logger,
w *temporaliov1alpha1.TemporalWorkerDeployment,
connection temporaliov1alpha1.TemporalConnectionSpec,
temporalState *temporal.TemporalWorkerState,
) (*plan, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(w)
targetVersionID := k8s.ComputeVersionID(w)
Expand Down Expand Up @@ -82,17 +84,15 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(

// Generate the plan using the planner package
plannerConfig := &planner.Config{
Status: &w.Status,
Spec: &w.Spec,
RolloutStrategy: rolloutStrategy,
TargetVersionID: targetVersionID,
Replicas: *w.Spec.Replicas,
ConflictToken: w.Status.VersionConflictToken,
}

planResult, err := planner.GeneratePlan(
l,
k8sState,
&w.Status,
&w.Spec,
temporalState,
plannerConfig,
)
if err != nil {
Expand Down
12 changes: 1 addition & 11 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
temporalClient temporalClient.Client,
req ctrl.Request,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
temporalState *temporal.TemporalWorkerState,
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
targetVersionID := k8s.ComputeVersionID(workerDeploy)
Expand All @@ -41,17 +42,6 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
}

// Fetch Temporal worker deployment state
temporalState, err := temporal.GetWorkerDeploymentState(
ctx,
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
)
if err != nil {
return nil, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
}

// Fetch test workflow status for the desired version
if targetVersionID != temporalState.CurrentVersionID {
testWorkflows, err := temporal.GetTestWorkflowStatus(
Expand Down
23 changes: 5 additions & 18 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,25 @@ func (m *stateMapper) mapToStatus(targetVersionID string) *v1alpha1.TemporalWork

// Set current version
currentVersionID := m.temporalState.CurrentVersionID
if currentVersionID != "" {
status.CurrentVersion = m.mapCurrentWorkerDeploymentVersion(currentVersionID)
}
status.CurrentVersion = m.mapCurrentWorkerDeploymentVersion(currentVersionID)

// Set target version (desired version)
status.TargetVersion = m.mapTargetWorkerDeploymentVersion(targetVersionID)
if status.TargetVersion != nil && m.temporalState.RampingVersionID == targetVersionID {
if m.temporalState.RampingVersionID == targetVersionID {
status.TargetVersion.RampingSince = m.temporalState.RampingSince
status.TargetVersion.RampLastModifiedAt = m.temporalState.RampLastModifiedAt
rampPercentage := m.temporalState.RampPercentage
status.TargetVersion.RampPercentage = &rampPercentage
}

rampingVersionID := m.temporalState.RampingVersionID
// Set ramping version if it exists
if rampingVersionID != "" {
status.RampingVersion = m.mapTargetWorkerDeploymentVersion(rampingVersionID)
}

// Add deprecated versions
var deprecatedVersions []*v1alpha1.DeprecatedWorkerDeploymentVersion
for versionID := range m.k8sState.Deployments {
// Skip current and target versions
if versionID == currentVersionID || versionID == targetVersionID || versionID == rampingVersionID {
if versionID == currentVersionID || versionID == targetVersionID {
continue
}

// TODO(rob): We should never see a version here that has VersionStatusCurrent, but should we check?
versionStatus := m.mapDeprecatedWorkerDeploymentVersion(versionID)
if versionStatus != nil {
deprecatedVersions = append(deprecatedVersions, versionStatus)
Expand Down Expand Up @@ -107,12 +98,8 @@ func (m *stateMapper) mapCurrentWorkerDeploymentVersion(versionID string) *v1alp
}

// mapTargetWorkerDeploymentVersion creates a target version status from the states
func (m *stateMapper) mapTargetWorkerDeploymentVersion(versionID string) *v1alpha1.TargetWorkerDeploymentVersion {
if versionID == "" {
return nil
}

version := &v1alpha1.TargetWorkerDeploymentVersion{
func (m *stateMapper) mapTargetWorkerDeploymentVersion(versionID string) v1alpha1.TargetWorkerDeploymentVersion {
version := v1alpha1.TargetWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: v1alpha1.BaseWorkerDeploymentVersion{
VersionID: versionID,
Status: v1alpha1.VersionStatusNotRegistered,
Expand Down
1 change: 0 additions & 1 deletion internal/controller/state_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ func TestMapWorkerDeploymentVersion(t *testing.T) {

// Test target version mapping
targetVersion := mapper.mapTargetWorkerDeploymentVersion("worker.v1")
assert.NotNil(t, targetVersion)
assert.Equal(t, "worker.v1", targetVersion.VersionID)
assert.Equal(t, temporaliov1alpha1.VersionStatusCurrent, targetVersion.Status)
assert.NotNil(t, targetVersion.HealthySince)
Expand Down
18 changes: 16 additions & 2 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
)

var (
Expand Down Expand Up @@ -124,8 +126,20 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
temporalClient = c
}

// Fetch Temporal worker deployment state
workerDeploymentName := k8s.ComputeWorkerDeploymentName(&workerDeploy)
temporalState, err := temporal.GetWorkerDeploymentState(
ctx,
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
}

// Compute a new status from k8s and temporal state
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy)
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -151,7 +165,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
}

// Generate a plan to get to desired spec from current status
plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec)
plan, err := r.generatePlan(ctx, l, &workerDeploy, temporalConnection.Spec, temporalState)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Loading