Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/controller/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
)

type ClientPoolKey struct {
HostPort string
Namespace string
HostPort string
Namespace string
MutualTLSSecret string // Include secret name in key to invalidate cache when the secret name changes
}

type ClientInfo struct {
Expand Down Expand Up @@ -142,8 +143,9 @@ func (cp *ClientPool) UpsertClient(ctx context.Context, opts NewClientOptions) (
defer cp.mux.Unlock()

key := ClientPoolKey{
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
MutualTLSSecret: opts.Spec.MutualTLSSecret,
}
cp.clients[key] = ClientInfo{
client: c,
Expand Down
9 changes: 9 additions & 0 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
}
}

// Update deployments
for _, d := range p.UpdateDeployments {
l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace)
if err := r.Update(ctx, d); err != nil {
l.Error(err, "unable to update deployment", "deployment", d)
return fmt.Errorf("unable to update deployment: %w", err)
}
}

// Get deployment handler
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)

Expand Down
3 changes: 3 additions & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type plan struct {
DeleteDeployments []*appsv1.Deployment
CreateDeployment *appsv1.Deployment
ScaleDeployments map[*corev1.ObjectReference]uint32
UpdateDeployments []*appsv1.Deployment
// Register new versions as current or with ramp
UpdateVersionConfig *planner.VersionConfig

Expand Down Expand Up @@ -90,6 +91,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
&w.Status,
&w.Spec,
temporalState,
connection,
plannerConfig,
)
if err != nil {
Expand All @@ -99,6 +101,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
// Convert planner result to controller plan
plan.DeleteDeployments = planResult.DeleteDeployments
plan.ScaleDeployments = planResult.ScaleDeployments
plan.UpdateDeployments = planResult.UpdateDeployments

// Convert version config
plan.UpdateVersionConfig = planResult.VersionConfig
Expand Down
34 changes: 31 additions & 3 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand Down Expand Up @@ -58,7 +60,6 @@ type TemporalWorkerDeploymentReconciler struct {
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
// TODO(carlydf): Add watching of temporal connection custom resource (may have issue)
func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// TODO(Shivam): Monitor if the time taken for a successful reconciliation loop is closing in on 5 minutes. If so, we
// may need to increase the timeout value.
Expand Down Expand Up @@ -112,8 +113,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req

// Get or update temporal client for connection
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
MutualTLSSecret: temporalConnection.Spec.MutualTLSSecret,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlegrone - curious to hear your thoughts on this:

I added the name of the MutualTLSSecret as a key over here since a change to the secret name, for the same connectionSpec object, should trigger our worker controller to refresh the client it is using! This was required since I realized that there could be a world where the old secret is not expired but just replaced.

However, one area where this breaks is if someone just replaces the contents of an existing secret without changing the name. In this way, the controller does not really get a new client.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you solve the problem of content changing without name changing by storing a hash of the name + contents instead of just the name?

Or, if that's inefficient because it means you have to read the secret contents all the time just to generate the map key, we could do a different solution for the contents changing, where if an error occurs when calling the temporal APIs that suggests "wrong credentials", we could try reloading them once just in case it's changed. Then, do a namespace describe to make sure they work, and if they work, restart all the workers.

I don't think this needs to be solved in this PR though, but if you don't solve the "changed content, not expired, same name" thing, do just create an issue about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for responding to this;

yes, I did think about the storing a hash of the name + contents idea but I decided against it because in one of my earlier PR's, we discussed that reading the contents of a secret on every reconciliation loop could be intensive.

having said that, I like the APIs that suggests "wrong credentials", we could try reloading them once just in case it's changed idea but don't think it should come in this PR. I can make a separate issue for this!

}, temporalConnection.Spec.MutualTLSSecret != "")
if !ok {
c, err := r.TemporalClientPool.UpsertClient(ctx, clientpool.NewClientOptions{
Expand Down Expand Up @@ -212,9 +214,35 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
return ctrl.NewControllerManagedBy(mgr).
For(&temporaliov1alpha1.TemporalWorkerDeployment{}).
Owns(&appsv1.Deployment{}).
Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)).
WithOptions(controller.Options{
MaxConcurrentReconciles: 100,
RecoverPanic: &recoverPanic,
}).
Complete(r)
}

func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request {
var requests []reconcile.Request

// Find all TWDs in same namespace that reference this TC
var twds temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &twds, client.InNamespace(tc.GetNamespace())); err != nil {
return requests
}

// Filter to ones using this connection
for _, twd := range twds.Items {
if twd.Spec.WorkerOptions.TemporalConnection == tc.GetName() {
// Enqueue a reconcile request for this TWD
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: twd.Name,
Namespace: twd.Namespace,
},
})
}
}

return requests
}
3 changes: 3 additions & 0 deletions internal/demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ minikube kubectl -- get pods -n temporal-worker-controller -w
# Describe the controller pod's status
minikube kubectl -- describe pod <pod-name> -n temporal-worker-controller

# Output the controller pod's logs
minikube kubectl -- logs -n temporal-system -f pod/<pod-name>

# View TemporalWorkerDeployment status
kubectl get twd
```
Expand Down
6 changes: 3 additions & 3 deletions internal/demo/helloworld/temporal_worker_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ spec:
steps:
# Increase traffic from 1% to 10% over 15 seconds
- rampPercentage: 1
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 5
pauseDuration: 5s
pauseDuration: 30s
- rampPercentage: 10
pauseDuration: 5s
pauseDuration: 30s
# Increase traffic to 50% and wait 1 minute
- rampPercentage: 50
pauseDuration: 1m
Expand Down
36 changes: 30 additions & 6 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package k8s

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"regexp"
"sort"
Expand All @@ -25,11 +27,12 @@ import (
const (
DeployOwnerKey = ".metadata.controller"
// BuildIDLabel is the label that identifies the build ID for a deployment
BuildIDLabel = "temporal.io/build-id"
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
K8sResourceNameSeparator = "-"
MaxBuildIdLen = 63
BuildIDLabel = "temporal.io/build-id"
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
K8sResourceNameSeparator = "-"
MaxBuildIdLen = 63
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
)

// DeploymentState represents the Kubernetes state of all deployments for a temporal worker deployment
Expand Down Expand Up @@ -256,6 +259,12 @@ func NewDeploymentWithOwnerRef(
})
}

// Build pod annotations
podAnnotations := make(map[string]string)
for k, v := range spec.Template.Annotations {
podAnnotations[k] = v
}
podAnnotations[ConnectionSpecHashAnnotation] = ComputeConnectionSpecHash(connection)
blockOwnerDeletion := true

return &appsv1.Deployment{
Expand Down Expand Up @@ -284,7 +293,7 @@ func NewDeploymentWithOwnerRef(
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: spec.Template.Annotations,
Annotations: podAnnotations,
},
Spec: *podSpec,
},
Expand All @@ -293,6 +302,21 @@ func NewDeploymentWithOwnerRef(
}
}

func ComputeConnectionSpecHash(connection temporaliov1alpha1.TemporalConnectionSpec) string {
// HostPort is required, but MutualTLSSecret can be empty for non-mTLS connections
if connection.HostPort == "" {
return ""
}

hasher := sha256.New()

// Hash connection spec fields in deterministic order
_, _ = hasher.Write([]byte(connection.HostPort))
_, _ = hasher.Write([]byte(connection.MutualTLSSecret))

return hex.EncodeToString(hasher.Sum(nil))
}

func NewDeploymentWithControllerRef(
w *temporaliov1alpha1.TemporalWorkerDeployment,
buildID string,
Expand Down
106 changes: 106 additions & 0 deletions internal/k8s/deployments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,109 @@ func TestComputeWorkerDeploymentName_Integration_WithVersionedName(t *testing.T)
assert.Equal(t, expectedVersionID, versionID)
assert.Equal(t, "hello-world"+k8s.DeploymentNameSeparator+"demo"+k8s.VersionIDSeparator+"v1-0-0-dd84", versionID)
}

// TestNewDeploymentWithPodAnnotations tests that every new pod created has a connection spec hash annotation
func TestNewDeploymentWithPodAnnotations(t *testing.T) {
connection := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "my-secret",
}

deployment := k8s.NewDeploymentWithOwnerRef(
&metav1.TypeMeta{},
&metav1.ObjectMeta{Name: "test", Namespace: "default"},
&temporaliov1alpha1.TemporalWorkerDeploymentSpec{},
"test-deployment",
"build123",
connection,
)

expectedHash := k8s.ComputeConnectionSpecHash(connection)
actualHash := deployment.Spec.Template.Annotations[k8s.ConnectionSpecHashAnnotation]

assert.Equal(t, expectedHash, actualHash, "Deployment should have correct connection spec hash annotation")
}

func TestComputeConnectionSpecHash(t *testing.T) {
t.Run("generates non-empty hash for valid connection spec", func(t *testing.T) {
spec := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "my-tls-secret",
}

result := k8s.ComputeConnectionSpecHash(spec)
assert.NotEmpty(t, result, "Hash should not be empty for valid spec")
assert.Len(t, result, 64, "SHA256 hash should be 64 characters") // hex encoded SHA256
})

t.Run("returns empty hash when hostport is empty", func(t *testing.T) {
spec := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "",
MutualTLSSecret: "secret",
}

result := k8s.ComputeConnectionSpecHash(spec)
assert.Empty(t, result, "Hash should be empty when hostport is empty")
})

t.Run("is deterministic - same input produces same hash", func(t *testing.T) {
spec := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "my-secret",
}

hash1 := k8s.ComputeConnectionSpecHash(spec)
hash2 := k8s.ComputeConnectionSpecHash(spec)

assert.Equal(t, hash1, hash2, "Same input should produce identical hashes")
})

t.Run("different hostports produce different hashes", func(t *testing.T) {
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "same-secret",
}
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "different-host:7233",
MutualTLSSecret: "same-secret",
}

hash1 := k8s.ComputeConnectionSpecHash(spec1)
hash2 := k8s.ComputeConnectionSpecHash(spec2)

assert.NotEqual(t, hash1, hash2, "Different hostports should produce different hashes")
})

t.Run("different mTLS secrets produce different hashes", func(t *testing.T) {
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "secret1",
}
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "secret2",
}

hash1 := k8s.ComputeConnectionSpecHash(spec1)
hash2 := k8s.ComputeConnectionSpecHash(spec2)

assert.NotEqual(t, hash1, hash2, "Different mTLS secrets should produce different hashes")
})

t.Run("empty mTLS secret vs non-empty produce different hashes", func(t *testing.T) {
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "",
}
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
HostPort: "localhost:7233",
MutualTLSSecret: "some-secret",
}

hash1 := k8s.ComputeConnectionSpecHash(spec1)
hash2 := k8s.ComputeConnectionSpecHash(spec2)

assert.NotEqual(t, hash1, hash2, "Empty vs non-empty mTLS secret should produce different hashes")
assert.NotEmpty(t, hash1, "Hash should still be generated even with empty mTLS secret")
})
}
Loading
Loading