Skip to content

Commit 9008bc3

Browse files
authored
Merge branch 'main' into yaananth-patch-2
2 parents 31532ec + cf95b44 commit 9008bc3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1355
-191
lines changed

README.md

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func Workflow2(ctx workflow.Context, msg string) (string, error) {
314314
From a workflow, call `workflow.ExecuteActivity` to execute an activity. The call returns a `Future[T]` you can await to get the result or any error it might return.
315315

316316
```go
317-
r1, err := workflow.ExecuteActivity[int](ctx, Activity1, 35, 12, nil, "test").Get(ctx)
317+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12, nil, "test").Get(ctx)
318318
if err != nil {
319319
panic("error getting activity 1 result")
320320
}
@@ -423,9 +423,86 @@ func SubWorkflow(ctx workflow.Context, msg string) (int, error) {
423423

424424
Similar to timer cancellation, you can pass a cancelable context to `CreateSubWorkflowInstance` and cancel the sub-workflow that way. Reacting to the cancellation is the same as canceling a workflow via the `Client`. See [Canceling workflows](#canceling-workflows) for more details.
425425

426+
### Error handling
427+
428+
#### Custom errors
429+
430+
Errors returned from activities and subworkflows need to be marshalled/unmarshalled by the library so they are wrapped in a `workflow.Error`. You can access the original type via the `err.Type` field. If a stacktrace was captured, you can access it via `err.Stack()`. Example (see also `samples/errors`):
431+
432+
```go
433+
func handleError(ctx workflow.Context, logger log.Logger, err error) {
434+
var werr *workflow.Error
435+
if errors.As(err, &werr) {
436+
switch werr.Type {
437+
case "CustomError": // This was a `type CustomError struct...` returned by an activity/subworkflow
438+
logger.Error("Custom error", "err", werr)
439+
return
440+
}
441+
442+
logger.Error("Generic workflow error", "err", werr, "stack", werr.Stack())
443+
return
444+
}
445+
446+
var perr *workflow.PanicError
447+
if errors.As(err, &perr) {
448+
// Activity/subworkflow ran into a panic
449+
logger.Error("Panic", "err", perr, "stack", perr.Stack())
450+
return
451+
}
452+
453+
logger.Error("Generic error", "err", err)
454+
}
455+
```
456+
457+
#### Panics
458+
459+
A panic in an activity will be captured by the library and made available as a `workflow.PanicError` in the calling workflow. Example:
460+
461+
462+
```go
463+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
464+
if err != nil {
465+
panic("error getting activity 1 result")
466+
}
467+
468+
var perr *workflow.PanicError
469+
if errors.As(err, &perr) {
470+
logger.Error("Panic", "err", perr, "stack", perr.Stack())
471+
return
472+
}
473+
```
474+
475+
#### Retries
476+
477+
With the default `DefaultActivityOptions`, Activities are retried up to three times when they return an error. If you want to keep automatic retries, but want to avoid them when hitting certain error types, you can wrap an error with `workflow.NewPermanentError`:
478+
479+
**Workflow**:
480+
481+
```go
482+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, "test").Get(ctx)
483+
if err != nil {
484+
panic("error getting activity 1 result")
485+
}
486+
487+
log.Println(r1)
488+
```
489+
490+
**Activity**:
491+
492+
```go
493+
func Activity1(ctx context.Context, name string) (int, error) {
494+
if name == "test" {
495+
// No need to retry in this case, the activity will aways fail with the given inputs
496+
return 0, workflow.NewPermanentError(errors.New("test is not a valid name"))
497+
}
498+
499+
return http.Do("POST", "https://example.com", name)
500+
}
501+
```
502+
426503
### `ContinueAsNew`
427504

428-
```ContinueAsNew` allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special `error` from your workflow that contains the new inputs:
505+
`ContinueAsNew` allows you to restart workflow execution with different inputs. The purpose is to keep the history size small enough to avoid hitting size limits, running out of memory and impacting performance. It works by returning a special `error` from your workflow that contains the new inputs:
429506

430507
```go
431508
wf := func(ctx workflow.Context, run int) (int, error) {
@@ -756,6 +833,3 @@ and only if a workflow instance was created with a version of `>= 2` will `Activ
756833
757834
This kind of check is understandable for simple changes, but it becomes hard and a source of bugs for more complicated workflows. Therefore for now versioning is not supported and the guidance is to rely on **side-by-side** deployments. See also Azure's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-versioning) documentation for the same topic.
758835
759-
### `ContinueAsNew`
760-
761-
Both Temporal/Cadence and DTFx support `ContinueAsNew`. This essentially re-starts a running workflow as a new workflow with a new event history. This is needed for long running workflows where the history can become very large, negatively affecting performance. While `WorkflowInstance` supports an `InstanceID` and an `ExecutionID`, this feature is not yet implemented (and might not be).

backend/backend.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type Backend interface {
6868
// ExtendActivityTask extends the lock of an activity task
6969
ExtendActivityTask(ctx context.Context, activityID string) error
7070

71+
// GetStats returns stats about the backend
72+
GetStats(ctx context.Context) (*Stats, error)
73+
7174
// Logger returns the configured logger for the backend
7275
Logger() log.Logger
7376

backend/mock_Backend.go

Lines changed: 31 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/mysql/stats.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package mysql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
)
10+
11+
func (b *mysqlBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
12+
s := &backend.Stats{}
13+
14+
tx, err := b.db.BeginTx(ctx, &sql.TxOptions{
15+
Isolation: sql.LevelReadCommitted,
16+
})
17+
if err != nil {
18+
return nil, fmt.Errorf("failed to start transaction: %w", err)
19+
}
20+
defer tx.Rollback()
21+
22+
row := tx.QueryRowContext(
23+
ctx,
24+
"SELECT COUNT(*) FROM instances i WHERE i.completed_at IS NULL",
25+
)
26+
if err := row.Err(); err != nil {
27+
return nil, fmt.Errorf("failed to query active instances: %w", err)
28+
}
29+
30+
var activeInstances int64
31+
if err := row.Scan(&activeInstances); err != nil {
32+
return nil, fmt.Errorf("failed to scan active instances: %w", err)
33+
}
34+
35+
s.ActiveWorkflowInstances = activeInstances
36+
37+
// Get pending activities
38+
row = tx.QueryRowContext(
39+
ctx,
40+
"SELECT COUNT(*) FROM activities")
41+
if err := row.Err(); err != nil {
42+
return nil, fmt.Errorf("failed to query active activities: %w", err)
43+
}
44+
45+
var pendingActivities int64
46+
if err := row.Scan(&pendingActivities); err != nil {
47+
return nil, fmt.Errorf("failed to scan active activities: %w", err)
48+
}
49+
50+
s.PendingActivities = pendingActivities
51+
52+
return s, nil
53+
}

backend/redis/instance.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func createInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
160160
Score: float64(createdAt.UnixMilli()),
161161
})
162162

163+
p.SAdd(ctx, instancesActive(), instanceSegment(instance))
164+
163165
return nil
164166
}
165167

@@ -173,6 +175,10 @@ func updateInstanceP(ctx context.Context, p redis.Pipeliner, instance *core.Work
173175

174176
p.Set(ctx, key, string(b), 0)
175177

178+
if state.State != core.WorkflowInstanceStateActive {
179+
p.SRem(ctx, instancesActive(), instanceSegment(instance))
180+
}
181+
176182
// CreatedAt does not change, so skip updating the instancesByCreation() ZSET
177183

178184
return nil

backend/redis/keys.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ func instancesByCreation() string {
2929
return "instances-by-creation"
3030
}
3131

32+
func instancesActive() string {
33+
return "instances-active"
34+
}
35+
3236
func instancesExpiring() string {
3337
return "instances-expiring"
3438
}

backend/redis/queue.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,11 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
4343
workerName: uuid.NewString(),
4444
}
4545

46-
// Create the consumer group
47-
_, err := rdb.XGroupCreateMkStream(context.Background(), tq.streamKey, tq.groupName, "0").Result()
48-
if err != nil {
49-
// Ugly, check since there is no UPSERT for consumer groups. Might replace with a script
50-
// using XINFO & XGROUP CREATE atomically
51-
if err.Error() != "BUSYGROUP Consumer Group name already exists" {
52-
return nil, fmt.Errorf("creating task queue: %w", err)
53-
}
54-
}
55-
5646
// Pre-load script
5747
cmds := map[string]*redis.StringCmd{
58-
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
59-
"completeCmd": completeCmd.Load(context.Background(), rdb),
48+
"createGroupCmd": createGroupCmd.Load(context.Background(), rdb),
49+
"enqueueCmd": enqueueCmd.Load(context.Background(), rdb),
50+
"completeCmd": completeCmd.Load(context.Background(), rdb),
6051
}
6152

6253
for name, cmd := range cmds {
@@ -67,6 +58,12 @@ func newTaskQueue[T any](rdb redis.UniversalClient, tasktype string) (*taskQueue
6758
}
6859
}
6960

61+
// Create the consumer group
62+
err := createGroupCmd.Run(context.Background(), rdb, []string{tq.streamKey, tq.groupName}).Err()
63+
if err != nil {
64+
return nil, fmt.Errorf("creating task queue: %w", err)
65+
}
66+
7067
return tq, nil
7168
}
7269

@@ -77,6 +74,10 @@ func (q *taskQueue[T]) Keys() KeyInfo {
7774
}
7875
}
7976

77+
func (q *taskQueue[T]) Size(ctx context.Context, rdb redis.UniversalClient) (int64, error) {
78+
return rdb.XLen(ctx, q.streamKey).Result()
79+
}
80+
8081
// KEYS[1] = set
8182
// KEYS[2] = stream
8283
// ARGV[1] = caller provided id of the task
@@ -91,6 +92,34 @@ var enqueueCmd = redis.NewScript(
9192
return true
9293
`)
9394

95+
var createGroupCmd = redis.NewScript(`
96+
local streamKey = KEYS[1]
97+
local groupName = KEYS[2]
98+
local exists = false
99+
local res = redis.pcall('XINFO', 'GROUPS', streamKey)
100+
101+
if res and type(res) == 'table' then
102+
for _, groupInfo in ipairs(res) do
103+
if type(groupInfo) == 'table' then
104+
for i = 1, #groupInfo, 2 do
105+
if groupInfo[i] == 'name' and groupInfo[i+1] == groupName then
106+
exists = true
107+
break
108+
end
109+
end
110+
end
111+
if exists then break end
112+
end
113+
end
114+
115+
if not exists then
116+
redis.call('XGROUP', 'CREATE', streamKey, groupName, '$', 'MKSTREAM')
117+
end
118+
119+
return true
120+
`)
121+
122+
94123
func (q *taskQueue[T]) Enqueue(ctx context.Context, p redis.Pipeliner, id string, data *T) error {
95124
ds, err := json.Marshal(data)
96125
if err != nil {

backend/redis/stats.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/cschleiden/go-workflows/backend"
8+
)
9+
10+
func (rb *redisBackend) GetStats(ctx context.Context) (*backend.Stats, error) {
11+
var err error
12+
13+
s := &backend.Stats{}
14+
15+
// get workflow instances
16+
activeInstances, err := rb.rdb.SCard(ctx, instancesActive()).Result()
17+
if err != nil {
18+
return nil, fmt.Errorf("getting active instances: %w", err)
19+
}
20+
21+
s.ActiveWorkflowInstances = activeInstances
22+
23+
// get pending activities
24+
pendingActivities, err := rb.activityQueue.Size(ctx, rb.rdb)
25+
if err != nil {
26+
return nil, fmt.Errorf("getting active activities: %w", err)
27+
}
28+
29+
s.PendingActivities = pendingActivities
30+
31+
return s, nil
32+
}

backend/sqlite/sqlite.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func (sb *sqliteBackend) Converter() converter.Converter {
8383
return sb.options.Converter
8484
}
8585

86-
func (b *sqliteBackend) ContextPropagators() []contextpropagation.ContextPropagator {
87-
return b.options.ContextPropagators
86+
func (sb *sqliteBackend) ContextPropagators() []contextpropagation.ContextPropagator {
87+
return sb.options.ContextPropagators
8888
}
8989

9090
func (sb *sqliteBackend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {

0 commit comments

Comments
 (0)