Skip to content

Commit 8a1899e

Browse files
committed
start work to create preliminary state
1 parent ad183cd commit 8a1899e

File tree

7 files changed

+199
-40
lines changed

7 files changed

+199
-40
lines changed

internal/controller/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"errors"
1010
"fmt"
11+
"github.com/temporalio/temporal-worker-controller/internal/k8s"
1112
"os"
1213
"strings"
1314
"time"
@@ -61,7 +62,7 @@ func awaitVersionRegistrationInDeployment(
6162
l logr.Logger,
6263
deploymentHandler sdkclient.WorkerDeploymentHandle,
6364
namespace, versionID string) error {
64-
deploymentName, _, _ := strings.Cut(versionID, ".")
65+
deploymentName, _, _ := strings.Cut(versionID, k8s.VersionIDSeparator)
6566
ticker := time.NewTicker(1 * time.Second)
6667
for {
6768
l.Info(fmt.Sprintf("checking if version %s exists in worker deployment", versionID))

internal/demo/util/worker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package util
66

77
import (
8+
"github.com/temporalio/temporal-worker-controller/internal/k8s"
89
"net/http"
910
"time"
1011

@@ -28,7 +29,7 @@ func NewVersionedWorker(opts worker.Options) (w worker.Worker, stopFunc func())
2829

2930
opts.DeploymentOptions = worker.DeploymentOptions{
3031
UseVersioning: true,
31-
Version: mustGetEnv("TEMPORAL_DEPLOYMENT_NAME") + "." + mustGetEnv("WORKER_BUILD_ID"),
32+
Version: mustGetEnv("TEMPORAL_DEPLOYMENT_NAME") + k8s.VersionIDSeparator + mustGetEnv("WORKER_BUILD_ID"),
3233
DefaultVersioningBehavior: workflow.VersioningBehaviorPinned,
3334
}
3435

internal/k8s/deployments.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ const (
2525
// BuildIDLabel is the label that identifies the build ID for a deployment
2626
BuildIDLabel = "temporal.io/build-id"
2727
deploymentNameSeparator = "/"
28-
versionIDSeparator = "."
28+
VersionIDSeparator = "." // TODO(carlydf): change this to ":"
2929
K8sResourceNameSeparator = "-"
3030
MaxBuildIdLen = 63
3131
)
@@ -75,7 +75,7 @@ func GetDeploymentState(
7575
for i := range childDeploys.Items {
7676
deploy := &childDeploys.Items[i]
7777
if buildID, ok := deploy.GetLabels()[BuildIDLabel]; ok {
78-
versionID := workerDeploymentName + "." + buildID
78+
versionID := workerDeploymentName + VersionIDSeparator + buildID
7979
state.Deployments[versionID] = deploy
8080
state.DeploymentsByTime = append(state.DeploymentsByTime, deploy)
8181
state.DeploymentRefs[versionID] = NewObjectRef(deploy)
@@ -110,7 +110,7 @@ func NewObjectRef(obj client.Object) *corev1.ObjectReference {
110110

111111
// ComputeVersionID generates a version ID from the worker deployment spec
112112
func ComputeVersionID(w *temporaliov1alpha1.TemporalWorkerDeployment) string {
113-
return ComputeWorkerDeploymentName(w) + versionIDSeparator + ComputeBuildID(w)
113+
return ComputeWorkerDeploymentName(w) + VersionIDSeparator + ComputeBuildID(w)
114114
}
115115

116116
func ComputeBuildID(w *temporaliov1alpha1.TemporalWorkerDeployment) string {
@@ -161,12 +161,12 @@ func CleanAndTruncateString(s string, n int) string {
161161
}
162162
// Keep only letters, numbers, and dashes
163163
re := regexp.MustCompile(`[^a-zA-Z0-9-]+`)
164-
return re.ReplaceAllString(s, "-")
164+
return re.ReplaceAllString(s, K8sResourceNameSeparator)
165165
}
166166

167167
// SplitVersionID splits a version ID into its components
168168
func SplitVersionID(versionID string) (deploymentName, buildID string, err error) {
169-
parts := strings.Split(versionID, ".")
169+
parts := strings.Split(versionID, VersionIDSeparator)
170170
if len(parts) < 2 {
171171
return "", "", fmt.Errorf("invalid version ID format: %s", versionID)
172172
}

internal/testhelpers/make.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66
corev1 "k8s.io/api/core/v1"
77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
88
"k8s.io/apimachinery/pkg/types"
9+
"time"
10+
)
11+
12+
const (
13+
testTaskQueue = "hello_world"
914
)
1015

1116
func MakeTWD(
@@ -70,7 +75,7 @@ func MakePodSpec(containers []corev1.Container, labels map[string]string, taskQu
7075
func MakeHelloWorldPodSpec(imageName string) corev1.PodTemplateSpec {
7176
return MakePodSpec([]corev1.Container{{Name: "worker", Image: imageName}},
7277
map[string]string{"app": "test-worker"},
73-
"hello_world")
78+
testTaskQueue)
7479
}
7580

7681
func MakeTWDWithImage(imageName string) *temporaliov1alpha1.TemporalWorkerDeployment {
@@ -121,6 +126,37 @@ func MakeTWDWithName(name string) *temporaliov1alpha1.TemporalWorkerDeployment {
121126
return twd
122127
}
123128

129+
func MakeCurrentVersion(namespace, twdName, imageName string, healthy, createDeployment bool) *temporaliov1alpha1.CurrentWorkerDeploymentVersion {
130+
ret := &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
131+
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
132+
VersionID: MakeVersionId(namespace, twdName, imageName),
133+
Status: temporaliov1alpha1.VersionStatusCurrent,
134+
HealthySince: nil,
135+
Deployment: nil,
136+
TaskQueues: []temporaliov1alpha1.TaskQueue{
137+
{Name: testTaskQueue},
138+
},
139+
ManagedBy: "",
140+
},
141+
}
142+
143+
if healthy {
144+
h := metav1.NewTime(time.Now())
145+
ret.HealthySince = &h
146+
}
147+
148+
if createDeployment {
149+
ret.Deployment = &corev1.ObjectReference{
150+
Namespace: namespace,
151+
Name: k8s.ComputeVersionedDeploymentName(
152+
twdName,
153+
MakeBuildId(twdName, imageName, nil),
154+
),
155+
}
156+
}
157+
return ret
158+
}
159+
124160
func ModifyObj[T any](obj T, callback func(obj T) T) T {
125161
return callback(obj)
126162
}

internal/tests/internal/env_helpers.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ package internal
55

66
import (
77
"context"
8+
"fmt"
89
"log/slog"
910
"os"
1011
"os/exec"
1112
"path/filepath"
1213
"runtime"
14+
"strings"
1315
"sync"
1416
"testing"
1517
"time"
@@ -38,20 +40,28 @@ func setupKubebuilderAssets() error {
3840
return nil // Already set
3941
}
4042

41-
// Try to find the assets using setup-envtest
42-
cmd := exec.Command("setup-envtest", "use", "1.28.0", "--bin-dir", "bin")
43+
// Get the repository root to find the setup-envtest binary
44+
_, currentFile, _, ok := runtime.Caller(0)
45+
if !ok {
46+
return fmt.Errorf("failed to get current file path")
47+
}
48+
repoRoot, err := filepath.Abs(filepath.Join(filepath.Dir(currentFile), "../../.."))
49+
if err != nil {
50+
return fmt.Errorf("failed to get repository root: %v", err)
51+
}
52+
53+
// Use the correct version and path that matches the Makefile
54+
setupEnvtestPath := filepath.Join(repoRoot, "bin", "setup-envtest")
55+
binDir := filepath.Join(repoRoot, "bin")
56+
cmd := exec.Command(setupEnvtestPath, "use", "1.27.1", "--bin-dir", binDir, "-p", "path")
4357
output, err := cmd.Output()
4458
if err != nil {
45-
return err
59+
return fmt.Errorf("failed to run setup-envtest: %v", err)
4660
}
4761

48-
// Parse the output to get the path
49-
// The output format is typically: "export KUBEBUILDER_ASSETS=/path/to/assets"
50-
// We need to extract just the path
51-
assetsPath := string(output)
62+
// The output with -p path flag is just the path, no need to parse
63+
assetsPath := strings.TrimSpace(string(output))
5264
if len(assetsPath) > 0 {
53-
// Remove any trailing newlines
54-
assetsPath = assetsPath[:len(assetsPath)-1]
5565
os.Setenv("KUBEBUILDER_ASSETS", assetsPath)
5666
}
5767

internal/tests/internal/integration_test.go

Lines changed: 131 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type testCase struct {
3333
// versions, so for test scenarios that start with existing deprecated version Deployments,
3434
// specify the number of replicas for each deprecated build here.
3535
deprecatedBuildReplicas map[string]int32
36+
deprecatedBuildImages map[string]string
3637
expectedStatus *temporaliov1alpha1.TemporalWorkerDeploymentStatus
3738
}
3839

@@ -87,19 +88,8 @@ func TestIntegration(t *testing.T) {
8788
}),
8889
deprecatedBuildReplicas: nil,
8990
expectedStatus: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
90-
TargetVersion: nil,
91-
CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
92-
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
93-
VersionID: testhelpers.MakeVersionId(testNamespace.Name, "all-at-once-rollout-2-replicas", "v1"),
94-
Deployment: &corev1.ObjectReference{
95-
Namespace: testNamespace.Name,
96-
Name: k8s.ComputeVersionedDeploymentName(
97-
"all-at-once-rollout-2-replicas",
98-
testhelpers.MakeBuildId("all-at-once-rollout-2-replicas", "v1", nil),
99-
),
100-
},
101-
},
102-
},
91+
TargetVersion: nil,
92+
CurrentVersion: testhelpers.MakeCurrentVersion(testNamespace.Name, "all-at-once-rollout-2-replicas", "v1", true, false),
10393
RampingVersion: nil,
10494
DeprecatedVersions: nil,
10595
VersionConflictToken: nil,
@@ -122,9 +112,18 @@ func TestIntegration(t *testing.T) {
122112
Namespace: testNamespace.Name,
123113
Labels: map[string]string{"app": "test-worker"},
124114
}
115+
obj.Status = temporaliov1alpha1.TemporalWorkerDeploymentStatus{
116+
TargetVersion: nil,
117+
CurrentVersion: testhelpers.MakeCurrentVersion(testNamespace.Name, "progressive-rollout-expect-first-step", "v0", true, true),
118+
RampingVersion: nil,
119+
DeprecatedVersions: nil,
120+
VersionConflictToken: nil,
121+
LastModifierIdentity: "",
122+
}
125123
return obj
126124
}),
127-
deprecatedBuildReplicas: nil,
125+
deprecatedBuildReplicas: map[string]int32{testhelpers.MakeBuildId("progressive-rollout-expect-first-step", "v0", nil): 1},
126+
deprecatedBuildImages: map[string]string{testhelpers.MakeBuildId("progressive-rollout-expect-first-step", "v0", nil): "v0"},
128127
expectedStatus: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{
129128
TargetVersion: &temporaliov1alpha1.TargetWorkerDeploymentVersion{
130129
BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{
@@ -168,20 +167,129 @@ func TestIntegration(t *testing.T) {
168167

169168
for testName, tc := range tests {
170169
t.Run(testName, func(t *testing.T) {
171-
// TODO(carlydf): create starting test env
172-
// - Use input.Status + deprecatedBuildReplicas to create (and maybe kill) pollers for deprecated versions in temporal
173-
// - also get routing config of the deployment into the starting state before running the test
174-
175-
testTemporalWorkerDeploymentCreation(t, k8sClient, ts, tc.input, tc.expectedStatus)
170+
ctx := context.Background()
171+
// TODO(carlydf): populate all fields in tc that are set to testName, so that the user does not need to specify
172+
testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, ts, tc)
176173
})
177174

178175
}
179176

180177
}
181178

179+
// Uses input.Status + deprecatedBuildReplicas to create (and maybe kill) pollers for deprecated versions in temporal
180+
// also gets routing config of the deployment into the starting state before running the test.
181+
// Does not set Status.VersionConflictToken, since that is only set internally by the server.
182+
func makePreliminaryStatusTrue(
183+
ctx context.Context,
184+
t *testing.T,
185+
k8sClient client.Client,
186+
ts *temporaltest.TestServer,
187+
twd *temporaliov1alpha1.TemporalWorkerDeployment,
188+
connection *temporaliov1alpha1.TemporalConnection,
189+
replicas map[string]int32,
190+
images map[string]string,
191+
) {
192+
t.Logf("Creating starting test env based on input.Status")
193+
for _, dv := range twd.Status.DeprecatedVersions {
194+
t.Logf("Handling deprecated version %v", dv.VersionID)
195+
switch dv.Status {
196+
case temporaliov1alpha1.VersionStatusInactive:
197+
// start a poller -- is this included in deprecated versions list?
198+
case temporaliov1alpha1.VersionStatusRamping, temporaliov1alpha1.VersionStatusCurrent:
199+
// these won't be in deprecated versions
200+
case temporaliov1alpha1.VersionStatusDraining:
201+
// TODO(carlydf): start a poller, set ramp, start a wf on that version, then unset
202+
case temporaliov1alpha1.VersionStatusDrained:
203+
// TODO(carlydf): start a poller, set ramp, unset, wait for drainage status visibility grace period
204+
case temporaliov1alpha1.VersionStatusNotRegistered:
205+
// no-op, although I think this won't occur in deprecated versions either
206+
}
207+
}
208+
// TODO(carlydf): handle Status.LastModifierIdentity
209+
if cv := twd.Status.CurrentVersion; cv != nil {
210+
t.Logf("Handling current version %v", cv.VersionID)
211+
if cv.Status != temporaliov1alpha1.VersionStatusCurrent {
212+
t.Errorf("Current Version's status must be Current")
213+
}
214+
if cv.Deployment != nil {
215+
t.Logf("Creating Deployment %s for Current Version", cv.Deployment.Name)
216+
createWorkerDeployment(ctx, t, k8sClient, twd, cv.VersionID, connection.Spec, replicas, images)
217+
expectedDeploymentName := k8s.ComputeVersionedDeploymentName(twd.Name, k8s.ComputeBuildID(twd))
218+
waitForDeployment(t, k8sClient, expectedDeploymentName, twd.Namespace, 30*time.Second)
219+
workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, twd.Namespace)
220+
defer func() {
221+
for _, f := range workerStopFuncs {
222+
if f != nil {
223+
f()
224+
}
225+
}
226+
}()
227+
}
228+
}
229+
230+
if rv := twd.Status.RampingVersion; rv != nil {
231+
t.Logf("Handling ramping version %v", rv.VersionID)
232+
if rv.Status != temporaliov1alpha1.VersionStatusRamping {
233+
t.Errorf("Ramping Version's status must be Ramping")
234+
}
235+
if rv.Deployment != nil {
236+
t.Logf("Creating Deployment %s for Ramping Version", rv.Deployment.Name)
237+
}
238+
// TODO(carlydf): do this
239+
}
240+
}
241+
242+
func createWorkerDeployment(
243+
ctx context.Context,
244+
t *testing.T,
245+
k8sClient client.Client,
246+
twd *temporaliov1alpha1.TemporalWorkerDeployment,
247+
versionID string,
248+
connection temporaliov1alpha1.TemporalConnectionSpec,
249+
replicas map[string]int32,
250+
images map[string]string,
251+
) {
252+
t.Log("Creating a Deployment")
253+
_, buildId, err := k8s.SplitVersionID(versionID)
254+
if err != nil {
255+
t.Error(err)
256+
}
257+
258+
prevImageName := twd.Spec.Template.Spec.Containers[0].Image
259+
prevReplicas := twd.Spec.Replicas
260+
// temporarily replace it
261+
twd.Spec.Template.Spec.Containers[0].Image = images[buildId]
262+
newReplicas := replicas[buildId]
263+
twd.Spec.Replicas = &newReplicas
264+
defer func() {
265+
twd.Spec.Template.Spec.Containers[0].Image = prevImageName
266+
twd.Spec.Replicas = prevReplicas
267+
}()
268+
269+
dep := k8s.NewDeploymentWithOwnerRef(
270+
&twd.TypeMeta,
271+
&twd.ObjectMeta,
272+
&twd.Spec,
273+
k8s.ComputeWorkerDeploymentName(twd),
274+
buildId,
275+
connection,
276+
)
277+
278+
if err := k8sClient.Create(ctx, dep); err != nil {
279+
t.Fatalf("failed to create Deployment: %v", err)
280+
}
281+
}
282+
182283
// testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status
183-
func testTemporalWorkerDeploymentCreation(t *testing.T, k8sClient client.Client, ts *temporaltest.TestServer, twd *temporaliov1alpha1.TemporalWorkerDeployment, expectedStatus *temporaliov1alpha1.TemporalWorkerDeploymentStatus) {
184-
ctx := context.Background()
284+
func testTemporalWorkerDeploymentCreation(
285+
ctx context.Context,
286+
t *testing.T,
287+
k8sClient client.Client,
288+
ts *temporaltest.TestServer,
289+
tc testCase,
290+
) {
291+
twd := tc.input
292+
expectedStatus := tc.expectedStatus
185293

186294
t.Log("Creating a TemporalConnection")
187295
temporalConnection := &temporaliov1alpha1.TemporalConnection{
@@ -197,6 +305,8 @@ func testTemporalWorkerDeploymentCreation(t *testing.T, k8sClient client.Client,
197305
t.Fatalf("failed to create TemporalConnection: %v", err)
198306
}
199307

308+
makePreliminaryStatusTrue(ctx, t, k8sClient, ts, twd, temporalConnection, tc.deprecatedBuildReplicas, tc.deprecatedBuildImages)
309+
200310
t.Log("Creating a TemporalWorkerDeployment")
201311
if err := k8sClient.Create(ctx, twd); err != nil {
202312
t.Fatalf("failed to create TemporalWorkerDeployment: %v", err)

0 commit comments

Comments
 (0)