Skip to content

Commit b4854e2

Browse files
committed
Add sample for different queues
1 parent fae4358 commit b4854e2

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

samples/queues/queues.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"log/slog"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/client"
11+
"github.com/cschleiden/go-workflows/samples"
12+
"github.com/cschleiden/go-workflows/worker"
13+
"github.com/cschleiden/go-workflows/workflow"
14+
15+
"github.com/google/uuid"
16+
)
17+
18+
var CustomActivityQueue = workflow.Queue("custom-activity-queue")
19+
20+
func main() {
21+
ctx, cancel := context.WithCancel(context.Background())
22+
23+
b := samples.GetBackend("queues", backend.WithLogger(slog.Default()))
24+
25+
// Run worker
26+
w := RunDefaultWorker(ctx, b)
27+
28+
w.RegisterWorkflow(Workflow1)
29+
w.RegisterActivity(Activity1)
30+
31+
// This worker won't actually execute Activity2, but it still needs to be aware of its signature
32+
// since the workflow processed by this worker will schedule it.
33+
w.RegisterActivity(Activity2)
34+
35+
activityWorker := worker.NewActivityWorker(b, &worker.ActivityWorkerOptions{
36+
ActivityPollers: 1,
37+
MaxParallelActivityTasks: 1,
38+
ActivityQueues: []workflow.Queue{CustomActivityQueue},
39+
})
40+
41+
activityWorker.RegisterActivity(Activity2)
42+
43+
activityWorker.Start(ctx)
44+
45+
if err := w.Start(ctx); err != nil {
46+
panic("could not start worker")
47+
}
48+
49+
// Start workflow via client
50+
c := client.New(b)
51+
52+
runWorkflow(ctx, c)
53+
54+
cancel()
55+
56+
if err := w.WaitForCompletion(); err != nil {
57+
panic("could not stop worker" + err.Error())
58+
}
59+
60+
if err := activityWorker.WaitForCompletion(); err != nil {
61+
panic("could not stop activity worker" + err.Error())
62+
}
63+
}
64+
65+
func runWorkflow(ctx context.Context, c *client.Client) {
66+
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
67+
InstanceID: uuid.NewString(),
68+
}, Workflow1, "Hello world"+uuid.NewString(), 42, Inputs{
69+
Msg: "",
70+
Times: 0,
71+
})
72+
if err != nil {
73+
log.Fatal(err)
74+
panic("could not start workflow")
75+
}
76+
77+
result, err := client.GetWorkflowResult[int](ctx, c, wf, time.Second*10)
78+
if err != nil {
79+
log.Fatal(err)
80+
}
81+
82+
log.Println("Workflow finished. Result:", result)
83+
}
84+
85+
func RunDefaultWorker(ctx context.Context, mb backend.Backend) *worker.Worker {
86+
w := worker.New(mb, nil)
87+
88+
w.RegisterWorkflow(Workflow1)
89+
90+
w.RegisterActivity(Activity1)
91+
92+
if err := w.Start(ctx); err != nil {
93+
panic("could not start worker")
94+
}
95+
96+
return w
97+
}

samples/queues/workflow.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"github.com/cschleiden/go-workflows/activity"
7+
"github.com/cschleiden/go-workflows/workflow"
8+
)
9+
10+
type Inputs struct {
11+
Msg string
12+
Times int
13+
}
14+
15+
func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int, error) {
16+
logger := workflow.Logger(ctx)
17+
logger.Info("Entering Workflow1", "msg", msg, "times", times, "inputs", inputs)
18+
defer logger.Info("Leaving Workflow1")
19+
20+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
21+
if err != nil {
22+
panic("error getting activity 1 result")
23+
}
24+
logger.Info("R1 result", "r1", r1)
25+
26+
// Queue activity to separate queue
27+
r2, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
28+
Queue: CustomActivityQueue,
29+
}, Activity2).Get(ctx)
30+
if err != nil {
31+
panic("error getting activity 2 result")
32+
}
33+
logger.Info("R2 result", "r2", r2)
34+
35+
return r1 + r2, nil
36+
}
37+
38+
func Activity1(ctx context.Context, a, b int) (int, error) {
39+
logger := activity.Logger(ctx)
40+
logger.Info("Entering Activity1")
41+
defer logger.Info("Leaving Activity1")
42+
43+
// time.Sleep(5 * time.Second)
44+
45+
return a + b, nil
46+
}
47+
48+
func Activity2(ctx context.Context) (int, error) {
49+
logger := activity.Logger(ctx)
50+
logger.Info("Entering Activity2")
51+
defer logger.Info("Leaving Activity2")
52+
53+
// time.Sleep(1 * time.Second)
54+
55+
return 12, nil
56+
}

0 commit comments

Comments
 (0)