Skip to content

Commit fce3c2a

Browse files
authored
feat: add support for gating rollouts behind successful workflow executions (#152)
<!--- Note to EXTERNAL Contributors --> <!-- Thanks for opening a PR! If it is a significant code change, please **make sure there is an open issue** for this. We work best with you when we have accepted the idea first before you code. --> This PR adds support for custom inputs to the "gate" workflow, which was originally part of the docs but I discovered was not implemented when I tried to use it 😅. <!--- For ALL Contributors 👇 --> ## What was changed <!-- Describe what has changed in this PR --> This PR includes a few changes: 1. Gated workflow input specification support both in-line and as a ConfigMap 2. Moved CRDs to top level chart directory, which ensures they are installed first per [helm docs](https://helm.sh/docs/chart_best_practices/custom_resource_definitions/#method-1-let-helm-do-it-for-you) 3. Because of 2, TemporalConnection can now be added to the helm templates/values definitions 4. Relevant docs updates ## Why? <!-- Tell your future self why have you made these changes --> Gated workflow success for our use case really depends on being able to specify exactly the workflow inputs. ## Checklist <!--- add/delete as needed ---> ### Closes <!-- add issue number here --> Here is the [link to temporal community slack thread](https://temporalio.slack.com/archives/C07MDJ6S3HP/p1759190414718889) where I discussed with @Shivs11 and @carlydf . ### How was this tested: <!--- Please describe how you tested your changes/how we can test them --> Already deployed in our staging cluster, using configMap with a huge workflow input. This workflow must complete successfully or the new workers will not roll out. ### Any docs updates needed? <!--- update README if applicable or point out where to update docs.temporal.io --> Updated docs accordingly.
1 parent 3192bda commit fce3c2a

File tree

11 files changed

+491
-19
lines changed

11 files changed

+491
-19
lines changed

api/v1alpha1/temporalworker_webhook.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,27 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
131131
}
132132
}
133133

134+
// Validate gate input fields
135+
if s.Gate != nil {
136+
gate := s.Gate
137+
if gate.Input != nil && gate.InputFrom != nil {
138+
allErrs = append(allErrs,
139+
field.Invalid(field.NewPath("spec.rollout.gate"), "input & inputFrom",
140+
"only one of input or inputFrom may be set"),
141+
)
142+
}
143+
if gate.InputFrom != nil {
144+
cm := gate.InputFrom.ConfigMapKeyRef
145+
sec := gate.InputFrom.SecretKeyRef
146+
if (cm == nil && sec == nil) || (cm != nil && sec != nil) {
147+
allErrs = append(allErrs,
148+
field.Invalid(field.NewPath("spec.rollout.gate.inputFrom"), gate.InputFrom,
149+
"exactly one of configMapKeyRef or secretKeyRef must be set"),
150+
)
151+
}
152+
}
153+
}
154+
134155
return allErrs
135156
}
136157

api/v1alpha1/worker_types.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package v1alpha1
66

77
import (
88
corev1 "k8s.io/api/core/v1"
9+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1011
)
1112

@@ -273,6 +274,25 @@ const (
273274

274275
type GateWorkflowConfig struct {
275276
WorkflowType string `json:"workflowType"`
277+
// Input is an arbitrary JSON object passed as the first parameter to the gate workflow.
278+
// For inputs with secrets use SecretKeyRef in InputFrom to omit from logs.
279+
// +optional
280+
Input *apiextensionsv1.JSON `json:"input,omitempty"`
281+
// InputFrom references a key in a ConfigMap or Secret whose contents are passed
282+
// as the first parameter to the gate workflow. The referenced value should be a JSON document.
283+
// For inputs with secrets use SecretKeyRef to omit from logs.
284+
// +optional
285+
InputFrom *GateInputSource `json:"inputFrom,omitempty"`
286+
}
287+
288+
// GateInputSource references a value from a ConfigMap or a Secret
289+
type GateInputSource struct {
290+
// Select a key of a ConfigMap in the same namespace
291+
// +optional
292+
ConfigMapKeyRef *corev1.ConfigMapKeySelector `json:"configMapKeyRef,omitempty"`
293+
// Select a key of a Secret in the same namespace
294+
// +optional
295+
SecretKeyRef *corev1.SecretKeySelector `json:"secretKeyRef,omitempty"`
276296
}
277297

278298
// RolloutStrategy defines strategy to apply during next rollout

docs/architecture.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,28 @@ Here is an example of a progressive cut-over strategy gated on the success of th
7171
workflowType: "HelloWorld"
7272
```
7373
74+
### Gate Input Flow
75+
76+
When `spec.rollout.gate` is configured, the controller starts one test workflow per task queue in the Target Version. If gate input is provided, it is passed as the first workflow argument:
77+
78+
```yaml
79+
rollout:
80+
gate:
81+
workflowType: "RolloutGate"
82+
# Inline JSON object
83+
input:
84+
thresholds:
85+
errorRate: 0.01
86+
p95LatencyMs: 250
87+
# Or reference ConfigMap/Secret (JSON document)
88+
# inputFrom:
89+
# configMapKeyRef:
90+
# name: my-gate-input
91+
# key: payload.json
92+
```
93+
94+
Input resolution happens in the controller before invoking the workflow. The payload must be a JSON object and is not logged, except for size and a short preview for debugging.
95+
7496
## Deployment Flow Diagram
7597

7698
```mermaid

docs/concepts.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,15 @@ Configuration that tells the controller how to connect to the same Temporal clus
9595
Defines how new versions are promoted:
9696
- **strategy**: Manual, AllAtOnce, or Progressive
9797
- **steps**: For Progressive strategy, defines ramp percentages and pause durations
98-
- **gate**: Optional workflow that must succeed on all task queues in the target Worker Deployment Version before promotion continues
98+
- **gate**: Optional workflow that must succeed on all task queues in the target Worker Deployment Version before promotion continues. Gate can receive an input payload:
99+
- `workflowType`: The workflow name/type to execute for validation
100+
- `input`: Inline JSON object passed as the first workflow argument
101+
- `inputFrom`: Reference to a `ConfigMap` or `Secret` key whose contents are JSON; passed as the first workflow argument
102+
103+
Notes on gate inputs:
104+
- Exactly one of `input` or `inputFrom` may be set.
105+
- The value must be a JSON object (not a string containing JSON).
106+
- Large/sensitive payloads should use `inputFrom.secretKeyRef` or split into smaller documents.
99107

100108
### Sunset Configuration
101109
Defines how Drained versions are cleaned up:

docs/configuration.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,29 @@ rollout:
184184
pauseDuration: 5m
185185
gate:
186186
workflowType: "HealthCheck"
187+
# Optionally provide input to the gate workflow:
188+
# 1) Inline arbitrary JSON:
189+
# input:
190+
# thresholds:
191+
# errorRate: 0.01
192+
# p95LatencyMs: 250
193+
# 2) Or reference a key from a ConfigMap or Secret containing JSON:
194+
# inputFrom:
195+
# configMapKeyRef:
196+
# name: gate-input
197+
# key: payload.json
198+
# inputFrom:
199+
# secretKeyRef:
200+
# name: gate-input
201+
# key: payload.json
187202
```
188203

204+
Gate workflow input details:
205+
- Exactly one of `input` or `inputFrom` may be set.
206+
- `input` accepts any JSON object and is passed as the first parameter to the gate workflow.
207+
- `inputFrom` reads a JSON document from the specified `ConfigMap` or `Secret` key.
208+
- The target workflow should declare a single argument matching the JSON shape (e.g., a struct or `json.RawMessage`).
209+
189210
## Advanced Configuration
190211

191212
### Environment-Specific Configurations

docs/migration-guide.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,21 @@ spec:
545545
pauseDuration: 30m
546546
gate:
547547
workflowType: "HealthCheck" # Validate new version before proceeding
548+
# Optional gate input examples
549+
# Inline JSON object:
550+
# input:
551+
# thresholds:
552+
# errorRate: 0.01
553+
# p95LatencyMs: 250
554+
# Or load from a ConfigMap/Secret key containing JSON:
555+
# inputFrom:
556+
# configMapKeyRef:
557+
# name: order-processor-gate-input
558+
# key: payload.json
559+
# inputFrom:
560+
# secretKeyRef:
561+
# name: gate-input
562+
# key: payload.json
548563
---
549564
# Staging - Fast rollout for testing
550565
apiVersion: temporal.io/v1alpha1

helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,36 @@ spec:
6363
properties:
6464
gate:
6565
properties:
66+
input:
67+
type: object
68+
x-kubernetes-preserve-unknown-fields: true
69+
inputFrom:
70+
properties:
71+
configMapKeyRef:
72+
properties:
73+
key:
74+
type: string
75+
name:
76+
type: string
77+
optional:
78+
type: boolean
79+
required:
80+
- key
81+
- name
82+
type: object
83+
secretKeyRef:
84+
properties:
85+
key:
86+
type: string
87+
name:
88+
type: string
89+
optional:
90+
type: boolean
91+
required:
92+
- key
93+
- name
94+
type: object
95+
type: object
6696
workflowType:
6797
type: string
6898
required:

internal/controller/execplan.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package controller
66

77
import (
88
"context"
9+
"encoding/json"
910
"fmt"
1011
"time"
1112

@@ -68,7 +69,47 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
6869
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(p.WorkerDeploymentName)
6970

7071
for _, wf := range p.startTestWorkflows {
71-
if _, err := temporalClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
72+
// Log workflow start details
73+
if len(wf.input) > 0 {
74+
if wf.isInputSecret {
75+
// Don't log the actual input if it came from a Secret
76+
l.Info("starting gate workflow",
77+
"workflowType", wf.workflowType,
78+
"taskQueue", wf.taskQueue,
79+
"buildID", wf.buildID,
80+
"inputBytes", len(wf.input),
81+
"inputSource", "SecretRef (contents hidden)",
82+
)
83+
} else {
84+
// For non-secret sources, parse JSON and extract keys
85+
var inputKeys []string
86+
if len(wf.input) > 0 {
87+
var jsonData map[string]interface{}
88+
if err := json.Unmarshal(wf.input, &jsonData); err == nil {
89+
for key := range jsonData {
90+
inputKeys = append(inputKeys, key)
91+
}
92+
}
93+
}
94+
95+
// Log the input keys for non-secret sources (inline or ConfigMap)
96+
l.Info("starting gate workflow",
97+
"workflowType", wf.workflowType,
98+
"taskQueue", wf.taskQueue,
99+
"buildID", wf.buildID,
100+
"inputBytes", len(wf.input),
101+
"inputKeys", inputKeys,
102+
)
103+
}
104+
} else {
105+
l.Info("starting gate workflow",
106+
"workflowType", wf.workflowType,
107+
"taskQueue", wf.taskQueue,
108+
"buildID", wf.buildID,
109+
"inputBytes", 0,
110+
)
111+
}
112+
opts := sdkclient.StartWorkflowOptions{
72113
ID: wf.workflowID,
73114
TaskQueue: wf.taskQueue,
74115
WorkflowExecutionTimeout: time.Hour,
@@ -80,7 +121,14 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
80121
BuildId: wf.buildID,
81122
},
82123
},
83-
}, wf.workflowType); err != nil {
124+
}
125+
var err error
126+
if len(wf.input) > 0 {
127+
_, err = temporalClient.ExecuteWorkflow(ctx, opts, wf.workflowType, json.RawMessage(wf.input))
128+
} else {
129+
_, err = temporalClient.ExecuteWorkflow(ctx, opts, wf.workflowType)
130+
}
131+
if err != nil {
84132
return fmt.Errorf("unable to start test workflow execution: %w", err)
85133
}
86134
}

internal/controller/genplan.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/temporalio/temporal-worker-controller/internal/temporal"
1616
appsv1 "k8s.io/api/apps/v1"
1717
corev1 "k8s.io/api/core/v1"
18+
"k8s.io/apimachinery/pkg/types"
1819
)
1920

2021
// plan holds the actions to execute during reconciliation
@@ -41,10 +42,12 @@ type plan struct {
4142

4243
// startWorkflowConfig defines a workflow to be started
4344
type startWorkflowConfig struct {
44-
workflowType string
45-
workflowID string
46-
buildID string
47-
taskQueue string
45+
workflowType string
46+
workflowID string
47+
buildID string
48+
taskQueue string
49+
input []byte
50+
isInputSecret bool // indicates if input should be treated as sensitive
4851
}
4952

5053
// generatePlan creates a plan for the controller to execute
@@ -86,6 +89,39 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
8689
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
8790
}
8891

92+
// Resolve gate input if gate is configured
93+
var gateInput []byte
94+
var isGateInputSecret bool
95+
if rolloutStrategy.Gate != nil {
96+
// Fetch ConfigMap or Secret data if needed
97+
var configMapData map[string]string
98+
var configMapBinaryData map[string][]byte
99+
var secretData map[string][]byte
100+
101+
if rolloutStrategy.Gate.InputFrom != nil {
102+
if cmRef := rolloutStrategy.Gate.InputFrom.ConfigMapKeyRef; cmRef != nil {
103+
cm := &corev1.ConfigMap{}
104+
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: cmRef.Name}, cm); err != nil {
105+
return nil, fmt.Errorf("failed to get ConfigMap %s/%s: %w", w.Namespace, cmRef.Name, err)
106+
}
107+
configMapData = cm.Data
108+
configMapBinaryData = cm.BinaryData
109+
}
110+
if secRef := rolloutStrategy.Gate.InputFrom.SecretKeyRef; secRef != nil {
111+
sec := &corev1.Secret{}
112+
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: secRef.Name}, sec); err != nil {
113+
return nil, fmt.Errorf("failed to get Secret %s/%s: %w", w.Namespace, secRef.Name, err)
114+
}
115+
secretData = sec.Data
116+
}
117+
}
118+
119+
gateInput, isGateInputSecret, err = planner.ResolveGateInput(rolloutStrategy.Gate, w.Namespace, configMapData, configMapBinaryData, secretData)
120+
if err != nil {
121+
return nil, fmt.Errorf("unable to resolve gate input: %w", err)
122+
}
123+
}
124+
89125
// Generate the plan using the planner package
90126
plannerConfig := &planner.Config{
91127
RolloutStrategy: rolloutStrategy,
@@ -101,6 +137,8 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
101137
plannerConfig,
102138
workerDeploymentName,
103139
r.MaxDeploymentVersionsIneligibleForDeletion,
140+
gateInput,
141+
isGateInputSecret,
104142
)
105143
if err != nil {
106144
return nil, fmt.Errorf("error generating plan: %w", err)
@@ -119,10 +157,12 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
119157
// Convert test workflows
120158
for _, wf := range planResult.TestWorkflows {
121159
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
122-
workflowType: wf.WorkflowType,
123-
workflowID: wf.WorkflowID,
124-
buildID: wf.BuildID,
125-
taskQueue: wf.TaskQueue,
160+
workflowType: wf.WorkflowType,
161+
workflowID: wf.WorkflowID,
162+
buildID: wf.BuildID,
163+
taskQueue: wf.TaskQueue,
164+
input: []byte(wf.GateInput),
165+
isInputSecret: wf.IsInputSecret,
126166
})
127167
}
128168

0 commit comments

Comments
 (0)