Skip to content

Commit 71892aa

Browse files
committed
Wait until workflows are finished for scale sample
1 parent c6ea78b commit 71892aa

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

samples/scale/starter/main.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package main
33
import (
44
"context"
55
"log"
6+
"sync"
7+
"time"
68

79
"github.com/cschleiden/go-workflows/backend/mysql"
810
"github.com/cschleiden/go-workflows/client"
@@ -14,23 +16,37 @@ func main() {
1416
ctx := context.Background()
1517

1618
//b := sqlite.NewSqliteBackend("../scale.sqlite")
17-
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
19+
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "scale")
1820

1921
// Start workflow via client
2022
c := client.New(b)
2123

24+
wg := &sync.WaitGroup{}
25+
26+
now := time.Now()
27+
2228
for i := 0; i < 100; i++ {
23-
startWorkflow(ctx, c)
29+
wg.Add(1)
30+
31+
go startWorkflow(ctx, c, wg)
2432
}
33+
34+
wg.Wait()
35+
36+
log.Println("Finished in", time.Since(now))
2537
}
2638

27-
func startWorkflow(ctx context.Context, c client.Client) {
39+
func startWorkflow(ctx context.Context, c client.Client, wg *sync.WaitGroup) {
2840
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
2941
InstanceID: uuid.NewString(),
3042
}, scale.Workflow1, "Hello world "+uuid.NewString())
3143
if err != nil {
3244
panic("could not start workflow")
3345
}
3446

35-
log.Println("Started workflow", wf.GetInstanceID())
47+
// log.Println("Started workflow", wf.GetInstanceID())
48+
49+
c.WaitForWorkflowInstance(ctx, wf, time.Second*30)
50+
51+
wg.Done()
3652
}

samples/scale/worker/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"log"
66
"os"
77
"os/signal"
8-
"time"
98

109
"github.com/cschleiden/go-workflows/backend"
1110
"github.com/cschleiden/go-workflows/backend/mysql"
@@ -17,7 +16,7 @@ func main() {
1716
ctx, cancel := context.WithCancel(context.Background())
1817

1918
//b := sqlite.NewSqliteBackend("../scale.sqlite?_busy_timeout=10000")
20-
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "simple")
19+
b := mysql.NewMysqlBackend("localhost", 3306, "root", "SqlPassw0rd", "scale")
2120

2221
// Run worker
2322
go RunWorker(ctx, b)
@@ -28,11 +27,16 @@ func main() {
2827

2928
log.Println("Shutting down")
3029
cancel()
31-
time.Sleep(5 * time.Second)
3230
}
3331

3432
func RunWorker(ctx context.Context, mb backend.Backend) {
35-
w := worker.New(mb, nil)
33+
w := worker.New(mb, &worker.Options{
34+
WorkflowPollers: 2,
35+
MaxParallelWorkflowTasks: 100,
36+
ActivityPollers: 2,
37+
MaxParallelActivityTasks: 100,
38+
HeartbeatWorkflowTasks: false,
39+
})
3640

3741
w.RegisterWorkflow(scale.Workflow1)
3842

0 commit comments

Comments
 (0)