Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v1alpha1/temporalworker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
defaultScaledownDelay = 1 * time.Hour
defaultDeleteDelay = 24 * time.Hour
maxTemporalWorkerDeploymentNameLen = 63
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
)

func (r *TemporalWorkerDeployment) SetupWebhookWithManager(mgr ctrl.Manager) error {
Expand Down
1 change: 1 addition & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
l,
k8sState,
plannerConfig,
connection,
)
if err != nil {
return nil, fmt.Errorf("error generating plan: %w", err)
Expand Down
60 changes: 45 additions & 15 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
Expand Down Expand Up @@ -74,21 +76,6 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, client.IgnoreNotFound(err)
}

// TODO(jlegrone): Set defaults via webhook rather than manually
if err := workerDeploy.Default(ctx, &workerDeploy); err != nil {
l.Error(err, "TemporalWorkerDeployment defaulter failed")
return ctrl.Result{}, err
}

// TODO(carlydf): Handle warnings once we have some, handle ValidateUpdate once it is different from ValidateCreate
if _, err := workerDeploy.ValidateCreate(ctx, &workerDeploy); err != nil {
l.Error(err, "invalid TemporalWorkerDeployment")
return ctrl.Result{
Requeue: true,
RequeueAfter: 5 * time.Minute, // user needs time to fix this, if it changes, it will be re-queued immediately
}, nil
}

// Verify that a connection is configured
if workerDeploy.Spec.WorkerOptions.TemporalConnection == "" {
err := fmt.Errorf("TemporalConnection must be set")
Expand All @@ -106,6 +93,23 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, err
}

// TODO (Shivam): Do we validate TemporalConnection here as well?

// TODO(jlegrone): Set defaults via webhook rather than manually
if err := workerDeploy.Default(ctx, &workerDeploy); err != nil {
l.Error(err, "TemporalWorkerDeployment defaulter failed")
return ctrl.Result{}, err
}

// TODO(carlydf): Handle warnings once we have some, handle ValidateUpdate once it is different from ValidateCreate
if _, err := workerDeploy.ValidateCreate(ctx, &workerDeploy); err != nil {
l.Error(err, "invalid TemporalWorkerDeployment")
return ctrl.Result{
Requeue: true,
RequeueAfter: 5 * time.Minute, // user needs time to fix this, if it changes, it will be re-queued immediately
}, nil
}

// Get or update temporal client for connection
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Expand Down Expand Up @@ -195,8 +199,34 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
return ctrl.NewControllerManagedBy(mgr).
For(&temporaliov1alpha1.TemporalWorkerDeployment{}).
Owns(&appsv1.Deployment{}).
Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)).
WithOptions(controller.Options{
MaxConcurrentReconciles: 100,
}).
Complete(r)
}

func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request {
var requests []reconcile.Request

// Find all TWDs in same namespace that reference this TC
var workers temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &workers, client.InNamespace(tc.GetNamespace())); err != nil {
return requests
}

// Filter to ones using this connection
for _, worker := range workers.Items {
if worker.Spec.WorkerOptions.TemporalConnection == tc.GetName() {
// Add the TWD object as a reconcile request
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: worker.Name,
Namespace: worker.Namespace,
},
})
}
}

return requests
}
33 changes: 29 additions & 4 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ package k8s

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"regexp"
"sort"
"strings"

"github.com/distribution/reference"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/client"
"sort"
"strings"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/k8s.io/utils"
Expand Down Expand Up @@ -254,6 +257,13 @@ func NewDeploymentWithOwnerRef(
})
}

// Build pod annotations
podAnnotations := make(map[string]string)
for k, v := range spec.Template.Annotations {
podAnnotations[k] = v
}
podAnnotations[temporaliov1alpha1.ConnectionSpecHashAnnotation] = ComputeConnectionSpecHash(connection)

blockOwnerDeletion := true

return &appsv1.Deployment{
Expand Down Expand Up @@ -282,11 +292,26 @@ func NewDeploymentWithOwnerRef(
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: spec.Template.Annotations,
Annotations: podAnnotations,
},
Spec: *podSpec,
},
MinReadySeconds: spec.MinReadySeconds,
},
}
}

func ComputeConnectionSpecHash(connection temporaliov1alpha1.TemporalConnectionSpec) string {
// should not happen
if connection.MutualTLSSecret == "" || connection.HostPort == "" {
return ""
}

hasher := sha256.New()

// Hash connection spec fields in deterministic order
hasher.Write([]byte(connection.HostPort))
hasher.Write([]byte(connection.MutualTLSSecret))

return hex.EncodeToString(hasher.Sum(nil))
}
12 changes: 8 additions & 4 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func GeneratePlan(
l logr.Logger,
k8sState *k8s.DeploymentState,
config *Config,
connection temporaliov1alpha1.TemporalConnectionSpec,
) (*Plan, error) {
plan := &Plan{
ScaleDeployments: make(map[*v1.ObjectReference]uint32),
Expand All @@ -95,7 +96,7 @@ func GeneratePlan(
// Add delete/scale operations based on version status
plan.DeleteDeployments = getDeleteDeployments(k8sState, config)
plan.ScaleDeployments = getScaleDeployments(k8sState, config)
plan.ShouldCreateDeployment = shouldCreateDeployment(k8sState, config)
plan.ShouldCreateDeployment = shouldCreateOrUpdateDeployment(k8sState, config, connection)

// Determine if we need to start any test workflows
plan.TestWorkflows = getTestWorkflows(config)
Expand Down Expand Up @@ -218,9 +219,10 @@ func getScaleDeployments(
}

// shouldCreateDeployment determines if a new deployment needs to be created
func shouldCreateDeployment(
func shouldCreateOrUpdateDeployment(
k8sState *k8s.DeploymentState,
config *Config,
connection temporaliov1alpha1.TemporalConnectionSpec,
) bool {
if config.Status.TargetVersion == nil {
return true
Expand All @@ -232,8 +234,10 @@ func shouldCreateDeployment(

// If the target version already has a deployment, we don't need to create another one
if config.Status.TargetVersion.VersionID == config.TargetVersionID {
if _, exists := k8sState.Deployments[config.TargetVersionID]; exists {
return false
if d, exists := k8sState.Deployments[config.TargetVersionID]; exists {
// If the deployment already exists, we need to check if the secret hash has changed
connectionSpecHash := k8s.ComputeConnectionSpecHash(connection)
return connectionSpecHash != d.Spec.Template.Annotations[temporaliov1alpha1.ConnectionSpecHashAnnotation]
}
}

Expand Down
Loading