Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
- deployments/scale
verbs:
- update
- apiGroups:
- temporal.io
resources:
- temporalconnections
verbs:
- get
- list
- watch
- apiGroups:
- temporal.io
resources:
- temporalworkerdeployments
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- temporal.io
resources:
- temporalworkerdeployments/finalizers
verbs:
- update
- apiGroups:
- temporal.io
resources:
- temporalworkerdeployments/status
verbs:
- get
- patch
- update
26 changes: 26 additions & 0 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-temporal-io-temporal-io-v1alpha1-temporalworkerdeployment
failurePolicy: Fail
name: mtemporalworker.kb.io
rules:
- apiGroups:
- temporal.io.temporal.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- temporalworkers
sideEffects: None
10 changes: 6 additions & 4 deletions internal/controller/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
)

type ClientPoolKey struct {
HostPort string
Namespace string
HostPort string
Namespace string
MutualTLSSecret string // Include secret name in key to invalidate cache when the secret name itself changes
}

type ClientInfo struct {
Expand Down Expand Up @@ -142,8 +143,9 @@ func (cp *ClientPool) UpsertClient(ctx context.Context, opts NewClientOptions) (
defer cp.mux.Unlock()

key := ClientPoolKey{
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
MutualTLSSecret: opts.Spec.MutualTLSSecret,
}
cp.clients[key] = ClientInfo{
client: c,
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
}
}

// Update deployments
for _, d := range p.UpdateDeployments {
l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace)
if err := r.Update(ctx, d); err != nil {
l.Error(err, "unable to update deployment", "deployment", d)
return fmt.Errorf("unable to update deployment: %w", err)
}
}

// Get deployment handler
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)

Expand Down
3 changes: 3 additions & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type plan struct {
DeleteDeployments []*appsv1.Deployment
CreateDeployment *appsv1.Deployment
ScaleDeployments map[*corev1.ObjectReference]uint32
UpdateDeployments []*appsv1.Deployment
// Register new versions as current or with ramp
UpdateVersionConfig *planner.VersionConfig

Expand Down Expand Up @@ -90,6 +91,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
&w.Status,
&w.Spec,
temporalState,
connection,
plannerConfig,
)
if err != nil {
Expand All @@ -99,6 +101,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
// Convert planner result to controller plan
plan.DeleteDeployments = planResult.DeleteDeployments
plan.ScaleDeployments = planResult.ScaleDeployments
plan.UpdateDeployments = planResult.UpdateDeployments

// Convert version config
plan.UpdateVersionConfig = planResult.VersionConfig
Expand Down
34 changes: 31 additions & 3 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand Down Expand Up @@ -58,7 +60,6 @@ type TemporalWorkerDeploymentReconciler struct {
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
// TODO(carlydf): Add watching of temporal connection custom resource (may have issue)
func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// TODO(Shivam): Monitor if the time taken for a successful reconciliation loop is closing in on 5 minutes. If so, we
// may need to increase the timeout value.
Expand Down Expand Up @@ -112,8 +113,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req

// Get or update temporal client for connection
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
MutualTLSSecret: temporalConnection.Spec.MutualTLSSecret,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlegrone - curious to hear your thoughts on this:

I added the name of the MutualTLSSecret as a key over here since a change to the secret name, for the same connectionSpec object, should trigger our worker controller to refresh the client it is using! This was required since I realized that there could be a world where the old secret is not expired but just replaced.

However, one area where this breaks is if someone just replaces the contents of an existing secret without changing the name. In this way, the controller does not really get a new client.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you solve the problem of content changing without name changing by storing a hash of the name + contents instead of just the name?

Or, if that's inefficient because it means you have to read the secret contents all the time just to generate the map key, we could do a different solution for the contents changing, where if an error occurs when calling the temporal APIs that suggests "wrong credentials", we could try reloading them once just in case it's changed. Then, do a namespace describe to make sure they work, and if they work, restart all the workers.

I don't think this needs to be solved in this PR though, but if you don't solve the "changed content, not expired, same name" thing, do just create an issue about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for responding to this;

yes, I did think about the storing a hash of the name + contents idea but I decided against it because in one of my earlier PR's, we discussed that reading the contents of a secret on every reconciliation loop could be intensive.

having said that, I like the APIs that suggests "wrong credentials", we could try reloading them once just in case it's changed idea but don't think it should come in this PR. I can make a separate issue for this!

}, temporalConnection.Spec.MutualTLSSecret != "")
if !ok {
c, err := r.TemporalClientPool.UpsertClient(ctx, clientpool.NewClientOptions{
Expand Down Expand Up @@ -212,9 +214,35 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
return ctrl.NewControllerManagedBy(mgr).
For(&temporaliov1alpha1.TemporalWorkerDeployment{}).
Owns(&appsv1.Deployment{}).
Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)).
WithOptions(controller.Options{
MaxConcurrentReconciles: 100,
RecoverPanic: &recoverPanic,
}).
Complete(r)
}

func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request {
var requests []reconcile.Request

// Find all TWDs in same namespace that reference this TC
var workers temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &workers, client.InNamespace(tc.GetNamespace())); err != nil {
return requests
}

// Filter to ones using this connection
for _, worker := range workers.Items {
if worker.Spec.WorkerOptions.TemporalConnection == tc.GetName() {
// Add the TWD object as a reconcile request
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: worker.Name,
Namespace: worker.Namespace,
},
})
}
}

return requests
}
3 changes: 3 additions & 0 deletions internal/demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ minikube kubectl -- get pods -n temporal-worker-controller -w
# Describe the controller pod's status
minikube kubectl -- describe pod <pod-name> -n temporal-worker-controller

# Output the controller pod's logs
minikube kubectl -- logs -n temporal-system -f pod/<pod-name>

# View TemporalWorkerDeployment status
kubectl get twd
```
Expand Down
6 changes: 3 additions & 3 deletions internal/demo/helloworld/temporal_worker_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ spec:
steps:
# Increase traffic from 1% to 10% over 15 seconds
- rampPercentage: 1
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 5
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 10
pauseDuration: 5s
pauseDuration: 30s
# Increase traffic to 50% and wait 1 minute
- rampPercentage: 50
pauseDuration: 1m
Expand Down
36 changes: 30 additions & 6 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package k8s

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"regexp"
"sort"
Expand All @@ -25,11 +27,12 @@ import (
const (
DeployOwnerKey = ".metadata.controller"
// BuildIDLabel is the label that identifies the build ID for a deployment
BuildIDLabel = "temporal.io/build-id"
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
K8sResourceNameSeparator = "-"
MaxBuildIdLen = 63
BuildIDLabel = "temporal.io/build-id"
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
K8sResourceNameSeparator = "-"
MaxBuildIdLen = 63
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
)

// DeploymentState represents the Kubernetes state of all deployments for a temporal worker deployment
Expand Down Expand Up @@ -256,6 +259,12 @@ func NewDeploymentWithOwnerRef(
})
}

// Build pod annotations
podAnnotations := make(map[string]string)
for k, v := range spec.Template.Annotations {
podAnnotations[k] = v
}
podAnnotations[ConnectionSpecHashAnnotation] = ComputeConnectionSpecHash(connection)
blockOwnerDeletion := true

return &appsv1.Deployment{
Expand Down Expand Up @@ -284,7 +293,7 @@ func NewDeploymentWithOwnerRef(
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: spec.Template.Annotations,
Annotations: podAnnotations,
},
Spec: *podSpec,
},
Expand All @@ -293,6 +302,21 @@ func NewDeploymentWithOwnerRef(
}
}

func ComputeConnectionSpecHash(connection temporaliov1alpha1.TemporalConnectionSpec) string {
// HostPort is required, but MutualTLSSecret can be empty for non-mTLS connections
if connection.HostPort == "" {
return ""
}

hasher := sha256.New()

// Hash connection spec fields in deterministic order
_, _ = hasher.Write([]byte(connection.HostPort))
_, _ = hasher.Write([]byte(connection.MutualTLSSecret))

return hex.EncodeToString(hasher.Sum(nil))
}

func NewDeploymentWithControllerRef(
w *temporaliov1alpha1.TemporalWorkerDeployment,
buildID string,
Expand Down
Loading
Loading