Skip to content

Commit c4b8773

Browse files
authored
Merge pull request #63 from cschleiden/web-diagnostics
Add basic web ui to look at workflows and histories
2 parents 6126040 + 0e8e585 commit c4b8773

38 files changed

+29679
-28
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
*.sqlite
22
*.sqlite-journal
33
vendor
4-
plugin.so
4+
plugin.so
5+
6+
web/app/node_modules

.vscode/launch.json

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,45 +24,52 @@
2424
"type": "go",
2525
"request": "launch",
2626
"mode": "debug",
27-
"program": "${workspaceFolder}/samples/concurrent/concurrent.go"
27+
"program": "${workspaceFolder}/samples/concurrent"
2828
},
2929
{
3030
"name": "Launch timer sample",
3131
"type": "go",
3232
"request": "launch",
3333
"mode": "debug",
34-
"program": "${workspaceFolder}/samples/timer/timer.go"
34+
"program": "${workspaceFolder}/samples/timer"
3535
},
3636
{
3737
"name": "Launch signal sample",
3838
"type": "go",
3939
"request": "launch",
4040
"mode": "debug",
41-
"program": "${workspaceFolder}/samples/signal/signal.go"
41+
"program": "${workspaceFolder}/samples/signal"
4242
},
4343
{
4444
"name": "Launch cancellation sample",
4545
"type": "go",
4646
"request": "launch",
4747
"mode": "debug",
48-
"program": "${workspaceFolder}/samples/cancellation/cancellation.go"
48+
"program": "${workspaceFolder}/samples/cancellation"
4949
},
5050
{
5151
"name": "Launch subworkflow sample",
5252
"type": "go",
5353
"request": "launch",
5454
"mode": "debug",
55-
"program": "${workspaceFolder}/samples/subworkflow/subworkflow.go"
55+
"program": "${workspaceFolder}/samples/subworkflow"
5656
},
5757
{
5858
"name": "Launch scale/worker sample",
5959
"type": "go",
6060
"request": "launch",
6161
"mode": "debug",
62-
"program": "${workspaceFolder}/samples/scale/worker/main.go",
62+
"program": "${workspaceFolder}/samples/scale/worker",
6363
"args": [
6464
"-backend", "mysql",
6565
]
6666
},
67+
{
68+
"name": "Launch web sample",
69+
"type": "go",
70+
"request": "launch",
71+
"mode": "debug",
72+
"program": "${workspaceFolder}/samples/web"
73+
},
6774
]
6875
}

README.md

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ While there are implementations for other lanuages in the context of Azure Durab
116116

117117
The execution model for `go-workflows` follows closely the one created for Uber's [Cadence](https://cadenceworkflow.io) and which was then forked by the original creators for [Temporal.io](https://temporal.io).
118118

119-
TODO: describe in more detail here.
119+
See https://cschleiden.dev/blog/2022-05-02-go-workflows-part2/ for some more details.
120120

121121
### Supported backends
122122

@@ -513,7 +513,38 @@ logger := activity.Logger(ctx)
513513
514514
The returned `logger` implements the `Logger` interface, and already has the id of the activity, and the workflow instance and execution IDs set as default fields.
515515
516-
## Workflow Versioning
516+
517+
## Tools
518+
519+
### Analyzer
520+
521+
`/analyzer` contains a simple [golangci-lint](https://github.com/golangci/golangci-lint) based analyzer to spot common issues in workflow code.
522+
523+
### Diagnostics Web UI
524+
525+
For investigating workflows, the package includes a simple diagnostic web UI. You can serve it via:
526+
527+
```go
528+
m := http.NewServeMux()
529+
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(b)))
530+
go http.ListenAndServe(":3000", m)
531+
```
532+
533+
It provides a simple paginated list of workflow instances:
534+
535+
<img src="./docs/diag-list.png" width="700">
536+
537+
And a way to inspect the history of a workflow instance:
538+
539+
<img src="./docs/diag-details.png" width="700">
540+
541+
## FAQ
542+
543+
### How are releases versioned?
544+
545+
For now this library is in a pre-release state. There are no guarantees given regarding breaking changes between (pre)-releases.
546+
547+
### Workflow versioning
517548
518549
For now, I've intentionally left out versioning. Cadence, Temporal, and DTFx all support the concept of versions for workflows as well as activities. This is mostly required when you make changes to workflows and need to keep backwards compatibility with workflows that are being executed at the time of the upgrade.
519550

@@ -574,12 +605,6 @@ and only if a workflow instance was created with a version of `>= 2` will `Activ
574605

575606
This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on **side-by-side** deployments. See also Azure's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-versioning) documentation for the same topic.
576607
577-
## `ContinueAsNew`
608+
### `ContinueAsNew`
578609
579-
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).
580-
581-
## FAQ
582-
583-
### How are releases versioned?
584-
585-
For now this library is in a pre-release state. There are no guarantees given regarding breaking changes between (pre)-releases.
610+
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).

backend/redis/diagnostics.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/diag"
9+
"github.com/go-redis/redis/v8"
10+
)
11+
12+
var _ diag.Backend = (*redisBackend)(nil)
13+
14+
func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID string, count int) ([]*diag.WorkflowInstanceRef, error) {
15+
max := "+inf"
16+
17+
if afterInstanceID != "" {
18+
scores, err := rb.rdb.ZMScore(ctx, instancesByCreation(), afterInstanceID).Result()
19+
if err != nil {
20+
return nil, fmt.Errorf("getting instance score for %v: %w", afterInstanceID, err)
21+
}
22+
23+
if len(scores) == 0 {
24+
rb.Logger().Error("could not find instance %v", "afterInstanceID", afterInstanceID)
25+
return nil, nil
26+
}
27+
28+
max = fmt.Sprintf("(%v", int64(scores[0]))
29+
}
30+
31+
result, err := rb.rdb.ZRangeArgs(ctx, redis.ZRangeArgs{
32+
Key: instancesByCreation(),
33+
Stop: max,
34+
Start: "-inf",
35+
ByScore: true,
36+
Rev: true,
37+
Count: int64(count),
38+
}).Result()
39+
if err != nil {
40+
return nil, fmt.Errorf("getting instances after %v: %w", max, err)
41+
}
42+
43+
instanceIDs := make([]string, 0)
44+
for _, r := range result {
45+
instanceID := r
46+
instanceIDs = append(instanceIDs, instanceKey(instanceID))
47+
}
48+
49+
instances, err := rb.rdb.MGet(ctx, instanceIDs...).Result()
50+
if err != nil {
51+
return nil, fmt.Errorf("getting instances: %w", err)
52+
}
53+
54+
var instanceRefs []*diag.WorkflowInstanceRef
55+
for _, instance := range instances {
56+
var state instanceState
57+
if err := json.Unmarshal([]byte(instance.(string)), &state); err != nil {
58+
return nil, fmt.Errorf("unmarshaling instance state: %w", err)
59+
}
60+
61+
instanceRefs = append(instanceRefs, &diag.WorkflowInstanceRef{
62+
Instance: state.Instance,
63+
CreatedAt: state.CreatedAt,
64+
CompletedAt: state.CompletedAt,
65+
State: state.State,
66+
})
67+
}
68+
69+
return instanceRefs, nil
70+
}
71+
72+
func (rb *redisBackend) GetWorkflowInstance(ctx context.Context, instanceID string) (*diag.WorkflowInstanceRef, error) {
73+
instance, err := readInstance(ctx, rb.rdb, instanceID)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
return &diag.WorkflowInstanceRef{
79+
Instance: instance.Instance,
80+
CreatedAt: instance.CreatedAt,
81+
CompletedAt: instance.CompletedAt,
82+
State: instance.State,
83+
}, nil
84+
}

backend/redis/instance.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ type instanceState struct {
119119
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *core.WorkflowInstance, ignoreDuplicate bool) error {
120120
key := instanceKey(instance.InstanceID)
121121

122+
createdAt := time.Now()
123+
122124
b, err := json.Marshal(&instanceState{
123125
Instance: instance,
124126
State: backend.WorkflowStateActive,
125-
CreatedAt: time.Now(),
127+
CreatedAt: createdAt,
126128
})
127129
if err != nil {
128130
return fmt.Errorf("marshaling instance state: %w", err)
@@ -148,6 +150,13 @@ func createInstance(ctx context.Context, rdb redis.UniversalClient, instance *co
148150
}
149151
}
150152

153+
if err := rdb.ZAdd(ctx, instancesByCreation(), &redis.Z{
154+
Member: instance.InstanceID,
155+
Score: float64(createdAt.UnixMilli()),
156+
}).Err(); err != nil {
157+
return fmt.Errorf("storing instance reference: %w", err)
158+
}
159+
151160
return nil
152161
}
153162

@@ -164,6 +173,8 @@ func updateInstance(ctx context.Context, rdb redis.UniversalClient, instanceID s
164173
return fmt.Errorf("updating instance: %w", err)
165174
}
166175

176+
// CreatedAt does not change, so skip updating the instancesByCreation() ZSET
177+
167178
return nil
168179
}
169180

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ func instanceKey(instanceID string) string {
88
return fmt.Sprintf("instance:%v", instanceID)
99
}
1010

11+
func instancesByCreation() string {
12+
return "instances-by-creation"
13+
}
14+
1115
func subInstanceKey(instanceID string) string {
1216
return fmt.Sprintf("sub-instance:%v", instanceID)
1317
}

backend/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func WithBackendOptions(opts ...backend.BackendOption) RedisBackendOption {
3434
}
3535
}
3636

37-
func NewRedisBackend(address, username, password string, db int, opts ...RedisBackendOption) (backend.Backend, error) {
37+
func NewRedisBackend(address, username, password string, db int, opts ...RedisBackendOption) (*redisBackend, error) {
3838
client := redis.NewUniversalClient(&redis.UniversalOptions{
3939
Addrs: []string{address},
4040
Username: username,

backend/sqlite/diagnostics.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package sqlite
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/diag"
10+
"github.com/cschleiden/go-workflows/internal/core"
11+
)
12+
13+
var _ diag.Backend = (*sqliteBackend)(nil)
14+
15+
func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID string, count int) ([]*diag.WorkflowInstanceRef, error) {
16+
var err error
17+
tx, err := sb.db.BeginTx(ctx, nil)
18+
if err != nil {
19+
return nil, err
20+
}
21+
defer tx.Rollback()
22+
23+
var rows *sql.Rows
24+
if afterInstanceID != "" {
25+
rows, err = tx.QueryContext(
26+
ctx,
27+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
28+
FROM instances i
29+
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ?) ii
30+
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
31+
ORDER BY i.created_at DESC, i.id DESC
32+
LIMIT ?`,
33+
afterInstanceID,
34+
count,
35+
)
36+
} else {
37+
rows, err = tx.QueryContext(
38+
ctx,
39+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
40+
FROM instances i
41+
ORDER BY i.created_at DESC, i.id DESC
42+
LIMIT ?`,
43+
count,
44+
)
45+
}
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
var instances []*diag.WorkflowInstanceRef
51+
52+
for rows.Next() {
53+
var id, executionID string
54+
var createdAt time.Time
55+
var completedAt *time.Time
56+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
var state backend.WorkflowState
62+
if completedAt != nil {
63+
state = backend.WorkflowStateFinished
64+
}
65+
66+
instances = append(instances, &diag.WorkflowInstanceRef{
67+
Instance: core.NewWorkflowInstance(id, executionID),
68+
CreatedAt: createdAt,
69+
CompletedAt: completedAt,
70+
State: state,
71+
})
72+
}
73+
74+
return instances, nil
75+
}
76+
77+
func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instanceID string) (*diag.WorkflowInstanceRef, error) {
78+
tx, err := sb.db.BeginTx(ctx, nil)
79+
if err != nil {
80+
return nil, err
81+
}
82+
defer tx.Rollback()
83+
84+
res := tx.QueryRowContext(ctx, "SELECT id, execution_id, created_at, completed_at FROM instances WHERE id = ?", instanceID)
85+
86+
var id, executionID string
87+
var createdAt time.Time
88+
var completedAt *time.Time
89+
90+
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
91+
if err != nil {
92+
if err == sql.ErrNoRows {
93+
return nil, nil
94+
}
95+
96+
return nil, err
97+
}
98+
99+
var state backend.WorkflowState
100+
if completedAt != nil {
101+
state = backend.WorkflowStateFinished
102+
}
103+
104+
return &diag.WorkflowInstanceRef{
105+
Instance: core.NewWorkflowInstance(id, executionID),
106+
CreatedAt: createdAt,
107+
CompletedAt: completedAt,
108+
State: state,
109+
}, nil
110+
}

backend/sqlite/sqlite.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
//go:embed schema.sql
2424
var schema string
2525

26-
func NewInMemoryBackend(opts ...backend.BackendOption) backend.Backend {
26+
func NewInMemoryBackend(opts ...backend.BackendOption) *sqliteBackend {
2727
b := newSqliteBackend("file::memory:", opts...)
2828

2929
b.db.SetMaxOpenConns(1)

0 commit comments

Comments
 (0)