|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "database/sql" |
| 6 | + "flag" |
| 7 | + "fmt" |
| 8 | + "log" |
| 9 | + "os" |
| 10 | + "sync" |
| 11 | + "time" |
| 12 | + |
| 13 | + "github.com/cschleiden/go-workflows/backend" |
| 14 | + "github.com/cschleiden/go-workflows/backend/mysql" |
| 15 | + "github.com/cschleiden/go-workflows/backend/redis" |
| 16 | + "github.com/cschleiden/go-workflows/backend/sqlite" |
| 17 | + "github.com/cschleiden/go-workflows/client" |
| 18 | + "github.com/cschleiden/go-workflows/worker" |
| 19 | + redisv8 "github.com/go-redis/redis/v8" |
| 20 | +) |
| 21 | + |
| 22 | +var b = flag.String("backend", "redis", "Backend to use. Supported backends are:\n- redis\n- mysql\n- sqlite\n") |
| 23 | +var timeout = flag.Duration("timeout", time.Second*30, "Timeout for the benchmark run") |
| 24 | +var scenario = flag.String("scenario", "basic", "Scenario to run. Support scenarios are:\n- basic\n") |
| 25 | +var runs = flag.Int("runs", 1, "Number of root workflows to start") |
| 26 | +var depth = flag.Int("depth", 2, "Depth of mid workflows") |
| 27 | +var fanOut = flag.Int("fanout", 2, "Number of child workflows to execute per root/mid workflow") |
| 28 | +var leafFanOut = flag.Int("leaffanout", 2, "Number of leaf workflows to execute per mid workflow") |
| 29 | +var activities = flag.Int("activities", 2, "Number of activities to execute per leaf workflow") |
| 30 | +var resultSize = flag.Int("resultsize", 100, "Size of activity result payload in bytes") |
| 31 | +var format = flag.String("format", "text", "Output format. Supported formats are:\n- text\n- csv\n") |
| 32 | +var cacheSize = flag.Int("cachesize", 128, "Size of the workflow executor cache") |
| 33 | + |
| 34 | +func main() { |
| 35 | + flag.Parse() |
| 36 | + |
| 37 | + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(*timeout).Add(time.Second*5)) |
| 38 | + defer cancel() |
| 39 | + |
| 40 | + mm := newMemMetrics() |
| 41 | + ba := getBackend(*b, backend.WithLogger(&nullLogger{}), backend.WithMetrics(mm)) |
| 42 | + |
| 43 | + wo := worker.DefaultWorkerOptions |
| 44 | + wo.WorkflowExecutorCacheSize = *cacheSize |
| 45 | + w := worker.New(ba, &wo) |
| 46 | + |
| 47 | + w.RegisterWorkflow(Root) |
| 48 | + w.RegisterWorkflow(Mid) |
| 49 | + w.RegisterWorkflow(Leaf) |
| 50 | + w.RegisterActivity(Activity) |
| 51 | + |
| 52 | + if err := w.Start(ctx); err != nil { |
| 53 | + panic(err) |
| 54 | + } |
| 55 | + |
| 56 | + c := client.New(ba) |
| 57 | + |
| 58 | + start := time.Now() |
| 59 | + wg := sync.WaitGroup{} |
| 60 | + for i := 0; i < *runs; i++ { |
| 61 | + i, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{ |
| 62 | + InstanceID: fmt.Sprintf("root-%d", i), |
| 63 | + }, Root, &MidInput{ |
| 64 | + FanOut: *fanOut, |
| 65 | + Depth: *depth, |
| 66 | + |
| 67 | + LeafFanOut: *leafFanOut, |
| 68 | + |
| 69 | + Activities: *activities, |
| 70 | + PayloadSizeBytes: *resultSize, |
| 71 | + }) |
| 72 | + if err != nil { |
| 73 | + panic(err) |
| 74 | + } |
| 75 | + |
| 76 | + wg.Add(1) |
| 77 | + go func() { |
| 78 | + defer wg.Done() |
| 79 | + err := c.WaitForWorkflowInstance(ctx, i, *timeout) |
| 80 | + if err != nil { |
| 81 | + panic(fmt.Errorf("Workflow instance %s failed: %w", i.InstanceID, err)) |
| 82 | + } |
| 83 | + }() |
| 84 | + } |
| 85 | + |
| 86 | + wg.Wait() |
| 87 | + |
| 88 | + end := time.Now() |
| 89 | + |
| 90 | + switch *format { |
| 91 | + case "text": |
| 92 | + log.Println("Ran", *runs, "root workflows in", end.Sub(start).Seconds(), "seconds") |
| 93 | + mm.Print() |
| 94 | + |
| 95 | + case "csv": |
| 96 | + fmt.Printf( |
| 97 | + "%s,%v,%s,%d,%d,%d,%d,%d,%d\n", |
| 98 | + *b, end.Sub(start).Seconds(), *scenario, *runs, *depth, *fanOut, *leafFanOut, *activities, *resultSize) |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +func getBackend(b string, opt ...backend.BackendOption) backend.Backend { |
| 103 | + switch b { |
| 104 | + case "memory": |
| 105 | + return sqlite.NewInMemoryBackend(opt...) |
| 106 | + |
| 107 | + case "sqlite": |
| 108 | + os.Remove("bench.sqlite") |
| 109 | + |
| 110 | + return sqlite.NewSqliteBackend("bench.sqlite", opt...) |
| 111 | + |
| 112 | + case "mysql": |
| 113 | + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", "root", "root")) |
| 114 | + if err != nil { |
| 115 | + panic(err) |
| 116 | + } |
| 117 | + |
| 118 | + if _, err := db.Exec("DROP DATABASE IF EXISTS bench"); err != nil { |
| 119 | + panic(fmt.Errorf("dropping database: %w", err)) |
| 120 | + } |
| 121 | + |
| 122 | + if _, err := db.Exec("CREATE DATABASE bench"); err != nil { |
| 123 | + panic(fmt.Errorf("creating database: %w", err)) |
| 124 | + } |
| 125 | + |
| 126 | + if err := db.Close(); err != nil { |
| 127 | + panic(err) |
| 128 | + } |
| 129 | + |
| 130 | + return mysql.NewMysqlBackend("localhost", 3306, "root", "root", "bench", opt...) |
| 131 | + |
| 132 | + case "redis": |
| 133 | + rclient := redisv8.NewUniversalClient(&redisv8.UniversalOptions{ |
| 134 | + Addrs: []string{"localhost:6379"}, |
| 135 | + Username: "", |
| 136 | + Password: "RedisPassw0rd", |
| 137 | + DB: 0, |
| 138 | + WriteTimeout: time.Second * 30, |
| 139 | + ReadTimeout: time.Second * 30, |
| 140 | + }) |
| 141 | + |
| 142 | + rclient.FlushAll(context.Background()).Result() |
| 143 | + |
| 144 | + b, err := redis.NewRedisBackend(rclient, redis.WithBackendOptions(opt...)) |
| 145 | + if err != nil { |
| 146 | + panic(err) |
| 147 | + } |
| 148 | + |
| 149 | + return b |
| 150 | + |
| 151 | + default: |
| 152 | + panic("unknown backend " + b) |
| 153 | + } |
| 154 | +} |
0 commit comments