Skip to content

Commit 5d9fc97

Browse files
authored
TemporalConnection update propagation (#102)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> <!--- For ALL Contributors 👇 --> ## What was changed - Handle updates to TemporalConnection, for TWD's that are associated with one, so that these updates are propagated deployments. - This was achieved by adding the hashed contents of a TemporalConnection as pod annotations. Thus, if a TemporalConnection were to get updated, it would result in new deployments with the updated annotation. The new updated secret would also be mounted so that the workers starting can be started with the latest secrets. ## Why? - core functionality ## Checklist <!--- add/delete as needed ---> 1. Closes <!-- add issue number here --> - #10 4. How was this tested: - Added unit tests and also tested this locally. The only thing I was not able to do was add an integration test since I was not able to configure our temporaltest server using mTLS. Here's how I tested this locally: - I ran a worker pod which was using the `temporal-cloud-mtls` secret to connect to Temporal. - I updated the namespace the worker was connected to by changing the mTLS secret. This resulted in the controller and the worker pods throwing connection errors (expected) - I updated the local secret in the local k8s cluster (and also changed it's name to `temporal-cloud-mtls-1` since we want our controller to also refresh the client it's using) - Noticed that a rolling deployment was conducted and everything stabalized. Proof of a rolling update: <img width="1056" height="58" alt="Screenshot 2025-08-07 at 5 41 34 PM" src="https://github.com/user-attachments/assets/40720118-0bb5-4f85-aa80-2c7c257b65f2" /> 5. Any docs updates needed? - None
1 parent 40011b7 commit 5d9fc97

File tree

10 files changed

+678
-47
lines changed

10 files changed

+678
-47
lines changed

internal/controller/clientpool/clientpool.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
)
2323

2424
type ClientPoolKey struct {
25-
HostPort string
26-
Namespace string
25+
HostPort string
26+
Namespace string
27+
MutualTLSSecret string // Include secret name in key to invalidate cache when the secret name changes
2728
}
2829

2930
type ClientInfo struct {
@@ -142,8 +143,9 @@ func (cp *ClientPool) UpsertClient(ctx context.Context, opts NewClientOptions) (
142143
defer cp.mux.Unlock()
143144

144145
key := ClientPoolKey{
145-
HostPort: opts.Spec.HostPort,
146-
Namespace: opts.TemporalNamespace,
146+
HostPort: opts.Spec.HostPort,
147+
Namespace: opts.TemporalNamespace,
148+
MutualTLSSecret: opts.Spec.MutualTLSSecret,
147149
}
148150
cp.clients[key] = ClientInfo{
149151
client: c,

internal/controller/execplan.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
5454
}
5555
}
5656

57+
// Update deployments
58+
for _, d := range p.UpdateDeployments {
59+
l.Info("updating deployment", "deployment", d.Name, "namespace", d.Namespace)
60+
if err := r.Update(ctx, d); err != nil {
61+
l.Error(err, "unable to update deployment", "deployment", d)
62+
return fmt.Errorf("unable to update deployment: %w", err)
63+
}
64+
}
65+
5766
// Get deployment handler
5867
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)
5968

internal/controller/genplan.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type plan struct {
2727
DeleteDeployments []*appsv1.Deployment
2828
CreateDeployment *appsv1.Deployment
2929
ScaleDeployments map[*corev1.ObjectReference]uint32
30+
UpdateDeployments []*appsv1.Deployment
3031
// Register new versions as current or with ramp
3132
UpdateVersionConfig *planner.VersionConfig
3233

@@ -90,6 +91,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
9091
&w.Status,
9192
&w.Spec,
9293
temporalState,
94+
connection,
9395
plannerConfig,
9496
)
9597
if err != nil {
@@ -99,6 +101,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
99101
// Convert planner result to controller plan
100102
plan.DeleteDeployments = planResult.DeleteDeployments
101103
plan.ScaleDeployments = planResult.ScaleDeployments
104+
plan.UpdateDeployments = planResult.UpdateDeployments
102105

103106
// Convert version config
104107
plan.UpdateVersionConfig = planResult.VersionConfig

internal/controller/worker_controller.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
ctrl "sigs.k8s.io/controller-runtime"
2222
"sigs.k8s.io/controller-runtime/pkg/client"
2323
"sigs.k8s.io/controller-runtime/pkg/controller"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
2425
"sigs.k8s.io/controller-runtime/pkg/log"
26+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2527
)
2628

2729
var (
@@ -58,7 +60,6 @@ type TemporalWorkerDeploymentReconciler struct {
5860
//
5961
// For more details, check Reconcile and its Result here:
6062
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
61-
// TODO(carlydf): Add watching of temporal connection custom resource (may have issue)
6263
func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6364
// TODO(Shivam): Monitor if the time taken for a successful reconciliation loop is closing in on 5 minutes. If so, we
6465
// may need to increase the timeout value.
@@ -112,8 +113,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
112113

113114
// Get or update temporal client for connection
114115
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
115-
HostPort: temporalConnection.Spec.HostPort,
116-
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
116+
HostPort: temporalConnection.Spec.HostPort,
117+
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
118+
MutualTLSSecret: temporalConnection.Spec.MutualTLSSecret,
117119
}, temporalConnection.Spec.MutualTLSSecret != "")
118120
if !ok {
119121
c, err := r.TemporalClientPool.UpsertClient(ctx, clientpool.NewClientOptions{
@@ -212,9 +214,35 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager)
212214
return ctrl.NewControllerManagedBy(mgr).
213215
For(&temporaliov1alpha1.TemporalWorkerDeployment{}).
214216
Owns(&appsv1.Deployment{}).
217+
Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)).
215218
WithOptions(controller.Options{
216219
MaxConcurrentReconciles: 100,
217220
RecoverPanic: &recoverPanic,
218221
}).
219222
Complete(r)
220223
}
224+
225+
func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request {
226+
var requests []reconcile.Request
227+
228+
// Find all TWDs in same namespace that reference this TC
229+
var twds temporaliov1alpha1.TemporalWorkerDeploymentList
230+
if err := r.List(ctx, &twds, client.InNamespace(tc.GetNamespace())); err != nil {
231+
return requests
232+
}
233+
234+
// Filter to ones using this connection
235+
for _, twd := range twds.Items {
236+
if twd.Spec.WorkerOptions.TemporalConnection == tc.GetName() {
237+
// Enqueue a reconcile request for this TWD
238+
requests = append(requests, reconcile.Request{
239+
NamespacedName: types.NamespacedName{
240+
Name: twd.Name,
241+
Namespace: twd.Namespace,
242+
},
243+
})
244+
}
245+
}
246+
247+
return requests
248+
}

internal/demo/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ minikube kubectl -- get pods -n temporal-worker-controller -w
7979
# Describe the controller pod's status
8080
minikube kubectl -- describe pod <pod-name> -n temporal-worker-controller
8181
82+
# Output the controller pod's logs
83+
minikube kubectl -- logs -n temporal-system -f pod/<pod-name>
84+
8285
# View TemporalWorkerDeployment status
8386
kubectl get twd
8487
```

internal/demo/helloworld/temporal_worker_deployment.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ spec:
1818
steps:
1919
# Increase traffic from 1% to 10% over 15 seconds
2020
- rampPercentage: 1
21-
pauseDuration: 5s
21+
pauseDuration: 30s
2222
- rampPercentage: 5
23-
pauseDuration: 5s
23+
pauseDuration: 30s
2424
- rampPercentage: 10
25-
pauseDuration: 5s
25+
pauseDuration: 30s
2626
# Increase traffic to 50% and wait 1 minute
2727
- rampPercentage: 50
2828
pauseDuration: 1m

internal/k8s/deployments.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package k8s
66

77
import (
88
"context"
9+
"crypto/sha256"
10+
"encoding/hex"
911
"fmt"
1012
"regexp"
1113
"sort"
@@ -25,11 +27,12 @@ import (
2527
const (
2628
DeployOwnerKey = ".metadata.controller"
2729
// BuildIDLabel is the label that identifies the build ID for a deployment
28-
BuildIDLabel = "temporal.io/build-id"
29-
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
30-
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
31-
K8sResourceNameSeparator = "-"
32-
MaxBuildIdLen = 63
30+
BuildIDLabel = "temporal.io/build-id"
31+
DeploymentNameSeparator = "/" // TODO(carlydf): change this to "." once the server accepts `.` in deployment names
32+
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
33+
K8sResourceNameSeparator = "-"
34+
MaxBuildIdLen = 63
35+
ConnectionSpecHashAnnotation = "temporal.io/connection-spec-hash"
3336
)
3437

3538
// DeploymentState represents the Kubernetes state of all deployments for a temporal worker deployment
@@ -256,6 +259,12 @@ func NewDeploymentWithOwnerRef(
256259
})
257260
}
258261

262+
// Build pod annotations
263+
podAnnotations := make(map[string]string)
264+
for k, v := range spec.Template.Annotations {
265+
podAnnotations[k] = v
266+
}
267+
podAnnotations[ConnectionSpecHashAnnotation] = ComputeConnectionSpecHash(connection)
259268
blockOwnerDeletion := true
260269

261270
return &appsv1.Deployment{
@@ -284,7 +293,7 @@ func NewDeploymentWithOwnerRef(
284293
Template: corev1.PodTemplateSpec{
285294
ObjectMeta: metav1.ObjectMeta{
286295
Labels: podLabels,
287-
Annotations: spec.Template.Annotations,
296+
Annotations: podAnnotations,
288297
},
289298
Spec: *podSpec,
290299
},
@@ -293,6 +302,21 @@ func NewDeploymentWithOwnerRef(
293302
}
294303
}
295304

305+
func ComputeConnectionSpecHash(connection temporaliov1alpha1.TemporalConnectionSpec) string {
306+
// HostPort is required, but MutualTLSSecret can be empty for non-mTLS connections
307+
if connection.HostPort == "" {
308+
return ""
309+
}
310+
311+
hasher := sha256.New()
312+
313+
// Hash connection spec fields in deterministic order
314+
_, _ = hasher.Write([]byte(connection.HostPort))
315+
_, _ = hasher.Write([]byte(connection.MutualTLSSecret))
316+
317+
return hex.EncodeToString(hasher.Sum(nil))
318+
}
319+
296320
func NewDeploymentWithControllerRef(
297321
w *temporaliov1alpha1.TemporalWorkerDeployment,
298322
buildID string,

internal/k8s/deployments_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,3 +451,109 @@ func TestComputeWorkerDeploymentName_Integration_WithVersionedName(t *testing.T)
451451
assert.Equal(t, expectedVersionID, versionID)
452452
assert.Equal(t, "hello-world"+k8s.DeploymentNameSeparator+"demo"+k8s.VersionIDSeparator+"v1-0-0-dd84", versionID)
453453
}
454+
455+
// TestNewDeploymentWithPodAnnotations tests that every new pod created has a connection spec hash annotation
456+
func TestNewDeploymentWithPodAnnotations(t *testing.T) {
457+
connection := temporaliov1alpha1.TemporalConnectionSpec{
458+
HostPort: "localhost:7233",
459+
MutualTLSSecret: "my-secret",
460+
}
461+
462+
deployment := k8s.NewDeploymentWithOwnerRef(
463+
&metav1.TypeMeta{},
464+
&metav1.ObjectMeta{Name: "test", Namespace: "default"},
465+
&temporaliov1alpha1.TemporalWorkerDeploymentSpec{},
466+
"test-deployment",
467+
"build123",
468+
connection,
469+
)
470+
471+
expectedHash := k8s.ComputeConnectionSpecHash(connection)
472+
actualHash := deployment.Spec.Template.Annotations[k8s.ConnectionSpecHashAnnotation]
473+
474+
assert.Equal(t, expectedHash, actualHash, "Deployment should have correct connection spec hash annotation")
475+
}
476+
477+
func TestComputeConnectionSpecHash(t *testing.T) {
478+
t.Run("generates non-empty hash for valid connection spec", func(t *testing.T) {
479+
spec := temporaliov1alpha1.TemporalConnectionSpec{
480+
HostPort: "localhost:7233",
481+
MutualTLSSecret: "my-tls-secret",
482+
}
483+
484+
result := k8s.ComputeConnectionSpecHash(spec)
485+
assert.NotEmpty(t, result, "Hash should not be empty for valid spec")
486+
assert.Len(t, result, 64, "SHA256 hash should be 64 characters") // hex encoded SHA256
487+
})
488+
489+
t.Run("returns empty hash when hostport is empty", func(t *testing.T) {
490+
spec := temporaliov1alpha1.TemporalConnectionSpec{
491+
HostPort: "",
492+
MutualTLSSecret: "secret",
493+
}
494+
495+
result := k8s.ComputeConnectionSpecHash(spec)
496+
assert.Empty(t, result, "Hash should be empty when hostport is empty")
497+
})
498+
499+
t.Run("is deterministic - same input produces same hash", func(t *testing.T) {
500+
spec := temporaliov1alpha1.TemporalConnectionSpec{
501+
HostPort: "localhost:7233",
502+
MutualTLSSecret: "my-secret",
503+
}
504+
505+
hash1 := k8s.ComputeConnectionSpecHash(spec)
506+
hash2 := k8s.ComputeConnectionSpecHash(spec)
507+
508+
assert.Equal(t, hash1, hash2, "Same input should produce identical hashes")
509+
})
510+
511+
t.Run("different hostports produce different hashes", func(t *testing.T) {
512+
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
513+
HostPort: "localhost:7233",
514+
MutualTLSSecret: "same-secret",
515+
}
516+
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
517+
HostPort: "different-host:7233",
518+
MutualTLSSecret: "same-secret",
519+
}
520+
521+
hash1 := k8s.ComputeConnectionSpecHash(spec1)
522+
hash2 := k8s.ComputeConnectionSpecHash(spec2)
523+
524+
assert.NotEqual(t, hash1, hash2, "Different hostports should produce different hashes")
525+
})
526+
527+
t.Run("different mTLS secrets produce different hashes", func(t *testing.T) {
528+
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
529+
HostPort: "localhost:7233",
530+
MutualTLSSecret: "secret1",
531+
}
532+
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
533+
HostPort: "localhost:7233",
534+
MutualTLSSecret: "secret2",
535+
}
536+
537+
hash1 := k8s.ComputeConnectionSpecHash(spec1)
538+
hash2 := k8s.ComputeConnectionSpecHash(spec2)
539+
540+
assert.NotEqual(t, hash1, hash2, "Different mTLS secrets should produce different hashes")
541+
})
542+
543+
t.Run("empty mTLS secret vs non-empty produce different hashes", func(t *testing.T) {
544+
spec1 := temporaliov1alpha1.TemporalConnectionSpec{
545+
HostPort: "localhost:7233",
546+
MutualTLSSecret: "",
547+
}
548+
spec2 := temporaliov1alpha1.TemporalConnectionSpec{
549+
HostPort: "localhost:7233",
550+
MutualTLSSecret: "some-secret",
551+
}
552+
553+
hash1 := k8s.ComputeConnectionSpecHash(spec1)
554+
hash2 := k8s.ComputeConnectionSpecHash(spec2)
555+
556+
assert.NotEqual(t, hash1, hash2, "Empty vs non-empty mTLS secret should produce different hashes")
557+
assert.NotEmpty(t, hash1, "Hash should still be generated even with empty mTLS secret")
558+
})
559+
}

0 commit comments

Comments
 (0)