Skip to content

Commit 6c38ecf

Browse files
authored
Add worker versioning sample (#426)
1 parent 7cfe0d8 commit 6c38ecf

File tree

8 files changed

+534
-0
lines changed

8 files changed

+534
-0
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ resource waiting for its successful completion
252252
- [**Early-Return**](./early-return):
253253
Demonstrates how to receive a response mid-workflow, while the workflow continues to run to completion.
254254

255+
- [**Worker Versioning**](./worker-versioning):
256+
Demonstrates how to use worker versioning to manage workflow code changes.
257+
255258
### Pending examples
256259

257260
Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/

worker-versioning/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
## Worker Versioning
2+
3+
This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs.
4+
5+
The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates:
6+
- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions
7+
- **Pinned workflows**: Stay on the original worker version throughout their lifecycle
8+
- **Compatible vs incompatible changes**: How to make safe updates using `workflow.GetVersion`
9+
10+
### Steps to run this sample:
11+
12+
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
13+
14+
2) Start the main application (this will guide you through the sample):
15+
```bash
16+
go run worker-versioning/app/main.go
17+
```
18+
19+
3) Follow the prompts to start workers in separate terminals:
20+
- When prompted, run: `go run worker-versioning/workerv1/main.go`
21+
- When prompted, run: `go run worker-versioning/workerv1.1/main.go`
22+
- When prompted, run: `go run worker-versioning/workerv2/main.go`
23+
24+
The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows remain on their original version.

worker-versioning/app/main.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/pborman/uuid"
9+
10+
worker_versioning "github.com/temporalio/samples-go/worker-versioning"
11+
12+
"go.temporal.io/sdk/client"
13+
"go.temporal.io/sdk/worker"
14+
)
15+
16+
func main() {
17+
ctx := context.Background()
18+
19+
// The client is a heavyweight object that should be created once per process.
20+
c, err := client.Dial(client.Options{
21+
HostPort: client.DefaultHostPort,
22+
})
23+
if err != nil {
24+
log.Fatalln("Unable to create client", err)
25+
}
26+
defer c.Close()
27+
28+
// First, we wait for the v1 worker to appear, and we will set it as the current version for
29+
// the deployment.
30+
log.Println("Waiting for v1 worker to appear. Run `go run worker-versioning/workerv1/main.go` in another terminal")
31+
err = waitForWorkerAndMakeCurrent(c, "1.0")
32+
if err != nil {
33+
log.Fatalln("Unable to set current deployment version", err)
34+
}
35+
36+
// Next we'll start two workflows, one which uses the `AutoUpgrade` behavior, and one which uses
37+
// `Pinned`. Importantly, note that when we start the workflows, we are using a workflow type
38+
// name which does *not* include the version number. We defined them with versioned names so
39+
// we could show changes to the code, but here when the client invokes them, we're demonstrating
40+
// that the client remains version-agnostic.
41+
autoUpgradeWorkflowId := "worker-versioning-versioning-autoupgrade_" + uuid.New()
42+
autoUpgradeWorkflowOptions := client.StartWorkflowOptions{
43+
ID: autoUpgradeWorkflowId,
44+
TaskQueue: worker_versioning.TaskQueue,
45+
}
46+
autoUpgradeExecution, err := c.ExecuteWorkflow(ctx, autoUpgradeWorkflowOptions, "AutoUpgradingWorkflow")
47+
if err != nil {
48+
log.Fatalln("Unable to start workflow", err)
49+
}
50+
log.Println("Started auto-upgrading workflow",
51+
"WorkflowID", autoUpgradeExecution.GetID(), "RunID", autoUpgradeExecution.GetRunID())
52+
53+
pinnedWorkflowId := "worker-versioning-versioning-pinned_" + uuid.New()
54+
pinnedWorkflowOptions := client.StartWorkflowOptions{
55+
ID: pinnedWorkflowId,
56+
TaskQueue: worker_versioning.TaskQueue,
57+
}
58+
pinnedExecution, err := c.ExecuteWorkflow(ctx, pinnedWorkflowOptions, "PinnedWorkflow")
59+
if err != nil {
60+
log.Fatalln("Unable to start workflow", err)
61+
}
62+
log.Println("Started pinned workflow",
63+
"WorkflowID", pinnedExecution.GetID(), "RunID", pinnedExecution.GetRunID())
64+
65+
// Signal both workflows a few times to drive them
66+
err = advanceWorkflows(ctx, c, autoUpgradeExecution, pinnedExecution)
67+
if err != nil {
68+
log.Fatalln("Unable to signal workflow", err)
69+
}
70+
71+
// Now wait for the v1.1 worker to appear and become current
72+
log.Println("Waiting for v1.1 worker to appear. Run `go run worker-versioning/workerv1.1/main.go` in another terminal")
73+
err = waitForWorkerAndMakeCurrent(c, "1.1")
74+
if err != nil {
75+
log.Fatalln("Unable to set current deployment version", err)
76+
}
77+
78+
// Once it has, we will continue to advance the workflows.
79+
// The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
80+
// keep progressing on the old worker.
81+
err = advanceWorkflows(ctx, c, autoUpgradeExecution, pinnedExecution)
82+
if err != nil {
83+
log.Fatalln("Unable to signal workflow", err)
84+
}
85+
86+
// Finally we'll start the v2 worker, and again it'll become the new current version
87+
log.Println("Waiting for v2 worker to appear. Run `go run worker-versioning/workerv2/main.go` in another terminal")
88+
err = waitForWorkerAndMakeCurrent(c, "2.0")
89+
if err != nil {
90+
log.Fatalln("Unable to set current deployment version", err)
91+
}
92+
93+
// Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
94+
// pinned worklfows start on the current version.
95+
pinnedWorkflow2Id := "worker-versioning-versioning-pinned-2_" + uuid.New()
96+
pinnedWorkflow2Options := client.StartWorkflowOptions{
97+
ID: pinnedWorkflow2Id,
98+
TaskQueue: worker_versioning.TaskQueue,
99+
}
100+
pinnedExecution2, err := c.ExecuteWorkflow(ctx, pinnedWorkflow2Options, "PinnedWorkflow")
101+
if err != nil {
102+
log.Fatalln("Unable to start workflow", err)
103+
}
104+
log.Println("Started pinned workflow v2",
105+
"WorkflowID", pinnedExecution2.GetID(), "RunID", pinnedExecution2.GetRunID())
106+
107+
// Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
108+
// workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
109+
for _, handle := range []client.WorkflowRun{autoUpgradeExecution, pinnedExecution, pinnedExecution2} {
110+
err = c.SignalWorkflow(ctx, handle.GetID(), handle.GetRunID(),
111+
"do-next-signal", "conclude")
112+
if err != nil {
113+
log.Fatalln("Unable to signal workflow", err)
114+
return
115+
}
116+
err = handle.Get(ctx, nil)
117+
if err != nil {
118+
log.Fatalln("Unable to get workflow result", err)
119+
}
120+
}
121+
}
122+
123+
func advanceWorkflows(ctx context.Context, c client.Client, autoUpgradeExecution client.WorkflowRun, pinnedExecution client.WorkflowRun) error {
124+
var err error
125+
for i := 0; i < 3; i++ {
126+
err = c.SignalWorkflow(ctx, autoUpgradeExecution.GetID(), autoUpgradeExecution.GetRunID(),
127+
"do-next-signal", "do-activity")
128+
if err != nil {
129+
return err
130+
}
131+
err = c.SignalWorkflow(ctx, pinnedExecution.GetID(), pinnedExecution.GetRunID(),
132+
"do-next-signal", "some-signal")
133+
if err != nil {
134+
return err
135+
}
136+
}
137+
return nil
138+
}
139+
140+
func waitForWorkerAndMakeCurrent(c client.Client, buildID string) error {
141+
ctx := context.Background()
142+
deploymentHandle := c.WorkerDeploymentClient().GetHandle(worker_versioning.DeploymentName)
143+
version := worker.WorkerDeploymentVersion{
144+
DeploymentName: worker_versioning.DeploymentName,
145+
BuildID: buildID,
146+
}
147+
148+
Outer:
149+
for {
150+
d, err := deploymentHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
151+
if err == nil {
152+
for _, v := range d.Info.VersionSummaries {
153+
if v.Version == version {
154+
break Outer
155+
}
156+
}
157+
}
158+
time.Sleep(time.Second)
159+
}
160+
161+
// Once it has, we will mark this version as the "current" version for the deployment.
162+
_, err := c.WorkerDeploymentClient().GetHandle(worker_versioning.DeploymentName).SetCurrentVersion(ctx,
163+
client.WorkerDeploymentSetCurrentVersionOptions{
164+
BuildID: buildID,
165+
})
166+
return err
167+
}

worker-versioning/constants.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package worker_versioning
2+
3+
const (
4+
TaskQueue = "worker-versioning-sample"
5+
DeploymentName = "my-worker-deployment"
6+
)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
worker_versioning "github.com/temporalio/samples-go/worker-versioning"
7+
8+
"go.temporal.io/sdk/client"
9+
"go.temporal.io/sdk/worker"
10+
"go.temporal.io/sdk/workflow"
11+
)
12+
13+
func main() {
14+
c, err := client.Dial(client.Options{
15+
HostPort: client.DefaultHostPort,
16+
})
17+
if err != nil {
18+
log.Fatalln("Unable to create client", err)
19+
}
20+
defer c.Close()
21+
22+
w := worker.New(c, worker_versioning.TaskQueue, worker.Options{
23+
DeploymentOptions: worker.DeploymentOptions{
24+
UseVersioning: true,
25+
Version: worker.WorkerDeploymentVersion{
26+
DeploymentName: worker_versioning.DeploymentName,
27+
BuildID: "1.1",
28+
},
29+
},
30+
})
31+
// It's important that we register all the different implementations of the workflow using
32+
// the same name. This allows us to demonstrate what would happen if you were making changes
33+
// to this workflow code over time while keeping the same workflow name/type.
34+
//
35+
// Note that in this 2.0 worker, we use version 2 of the pinned workflow.
36+
w.RegisterWorkflowWithOptions(worker_versioning.AutoUpgradingWorkflowV1b, workflow.RegisterOptions{
37+
Name: "AutoUpgradingWorkflow",
38+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
39+
})
40+
w.RegisterWorkflowWithOptions(worker_versioning.PinnedWorkflowV2, workflow.RegisterOptions{
41+
Name: "PinnedWorkflow",
42+
VersioningBehavior: workflow.VersioningBehaviorPinned,
43+
})
44+
w.RegisterActivity(worker_versioning.SomeActivity)
45+
w.RegisterActivity(worker_versioning.SomeIncompatibleActivity)
46+
47+
err = w.Run(worker.InterruptCh())
48+
if err != nil {
49+
log.Fatalf("Unable to start worker: %v", err)
50+
}
51+
}

worker-versioning/workerv1/main.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
worker_versioning "github.com/temporalio/samples-go/worker-versioning"
7+
8+
"go.temporal.io/sdk/client"
9+
"go.temporal.io/sdk/worker"
10+
"go.temporal.io/sdk/workflow"
11+
)
12+
13+
func main() {
14+
c, err := client.Dial(client.Options{
15+
HostPort: client.DefaultHostPort,
16+
})
17+
if err != nil {
18+
log.Fatalln("Unable to create client", err)
19+
}
20+
defer c.Close()
21+
22+
w := worker.New(c, worker_versioning.TaskQueue, worker.Options{
23+
DeploymentOptions: worker.DeploymentOptions{
24+
UseVersioning: true,
25+
Version: worker.WorkerDeploymentVersion{
26+
DeploymentName: worker_versioning.DeploymentName,
27+
BuildID: "1.0",
28+
},
29+
},
30+
})
31+
// It's important that we register all the different implementations of the workflow using
32+
// the same name. This allows us to demonstrate what would happen if you were making changes
33+
// to this workflow code over time while keeping the same workflow name/type.
34+
w.RegisterWorkflowWithOptions(worker_versioning.AutoUpgradingWorkflowV1, workflow.RegisterOptions{
35+
Name: "AutoUpgradingWorkflow",
36+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
37+
})
38+
w.RegisterWorkflowWithOptions(worker_versioning.PinnedWorkflowV1, workflow.RegisterOptions{
39+
Name: "PinnedWorkflow",
40+
VersioningBehavior: workflow.VersioningBehaviorPinned,
41+
})
42+
w.RegisterActivity(worker_versioning.SomeActivity)
43+
w.RegisterActivity(worker_versioning.SomeIncompatibleActivity)
44+
45+
err = w.Run(worker.InterruptCh())
46+
if err != nil {
47+
log.Fatalf("Unable to start worker: %v", err)
48+
}
49+
}

worker-versioning/workerv2/main.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
worker_versioning "github.com/temporalio/samples-go/worker-versioning"
7+
8+
"go.temporal.io/sdk/client"
9+
"go.temporal.io/sdk/worker"
10+
"go.temporal.io/sdk/workflow"
11+
)
12+
13+
func main() {
14+
c, err := client.Dial(client.Options{
15+
HostPort: client.DefaultHostPort,
16+
})
17+
if err != nil {
18+
log.Fatalln("Unable to create client", err)
19+
}
20+
defer c.Close()
21+
22+
w := worker.New(c, worker_versioning.TaskQueue, worker.Options{
23+
DeploymentOptions: worker.DeploymentOptions{
24+
UseVersioning: true,
25+
Version: worker.WorkerDeploymentVersion{
26+
DeploymentName: worker_versioning.DeploymentName,
27+
BuildID: "2.0",
28+
},
29+
},
30+
})
31+
// It's important that we register all the different implementations of the workflow using
32+
// the same name. This allows us to demonstrate what would happen if you were making changes
33+
// to this workflow code over time while keeping the same workflow name/type.
34+
//
35+
// Note that in this 1.1 worker, we use version 1b instead of 1 of the auto-upgrading workflow.
36+
w.RegisterWorkflowWithOptions(worker_versioning.AutoUpgradingWorkflowV1b, workflow.RegisterOptions{
37+
Name: "AutoUpgradingWorkflow",
38+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
39+
})
40+
w.RegisterWorkflowWithOptions(worker_versioning.PinnedWorkflowV2, workflow.RegisterOptions{
41+
Name: "PinnedWorkflow",
42+
VersioningBehavior: workflow.VersioningBehaviorPinned,
43+
})
44+
w.RegisterActivity(worker_versioning.SomeActivity)
45+
w.RegisterActivity(worker_versioning.SomeIncompatibleActivity)
46+
47+
err = w.Run(worker.InterruptCh())
48+
if err != nil {
49+
log.Fatalf("Unable to start worker: %v", err)
50+
}
51+
}

0 commit comments

Comments
 (0)