Skip to content

Commit 11bad76

Browse files
committed
Add activities and operations sample folders with self-contained workflows
Signed-off-by: Diana Zawadzki <[email protected]>
1 parent 89adef8 commit 11bad76

File tree

8 files changed

+602
-0
lines changed

8 files changed

+602
-0
lines changed

new_samples/activities/README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Activity Samples
2+
3+
This folder contains samples demonstrating different activity patterns in Cadence.
4+
5+
## Samples Included
6+
7+
### 1. Dynamic Workflow
8+
Demonstrates calling activities by string name instead of function reference.
9+
10+
### 2. Parallel Pick First Workflow
11+
Demonstrates running multiple activities in parallel and using the first result.
12+
13+
## Prerequisites
14+
15+
1. Run the Cadence server (see [main README](../../README.md))
16+
2. Register the `cadence-samples` domain:
17+
18+
```bash
19+
cadence --env development --domain cadence-samples domain register
20+
```
21+
22+
## Steps to Run
23+
24+
### Start the Worker
25+
26+
```bash
27+
go run .
28+
```
29+
30+
This worker handles both workflow types.
31+
32+
---
33+
34+
## Dynamic Workflow
35+
36+
Invoke activities by string name for plugin systems or configuration-driven workflows.
37+
38+
### Start the Workflow
39+
40+
```bash
41+
cadence --env development \
42+
--domain cadence-samples \
43+
workflow start \
44+
--workflow_type cadence_samples.DynamicWorkflow \
45+
--tl cadence-samples-worker \
46+
--et 60 \
47+
--input '{"message":"Cadence"}'
48+
```
49+
50+
### Key Concept
51+
52+
```go
53+
// Instead of: workflow.ExecuteActivity(ctx, MyActivity, input)
54+
// Use: workflow.ExecuteActivity(ctx, "activity.name.string", input)
55+
```
56+
57+
---
58+
59+
## Parallel Pick First Workflow
60+
61+
Run multiple activities in parallel, use the first result, cancel the rest.
62+
63+
### Start the Workflow
64+
65+
```bash
66+
cadence --env development \
67+
--domain cadence-samples \
68+
workflow start \
69+
--workflow_type cadence_samples.ParallelBranchPickFirstWorkflow \
70+
--tl cadence-samples-worker \
71+
--et 60 \
72+
--input '{}'
73+
```
74+
75+
### Key Concept
76+
77+
```go
78+
selector := workflow.NewSelector(ctx)
79+
selector.AddFuture(f1, handler1)
80+
selector.AddFuture(f2, handler2)
81+
selector.Select(ctx) // Blocks until one completes
82+
cancelHandler() // Cancel the rest
83+
```
84+
85+
---
86+
87+
## View Your Workflows
88+
89+
Open [localhost:8088](http://localhost:8088) and click on `cadence-samples` domain.
90+
91+
## References
92+
93+
- [Cadence Documentation](https://cadenceworkflow.io)
94+
- [Activity Basics](https://cadenceworkflow.io/docs/concepts/activities/)
95+

new_samples/activities/main.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
)
9+
10+
func main() {
11+
StartWorker()
12+
13+
done := make(chan os.Signal, 1)
14+
signal.Notify(done, syscall.SIGINT)
15+
fmt.Println("Cadence worker started, press ctrl+c to terminate...")
16+
<-done
17+
}
18+

new_samples/activities/worker.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package main
2+
3+
import (
4+
"github.com/uber-go/tally"
5+
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
6+
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
7+
"go.uber.org/cadence/activity"
8+
"go.uber.org/cadence/compatibility"
9+
"go.uber.org/cadence/worker"
10+
"go.uber.org/cadence/workflow"
11+
"go.uber.org/yarpc"
12+
"go.uber.org/yarpc/peer"
13+
yarpchostport "go.uber.org/yarpc/peer/hostport"
14+
"go.uber.org/yarpc/transport/grpc"
15+
"go.uber.org/zap"
16+
"go.uber.org/zap/zapcore"
17+
)
18+
19+
const (
20+
HostPort = "127.0.0.1:7833"
21+
Domain = "cadence-samples"
22+
TaskListName = "cadence-samples-worker"
23+
ClientName = "cadence-samples-worker"
24+
CadenceService = "cadence-frontend"
25+
)
26+
27+
func StartWorker() {
28+
logger, cadenceClient := BuildLogger(), BuildCadenceClient()
29+
workerOptions := worker.Options{
30+
Logger: logger,
31+
MetricsScope: tally.NewTestScope(TaskListName, nil),
32+
}
33+
34+
w := worker.New(
35+
cadenceClient,
36+
Domain,
37+
TaskListName,
38+
workerOptions)
39+
40+
// Dynamic workflow registration
41+
w.RegisterWorkflowWithOptions(DynamicWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DynamicWorkflow"})
42+
w.RegisterActivityWithOptions(DynamicGreetingActivity, activity.RegisterOptions{Name: DynamicGreetingActivityName})
43+
44+
// Parallel pick first workflow registration
45+
w.RegisterWorkflowWithOptions(ParallelBranchPickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ParallelBranchPickFirstWorkflow"})
46+
w.RegisterActivityWithOptions(ParallelActivity, activity.RegisterOptions{Name: "cadence_samples.ParallelActivity"})
47+
48+
err := w.Start()
49+
if err != nil {
50+
panic("Failed to start worker: " + err.Error())
51+
}
52+
logger.Info("Started Worker.", zap.String("worker", TaskListName))
53+
}
54+
55+
func BuildCadenceClient(dialOptions ...grpc.DialOption) workflowserviceclient.Interface {
56+
grpcTransport := grpc.NewTransport()
57+
myChooser := peer.NewSingle(
58+
yarpchostport.Identify(HostPort),
59+
grpcTransport.NewDialer(dialOptions...),
60+
)
61+
outbound := grpcTransport.NewOutbound(myChooser)
62+
63+
dispatcher := yarpc.NewDispatcher(yarpc.Config{
64+
Name: ClientName,
65+
Outbounds: yarpc.Outbounds{
66+
CadenceService: {Unary: outbound},
67+
},
68+
})
69+
if err := dispatcher.Start(); err != nil {
70+
panic("Failed to start dispatcher: " + err.Error())
71+
}
72+
73+
clientConfig := dispatcher.ClientConfig(CadenceService)
74+
75+
return compatibility.NewThrift2ProtoAdapter(
76+
apiv1.NewDomainAPIYARPCClient(clientConfig),
77+
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
78+
apiv1.NewWorkerAPIYARPCClient(clientConfig),
79+
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
80+
)
81+
}
82+
83+
func BuildLogger() *zap.Logger {
84+
config := zap.NewDevelopmentConfig()
85+
config.Level.SetLevel(zapcore.InfoLevel)
86+
87+
var err error
88+
logger, err := config.Build()
89+
if err != nil {
90+
panic("Failed to setup logger: " + err.Error())
91+
}
92+
93+
return logger
94+
}
95+

new_samples/activities/workflow.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.uber.org/cadence/activity"
8+
"go.uber.org/cadence/workflow"
9+
"go.uber.org/zap"
10+
)
11+
12+
// =============================================================================
13+
// Dynamic Workflow
14+
// =============================================================================
15+
16+
// DynamicGreetingActivityName is the registered name for the activity.
17+
// This demonstrates how to invoke activities by string name rather than function reference.
18+
const DynamicGreetingActivityName = "cadence_samples.DynamicGreetingActivity"
19+
20+
type dynamicWorkflowInput struct {
21+
Message string `json:"message"`
22+
}
23+
24+
// DynamicWorkflow demonstrates calling activities using string names for dynamic behavior.
25+
// Instead of passing the function directly to ExecuteActivity, we pass the activity name.
26+
// This is useful for plugin systems or configuration-driven workflows.
27+
func DynamicWorkflow(ctx workflow.Context, input dynamicWorkflowInput) (string, error) {
28+
ao := workflow.ActivityOptions{
29+
ScheduleToStartTimeout: time.Minute,
30+
StartToCloseTimeout: time.Minute,
31+
}
32+
ctx = workflow.WithActivityOptions(ctx, ao)
33+
34+
logger := workflow.GetLogger(ctx)
35+
logger.Info("DynamicWorkflow started")
36+
37+
var greetingMsg string
38+
// Note: We pass the activity NAME (string) instead of the function reference
39+
err := workflow.ExecuteActivity(ctx, DynamicGreetingActivityName, input.Message).Get(ctx, &greetingMsg)
40+
if err != nil {
41+
logger.Error("DynamicGreetingActivity failed", zap.Error(err))
42+
return "", err
43+
}
44+
45+
logger.Info("Workflow result", zap.String("greeting", greetingMsg))
46+
return greetingMsg, nil
47+
}
48+
49+
// DynamicGreetingActivity is a simple activity that returns a greeting message.
50+
func DynamicGreetingActivity(ctx context.Context, message string) (string, error) {
51+
logger := activity.GetLogger(ctx)
52+
logger.Info("DynamicGreetingActivity started.")
53+
return "Hello, " + message, nil
54+
}
55+
56+
// =============================================================================
57+
// Parallel Pick First Workflow
58+
// =============================================================================
59+
60+
type parallelBranchInput struct {
61+
Message string `json:"message"`
62+
}
63+
64+
// ParallelBranchPickFirstWorkflow demonstrates running multiple activities in parallel
65+
// and using the result from whichever completes first.
66+
func ParallelBranchPickFirstWorkflow(ctx workflow.Context) (string, error) {
67+
logger := workflow.GetLogger(ctx)
68+
logger.Info("ParallelBranchPickFirstWorkflow started")
69+
70+
selector := workflow.NewSelector(ctx)
71+
var firstResp string
72+
73+
// Use a cancel handler to cancel all remaining activities once one completes
74+
childCtx, cancelHandler := workflow.WithCancel(ctx)
75+
ao := workflow.ActivityOptions{
76+
ScheduleToStartTimeout: time.Minute,
77+
StartToCloseTimeout: time.Minute,
78+
HeartbeatTimeout: time.Second * 20,
79+
WaitForCancellation: true,
80+
}
81+
childCtx = workflow.WithActivityOptions(childCtx, ao)
82+
83+
// Run two activities in parallel with different delays
84+
f1 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "first activity"}, time.Second*10)
85+
f2 := workflow.ExecuteActivity(childCtx, ParallelActivity, parallelBranchInput{Message: "second activity"}, time.Second*2)
86+
pendingFutures := []workflow.Future{f1, f2}
87+
88+
selector.AddFuture(f1, func(f workflow.Future) {
89+
f.Get(ctx, &firstResp)
90+
}).AddFuture(f2, func(f workflow.Future) {
91+
f.Get(ctx, &firstResp)
92+
})
93+
94+
// Wait for any of the futures to complete
95+
selector.Select(ctx)
96+
97+
// Cancel all other pending activities
98+
cancelHandler()
99+
100+
// Wait for pending activities to finish cancellation
101+
for _, f := range pendingFutures {
102+
err := f.Get(ctx, &firstResp)
103+
if err != nil {
104+
return "", err
105+
}
106+
}
107+
108+
logger.Info("ParallelBranchPickFirstWorkflow completed")
109+
return firstResp, nil
110+
}
111+
112+
// ParallelActivity is a simple activity that returns a greeting.
113+
func ParallelActivity(ctx context.Context, input parallelBranchInput) (string, error) {
114+
logger := activity.GetLogger(ctx)
115+
logger.Info("ParallelActivity started")
116+
return "Hello " + input.Message, nil
117+
}
118+

0 commit comments

Comments
 (0)